Skip to content

Bulk wf

Bulk workflow tasks.

BulkWfPayload

Bases: BaseModel

Payload for bulk workflow operations.

Source code in gso/tasks/bulk_wf.py
class BulkWfPayload(BaseModel):
    """Payload for bulk workflow operations."""

    workflow_key: str
    user_inputs: list[State]
    callback_route: str | None = None
    identifier: str

BulkRunMode

Bases: StrEnum

Enumeration for bulk workflow run modes.

Source code in gso/tasks/bulk_wf.py
class BulkRunMode(StrEnum):
    """Enumeration for bulk workflow run modes."""

    PARALLEL = "parallel"
    RANDOM_DELAY = "random_delay"
    STAGGERED = "staggered"

process_one_wf(wf_payload)

Celery subtask to start & wait for a single workflow.

Returns (identifiers, succeeded:bool, message:str).

Source code in gso/tasks/bulk_wf.py
@shared_task(ignore_result=False)
def process_one_wf(wf_payload: dict) -> tuple[str, bool, str]:
    """Celery subtask to start & wait for a single workflow.

    Returns (identifiers, succeeded:bool, message:str).
    """
    payload = BulkWfPayload(**wf_payload)
    succeeded = False
    try:
        pid = start_process(workflow_key=payload.workflow_key, user_inputs=payload.user_inputs)

        proc = wait_for_workflow_to_stop(pid, check_interval=5, max_retries=60)

        if proc is None:
            message = "Timed out waiting for workflow to complete"
        elif proc.last_step == "Done" and proc.last_status == ProcessStatus.COMPLETED:
            succeeded = True
            message = "Done"
        elif proc.last_status == ProcessStatus.ABORTED:
            message = "Workflow was aborted"
        elif proc.last_status == ProcessStatus.FAILED:
            message = proc.failed_reason or "Workflow failed without a reason"
        else:
            message = f"Workflow status: {proc.last_status}, last step: {proc.last_step}"

    except FormValidationError as e:
        message = f"Validation error: {e}"
    except Exception as e:  # noqa: BLE001
        message = f"Unexpected error: {e}, identifier: {payload.identifier}"

    return payload.identifier, succeeded, message

finalize_bulk_wf(results, callback_route, identifiers, success_key, failure_key)

Called once after all process_one_wf tasks.results is a list of (identifier, succeeded, message).

Source code in gso/tasks/bulk_wf.py
@shared_task(ignore_result=False)
def finalize_bulk_wf(
    results: list[tuple[str, bool, str]],
    callback_route: str,
    identifiers: list[str],
    success_key: str,
    failure_key: str,
) -> None:
    """Called once after all process_one_wf tasks.`results` is a list of (identifier, succeeded, message)."""
    successful_wfs = {}
    failed_wfs = {}
    for identifier, ok, msg in results:
        if ok:
            successful_wfs[identifier] = msg
        else:
            failed_wfs[identifier] = msg

    # fire callback
    oss = settings.load_oss_params()
    callback_url = f"{oss.GENERAL.internal_hostname}{callback_route}"
    payload = {
        failure_key: failed_wfs,
        success_key: successful_wfs,
    }

    try:
        response = requests.post(callback_url, json=payload, timeout=30)
        if not response.ok:
            logger.error(
                "Callback failed",
                extra={
                    "status_code": response.status_code,
                    "response_text": response.text,
                    "callback_url": callback_url,
                    failure_key: failed_wfs,
                    "identifiers": identifiers,
                },
            )
    except Exception as e:
        msg = f"Failed to post callback: {e}"
        logger.exception(
            msg,
            extra={
                "callback_url": callback_url,
                failure_key: failed_wfs,
                "identifiers": identifiers,
            },
        )

bulk_wf_task(wf_payloads, callback_route, success_key, failure_key, run_mode=BulkRunMode.RANDOM_DELAY, delay_min=1.0, delay_max=10.0, stagger_step=2.0, initial_delay=0.0)

Kicks off one Celery subtask per workflow, then runs the final callback.

Source code in gso/tasks/bulk_wf.py
@shared_task(ignore_result=False)
def bulk_wf_task(  # noqa: PLR0917
    wf_payloads: list[dict],
    callback_route: str,
    success_key: str,
    failure_key: str,
    run_mode: str = BulkRunMode.RANDOM_DELAY,
    delay_min: float = 1.0,
    delay_max: float = 10.0,
    stagger_step: float = 2.0,
    initial_delay: float = 0.0,
) -> None:
    """Kicks off one Celery subtask per workflow, then runs the final callback."""
    identifiers = {wf_payload["identifier"] for wf_payload in wf_payloads}

    if run_mode == BulkRunMode.RANDOM_DELAY:
        lo, hi = (delay_min, delay_max) if delay_min <= delay_max else (delay_max, delay_min)
        header = [process_one_wf.s(p).set(countdown=random.uniform(lo, hi)) for p in wf_payloads]  # type: ignore[attr-defined]

    elif run_mode == BulkRunMode.STAGGERED:
        header = [
            process_one_wf.s(p).set(countdown=initial_delay + i * float(stagger_step))  # type: ignore[attr-defined]
            for i, p in enumerate(wf_payloads)
        ]

    else:
        header = [process_one_wf.s(p) for p in wf_payloads]  # type: ignore[attr-defined]

    chord(header)(finalize_bulk_wf.s(callback_route, list(identifiers), success_key, failure_key))  # type: ignore[attr-defined]