Skip to content

Create port label

Workflow for creating a Port Label.

initial_input_form_generator(product_name)

Gather operator input on which Edge Port the label should be applied.

Source code in gso/workflows/port_label/create_port_label.py
def initial_input_form_generator(product_name: str) -> FormGenerator:
    """Gather operator input on which Edge Port the label should be applied."""

    class CreatePortLabelForm(FormPage):
        model_config = ConfigDict(title=f"Apply label {product_name} to Edge Port")

        edge_port: active_edge_port_selector(exclude_subscription_ids=get_edge_ports_by_label(product_name))  # type: ignore[valid-type]

    user_input = yield CreatePortLabelForm
    return {"product_name": product_name, "edge_port": user_input.edge_port}

create_subscription(product)

Create a new Port Label subscription.

Source code in gso/workflows/port_label/create_port_label.py
@step("Create subscription")
def create_subscription(product: UUIDstr) -> State:
    """Create a new Port Label subscription."""
    geant_partner_id = get_partner_by_name("GEANT").partner_id
    subscription = PortLabelInactive.from_product_id(product, geant_partner_id)

    return {"subscription": subscription, "subscription_id": subscription.subscription_id}

initialize_subscription(subscription, product_name, edge_port)

Initialize Port Label subscription object.

Source code in gso/workflows/port_label/create_port_label.py
@step("Initialize subscription")
def initialize_subscription(subscription: PortLabelInactive, product_name: str, edge_port: UUIDstr) -> State:
    """Initialize Port Label subscription object."""
    subscription.port_label.edge_port = EdgePort.from_subscription(edge_port).edge_port
    subscription.description = f"Port Label - {product_name}"

    subscription = PortLabel.from_other_lifecycle(subscription, SubscriptionLifecycle.ACTIVE)

    return {"subscription": subscription}

create_port_label()

Create a new Port Label subscription.

Source code in gso/workflows/port_label/create_port_label.py
@create_workflow("Create Port Label", initial_input_form=initial_input_form_generator)
def create_port_label() -> StepList:
    """Create a new Port Label subscription."""
    return begin >> create_subscription >> store_process_subscription() >> initialize_subscription