Skip to content

Terminate ddos mitigation

A termination workflow for DDoS Single Upstream Setup Mitigation subscriptions.

input_form_generator(subscription_id)

Gather input from the operator.

Source code in gso/workflows/ddos_mitigation/terminate_ddos_mitigation.py
def input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
    """Gather input from the operator."""
    subscription = DDoSMitigation.from_subscription(subscription_id)

    class DDoSMitigationTerminationForm(FormPage):
        model_config = ConfigDict(title="Terminate DDoS Single Upstream Setup Mitigation")

        tt_number: TTNumber

    user_input = yield DDoSMitigationTerminationForm
    user_input_dict = user_input.model_dump()
    yield from create_summary_form(user_input_dict, subscription.product.name, ["tt_number"])

    return user_input_dict | {
        "subscription": subscription,
        "partner": subscription.customer_id,
        "partner_id": subscription.customer_id,
        "partner_name": get_partner_by_id(subscription.customer_id).name,
    }

restore_exit_export_policy_real(subscription, process_id, partner_name, tt_number)

Restores the original export policy on the DDoS exit port.

Source code in gso/workflows/ddos_mitigation/terminate_ddos_mitigation.py
@step("[FOR REAL] Restore Exit Port export policy")
def restore_exit_export_policy_real(
    subscription: dict[str, Any],
    process_id: UUIDstr,
    partner_name: str,
    tt_number: str,
) -> LSOState:
    """Restores the original export policy on the DDoS exit port."""
    params = load_oss_params()
    exit_port_fqdn = params.GENERAL.exit_port_fqdn

    extra_vars = {
        "dry_run": False,
        "subscription": subscription,
        "partner_name": partner_name,
        "commit_comment": (
            f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Terminate DDoS Single Upstream Setup Mitigation"
        ),
        "verb": "delete",
    }

    return {
        "playbook_name": "gap_ansible/playbooks/ddos_mitigation.yaml",
        "inventory": {"all": {"hosts": {exit_port_fqdn: None}}},
        "extra_vars": extra_vars,
    }

update_exit_export_policy_dry(subscription, process_id, partner_name, tt_number)

Perform a dry run of updating the export policy on a DDoS Single Upstream Setup Mitigation exit port.

Source code in gso/workflows/ddos_mitigation/terminate_ddos_mitigation.py
@step("[DRY RUN] Update Exit Port export policy")
def update_exit_export_policy_dry(
    subscription: dict[str, Any],
    process_id: UUIDstr,
    partner_name: str,
    tt_number: str,
) -> LSOState:
    """Perform a dry run of updating the export policy on a DDoS Single Upstream Setup Mitigation exit port."""
    params = load_oss_params()
    exit_port_fqdn = params.GENERAL.exit_port_fqdn

    extra_vars = {
        "dry_run": True,
        "subscription": subscription,
        "partner_name": partner_name,
        "commit_comment": (
            f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
            f"Create DDoS Single Upstream Setup Mitigation for {partner_name}"
        ),
        "verb": "delete",
    }

    return {
        "playbook_name": "gap_ansible/playbooks/ddos_upstream_setup.yaml",
        "inventory": {"all": {"hosts": {exit_port_fqdn: None}}},
        "extra_vars": extra_vars,
    }

update_exit_export_policy_real(subscription, process_id, partner_name, tt_number)

Updates the export policy on a DDoS Single Upstream Setup Mitigation exit port.

Source code in gso/workflows/ddos_mitigation/terminate_ddos_mitigation.py
@step("[FOR REAL] Update Exit Port export policy")
def update_exit_export_policy_real(
    subscription: dict[str, Any],
    process_id: UUIDstr,
    partner_name: str,
    tt_number: str,
) -> LSOState:
    """Updates the export policy on a DDoS Single Upstream Setup Mitigation exit port."""
    params = load_oss_params()
    exit_port_fqdn = params.GENERAL.exit_port_fqdn

    extra_vars = {
        "dry_run": False,
        "subscription": subscription,
        "partner_name": partner_name,
        "commit_comment": (
            f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
            f"Create DDoS Single Upstream Setup Mitigation for {partner_name}"
        ),
        "verb": "delete",
    }

    return {
        "playbook_name": "gap_ansible/playbooks/ddos_upstream_setup.yaml",
        "inventory": {"all": {"hosts": {exit_port_fqdn: None}}},
        "extra_vars": extra_vars,
    }

mark_ias_import_policy_dry(subscription, process_id, tt_number, partner_name, ias_subscription_endpoints, peer_map)

Perform a dry run of adding the DDoS victim policy to all IAS subscriptions of the partner.

Source code in gso/workflows/ddos_mitigation/terminate_ddos_mitigation.py
@step("[DRY RUN] Add Prefix List on all partner IAS subscriptions")
def mark_ias_import_policy_dry(
    subscription: dict[str, Any],
    process_id: UUIDstr,
    tt_number: TTNumber,
    partner_name: str,
    ias_subscription_endpoints: list[str],
    peer_map: dict[str, dict[str, str | None]],
) -> LSOState:
    """Perform a dry run of adding the DDoS victim policy to all IAS subscriptions of the partner."""
    extra_vars = {
        "dry_run": True,
        "subscription": subscription,
        "partner_name": partner_name,
        "peer_map": peer_map,
        "commit_comment": (
            f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Create DDoS Single Upstream Setup Mitigation"
        ),
        "verb": "delete",
    }

    return {
        "playbook_name": "gap_ansible/playbooks/ddos_nren_setup.yaml",
        "inventory": {"all": {"hosts": dict.fromkeys(ias_subscription_endpoints)}},
        "extra_vars": extra_vars,
    }

mark_ias_import_policy_real(subscription, process_id, tt_number, partner_name, ias_subscription_endpoints, peer_map)

Adds the DDoS victim policy to all IAS subscriptions of the partner.

Source code in gso/workflows/ddos_mitigation/terminate_ddos_mitigation.py
@step("[FOR REAL] Add Prefix List on all partner IAS subscriptions")
def mark_ias_import_policy_real(
    subscription: dict[str, Any],
    process_id: UUIDstr,
    tt_number: str,
    partner_name: str,
    ias_subscription_endpoints: list[str],
    peer_map: dict[str, dict[str, str | None]],
) -> LSOState:
    """Adds the DDoS victim policy to all IAS subscriptions of the partner."""
    extra_vars = {
        "dry_run": False,
        "subscription": subscription,
        "partner_name": partner_name,
        "peer_map": peer_map,
        "commit_comment": (
            f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Delete DDoS Single Upstream Setup Mitigation"
        ),
        "verb": "delete",
    }

    return {
        "playbook_name": "gap_ansible/playbooks/ddos_nren_setup.yaml",
        "inventory": {"all": {"hosts": dict.fromkeys(ias_subscription_endpoints)}},
        "extra_vars": extra_vars,
    }

terminate_ddos_mitigation()

A Termination workflow for a DDoS Single Upstream Setup Mitigation.

This workflow: 1. Removes mitigation prefixes from NREN IAS Access Ports 2. Conditionally restores the exit upstream port export policy (only if no other active mitigations are using the same exit port)

Source code in gso/workflows/ddos_mitigation/terminate_ddos_mitigation.py
@terminate_workflow("Terminate DDoS Single Upstream Setup Mitigation", input_form_generator)
def terminate_ddos_mitigation() -> StepList:
    """A Termination workflow for a DDoS Single Upstream Setup Mitigation.

    This workflow:
    1. Removes mitigation prefixes from NREN IAS Access Ports
    2. Conditionally restores the exit upstream port export policy
       (only if no other active mitigations are using the same exit port)
    """
    should_restore = conditional(lambda state: has_no_other_active_mitigations(state["subscription_id"]))

    return (
        begin
        >> build_ias_peer_map
        >> lso_interaction(mark_ias_import_policy_dry)
        >> lso_interaction(mark_ias_import_policy_real)
        >> should_restore(lso_interaction(update_exit_export_policy_dry))
        >> should_restore(lso_interaction(update_exit_export_policy_real))
    )