Skip to content

Validate prefix list

A workflow for validating a Prefix List. Will redeploy drifted config if it is below a set threshold.

build_partner_map(subscription)

Build a map of FQDNs and BGP peer addresses where all subscriptions of a partner are deployed.

Source code in gso/workflows/prefix_list/validate_prefix_list.py
@step("Build map of partner services")
def build_partner_map(subscription: PrefixList) -> State:
    """Build a map of FQDNs and BGP peer addresses where all subscriptions of a partner are deployed."""
    partner_name = get_partner_by_id(subscription.customer_id).name
    # Get a list of partner's subscriptions
    partner_subscription_ids = get_subscriptions(
        product_types=[ProductType.IAS, ProductType.R_AND_E_PEER, ProductType.GEANT_IP],
        lifecycles=[SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE],
        partner_id=subscription.customer_id,
        includes=["subscription_id"],
    )

    # Build the list of all subscription objects
    partner_subscriptions = [
        SubscriptionModel.from_subscription(sub["subscription_id"]) for sub in partner_subscription_ids
    ]

    peer_map: dict[str, dict] = defaultdict(dict)
    for sub in partner_subscriptions:
        #  For each subscription belonging to this partner
        for ap in sub.l3_core.ap_list:  # type: ignore[attr-defined]
            #   And for each AP that is part of the subscription
            ap_fqdn = Router.from_subscription(ap.sbp.edge_port.node.owner_subscription_id).router.router_fqdn
            service_name = sub.service_name_attribute  # type: ignore[attr-defined]

            service_peers = peer_map[ap_fqdn].get(service_name, {"ipv4_peers": [], "ipv6_peers": []})
            service_peers["ipv4_peers"].extend([
                bgp_session.peer_address
                for bgp_session in ap.sbp.bgp_session_list
                if bgp_session.ip_type is IPTypes.IPV4
            ])
            service_peers["ipv6_peers"].extend([
                bgp_session.peer_address
                for bgp_session in ap.sbp.bgp_session_list
                if bgp_session.ip_type is IPTypes.IPV6
            ])
            peer_map[ap_fqdn][service_name] = service_peers

    email_settings = load_oss_params().EMAIL
    email_destination = email_settings.notification_email_destinations

    return {
        "partner_name": partner_name,
        "prefix_list_was_redeployed": None,
        "peer_map": peer_map,
        "notification_destination_email_address": email_destination,
    }

run_validate_prefix_list_playbook(subscription, process_id, peer_map, partner_name, notification_destination_email_address)

Workflow step for running a playbook that validates prefix lists on a list of FQDNs.

Source code in gso/workflows/prefix_list/validate_prefix_list.py
@step("Validate Prefix List")
def run_validate_prefix_list_playbook(
    subscription: dict,
    process_id: UUIDstr,
    peer_map: dict,
    partner_name: str,
    notification_destination_email_address: str,
) -> LSOState:
    """Workflow step for running a playbook that validates prefix lists on a list of FQDNs."""
    return {
        "playbook_name": "gap_ansible/playbooks/validate_prefix_list.yaml",
        "inventory": {"all": {"hosts": dict.fromkeys(peer_map.keys())}},
        "extra_vars": {
            "subscription": subscription,
            "notification_destination_email_address": notification_destination_email_address,
            "peer_map": peer_map,
            "partner_name": partner_name,
            "dry_run": True,
            "verb": "deploy",
            "object": "prefix_list",
            "is_verification_workflow": True,
            "commit_comment": f"GSO_PROCESS_ID: {process_id} - Validate {subscription['description']}",
        },
    }

validate_prefix_list()

Validate a Prefix List of a partner.

If the list has drifted below the threshold, changes will be applied automatically.

Source code in gso/workflows/prefix_list/validate_prefix_list.py
@validate_workflow("Validate Prefix List")
def validate_prefix_list() -> StepList:
    """Validate a Prefix List of a partner.

    If the list has drifted below the threshold, changes will be applied automatically.
    """
    return begin >> build_partner_map >> anonymous_lso_interaction(run_validate_prefix_list_playbook)