Skip to content

Import peering connections

A modification workflow for importing peering connections to a Commercial Peer subscription.

initial_input_form_generator(subscription_id)

Get the initial input form for importing peering connections to a Commercial Peer subscription.

Source code in gso/workflows/commercial_peer/import_peering_connections.py
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
    """Get the initial input form for importing peering connections to a Commercial Peer subscription."""
    subscription = CommercialPeer.from_subscription(subscription_id)

    class ImportPeeringConnectionsForm(SubmitFormPage):
        peering_connection_list: list[PeeringConnectionModel] = Field(default_factory=list)

    user_input = yield ImportPeeringConnectionsForm

    return user_input.model_dump() | {"subscription": subscription}

add_peering_connections_to_subscription(subscription, peering_connection_list)

Add peering connections to the Commercial Peer subscription.

Source code in gso/workflows/commercial_peer/import_peering_connections.py
@step("Modify subscription to add peering connections")
def add_peering_connections_to_subscription(
    subscription: CommercialPeer,
    peering_connection_list: list[dict[str, Any]],
) -> dict:
    """Add peering connections to the Commercial Peer subscription."""
    added_peering_connections = [
        PeeringConnection.new(
            subscription_id=uuid4(),
            bgp_session_v4=BGPSession.new(
                subscription_id=uuid4(),
                ip_type=IPTypes.IPV4,
                prefix_limit=subscription.commercial_peer.prefix_limit_v4,
                **peering_connection["bgp_session_v4"],
            ),
            bgp_session_v6=BGPSession.new(
                subscription_id=uuid4(),
                ip_type=IPTypes.IPV6,
                prefix_limit=subscription.commercial_peer.prefix_limit_v6,
                **peering_connection["bgp_session_v6"],
            ),
            minimum_hold_timer=peering_connection.get("minimum_hold_timer", None),
            session_state=peering_connection["session_state"],
            placement_port=SubscriptionModel.from_subscription(peering_connection["placement_port"]).product_block,  # type: ignore[attr-defined]
        )
        for peering_connection in peering_connection_list
    ]
    subscription.commercial_peer.peering_connection_list.extend(added_peering_connections)
    return {"subscription": subscription}

import_peering_connections()

Import Peering connections to an existing Commercial Peer subscription.

Source code in gso/workflows/commercial_peer/import_peering_connections.py
@modify_workflow("Import Peering Connections", initial_input_form_generator)
def import_peering_connections() -> StepList:
    """Import Peering connections to an existing Commercial Peer subscription."""
    return begin >> add_peering_connections_to_subscription >> resync >> done