Import an existing LAN Switch Interconnect into the subscription database.
create_subscription(partner)
Create a new subscription object.
Source code in gso/workflows/lan_switch_interconnect/create_imported_lan_switch_interconnect.py
| @step("Create subscription")
def create_subscription(partner: str) -> State:
"""Create a new subscription object."""
partner_id = get_partner_by_name(partner)["partner_id"]
product_id = get_product_id_by_name(ProductName.IMPORTED_LAN_SWITCH_INTERCONNECT)
subscription = ImportedLanSwitchInterconnectInactive.from_product_id(product_id, partner_id)
return {"subscription": subscription, "subscription_id": subscription.subscription_id}
|
initialize_subscription(subscription, lan_switch_interconnect_description, minimum_links, switch_management_vlan_id, dcn_management_vlan_id, router_side, switch_side)
Initialize the subscription using input data.
Source code in gso/workflows/lan_switch_interconnect/create_imported_lan_switch_interconnect.py
| @step("Initialize subscription")
def initialize_subscription(
subscription: ImportedLanSwitchInterconnectInactive,
lan_switch_interconnect_description: str,
minimum_links: int,
switch_management_vlan_id: VLAN_ID,
dcn_management_vlan_id: VLAN_ID,
router_side: dict,
switch_side: dict,
) -> State:
"""Initialize the subscription using input data."""
subscription.lan_switch_interconnect.lan_switch_interconnect_description = lan_switch_interconnect_description
subscription.lan_switch_interconnect.minimum_links = minimum_links
subscription.lan_switch_interconnect.switch_management_vlan_id = switch_management_vlan_id
subscription.lan_switch_interconnect.dcn_management_vlan_id = dcn_management_vlan_id
router_block = Router.from_subscription(router_side.pop("node")).router
router_side_interfaces = [
LanSwitchInterconnectInterfaceBlockInactive.new(uuid4(), **ae_member)
for ae_member in router_side.pop("ae_members")
]
subscription.lan_switch_interconnect.router_side = LanSwitchInterconnectRouterSideBlockInactive.new(
uuid4(), **router_side, node=router_block, ae_members=router_side_interfaces
)
switch_block = Switch.from_subscription(switch_side.pop("switch")).switch
switch_side_interfaces = [
LanSwitchInterconnectInterfaceBlockInactive.new(uuid4(), **ae_member)
for ae_member in switch_side.pop("ae_members")
]
subscription.lan_switch_interconnect.switch_side = LanSwitchInterconnectSwitchSideBlockInactive.new(
uuid4(), **switch_side, switch=switch_block, ae_members=switch_side_interfaces
)
return {"subscription": subscription}
|
create_imported_lan_switch_interconnect()
Create an imported LAN Switch Interconnect without provisioning it.
Source code in gso/workflows/lan_switch_interconnect/create_imported_lan_switch_interconnect.py
| @workflow(
"Create Imported LAN Switch Interconnect", initial_input_form=_initial_input_form_generator, target=Target.CREATE
)
def create_imported_lan_switch_interconnect() -> StepList:
"""Create an imported LAN Switch Interconnect without provisioning it."""
return (
begin
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)
|