Skip to content

Prune validation workflows

A cleanup task for pruning validation workflow runs.

This task removes past entries of running the cleanup task, but also cleans up out-of-date validation workflow runs. It will abort any validation workflow that has run on a subscription that is now terminated. It also aborts duplicate validation workflows that have run on the same subscription multiple times.

abort_terminated_validation_workflows()

Abort any failed validation task that is connected to a subscription that has been terminated.

Source code in gso/workflows/tasks/prune_validation_workflows.py
@step("Abort validation workflows on terminated subscriptions")
def abort_terminated_validation_workflows() -> State:
    """Abort any failed validation task that is connected to a subscription that has been terminated."""
    validation_tasks = get_failed_tasks()
    terminated_subscriptions = defaultdict(list)
    for task in validation_tasks:
        subscription = get_subscription_by_process_id(task.process_id)
        if subscription and subscription.status == SubscriptionLifecycle.TERMINATED:
            task.last_status = ProcessStatus.ABORTED
            terminated_subscriptions[subscription.subscription_id].append(task)

    return {"terminated_subscriptions": terminated_subscriptions}

abort_duplicate_validation_workflows()

Abort any duplicate failed validation task, except for the most recently started one.

Source code in gso/workflows/tasks/prune_validation_workflows.py
@step("Abort duplicate Validation workflows")
def abort_duplicate_validation_workflows() -> State:
    """Abort any duplicate failed validation task, except for the most recently started one."""
    validation_tasks = get_failed_tasks()
    tasks_per_subscription = defaultdict(list)
    for task in validation_tasks:
        # Group validation tasks by ``subscription_id``
        subscription = get_subscription_by_process_id(task.process_id)
        if subscription:
            tasks_per_subscription[subscription.subscription_id].append(task)

    stale_tasks = {}
    for subscription_id, tasks in tasks_per_subscription.items():
        # Get the latest validation task, and remove all other past runs
        most_recent = max(tasks, key=lambda t: t.started_at)
        stale = [task for task in tasks if task is not most_recent]
        if stale:
            for task in stale:
                task.last_status = ProcessStatus.ABORTED
            stale_tasks[subscription_id] = stale

    return {"stale_validation_tasks": stale_tasks}

send_notification_email(terminated_subscriptions, stale_validation_tasks)

Send an email notification containing all pruned validation tasks.

Source code in gso/workflows/tasks/prune_validation_workflows.py
@step("Send notification email")
def send_notification_email(terminated_subscriptions: dict, stale_validation_tasks: dict[str, list]) -> None:
    """Send an email notification containing all pruned validation tasks."""
    settings = load_oss_params()

    def _remove_verbose_keys(ts: list[dict]) -> list[dict]:
        for t in ts:
            t.pop("input_states", None)
            t.pop("process_subscriptions", None)
            t.pop("steps", None)
            t.pop("traceback", None)
        return ts

    pruned_terminated_subscriptions = ""
    for sub, tasks in terminated_subscriptions.items():
        subscription = SubscriptionModel.from_subscription(sub)
        pruned_terminated_subscriptions += (
            f"\n{subscription.description}: {json.dumps(_remove_verbose_keys(tasks), indent=4)}"
        )

    if pruned_terminated_subscriptions:
        pruned_terminated_subscriptions = (
            "The following validation tasks have been pruned since the parent subscription has been terminated:\n"
            + pruned_terminated_subscriptions
            + "\n\n------\n"
        )

    pruned_stale_validation_tasks = ""
    for sub, tasks in stale_validation_tasks.items():
        subscription = SubscriptionModel.from_subscription(sub)
        pruned_stale_validation_tasks += (
            f"\n{subscription.description}: {json.dumps(_remove_verbose_keys(tasks), indent=4)}"
        )

    if pruned_stale_validation_tasks:
        pruned_stale_validation_tasks = (
            "The following duplicate validation tasks have been pruned:\n"
            + pruned_stale_validation_tasks
            + "\n\n------\n"
        )

    email_body = (
        pruned_terminated_subscriptions
        + pruned_stale_validation_tasks
        + "\nRegards, the GÉANT Automation Platform.\n\n"
    )

    send_mail(
        f"GAP {settings.GENERAL.environment} environment - Validation tasks have been pruned",
        email_body,
        destination=settings.EMAIL.system_email_destinations,
    )

task_prune_validation_workflows()

Prune runs of validation workflows.

This task removes duplicate validation tasks, and those that have run on terminated subscriptions.

Source code in gso/workflows/tasks/prune_validation_workflows.py
@workflow("Prune old validation workflows", target=Target.SYSTEM)
def task_prune_validation_workflows() -> StepList:
    """Prune runs of validation workflows.

    This task removes duplicate validation tasks, and those that have run on terminated subscriptions.
    """
    tasks_have_been_pruned = conditional(
        lambda state: state["terminated_subscriptions"] or state["stale_validation_tasks"]
    )

    return (
        begin
        >> abort_terminated_validation_workflows
        >> abort_duplicate_validation_workflows
        >> tasks_have_been_pruned(send_notification_email)
        >> done
    )