Skip to content

Validate edge port

Workflow for validating an existing Edge port subscription.

prepare_state(subscription_id)

Add required keys to the state for the workflow to run successfully.

Source code in gso/workflows/edge_port/validate_edge_port.py
@step("Prepare required keys in state")
def prepare_state(subscription_id: UUIDstr) -> State:
    """Add required keys to the state for the workflow to run successfully."""
    edge_port = EdgePort.from_subscription(subscription_id)

    return {"subscription": edge_port}

verify_netbox_entries(subscription)

Validate required entries for an edge port in NetBox.

Source code in gso/workflows/edge_port/validate_edge_port.py
@step("Verify NetBox entries")
def verify_netbox_entries(subscription: EdgePort) -> None:
    """Validate required entries for an edge port in NetBox."""
    nbclient = NetboxClient()
    netbox_errors = []

    #  Raises en exception when not found.
    lag = nbclient.get_interface_by_name_and_device(
        subscription.edge_port.edge_port_name, subscription.edge_port.node.router_fqdn
    )
    if lag.description != str(subscription.subscription_id):
        netbox_errors.append(
            f"Incorrect description for '{lag}', expected "
            f"'{subscription.subscription_id}' but got '{lag.description}'"
        )
    if not lag.enabled:
        netbox_errors.append(f"NetBox interface '{lag}' is not enabled.")
    for member in subscription.edge_port.edge_port_ae_members:
        interface = nbclient.get_interface_by_name_and_device(
            member.interface_name, subscription.edge_port.node.router_fqdn
        )
        if interface.description != str(subscription.subscription_id):
            netbox_errors.append(
                f"Incorrect description for '{member.interface_name}', expected "
                f"'{subscription.subscription_id}' but got '{interface.description}'"
            )
        if not interface.enabled:
            netbox_errors.append(f"NetBox interface '{member.interface_name}' is not enabled.")

    if netbox_errors:
        raise ProcessFailureError(message="NetBox misconfiguration(s) found", details=str(netbox_errors))

verify_base_config(subscription)

Workflow step for running a playbook that checks whether base config has drifted.

Source code in gso/workflows/edge_port/validate_edge_port.py
@step("Check base config for drift")
def verify_base_config(subscription: dict[str, Any]) -> LSOState:
    """Workflow step for running a playbook that checks whether base config has drifted."""
    partner_name = get_partner_by_id(subscription["customer_id"]).name
    return {
        "playbook_name": "gap_ansible/playbooks/edge_port.yaml",
        "inventory": {"all": {"hosts": {subscription["edge_port"]["node"]["router_fqdn"]: None}}},
        "extra_vars": {
            "dry_run": True,
            "subscription": subscription,
            "partner_name": partner_name,
            "verb": "create",
            "is_verification_workflow": "true",
        },
    }

validate_edge_port()

Validate an existing, active Edge port subscription.

  • Check correct configuration of interfaces in NetBox.
  • Verify create Edge port configuration.
Source code in gso/workflows/edge_port/validate_edge_port.py
@workflow(
    "Validate Edge Port Configuration", target=Target.SYSTEM, initial_input_form=wrap_modify_initial_input_form(None)
)
def validate_edge_port() -> StepList:
    """Validate an existing, active Edge port subscription.

    * Check correct configuration of interfaces in NetBox.
    * Verify create Edge port configuration.
    """
    return (
        begin
        >> store_process_subscription(Target.SYSTEM)
        >> unsync
        >> prepare_state
        >> verify_netbox_entries
        >> anonymous_lso_interaction(verify_base_config)
        >> resync
        >> done
    )