Skip to content

Task update commercial peer prefix limits

Task workflow that updates Commercial Peer prefix limits from PeeringDB.

ResolvedLimits

Bases: BaseModel

Resolved prefix limits after comparing PeeringDB data with existing values.

Source code in gso/workflows/tasks/task_update_commercial_peer_prefix_limits.py
class ResolvedLimits(BaseModel):
    """Resolved prefix limits after comparing PeeringDB data with existing values."""

    v4: int | None = None
    v6: int | None = None
    v4_changed: bool = False
    v6_changed: bool = False

    @property
    def needs_update(self) -> bool:
        """Check if any limit changed."""
        return self.v4_changed or self.v6_changed

    @property
    def is_valid(self) -> bool:
        """Check if both limits are set (required for workflow)."""
        return self.v4 is not None and self.v6 is not None

needs_update property

Check if any limit changed.

is_valid property

Check if both limits are set (required for workflow).

PeerProcessResult dataclass

Result of processing a single commercial peer subscription.

Source code in gso/workflows/tasks/task_update_commercial_peer_prefix_limits.py
@dataclass
class PeerProcessResult:
    """Result of processing a single commercial peer subscription."""

    subscription_id: UUID
    description: str
    asn: int | None = None
    status: Literal["success", "planned", "skipped", "no_change"] = "skipped"
    skip_reason: str | None = None
    old_v4: int | None = None
    old_v6: int | None = None
    new_v4: int | None = None
    new_v6: int | None = None
    process_id: UUID | None = None
    significant_change: bool = False

SyncRunSummary dataclass

Summary of a complete PeeringDB prefix limit sync run.

Source code in gso/workflows/tasks/task_update_commercial_peer_prefix_limits.py
@dataclass
class SyncRunSummary:
    """Summary of a complete PeeringDB prefix limit sync run."""

    scanned: int = 0
    changed: int = 0
    skipped: int = 0
    results: list[PeerProcessResult] = field(default_factory=list)

    @property
    def planned_results(self) -> list[PeerProcessResult]:
        """Peers planned for update (before bulk execution)."""
        return [r for r in self.results if r.status == "planned"]

    @property
    def success_results(self) -> list[PeerProcessResult]:
        """Peers that were successfully updated."""
        return [r for r in self.results if r.status == "success"]

    @property
    def skipped_results(self) -> list[PeerProcessResult]:
        """Peers that were skipped due to errors or missing data."""
        return [r for r in self.results if r.status == "skipped"]

    @property
    def significant_changes(self) -> list[PeerProcessResult]:
        """Peers with changes >10% that warrant notification."""
        return [r for r in self.results if r.significant_change]

planned_results property

Peers planned for update (before bulk execution).

success_results property

Peers that were successfully updated.

skipped_results property

Peers that were skipped due to errors or missing data.

significant_changes property

Peers with changes >10% that warrant notification.

EmailSummaryCounters dataclass

Summary counters for email reporting.

Source code in gso/workflows/tasks/task_update_commercial_peer_prefix_limits.py
@dataclass
class EmailSummaryCounters:
    """Summary counters for email reporting."""

    scanned: int
    updated: int
    no_change: int
    significant_changes: int
    failures: int
    workflow_start_failures: int
    successful_significant_changes: int
    failed_significant_changes: int

compute_new_limit(peeringdb_value)

Compute new prefix limit with safety factor.

Parameters:

Name Type Description Default
peeringdb_value int | None

The prefix count from PeeringDB, or None if not available.

required

Returns:

Type Description
int | None

Ceiling of (peeringdb_value * 1.5) as an integer, or None if input is None.

Source code in gso/workflows/tasks/task_update_commercial_peer_prefix_limits.py
def compute_new_limit(peeringdb_value: int | None) -> int | None:
    """Compute new prefix limit with safety factor.

    Args:
        peeringdb_value: The prefix count from PeeringDB, or None if not available.

    Returns:
        Ceiling of (peeringdb_value * 1.5) as an integer, or None if input is None.
    """
    if peeringdb_value is None:
        return None
    return ceil(peeringdb_value * SAFETY_FACTOR)

should_notify(old, new)

Determine if the change warrants a notification (>10% delta).

Notification rules: - If old is None and new is not None: notify (first-time limit) - If old == 0 and new != 0: notify - If old > 0: notify if delta > 10%

Parameters:

Name Type Description Default
old int | None

The current/old prefix limit value

required
new int | None

The new prefix limit value to be set

required

Returns:

Type Description
bool

True if the change should trigger a notification, False otherwise.

Source code in gso/workflows/tasks/task_update_commercial_peer_prefix_limits.py
def should_notify(old: int | None, new: int | None) -> bool:
    """Determine if the change warrants a notification (>10% delta).

    Notification rules:
    - If old is None and new is not None: notify (first-time limit)
    - If old == 0 and new != 0: notify
    - If old > 0: notify if delta > 10%

    Args:
        old: The current/old prefix limit value
        new: The new prefix limit value to be set

    Returns:
        True if the change should trigger a notification, False otherwise.
    """
    if new is None:
        return False

    if old is None:
        return True

    if old == 0:
        return new != 0

    if new < old:
        # For decreases, we want to notify for any decrease (even <10%) since it could impact existing peers.
        return True

    delta_ratio = abs(new - old) / old
    return delta_ratio > NOTIFICATION_THRESHOLD

format_change_display(old, new)

Format a limit change for display in notifications.

Parameters:

Name Type Description Default
old int | None

Old/current limit value

required
new int | None

New limit value

required

Returns:

Type Description
str

Human-readable change string (e.g., "+15.0%" or "None -> 100")

Source code in gso/workflows/tasks/task_update_commercial_peer_prefix_limits.py
def format_change_display(old: int | None, new: int | None) -> str:
    """Format a limit change for display in notifications.

    Args:
        old: Old/current limit value
        new: New limit value

    Returns:
        Human-readable change string (e.g., "+15.0%" or "None -> 100")
    """
    if old is None or new is None:
        return f"{old} -> {new}"

    if old == 0:
        return f"{old} -> {new}"

    percent = (new - old) / old * 100
    return f"{percent:+.1f}%"

resolve_new_limits(old_v4, old_v6, peeringdb_info)

Resolve new prefix limits based on PeeringDB data and existing values.

If PeeringDB returns None for a family, the existing value is kept. Both limits must be non-None for the result to be valid.

Parameters:

Name Type Description Default
old_v4 int | None

Current IPv4 limit

required
old_v6 int | None

Current IPv6 limit

required
peeringdb_info PeeringDBPrefixInfo | None

PeeringDB data, or None if lookup failed

required

Returns:

Type Description
ResolvedLimits

ResolvedLimits with new values and change flags

Source code in gso/workflows/tasks/task_update_commercial_peer_prefix_limits.py
def resolve_new_limits(
    old_v4: int | None, old_v6: int | None, peeringdb_info: PeeringDBPrefixInfo | None
) -> ResolvedLimits:
    """Resolve new prefix limits based on PeeringDB data and existing values.

    If PeeringDB returns None for a family, the existing value is kept.
    Both limits must be non-None for the result to be valid.

    Args:
        old_v4: Current IPv4 limit
        old_v6: Current IPv6 limit
        peeringdb_info: PeeringDB data, or None if lookup failed

    Returns:
        ResolvedLimits with new values and change flags
    """
    if peeringdb_info is None:
        # No PeeringDB data, keep existing
        return ResolvedLimits(v4=old_v4, v6=old_v6, v4_changed=False, v6_changed=False)

    # Compute new limits from PeeringDB
    computed_v4 = compute_new_limit(peeringdb_info.prefixes4) if old_v4 != peeringdb_info.prefixes4 else None
    computed_v6 = compute_new_limit(peeringdb_info.prefixes6) if old_v6 != peeringdb_info.prefixes6 else None

    # Resolve final values: use PeeringDB if available, else keep existing
    new_v4 = computed_v4 if computed_v4 is not None else old_v4
    new_v6 = computed_v6 if computed_v6 is not None else old_v6

    return ResolvedLimits(
        v4=new_v4,
        v6=new_v6,
        v4_changed=(new_v4 != old_v4),
        v6_changed=(new_v6 != old_v6),
    )

process_peer(sub_data, session)

Process a single commercial peer subscription.

Parameters:

Name Type Description Default
sub_data dict

Subscription dict with subscription_id and description

required
session Session

Requests session for PeeringDB API calls

required

Returns:

Type Description
PeerProcessResult

PeerProcessResult with outcome of processing this peer

Source code in gso/workflows/tasks/task_update_commercial_peer_prefix_limits.py
def process_peer(sub_data: dict, session: requests.Session) -> PeerProcessResult:
    """Process a single commercial peer subscription.

    Args:
        sub_data: Subscription dict with subscription_id and description
        session: Requests session for PeeringDB API calls

    Returns:
        PeerProcessResult with outcome of processing this peer
    """
    subscription_id = sub_data["subscription_id"]
    description = sub_data.get("description", "Unknown")

    result = PeerProcessResult(subscription_id=subscription_id, description=description)

    try:
        # Load full subscription
        subscription = CommercialPeer.from_subscription(subscription_id)
        commercial_peer = subscription.commercial_peer

        # Check for ASN
        if not commercial_peer.partner_asn:
            result.skip_reason = "No partner ASN"
            logger.warning("Skipping subscription: no ASN", subscription_id=str(subscription_id))
            return result

        result.asn = commercial_peer.partner_asn
        result.old_v4 = commercial_peer.prefix_limit_v4
        result.old_v6 = commercial_peer.prefix_limit_v6

        # Fetch PeeringDB data
        try:
            peeringdb_info = fetch_prefix_info(result.asn, session)
        except requests.RequestException as e:
            result.skip_reason = f"PeeringDB API error: {e}"
            logger.warning(
                "PeeringDB lookup failed",
                subscription_id=str(subscription_id),
                asn=result.asn,
                error=str(e),
            )
            return result

        if peeringdb_info is None:
            result.skip_reason = "No data in PeeringDB"
            logger.warning("No PeeringDB data found", subscription_id=str(subscription_id), asn=result.asn)
            return result

        # Resolve new limits
        limits = resolve_new_limits(result.old_v4, result.old_v6, peeringdb_info)

        if not limits.is_valid:
            result.skip_reason = f"Missing limit values (v4={limits.v4}, v6={limits.v6})"
            logger.warning(
                "Cannot set limits: missing values",
                subscription_id=str(subscription_id),
                asn=result.asn,
                v4=limits.v4,
                v6=limits.v6,
            )
            return result

        if not limits.needs_update:
            result.status = "no_change"
            logger.debug("No change needed", subscription_id=str(subscription_id), asn=result.asn)
            return result

        # Update result with new limits
        result.new_v4 = limits.v4
        result.new_v6 = limits.v6

        # Check if significant change
        result.significant_change = should_notify(result.old_v4, limits.v4) or should_notify(result.old_v6, limits.v6)

        # Mark as planned will be executed via bulk workflow, unless significant change requires manual review
        if result.significant_change:
            result.status = "skipped"
            result.skip_reason = "Significant change or Negative change detected, manual review required"
            logger.debug(
                "Significant change detected, skipping automatic update",
                subscription_id=str(subscription_id),
                asn=result.asn,
                description=description,
                old_v4=result.old_v4,
                new_v4=limits.v4,
                old_v6=result.old_v6,
                new_v6=limits.v6,
            )
        else:
            result.status = "planned"
            logger.debug(
                "Peer update planned for bulk execution",
                subscription_id=str(subscription_id),
                asn=result.asn,
                description=description,
                old_v4=result.old_v4,
                new_v4=limits.v4,
                old_v6=result.old_v6,
                new_v6=limits.v6,
            )

    except Exception as e:
        result.skip_reason = f"Processing error: {e}"
        logger.exception(
            "Error processing subscription",
            subscription_id=str(subscription_id),
            error=str(e),
        )
    return result

compute_email_counters(summary)

Compute all email summary counters from sync results.

Parameters:

Name Type Description Default
summary SyncRunSummary

Sync run summary with results

required

Returns:

Type Description
EmailSummaryCounters

EmailSummaryCounters with all computed values

Source code in gso/workflows/tasks/task_update_commercial_peer_prefix_limits.py
def compute_email_counters(summary: SyncRunSummary) -> EmailSummaryCounters:
    """Compute all email summary counters from sync results.

    Args:
        summary: Sync run summary with results

    Returns:
        EmailSummaryCounters with all computed values
    """
    no_change_count = len([r for r in summary.results if r.status == "no_change"])
    workflow_start_failures = len([
        r for r in summary.skipped_results if r.skip_reason and "Workflow start failed" in r.skip_reason
    ])

    # Count successful vs failed significant changes
    successful_significant_changes = len([r for r in summary.significant_changes if r.status == "success"])
    failed_significant_changes = len([r for r in summary.significant_changes if r.status == "skipped"])

    return EmailSummaryCounters(
        scanned=summary.scanned,
        updated=summary.changed,
        no_change=no_change_count,
        significant_changes=len(summary.significant_changes),
        failures=summary.skipped,
        workflow_start_failures=workflow_start_failures,
        successful_significant_changes=successful_significant_changes,
        failed_significant_changes=failed_significant_changes,
    )

_build_successful_significant_changes_section(successful_significant)

Build email section for successfully applied significant changes.

Source code in gso/workflows/tasks/task_update_commercial_peer_prefix_limits.py
def _build_successful_significant_changes_section(successful_significant: list[PeerProcessResult]) -> list[str]:
    """Build email section for successfully applied significant changes."""
    if not successful_significant:
        return []

    lines = [
        "=" * 60,
        "SUCCESSFULLY APPLIED SIGNIFICANT CHANGES (>10% delta):",
        "=" * 60,
        "",
    ]

    for result in successful_significant:
        v4_change = format_change_display(result.old_v4, result.new_v4)
        v6_change = format_change_display(result.old_v6, result.new_v6)

        lines.extend([
            f"Description: {result.description}",
            f"ASN: {result.asn}",
            f"  IPv4: {result.old_v4} -> {result.new_v4} ({v4_change})",
            f"  IPv6: {result.old_v6} -> {result.new_v6} ({v6_change})",
        ])

        if result.process_id:
            task_url = generate_task_link(result.process_id)
            lines.append(f"  Task URL: {task_url}")

        if result.subscription_id:
            subscription_link = generate_subscription_link(result.subscription_id)
            lines.append(f"  Subscription URL: {subscription_link}")

        lines.append("")

    return lines

_build_failed_significant_changes_section(failed_significant)

Build email section for failed updates with significant changes detected.

Source code in gso/workflows/tasks/task_update_commercial_peer_prefix_limits.py
def _build_failed_significant_changes_section(failed_significant: list[PeerProcessResult]) -> list[str]:
    """Build email section for failed updates with significant changes detected."""
    if not failed_significant:
        return []

    lines = [
        "=" * 60,
        "FAILED UPDATES WITH SIGNIFICANT CHANGES DETECTED (>10% delta):",
        "=" * 60,
        "",
    ]

    for result in failed_significant:
        v4_change = format_change_display(result.old_v4, result.new_v4)
        v6_change = format_change_display(result.old_v6, result.new_v6)

        lines.extend([
            f"Description: {result.description}",
            f"ASN: {result.asn}",
            f"  Detected changes: IPv4: {result.old_v4} -> {result.new_v4} ({v4_change})",
            f"                    IPv6: {result.old_v6} -> {result.new_v6} ({v6_change})",
            f"  Reason: {result.skip_reason}",
        ])

        if result.subscription_id:
            subscription_link = generate_subscription_link(result.subscription_id)
            lines.append(f"  Subscription URL: {subscription_link}")

        lines.append("")

    return lines

_build_other_failures_section(non_significant_failures)

Build email section for other failures/skipped peers (without significant changes).

Source code in gso/workflows/tasks/task_update_commercial_peer_prefix_limits.py
def _build_other_failures_section(non_significant_failures: list[PeerProcessResult]) -> list[str]:
    """Build email section for other failures/skipped peers (without significant changes)."""
    if not non_significant_failures:
        return []

    lines = [
        "=" * 60,
        "OTHER FAILURES / SKIPPED PEERS:",
        "=" * 60,
        "",
    ]

    for result in non_significant_failures:
        asn_info = f" (ASN {result.asn})" if result.asn else ""
        lines.extend([
            f"Description: {result.description}{asn_info}",
            f"Reason: {result.skip_reason}",
        ])

        if result.process_id:
            task_url = generate_task_link(result.process_id)
            lines.append(f"  Task URL: {task_url}")

        if result.subscription_id:
            subscription_link = generate_subscription_link(result.subscription_id)
            lines.append(f"  Subscription URL: {subscription_link}")

        lines.append("")

    return lines

_build_started_workflows_section(success_with_process)

Build email section for started modify_prefix_limit workflows.

Source code in gso/workflows/tasks/task_update_commercial_peer_prefix_limits.py
def _build_started_workflows_section(success_with_process: list[PeerProcessResult]) -> list[str]:
    """Build email section for started modify_prefix_limit workflows."""
    if not success_with_process:
        return []

    lines = [
        "=" * 60,
        "STARTED MODIFY_PREFIX_LIMIT WORKFLOWS:",
        "=" * 60,
        "",
    ]

    for result in success_with_process:
        lines.extend([
            f"  Peer: {result.description}",
            f"  Process ID: {result.process_id}",
            f"  Task URL: {generate_task_link(result.process_id) if result.process_id else 'N/A'}",
            "",
        ])

    return lines

build_email_content(summary)

Build email subject and body for sync run summary.

Parameters:

Name Type Description Default
summary SyncRunSummary

Sync run summary with results

required

Returns:

Type Description
tuple[str, str]

Tuple of (subject, body)

Source code in gso/workflows/tasks/task_update_commercial_peer_prefix_limits.py
def build_email_content(summary: SyncRunSummary) -> tuple[str, str]:
    """Build email subject and body for sync run summary.

    Args:
        summary: Sync run summary with results

    Returns:
        Tuple of (subject, body)
    """
    # Get environment info
    params = load_oss_params()
    environment = str(params.GENERAL.environment)
    public_hostname = params.GENERAL.public_hostname

    timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC")

    # Compute all counters
    counters = compute_email_counters(summary)

    # Build body sections
    body_lines = [
        "PeeringDB Commercial Peer Prefix Limit Sync Report",
        "=" * 60,
        f"Environment: {environment}",
        f"Public Hostname: {public_hostname}",
        f"Timestamp: {timestamp}",
        "",
        "SUMMARY COUNTERS:",
        f"  - Scanned Peers: {counters.scanned}",
        f"  - Updated Peers: {counters.updated}",
        f"  - No Change: {counters.no_change}",
        f"  - Significant Changes Detected (>10%): {counters.significant_changes}",
        f"    • Successfully Applied: {counters.successful_significant_changes}",
        f"    • Failed to Apply: {counters.failed_significant_changes}",
        f"  - Failures/Skipped: {counters.failures}",
    ]

    if counters.workflow_start_failures > 0:
        body_lines.append(f"    (including {counters.workflow_start_failures} workflow start failures)")

    body_lines.append("")

    # Add all detail sections
    successful_significant = [r for r in summary.significant_changes if r.status == "success"]
    body_lines.extend(_build_successful_significant_changes_section(successful_significant))

    failed_significant = [r for r in summary.significant_changes if r.status == "skipped"]
    body_lines.extend(_build_failed_significant_changes_section(failed_significant))

    non_significant_failures = [r for r in summary.skipped_results if not r.significant_change]
    body_lines.extend(_build_other_failures_section(non_significant_failures))

    success_with_process = [r for r in summary.success_results if r.process_id]
    body_lines.extend(_build_started_workflows_section(success_with_process))

    body = "\n".join(body_lines)

    # Build subject
    subject_prefix = "[ALERT]" if summary.significant_changes else "[INFO]"
    subject = f"{subject_prefix} [{environment}] PeeringDB Prefix Limit Sync: {counters.updated} peers updated"

    return subject, body

collect_cp_subscriptions()

Collect all active Commercial Peer subscriptions.

Source code in gso/workflows/tasks/task_update_commercial_peer_prefix_limits.py
@step("Collect Commercial Peer subscriptions")
def collect_cp_subscriptions() -> State:
    """Collect all active Commercial Peer subscriptions."""
    subscriptions = get_subscriptions(
        product_types=[ProductType.COMMERCIAL_PEER],
        lifecycles=[SubscriptionLifecycle.ACTIVE],
        includes=["subscription_id", "description"],
    )

    logger.info("Collected Commercial Peer subscriptions", count=len(subscriptions))

    return {
        "subscriptions": subscriptions,
        "scanned_count": len(subscriptions),
    }

plan_peer_updates(subscriptions)

Process all peers: fetch PeeringDB data and plan updates.

Source code in gso/workflows/tasks/task_update_commercial_peer_prefix_limits.py
@step("Plan peer updates")
def plan_peer_updates(subscriptions: list[dict]) -> State:
    """Process all peers: fetch PeeringDB data and plan updates."""
    summary = SyncRunSummary(scanned=len(subscriptions))
    session = create_session()

    for sub_data in subscriptions:
        result = process_peer(sub_data, session)
        summary.results.append(result)

        # Count skipped peers
        if result.status == "skipped":
            summary.skipped += 1

    # At this point, planned updates haven't been executed yet
    # changed count will be updated after bulk execution
    logger.info(
        "PeeringDB prefix limit planning completed",
        scanned=summary.scanned,
        planned=len(summary.planned_results),
        skipped=summary.skipped,
        no_change=len([r for r in summary.results if r.status == "no_change"]),
        significant_changes=len(summary.significant_changes),
    )

    return {
        "summary_scanned": summary.scanned,
        "summary_changed": 0,  # Will be updated after bulk execution
        "summary_skipped": summary.skipped,
        "summary_results": [
            {
                "subscription_id": str(r.subscription_id),
                "description": r.description,
                "asn": r.asn,
                "status": r.status,
                "skip_reason": r.skip_reason,
                "old_v4": r.old_v4,
                "old_v6": r.old_v6,
                "new_v4": r.new_v4,
                "new_v6": r.new_v6,
                "process_id": str(r.process_id) if r.process_id else None,
                "significant_change": r.significant_change,
            }
            for r in summary.results
        ],
    }

start_modify_workflows_staggered(callback_route, summary_results)

Start modify_prefix_limit workflows for planned updates using staggered bulk execution.

Source code in gso/workflows/tasks/task_update_commercial_peer_prefix_limits.py
@step("Start Prefix List modification workflows (staggered)")
def start_modify_workflows_staggered(callback_route: str, summary_results: list[dict]) -> State:
    """Start modify_prefix_limit workflows for planned updates using staggered bulk execution."""
    # Filter for planned updates only
    planned_updates = [r for r in summary_results if r["status"] == "planned"]

    if not planned_updates:
        logger.info("No planned updates, bulk workflow will complete immediately")

    # Build bulk workflow payloads
    wf_payloads = []
    for result_dict in planned_updates:
        subscription_id = result_dict["subscription_id"]
        new_v4 = result_dict["new_v4"]
        new_v6 = result_dict["new_v6"]
        description = result_dict["description"]

        # Build payload for modify_prefix_limit workflow
        wf_payload = BulkWfPayload(
            workflow_key="modify_prefix_limit",
            identifier=subscription_id,  # Use subscription_id as identifier
            user_inputs=[
                {"subscription_id": subscription_id},
                {
                    "tt_number": AUTOMATION_TT_NUMBER,
                    "prefix_limit_v4": new_v4,
                    "prefix_limit_v6": new_v6,
                    "is_human_initiated_task": False,
                },
            ],
        )
        wf_payloads.append(wf_payload.model_dump())

        logger.debug(
            "Planned workflow for bulk execution",
            subscription_id=subscription_id,
            description=description,
            new_v4=new_v4,
            new_v6=new_v6,
        )

    # Start bulk workflows with staggered execution
    oss_params = load_oss_params()
    stagger_step = oss_params.GENERAL.commercial_peer_prefix_limit_stagger_step_seconds

    logger.info(
        "Starting bulk modify_prefix_limit workflows with staggered execution",
        count=len(wf_payloads),
        stagger_step_seconds=stagger_step,
    )

    bulk_wf_task.apply_async(
        kwargs={
            "wf_payloads": wf_payloads,
            "callback_route": callback_route,
            "success_key": _BULK_SUCCESS_KEY,
            "failure_key": _BULK_FAILURE_KEY,
            "run_mode": BulkRunMode.STAGGERED.value,
            "stagger_step": stagger_step,
            "initial_delay": 0.0,
        },
    )

    return {_BULK_SUCCESS_KEY: {}, _BULK_FAILURE_KEY: {}}

evaluate_bulk_results(callback_result, summary_scanned, summary_skipped, summary_results)

Evaluate results from bulk modify_prefix_limit workflows and update summary.

Source code in gso/workflows/tasks/task_update_commercial_peer_prefix_limits.py
@step("Evaluate bulk workflow results")
def evaluate_bulk_results(
    callback_result: dict, summary_scanned: int, summary_skipped: int, summary_results: list[dict]
) -> State:
    """Evaluate results from bulk modify_prefix_limit workflows and update summary."""
    # Extract success/failure maps from callback
    successful_wfs = callback_result.pop(_BULK_SUCCESS_KEY, {})
    failed_wfs = callback_result.pop(_BULK_FAILURE_KEY, {})

    logger.info(
        "Bulk workflow results received",
        successful=len(successful_wfs),
        failed=len(failed_wfs),
    )

    # Update results based on bulk execution outcomes
    for result_dict in summary_results:
        subscription_id_str = result_dict["subscription_id"]

        # If this was a planned update, check bulk execution result
        if result_dict["status"] == "planned":
            if subscription_id_str in successful_wfs:
                # Successfully executed
                result_dict["status"] = "success"
                # Note: process_id would need to be extracted from bulk results if needed
                logger.debug("Peer update succeeded", subscription_id=subscription_id_str)
            elif subscription_id_str in failed_wfs:
                # Failed during bulk execution
                result_dict["status"] = "skipped"
                result_dict["skip_reason"] = f"Workflow execution failed: {failed_wfs[subscription_id_str]}"
                logger.warning(
                    "Peer update failed during bulk execution",
                    subscription_id=subscription_id_str,
                    reason=failed_wfs[subscription_id_str],
                )
            # else: still planned (shouldn't happen, but handle gracefully)

    # Recount summary stats after bulk execution
    summary_changed = len([r for r in summary_results if r["status"] == "success"])
    summary_skipped = len([r for r in summary_results if r["status"] == "skipped"])

    # Log any failures for visibility (but don't fail the whole task)
    if failed_wfs:
        logger.warning(
            "Some modify_prefix_limit workflows failed",
            failed_count=len(failed_wfs),
            failed_subscriptions=list(failed_wfs.keys()),
        )

    return {
        "callback_result": callback_result,
        "summary_scanned": summary_scanned,
        "summary_changed": summary_changed,
        "summary_skipped": summary_skipped,
        "summary_results": summary_results,
        _BULK_SUCCESS_KEY: successful_wfs,
        _BULK_FAILURE_KEY: failed_wfs,
    }

send_summary_notification(summary_scanned, summary_changed, summary_skipped, summary_results)

Send email notification with summary of sync run, including significant changes and failures.

Source code in gso/workflows/tasks/task_update_commercial_peer_prefix_limits.py
@step("Send summary notification")
def send_summary_notification(
    summary_scanned: int,
    summary_changed: int,
    summary_skipped: int,
    summary_results: list[dict],
) -> State:
    """Send email notification with summary of sync run, including significant changes and failures."""
    # Reconstruct summary from state
    summary = SyncRunSummary(scanned=summary_scanned, changed=summary_changed, skipped=summary_skipped)

    for result_dict in summary_results:
        result = PeerProcessResult(
            subscription_id=UUID(result_dict["subscription_id"]),
            description=result_dict["description"],
            asn=result_dict["asn"],
            status=result_dict["status"],
            skip_reason=result_dict["skip_reason"],
            old_v4=result_dict["old_v4"],
            old_v6=result_dict["old_v6"],
            new_v4=result_dict["new_v4"],
            new_v6=result_dict["new_v6"],
            process_id=UUID(result_dict["process_id"]) if result_dict["process_id"] else None,
            significant_change=result_dict["significant_change"],
        )
        summary.results.append(result)

    # Only send email if there are items to report
    if not summary.significant_changes and not summary.skipped_results:
        logger.info("No significant changes or skipped peers, skipping email notification")
        return {}

    subject, body = build_email_content(summary)

    send_mail(subject=subject, body=body)
    logger.info("Summary email notification sent", subject=subject)

    return {"email_subject": subject, "email_body": body}

task_update_commercial_peer_prefix_limits()

Update Commercial Peer prefix limits from PeeringDB.

This task workflow: 1. Collects all active Commercial Peer subscriptions 2. Fetches prefix info from PeeringDB for each peer's ASN 3. Computes new limits with 1.5x safety factor (ceiling) 4. Starts modify_prefix_limit workflows using staggered bulk execution 5. Evaluates bulk execution results 6. Sends email notification for significant changes (>10%) or errors

Source code in gso/workflows/tasks/task_update_commercial_peer_prefix_limits.py
@workflow(
    "Update Commercial Peer Prefix Limits from PeeringDB",
    target=Target.SYSTEM,
    authorize_callback=lambda _: load_oss_params().PEERINGDB.enable_prefix_limit_sync,
)
def task_update_commercial_peer_prefix_limits() -> StepList:
    """Update Commercial Peer prefix limits from PeeringDB.

    This task workflow:
    1. Collects all active Commercial Peer subscriptions
    2. Fetches prefix info from PeeringDB for each peer's ASN
    3. Computes new limits with 1.5x safety factor (ceiling)
    4. Starts modify_prefix_limit workflows using staggered bulk execution
    5. Evaluates bulk execution results
    6. Sends email notification for significant changes (>10%) or errors
    """
    return (
        begin
        >> collect_cp_subscriptions
        >> plan_peer_updates
        >> callback_step(
            name="Execute modify_prefix_limit workflows (staggered)",
            action_step=start_modify_workflows_staggered,
            validate_step=evaluate_bulk_results,
        )
        >> send_summary_notification
        >> done
    )