Skip to content

Create imported r and e lhcone

A creation workflow for adding an existing Imported R&E LHCONE to the service database.

initial_input_form_generator()

Initial input form generator for creating a new imported R&E LHCOne subscription.

Source code in gso/workflows/l3_core_service/r_and_e_lhcone/create_imported_r_and_e_lhcone.py
def initial_input_form_generator() -> FormGenerator:
    """Initial input form generator for creating a new imported R&E LHCOne subscription."""

    class ImportL3CoreServiceForm(SubmitFormPage):
        partner: str
        service_binding_ports: list[ServiceBindingPort]
        product_name: L3ProductNameType
        v4_bgp_local_preference: NonNegativeInt
        v4_bgp_med: MultiExitDiscriminator
        v6_bgp_local_preference: NonNegativeInt
        v6_bgp_med: MultiExitDiscriminator

    user_input = yield ImportL3CoreServiceForm

    return user_input.model_dump()

create_subscription(partner)

Create a new subscription object in the database.

Source code in gso/workflows/l3_core_service/r_and_e_lhcone/create_imported_r_and_e_lhcone.py
@step("Create subscription")
def create_subscription(partner: str) -> dict:
    """Create a new subscription object in the database."""
    partner_id = get_partner_by_name(partner).partner_id
    product_id = get_product_id_by_name(ProductName.IMPORTED_R_AND_E_LHCONE)
    subscription = ImportedRAndELHCOneInactive.from_product_id(product_id, partner_id)
    return {"subscription": subscription, "subscription_id": subscription.subscription_id}

create_imported_r_and_e_lhcone()

Import an R&E LHCONE without provisioning it.

Source code in gso/workflows/l3_core_service/r_and_e_lhcone/create_imported_r_and_e_lhcone.py
@workflow(
    "Create Imported R&E LHCONE",
    initial_input_form=initial_input_form_generator,
    target=Target.CREATE,
)
def create_imported_r_and_e_lhcone() -> StepList:
    """Import an R&E LHCONE without provisioning it."""
    return (
        begin
        >> create_subscription
        >> store_process_subscription()
        >> initialize_subscription
        >> update_r_and_e_lhcone_subscription_model
        >> set_status(SubscriptionLifecycle.ACTIVE)
        >> resync
        >> done
    )