Skip to content

Shared

Shared steps for DDos Mitigation workflows.

build_ias_peer_map(partner)

Gather BGP peer information from partner's IAS GWS routers.

Collects router endpoints and BGP peer addresses (IPv4/IPv6) for all IAS GWS subscriptions belonging to the partner. Only GWS flavor IAS is included as DDoS mitigation requires GWS infrastructure.

Parameters:

Name Type Description Default
partner UUIDstr

Partner UUID

required

Returns:

Type Description
State

State containing:

State
  • ias_subscription_endpoints: List of router FQDNs where IAS GWS is delivered
State
  • peer_map: BGP peer addresses per router Format: {router_fqdn: {"ipv4_peer": "x.x.x.x", "ipv6_peer": "dead::beef"}}
Source code in gso/workflows/ddos_mitigation/shared.py
@step("Build IAS peer map")
def build_ias_peer_map(partner: UUIDstr) -> State:
    """Gather BGP peer information from partner's IAS GWS routers.

    Collects router endpoints and BGP peer addresses (IPv4/IPv6) for all IAS GWS
    subscriptions belonging to the partner. Only GWS flavor IAS is included as
    DDoS mitigation requires GWS infrastructure.

    Args:
        partner: Partner UUID

    Returns:
        State containing:
        - ias_subscription_endpoints: List of router FQDNs where IAS GWS is delivered
        - peer_map: BGP peer addresses per router
          Format: {router_fqdn: {"ipv4_peer": "x.x.x.x", "ipv6_peer": "dead::beef"}}
    """
    # Get all IAS subscriptions and filter for GWS only
    ias_subscriptions = [
        IAS.from_subscription(sub.subscription_id)
        for sub in get_active_l3_subscriptions_by_partner_and_product(partner_id=partner, product_name=ProductName.IAS)
    ]

    # Filter to only IAS GWS subscriptions (DDoS mitigation only works with GWS)
    gws_subscriptions = [sub for sub in ias_subscriptions if sub.ias.ias_flavor == IASFlavor.IASGWS]

    # Build peer map: router_fqdn -> {ipv4_peer, ipv6_peer, ap_type}
    peer_map: dict[str, dict[str, Any]] = {}
    for sub in gws_subscriptions:
        for ap in sub.ias.l3_core.ap_list:
            router_fqdn = ap.sbp.edge_port.node.router_fqdn
            router_access_via_ts = ap.sbp.edge_port.node.router_access_via_ts
            site_ts_address = ap.sbp.edge_port.node.router_site.site_ts_address
            router_ts_port = ap.sbp.edge_port.node.router_ts_port
            site_name = ap.sbp.edge_port.node.router_site.site_name

            # Initialize peer map entry for this router if not present
            if router_fqdn not in peer_map:
                peer_map[router_fqdn] = {
                    "ipv4_peer": None,
                    "ipv6_peer": None,
                    "ap_type": ap.ap_type,
                    "site_name": site_name,
                    "router_access_via_ts": router_access_via_ts,
                    "site_ts_address": site_ts_address,
                    "router_ts_port": router_ts_port,
                }

            # Extract BGP peer addresses from the SBP's BGP sessions
            for bgp_session in ap.sbp.bgp_session_list:
                # If multiple peers exist for the same router, keep the first one
                if bgp_session.ip_type == IPTypes.IPV4 and peer_map[router_fqdn]["ipv4_peer"] is None:
                    peer_map[router_fqdn]["ipv4_peer"] = str(bgp_session.peer_address)
                elif bgp_session.ip_type == IPTypes.IPV6 and peer_map[router_fqdn]["ipv6_peer"] is None:
                    peer_map[router_fqdn]["ipv6_peer"] = str(bgp_session.peer_address)

    all_edge_port_fqdns = list(peer_map.keys())

    return {"ias_subscription_endpoints": all_edge_port_fqdns, "peer_map": peer_map}

check_other_active_mitigations(subscription_id)

Check if other DDoS mitigation subscriptions are active.

This is used during termination to determine whether we should restore the exit upstream port's export policy or skip that step because other active mitigations still exist.

Parameters:

Name Type Description Default
subscription_id UUID

The DDoS mitigation subscription ID being terminated

required

Returns:

Type Description
State

State containing:

State
  • should_restore_upstream: Boolean indicating if upstream restoration should proceed
Source code in gso/workflows/ddos_mitigation/shared.py
@step("Check if other active mitigations exist")
def check_other_active_mitigations(subscription_id: UUID) -> State:
    """Check if other DDoS mitigation subscriptions are active.

    This is used during termination to determine whether we should restore the
    exit upstream port's export policy or skip that step because other active
    mitigations still exist.

    Args:
        subscription_id: The DDoS mitigation subscription ID being terminated

    Returns:
        State containing:
        - should_restore_upstream: Boolean indicating if upstream restoration should proceed
    """
    # Query for all other active/provisioning DDoS mitigation subscriptions
    other_mitigations = get_subscriptions(
        product_types=[ProductType.DDOS_MITIGATION_SUS],
        lifecycles=[SubscriptionLifecycle.ACTIVE, SubscriptionLifecycle.PROVISIONING],
        includes=["subscription_id"],
        exclude_subscription_ids=[str(subscription_id)],
    )

    # If no other mitigations exist, safe to restore upstream
    return {"should_restore_upstream": len(other_mitigations) == 0}

check_prefix_list_ownership(subscription, partner_name, ias_subscription_endpoints)

Check if the provided prefix lists belong to the selected partner.

Parameters:

Name Type Description Default
subscription dict

DDoS mitigation subscription dict

required
partner_name str

Name of the partner

required
ias_subscription_endpoints list[str]

List of router FQDNs

required

Returns:

Type Description
State

LSO state with playbook name, inventory, and extra vars

Source code in gso/workflows/ddos_mitigation/shared.py
@step("Check if prefix belongs to the selected partner")
def check_prefix_list_ownership(subscription: dict, partner_name: str, ias_subscription_endpoints: list[str]) -> State:
    """Check if the provided prefix lists belong to the selected partner.

    Args:
        subscription: DDoS mitigation subscription dict
        partner_name: Name of the partner
        ias_subscription_endpoints: List of router FQDNs

    Returns:
        LSO state with playbook name, inventory, and extra vars
    """
    extra_vars = {
        "subscription": subscription,
        "partner_name": partner_name,
    }

    return {
        "playbook_name": "gap_ansible/playbooks/check_prefix_ownership.yaml",
        "inventory": {"all": {"hosts": dict.fromkeys(ias_subscription_endpoints)}},
        "extra_vars": extra_vars,
    }