Skip to content

Redeploy commercial peer

Workflow to redeploy an existing Commercial Peer.

initial_input_form_generator(subscription_id)

Generate an initial input form where an operator selects the peering connection that should get redeployed.

Source code in gso/workflows/commercial_peer/redeploy_commercial_peer.py
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
    """Generate an initial input form where an operator selects the peering connection that should get redeployed."""
    subscription = CommercialPeer.from_subscription(subscription_id)
    commercial_peer = subscription.commercial_peer
    product_name = subscription.product.name

    def existing_peering_connection_selector() -> TypeAlias:
        """Generate a dropdown selector for choosing an existing Peering Connection in an input form."""
        options = {
            str(pc.subscription_instance_id): (
                f"{SubscriptionModel.from_subscription(pc.placement_port.owner_subscription_id).description} - "
                f"{pc.bgp_session_v4.peer_address} - "
                f"{pc.bgp_session_v6.peer_address}"
            )
            for pc in commercial_peer.peering_connection_list
        }

        return cast(
            type[Choice],
            Choice.__call__(
                "Select a Peering Connection",
                zip(options.keys(), options.items(), strict=True),
            ),
        )

    class RedeployCommercialPeerForm(FormPage):
        model_config = ConfigDict(title=f"Redeploy {product_name}")

        tt_number: TTNumber
        divider: Divider = Field(None, exclude=True)
        label: Label = Field(
            f"Please select one of the peering connections associated with this {product_name} that "
            f"should get redeployed.",
            exclude=True,
        )
        peering_connection: existing_peering_connection_selector()  # type: ignore[valid-type]

    user_input = yield RedeployCommercialPeerForm
    selected_peering_connection = PeeringConnection.from_db(user_input.peering_connection)

    return {
        "verb": "deploy",
        "selected_peering_connection": selected_peering_connection,
        "tt_number": user_input.tt_number,
    }

redeploy_commercial_peer()

Redeploy a Commercial Peer subscription.

Source code in gso/workflows/commercial_peer/redeploy_commercial_peer.py
@redeploy_workflow("Redeploy Commercial Peer", initial_input_form=initial_input_form_generator)
def redeploy_commercial_peer() -> StepList:
    """Redeploy a Commercial Peer subscription."""
    return (
        begin
        >> scope_subscription
        >> lso_interaction(deploy_commercial_peer_dry)
        >> lso_interaction(deploy_commercial_peer_real)
    )