Skip to content

Terminate edge port

Terminate an edge port in the network.

initial_input_form_generator(subscription_id)

Let the operator decide whether to delete configuration on the router, and clear up IPAM resources.

Source code in gso/workflows/edge_port/terminate_edge_port.py
def initial_input_form_generator(subscription_id: UUID) -> FormGenerator:
    """Let the operator decide whether to delete configuration on the router, and clear up IPAM resources."""
    edge_port = EdgePort.from_subscription(subscription_id)

    class TerminateForm(FormPage):
        tt_number: TTNumber

        label: Label = "Should this workflow run Ansible playbooks to remove configuration from the router?"
        run_ansible_steps: bool = True

    user_input = yield TerminateForm

    summary_fields = ["tt_number", "run_ansible_steps", "description"]
    summary_form_input = user_input.model_dump() | {"description": edge_port.description}
    yield from create_summary_form(summary_form_input, edge_port.product.name, summary_fields)

    return user_input.model_dump()

remove_edge_port_dry(subscription, tt_number, process_id)

Remove an edge port from the network.

Source code in gso/workflows/edge_port/terminate_edge_port.py
@step("[DRY RUN] Remove Edge Port")
def remove_edge_port_dry(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> dict[str, Any]:
    """Remove an edge port from the network."""
    extra_vars = {
        "subscription": subscription,
        "dry_run": True,
        "verb": "terminate",
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Delete Edge Port",
    }

    return {
        "subscription": subscription,
        "playbook_name": "gap_ansible/playbooks/edge_port.yaml",
        "inventory": {"all": {"hosts": {subscription["edge_port"]["node"]["router_fqdn"]: None}}},
        "extra_vars": extra_vars,
    }

remove_edge_port_real(subscription, tt_number, process_id)

Remove an edge port from the network.

Source code in gso/workflows/edge_port/terminate_edge_port.py
@step("[FOR REAL] Remove Edge Port")
def remove_edge_port_real(subscription: dict[str, Any], tt_number: str, process_id: UUIDstr) -> LSOState:
    """Remove an edge port from the network."""
    extra_vars = {
        "subscription": subscription,
        "dry_run": False,
        "verb": "terminate",
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Delete Edge Port",
    }

    return {
        "subscription": subscription,
        "playbook_name": "gap_ansible/playbooks/edge_port.yaml",
        "inventory": {"all": {"hosts": {subscription["edge_port"]["node"]["router_fqdn"]: None}}},
        "extra_vars": extra_vars,
    }

netbox_clean_up(subscription)

Update Netbox to remove the edge port LAG interface and all the LAG members. This is only for Nokia routers.

Source code in gso/workflows/edge_port/terminate_edge_port.py
@step("Netbox Clean Up")
def netbox_clean_up(subscription: EdgePort) -> None:
    """Update Netbox to remove the edge port LAG interface and all the LAG members. This is only for Nokia routers."""
    nbclient = NetboxClient()

    for member in subscription.edge_port.edge_port_ae_members:
        nbclient.free_interface(subscription.edge_port.node.router_fqdn, member.interface_name)

    nbclient.delete_interface(subscription.edge_port.node.router_fqdn, subscription.edge_port.edge_port_name)

terminate_edge_port()

Terminate a new edge port in the network.

Source code in gso/workflows/edge_port/terminate_edge_port.py
@terminate_workflow("Terminate Edge Port", initial_input_form=initial_input_form_generator)
def terminate_edge_port() -> StepList:
    """Terminate a new edge port in the network."""
    router_is_nokia = conditional(lambda state: state["subscription"]["edge_port"]["node"]["vendor"] == Vendor.NOKIA)

    return (
        begin
        >> run_ansible_steps(lso_interaction(remove_edge_port_dry))
        >> run_ansible_steps(lso_interaction(remove_edge_port_real))
        >> router_is_nokia(netbox_clean_up)
    )