Skip to content

Base validate l3 core service

Validation workflow for L3 Core Service subscription objects.

build_fqdn_list(subscription)

Build the list of all FQDNs that are in the list of access ports of a L3 Core Service subscription.

Source code in gso/workflows/l3_core_service/base_validate_l3_core_service.py
@step("Prepare list of all Access Ports")
def build_fqdn_list(subscription: SubscriptionModel) -> dict[str, list[str]]:
    """Build the list of all FQDNs that are in the list of access ports of a L3 Core Service subscription."""
    ap_list = subscription.l3_core.ap_list  # type: ignore[attr-defined]
    ap_fqdn_list = [
        ap.sbp.edge_port.node.router_fqdn for ap in ap_list if ap.sbp.edge_port.node.vendor != Vendor.JUNIPER
    ]
    return {"ap_fqdn_list": ap_fqdn_list}

validate_sbp_config(subscription, process_id, ap_fqdn_list)

Workflow step for running a playbook that checks whether SBP config has drifted.

Source code in gso/workflows/l3_core_service/base_validate_l3_core_service.py
@step("Check SBP config for drift")
def validate_sbp_config(subscription: dict[str, Any], process_id: UUIDstr, ap_fqdn_list: list[str]) -> LSOState:
    """Workflow step for running a playbook that checks whether SBP config has drifted."""
    extra_vars = {
        "subscription": subscription,
        "partner_name": get_partner_by_id(subscription["customer_id"]).name,
        "dry_run": True,
        "verb": "deploy",
        "object": "sbp",
        "is_verification_workflow": "true",
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - Validate config for {subscription['description']}",
    }

    return {
        "playbook_name": "gap_ansible/playbooks/l3_core_service.yaml",
        "inventory": {"all": {"hosts": dict.fromkeys(ap_fqdn_list)}},
        "extra_vars": extra_vars,
    }

validate_bgp_peers(subscription, process_id, ap_fqdn_list)

Workflow step for running a playbook that checks whether BGP peer config has drifted.

Source code in gso/workflows/l3_core_service/base_validate_l3_core_service.py
@step("Check BGP peer config for drift")
def validate_bgp_peers(subscription: dict[str, Any], process_id: UUIDstr, ap_fqdn_list: list[str]) -> LSOState:
    """Workflow step for running a playbook that checks whether BGP peer config has drifted."""
    extra_vars = {
        "subscription": subscription,
        "partner_name": get_partner_by_id(subscription["customer_id"]).name,
        "verb": "deploy",
        "object": "bgp",
        "dry_run": True,
        "is_verification_workflow": "true",
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - Validate BGP peer config for {subscription['description']}",
    }

    return {
        "playbook_name": "gap_ansible/playbooks/l3_core_service.yaml",
        "inventory": {"all": {"hosts": dict.fromkeys(ap_fqdn_list)}},
        "extra_vars": extra_vars,
    }

validate_dns_records()

Validate DNS records in Infoblox.

Source code in gso/workflows/l3_core_service/base_validate_l3_core_service.py
@step("Validate DNS records")
def validate_dns_records() -> None:
    """Validate DNS records in Infoblox."""