Skip to content

Create prefix list

Workflow for creating a new Prefix List subscription.

initial_input_form_generator(product_name)

Get the partner name from the operator to create a Prefix List for.

Source code in gso/workflows/prefix_list/create_prefix_list.py
def initial_input_form_generator(product_name: str) -> FormGenerator:
    """Get the partner name from the operator to create a Prefix List for."""
    partners_with_prefix_list = [
        item["customer_id"]
        for item in get_subscriptions(
            product_types=[ProductType.PREFIX_LIST],
            lifecycles=[SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE],
            includes=["customer_id"],
        )
    ]

    available_partners = {
        partner.partner_id: partner.name
        for partner in get_partners_that_should_have_prefix_list()
        if partner.partner_id not in partners_with_prefix_list
    }
    partner_selector = Choice.__call__(
        "Select a partner", zip(available_partners.values(), available_partners.items(), strict=True)
    )

    class PrefixListForm(SubmitFormPage):
        model_config = ConfigDict(title=f"Create new {product_name}")

        partner: partner_selector  # type: ignore[valid-type]

        @field_validator("partner")
        def partner_can_have_only_one_prefix_list(cls, partner: str) -> str:
            if partner in partners_with_prefix_list:
                msg = "Each partner can have only one Prefix List subscription!"
                raise ValueError(msg)
            return partner

    user_input = yield PrefixListForm
    return user_input.model_dump()

create_subscription(product, partner)

Create a Prefix List subscription in the database.

Source code in gso/workflows/prefix_list/create_prefix_list.py
@step("Create subscription")
def create_subscription(product: UUIDstr, partner: UUIDstr) -> State:
    """Create a Prefix List subscription in the database."""
    subscription = PrefixListInactive.from_product_id(product, partner)
    subscription.description = f"Prefix List for {get_partner_by_id(partner).name}"

    return {"subscription": subscription, "subscription_id": subscription.subscription_id}

create_prefix_list()

Create a Prefix List service.

Source code in gso/workflows/prefix_list/create_prefix_list.py
@create_workflow("Create Prefix List", initial_input_form=initial_input_form_generator)
def create_prefix_list() -> StepList:
    """Create a Prefix List service."""
    return begin >> create_subscription >> store_process_subscription()