A cleanup task for pruning validation workflow runs.
This task removes past entries of running the cleanup task, but also cleans up out-of-date validation workflow runs.
It will abort any validation workflow that has run on a subscription that is now terminated. It also aborts duplicate
validation workflows that have run on the same subscription multiple times.
abort_terminated_validation_workflows()
Abort any failed validation task that is connected to a subscription that has been terminated.
Source code in gso/workflows/tasks/prune_validation_workflows.py
| @step("Abort validation workflows on terminated subscriptions")
def abort_terminated_validation_workflows() -> State:
"""Abort any failed validation task that is connected to a subscription that has been terminated."""
validation_tasks = get_failed_tasks()
terminated_subscriptions = defaultdict(list)
for task in validation_tasks:
subscription = get_subscription_by_process_id(task.process_id)
if subscription and subscription.status == SubscriptionLifecycle.TERMINATED:
task.last_status = ProcessStatus.ABORTED
terminated_subscriptions[subscription.subscription_id].append(task)
return {"terminated_subscriptions": terminated_subscriptions}
|
abort_duplicate_validation_workflows()
Abort any duplicate failed validation task, except for the most recently started one.
Source code in gso/workflows/tasks/prune_validation_workflows.py
| @step("Abort duplicate Validation workflows")
def abort_duplicate_validation_workflows() -> State:
"""Abort any duplicate failed validation task, except for the most recently started one."""
validation_tasks = get_failed_tasks()
tasks_per_subscription = defaultdict(list)
for task in validation_tasks:
# Group validation tasks by ``subscription_id``
subscription = get_subscription_by_process_id(task.process_id)
if subscription:
tasks_per_subscription[subscription.subscription_id].append(task)
stale_tasks = {}
for subscription_id, tasks in tasks_per_subscription.items():
# Get the latest validation task, and remove all other past runs
most_recent = max(tasks, key=lambda t: t.started_at)
stale = [task for task in tasks if task is not most_recent]
if stale:
for task in stale:
task.last_status = ProcessStatus.ABORTED
stale_tasks[subscription_id] = stale
return {"stale_validation_tasks": stale_tasks}
|
send_notification_email(terminated_subscriptions, stale_validation_tasks)
Send an email notification containing all pruned validation tasks.
Source code in gso/workflows/tasks/prune_validation_workflows.py
| @step("Send notification email")
def send_notification_email(terminated_subscriptions: dict, stale_validation_tasks: dict[str, list]) -> None:
"""Send an email notification containing all pruned validation tasks."""
settings = load_oss_params()
def _remove_verbose_keys(ts: list[dict]) -> list[dict]:
for t in ts:
t.pop("input_states", None)
t.pop("process_subscriptions", None)
t.pop("steps", None)
t.pop("traceback", None)
return ts
pruned_terminated_subscriptions = ""
for sub, tasks in terminated_subscriptions.items():
subscription = SubscriptionModel.from_subscription(sub)
pruned_terminated_subscriptions += (
f"\n{subscription.description}: {json.dumps(_remove_verbose_keys(tasks), indent=4)}"
)
if pruned_terminated_subscriptions:
pruned_terminated_subscriptions = (
"The following validation tasks have been pruned since the parent subscription has been terminated:\n"
+ pruned_terminated_subscriptions
+ "\n\n------\n"
)
pruned_stale_validation_tasks = ""
for sub, tasks in stale_validation_tasks.items():
subscription = SubscriptionModel.from_subscription(sub)
pruned_stale_validation_tasks += (
f"\n{subscription.description}: {json.dumps(_remove_verbose_keys(tasks), indent=4)}"
)
if pruned_stale_validation_tasks:
pruned_stale_validation_tasks = (
"The following duplicate validation tasks have been pruned:\n"
+ pruned_stale_validation_tasks
+ "\n\n------\n"
)
email_body = (
pruned_terminated_subscriptions
+ pruned_stale_validation_tasks
+ "\nRegards, the GÉANT Automation Platform.\n\n"
)
send_mail(
f"GAP {settings.GENERAL.environment} environment - Validation tasks have been pruned",
email_body,
destination=settings.EMAIL.system_email_destinations,
)
|
task_prune_validation_workflows()
Prune runs of validation workflows.
This task removes duplicate validation tasks, and those that have run on terminated subscriptions.
Source code in gso/workflows/tasks/prune_validation_workflows.py
| @workflow("Prune old validation workflows", target=Target.SYSTEM)
def task_prune_validation_workflows() -> StepList:
"""Prune runs of validation workflows.
This task removes duplicate validation tasks, and those that have run on terminated subscriptions.
"""
tasks_have_been_pruned = conditional(
lambda state: state["terminated_subscriptions"] or state["stale_validation_tasks"]
)
return (
begin
>> abort_terminated_validation_workflows
>> abort_duplicate_validation_workflows
>> tasks_have_been_pruned(send_notification_email)
>> done
)
|