Skip to content

Create imported switch

A creation workflow that adds an existing switch to the service database.

create_subscription(partner)

Create a new subscription object.

Source code in gso/workflows/switch/create_imported_switch.py
@step("Create subscription")
def create_subscription(partner: str) -> State:
    """Create a new subscription object."""
    partner_id = get_partner_by_name(partner).partner_id
    product_id = get_product_id_by_name(ProductName.IMPORTED_SWITCH)
    subscription = ImportedSwitchInactive.from_product_id(product_id, partner_id)

    return {"subscription": subscription, "subscription_id": subscription.subscription_id}

initialize_subscription(subscription, fqdn, ts_port, site, switch_vendor, switch_model)

Initialize the switch subscription using input data.

Source code in gso/workflows/switch/create_imported_switch.py
@step("Initialize subscription")
def initialize_subscription(
    subscription: ImportedSwitchInactive,
    fqdn: str,
    ts_port: PortNumber,
    site: UUIDstr,
    switch_vendor: Vendor,
    switch_model: SwitchModel,
) -> State:
    """Initialize the switch subscription using input data."""
    subscription.switch.fqdn = fqdn
    subscription.switch.ts_port = ts_port
    subscription.switch.site = Site.from_subscription(site).site
    subscription.switch.switch_vendor = switch_vendor
    subscription.switch.switch_model = switch_model

    return {"subscription": subscription}

create_imported_switch()

Create an imported switch without provisioning it.

Source code in gso/workflows/switch/create_imported_switch.py
@workflow("Create Imported Switch", initial_input_form=_initial_input_form_generator, target=Target.CREATE)
def create_imported_switch() -> StepList:
    """Create an imported switch without provisioning it."""
    return (
        begin
        >> create_subscription
        >> store_process_subscription(Target.CREATE)
        >> initialize_subscription
        >> set_status(SubscriptionLifecycle.ACTIVE)
        >> resync
        >> done
    )