Skip to content

Validate subscriptions

Scheduled task that runs a validation workflow for all active subscriptions.

validate_subscriptions()

Validate all subscriptions using their corresponding validation workflow.

Source code in gso/schedules/validate_subscriptions.py
@shared_task
@scheduler(CronScheduleConfig(name="Subscriptions Validator", minute="10", hour="3"))
def validate_subscriptions() -> None:
    """Validate all subscriptions using their corresponding validation workflow."""
    subscriptions = get_active_insync_subscriptions()
    if not subscriptions:
        logger.info("No subscriptions to validate")
        return

    for subscription in subscriptions:
        validation_workflow = None

        for workflow in subscription.product.workflows:
            if workflow.target == Target.SYSTEM:
                validation_workflow = workflow.name

        if validation_workflow:
            #  Validation workflows only run on subscriptions that are active, even when they could be run on
            #  provisioning subscriptions. E.g. for routers, they can manually be validated when provisioning, but are
            #  not included in this schedule.
            usable_when = TARGET_DEFAULT_USABLE_MAP[Target.SYSTEM]

            if subscription.status in usable_when:
                json = [{"subscription_id": str(subscription.subscription_id)}]

                validate_func = get_execution_context()["validate"]
                validate_func(validation_workflow, json=json)
        else:
            logger.warning(
                "SubscriptionTable has no validation workflow",
                subscription=subscription,
                product=subscription.product.name,
            )