Scheduled task that runs a validation workflow for all active subscriptions.
validate_subscriptions()
Validate all subscriptions using their corresponding validation workflow.
Source code in gso/schedules/validate_subscriptions.py
| @shared_task
@scheduler(CronScheduleConfig(name="Subscriptions Validator", minute="10", hour="3"))
def validate_subscriptions() -> None:
"""Validate all subscriptions using their corresponding validation workflow."""
subscriptions = get_active_insync_subscriptions()
if not subscriptions:
logger.info("No subscriptions to validate")
return
for subscription in subscriptions:
validation_workflow = None
for workflow in subscription.product.workflows:
if workflow.target == Target.SYSTEM:
validation_workflow = workflow.name
if validation_workflow:
# Validation workflows only run on subscriptions that are active, even when they could be run on
# provisioning subscriptions. E.g. for routers, they can manually be validated when provisioning, but are
# not included in this schedule.
usable_when = TARGET_DEFAULT_USABLE_MAP[Target.SYSTEM]
if subscription.status in usable_when:
json = [{"subscription_id": str(subscription.subscription_id)}]
validate_func = get_execution_context()["validate"]
validate_func(validation_workflow, json=json)
else:
logger.warning(
"SubscriptionTable has no validation workflow",
subscription=subscription,
product=subscription.product.name,
)
|