Skip to content

Check partner prefix lists

Task that verifies that every owner of an L3 service also has a prefix list subscription.

build_partner_list()

Build a list partners that should have a Prefix List subscription.

Source code in gso/workflows/tasks/check_partner_prefix_lists.py
@step("Build list of partners")
def build_partner_list() -> State:
    """Build a list partners that should have a Prefix List subscription."""
    partner_list = [partner.partner_id for partner in get_partners_that_should_have_prefix_list()]

    return {"partner_list": partner_list}

every_partner_has_a_prefix_list(partner_list)

Verify that every found partner indeed has a Prefix List subscription.

Source code in gso/workflows/tasks/check_partner_prefix_lists.py
@step("Every found partner must have a Prefix List")
def every_partner_has_a_prefix_list(partner_list: list[UUIDstr]) -> State:
    """Verify that every found partner indeed has a Prefix List subscription."""
    partners_without_prefix_list = []
    for partner_id in partner_list:
        prefix_list = get_subscriptions(
            product_types=[ProductType.PREFIX_LIST],
            lifecycles=[SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE],
            includes=["subscription_id"],
            partner_id=partner_id,
        )
        if not prefix_list:
            partners_without_prefix_list.append(get_partner_by_id(partner_id).name)

    return {"partners_without_prefix_list": partners_without_prefix_list}

no_orphaned_prefix_lists(partner_list)

Verify that every partner with a Prefix List should actually have one.

Source code in gso/workflows/tasks/check_partner_prefix_lists.py
@step("Verify that no redundant Prefix Lists exist")
def no_orphaned_prefix_lists(partner_list: list[UUIDstr]) -> State:
    """Verify that every partner with a Prefix List should actually have one."""
    partners_with_prefix_list = [
        item["customer_id"]
        for item in get_subscriptions(
            product_types=[ProductType.PREFIX_LIST],
            lifecycles=[SubscriptionLifecycle.PROVISIONING, SubscriptionLifecycle.ACTIVE],
            includes=["customer_id"],
        )
    ]

    orphaned_prefix_lists = [
        get_partner_by_id(partner_id).name for partner_id in partners_with_prefix_list if partner_id not in partner_list
    ]

    return {"orphaned_prefix_lists": orphaned_prefix_lists}

fail_with_incorrect_prefix_lists(partners_without_prefix_list, orphaned_prefix_lists)

Raise a ProcessValidationError if there are Prefix Lists which are configured incorrectly.

Source code in gso/workflows/tasks/check_partner_prefix_lists.py
@step("Fail on any found errors")
def fail_with_incorrect_prefix_lists(partners_without_prefix_list: list[str], orphaned_prefix_lists: list[str]) -> None:
    """Raise a ``ProcessValidationError`` if there are Prefix Lists which are configured incorrectly."""
    msg = "Some Prefix Lists are configured incorrectly"
    raise ProcessFailureError(
        msg,
        details={
            "partners_missing_prefix_list": partners_without_prefix_list,
            "orphaned_prefix_lists": orphaned_prefix_lists,
        },
    )

task_check_partner_prefix_lists()

Every partner that has certain subscriptions must have a Prefix List subscription.

These are
  • IAS
  • R&E Peer
  • GÉANT IP
Source code in gso/workflows/tasks/check_partner_prefix_lists.py
@workflow("Check partner Prefix Lists", target=Target.SYSTEM)
def task_check_partner_prefix_lists() -> StepList:
    """Every partner that has certain subscriptions must have a Prefix List subscription.

    These are:
        - IAS
        - R&E Peer
        - GÉANT IP
    """
    incorrect_prefix_lists_exist = conditional(
        lambda state: state["partners_without_prefix_list"] or state["orphaned_prefix_lists"]
    )
    return (
        begin
        >> step_group(
            name="Check partner Prefix Lists",
            steps=begin
            >> build_partner_list
            >> every_partner_has_a_prefix_list
            >> no_orphaned_prefix_lists
            >> incorrect_prefix_lists_exist(fail_with_incorrect_prefix_lists),
        )
        >> done
    )