Skip to content

Create imported lan switch interconnect

Import an existing LAN Switch Interconnect into the subscription database.

create_subscription(partner)

Create a new subscription object.

Source code in gso/workflows/lan_switch_interconnect/create_imported_lan_switch_interconnect.py
@step("Create subscription")
def create_subscription(partner: str) -> State:
    """Create a new subscription object."""
    partner_id = get_partner_by_name(partner)["partner_id"]
    product_id = get_product_id_by_name(ProductName.IMPORTED_LAN_SWITCH_INTERCONNECT)
    subscription = ImportedLanSwitchInterconnectInactive.from_product_id(product_id, partner_id)

    return {"subscription": subscription, "subscription_id": subscription.subscription_id}

initialize_subscription(subscription, lan_switch_interconnect_description, minimum_links, switch_management_vlan_id, dcn_management_vlan_id, router_side, switch_side)

Initialize the subscription using input data.

Source code in gso/workflows/lan_switch_interconnect/create_imported_lan_switch_interconnect.py
@step("Initialize subscription")
def initialize_subscription(
    subscription: ImportedLanSwitchInterconnectInactive,
    lan_switch_interconnect_description: str,
    minimum_links: int,
    switch_management_vlan_id: VLAN_ID,
    dcn_management_vlan_id: VLAN_ID,
    router_side: dict,
    switch_side: dict,
) -> State:
    """Initialize the subscription using input data."""
    subscription.lan_switch_interconnect.lan_switch_interconnect_description = lan_switch_interconnect_description
    subscription.lan_switch_interconnect.minimum_links = minimum_links
    subscription.lan_switch_interconnect.switch_management_vlan_id = switch_management_vlan_id
    subscription.lan_switch_interconnect.dcn_management_vlan_id = dcn_management_vlan_id

    router_block = Router.from_subscription(router_side.pop("node")).router
    router_side_interfaces = [
        LanSwitchInterconnectInterfaceBlockInactive.new(uuid4(), **ae_member)
        for ae_member in router_side.pop("ae_members")
    ]
    subscription.lan_switch_interconnect.router_side = LanSwitchInterconnectRouterSideBlockInactive.new(
        uuid4(), **router_side, node=router_block, ae_members=router_side_interfaces
    )

    switch_block = Switch.from_subscription(switch_side.pop("switch")).switch
    switch_side_interfaces = [
        LanSwitchInterconnectInterfaceBlockInactive.new(uuid4(), **ae_member)
        for ae_member in switch_side.pop("ae_members")
    ]
    subscription.lan_switch_interconnect.switch_side = LanSwitchInterconnectSwitchSideBlockInactive.new(
        uuid4(), **switch_side, switch=switch_block, ae_members=switch_side_interfaces
    )

    return {"subscription": subscription}

create_imported_lan_switch_interconnect()

Create an imported LAN Switch Interconnect without provisioning it.

Source code in gso/workflows/lan_switch_interconnect/create_imported_lan_switch_interconnect.py
@workflow(
    "Create Imported LAN Switch Interconnect", initial_input_form=_initial_input_form_generator, target=Target.CREATE
)
def create_imported_lan_switch_interconnect() -> StepList:
    """Create an imported LAN Switch Interconnect without provisioning it."""
    return (
        begin
        >> create_subscription
        >> store_process_subscription(Target.CREATE)
        >> initialize_subscription
        >> set_status(SubscriptionLifecycle.ACTIVE)
        >> resync
        >> done
    )