Skip to content

Create imported super pop switch

A creation workflow that adds existing Super PoP switches to the coreDB.

create_subscription(partner)

Create a new subscription object.

Source code in gso/workflows/super_pop_switch/create_imported_super_pop_switch.py
@step("Create subscription")
def create_subscription(partner: str) -> State:
    """Create a new subscription object."""
    partner_id = get_partner_by_name(partner)["partner_id"]
    product_id = subscriptions.get_product_id_by_name(ProductName.IMPORTED_SUPER_POP_SWITCH)
    subscription = ImportedSuperPopSwitchInactive.from_product_id(product_id, partner_id)

    return {
        "subscription": subscription,
        "subscription_id": subscription.subscription_id,
    }

initial_input_form_generator()

Generate a form that is filled in using information passed through the API endpoint.

Source code in gso/workflows/super_pop_switch/create_imported_super_pop_switch.py
def initial_input_form_generator() -> FormGenerator:
    """Generate a form that is filled in using information passed through the API endpoint."""

    class ImportSuperPopSwitch(SubmitFormPage):
        model_config = ConfigDict(title="Import a Super PoP switch")

        partner: str
        super_pop_switch_site: str
        hostname: str
        super_pop_switch_ts_port: PortNumber
        super_pop_switch_mgmt_ipv4_address: IPv4AddressType

    user_input = yield ImportSuperPopSwitch

    return user_input.model_dump()

initialize_subscription(subscription, hostname, super_pop_switch_ts_port, super_pop_switch_site, super_pop_switch_mgmt_ipv4_address=None)

Initialise the Super PoP switch subscription using input data.

Source code in gso/workflows/super_pop_switch/create_imported_super_pop_switch.py
@step("Initialize subscription")
def initialize_subscription(
    subscription: ImportedSuperPopSwitchInactive,
    hostname: str,
    super_pop_switch_ts_port: PortNumber,
    super_pop_switch_site: str,
    super_pop_switch_mgmt_ipv4_address: IPv4AddressType | None = None,
) -> State:
    """Initialise the Super PoP switch subscription using input data."""
    subscription.super_pop_switch.super_pop_switch_ts_port = super_pop_switch_ts_port
    site_obj = get_site_by_name(super_pop_switch_site).site
    subscription.super_pop_switch.super_pop_switch_site = site_obj
    fqdn = generate_fqdn(hostname, site_obj.site_name, site_obj.site_country_code)
    subscription.super_pop_switch.super_pop_switch_fqdn = fqdn
    subscription.description = f"Super PoP switch {fqdn}"
    subscription.super_pop_switch.super_pop_switch_mgmt_ipv4_address = super_pop_switch_mgmt_ipv4_address
    subscription.super_pop_switch.vendor = Vendor.JUNIPER

    return {"subscription": subscription}

create_imported_super_pop_switch()

Import a Super PoP switch without provisioning it.

Source code in gso/workflows/super_pop_switch/create_imported_super_pop_switch.py
@workflow(
    "Import Super PoP switch",
    initial_input_form=initial_input_form_generator,
    target=Target.CREATE,
)
def create_imported_super_pop_switch() -> StepList:
    """Import a Super PoP switch without provisioning it."""
    return (
        begin
        >> create_subscription
        >> store_process_subscription(Target.CREATE)
        >> initialize_subscription
        >> set_status(SubscriptionLifecycle.ACTIVE)
        >> resync
        >> done
    )