Skip to content

Create imported opengear

A creation workflow that adds an existing opengear to the service database.

create_subscription(partner)

Create a new subscription object.

Source code in gso/workflows/opengear/create_imported_opengear.py
@step("Create subscription")
def create_subscription(partner: str) -> State:
    """Create a new subscription object."""
    partner_id = get_partner_by_name(partner).partner_id
    product_id = get_product_id_by_name(ProductName.IMPORTED_OPENGEAR)
    subscription = ImportedOpengearInactive.from_product_id(product_id, partner_id)

    return {
        "subscription": subscription,
        "subscription_id": subscription.subscription_id,
    }

initial_input_form_generator()

Generate a form that is filled in using information passed through the API endpoint.

Source code in gso/workflows/opengear/create_imported_opengear.py
def initial_input_form_generator() -> FormGenerator:
    """Generate a form that is filled in using information passed through the API endpoint."""

    class ImportOpengear(SubmitFormPage):
        model_config = ConfigDict(title="Import Opengear")

        partner: str
        opengear_site: str
        opengear_hostname: str
        opengear_wan_address: IPv4AddressType
        opengear_wan_netmask: IPv4AddressType
        opengear_wan_gateway: IPv4AddressType

    user_input = yield ImportOpengear

    return user_input.model_dump()

initialize_subscription(subscription, opengear_site, opengear_hostname, opengear_wan_address, opengear_wan_netmask, opengear_wan_gateway)

Initialise the Imported Opengear subscription using input data.

Source code in gso/workflows/opengear/create_imported_opengear.py
@step("Initialize subscription")
def initialize_subscription(
    subscription: ImportedOpengearInactive,
    opengear_site: str,
    opengear_hostname: str,
    opengear_wan_address: IPv4AddressType | None,
    opengear_wan_netmask: IPv4AddressType | None,
    opengear_wan_gateway: IPv4AddressType | None,
) -> State:
    """Initialise the Imported Opengear subscription using input data."""
    subscription.opengear.opengear_site = get_site_by_name(opengear_site).site
    subscription.opengear.opengear_hostname = opengear_hostname
    subscription.opengear.opengear_wan_address = opengear_wan_address
    subscription.opengear.opengear_wan_netmask = opengear_wan_netmask
    subscription.opengear.opengear_wan_gateway = opengear_wan_gateway

    return {"subscription": subscription}

create_imported_opengear()

Import an Opengear without provisioning it.

Source code in gso/workflows/opengear/create_imported_opengear.py
@workflow(
    "Import Opengear",
    initial_input_form=initial_input_form_generator,
    target=Target.CREATE,
)
def create_imported_opengear() -> StepList:
    """Import an Opengear without provisioning it."""
    return (
        init
        >> create_subscription
        >> store_process_subscription(Target.CREATE)
        >> initialize_subscription
        >> set_status(SubscriptionLifecycle.ACTIVE)
        >> resync
        >> done
    )