Skip to content

Terminate edge port

Terminate an edge port in the network.

initial_input_form_generator()

Let the operator decide whether to delete configuration on the router, and clear up IPAM resources.

Source code in gso/workflows/edge_port/terminate_edge_port.py
def initial_input_form_generator() -> FormGenerator:
    """Let the operator decide whether to delete configuration on the router, and clear up IPAM resources."""

    class TerminateForm(SubmitFormPage):
        tt_number: TTNumber

    user_input = yield TerminateForm
    return user_input.model_dump()

remove_edge_port_dry(subscription, tt_number, process_id)

Remove an edge port from the network.

Source code in gso/workflows/edge_port/terminate_edge_port.py
@step("[DRY RUN] Remove Edge Port")
def remove_edge_port_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> dict[str, Any]:
    """Remove an edge port from the network."""
    extra_vars = {
        "subscription": subscription,
        "dry_run": True,
        "verb": "terminate",
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Delete Edge Port",
    }

    return {
        "subscription": subscription,
        "playbook_name": "gap_ansible/playbooks/edge_port.yaml",
        "inventory": {"all": {"hosts": {subscription["edge_port"]["node"]["router_fqdn"]: None}}},
        "extra_vars": extra_vars,
    }

remove_edge_port_real(subscription, tt_number, process_id)

Remove an edge port from the network.

Source code in gso/workflows/edge_port/terminate_edge_port.py
@step("[FOR REAL] Remove Edge Port")
def remove_edge_port_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
    """Remove an edge port from the network."""
    extra_vars = {
        "subscription": subscription,
        "dry_run": False,
        "verb": "terminate",
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Delete Edge Port",
    }

    return {
        "subscription": subscription,
        "playbook_name": "gap_ansible/playbooks/edge_port.yaml",
        "inventory": {"all": {"hosts": {subscription["edge_port"]["node"]["router_fqdn"]: None}}},
        "extra_vars": extra_vars,
    }

netbox_clean_up(subscription)

Update Netbox to remove the edge port LAG interface and all the LAG members.

Source code in gso/workflows/edge_port/terminate_edge_port.py
@step("Netbox Clean Up")
def netbox_clean_up(subscription: EdgePort) -> None:
    """Update Netbox to remove the edge port LAG interface and all the LAG members."""
    nbclient = NetboxClient()

    for member in subscription.edge_port.edge_port_ae_members:
        nbclient.free_interface(subscription.edge_port.node.router_fqdn, member.interface_name)

    nbclient.delete_interface(subscription.edge_port.node.router_fqdn, subscription.edge_port.edge_port_name)

terminate_edge_port()

Terminate a new edge port in the network.

Source code in gso/workflows/edge_port/terminate_edge_port.py
@workflow(
    "Terminate Edge Port",
    initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
    target=Target.TERMINATE,
)
def terminate_edge_port() -> StepList:
    """Terminate a new edge port in the network."""
    return (
        begin
        >> store_process_subscription(Target.TERMINATE)
        >> unsync
        >> lso_interaction(remove_edge_port_dry)
        >> lso_interaction(remove_edge_port_real)
        >> netbox_clean_up
        >> set_status(SubscriptionLifecycle.TERMINATED)
        >> resync
        >> done
    )