Skip to content

Processes

A collection of methods that make interaction with coreDB more straight-forward.

This prevents someone from having to re-write database statements many times, that might turn out to be erroneous or inconsistent when not careful. These methods are related to operations regarding processes and workflows.

get_enrichable_processes_by_ids(process_ids)

Fetch processes with relationships needed for enrichment.

Returns raw ORM rows ordered as returned by the database. Caller can preserve caller ordering if needed by sorting after enrichment.

Source code in gso/services/processes.py
def get_enrichable_processes_by_ids(process_ids: list[UUID]) -> list[ProcessTable]:
    """Fetch processes with relationships needed for enrichment.

    Returns raw ORM rows ordered as returned by the database. Caller can
    preserve caller ordering if needed by sorting after enrichment.
    """
    if not process_ids:
        return []

    return (
        db.session.query(ProcessTable)
        .filter(ProcessTable.process_id.in_(process_ids))
        .options(
            joinedload(ProcessTable.workflow),
            joinedload(ProcessTable.process_subscriptions)
            .joinedload(ProcessSubscriptionTable.subscription)
            .joinedload(SubscriptionTable.product),
        )
        .all()
    )

get_processes_by_ids(process_ids)

Return minimal process data ordered to match the given IDs.

Source code in gso/services/processes.py
def get_processes_by_ids(process_ids: list[UUID]) -> list[dict[str, Any]]:
    """Return minimal process data ordered to match the given IDs."""
    if not process_ids:
        return []

    order = {pid: idx for idx, pid in enumerate(process_ids)}
    rows: list[ProcessTable] = (
        db.session.query(ProcessTable)
        .filter(ProcessTable.process_id.in_(process_ids))
        .options(
            joinedload(ProcessTable.workflow),
            joinedload(ProcessTable.process_subscriptions),
        )
        .all()
    )

    serialized: list[dict[str, Any]] = []
    for row in rows:
        subscription_ids = [ps.subscription_id for ps in row.process_subscriptions]
        serialized.append({
            "process_id": row.process_id,
            "workflow_id": row.workflow_id,
            "workflow_name": row.workflow.name if row.workflow else None,
            "last_status": str(row.last_status),
            "is_task": bool(row.is_task),
            "created_by": row.created_by,
            "started_at": row.started_at,
            "last_modified_at": row.last_modified_at,
            "last_step": row.last_step,
            "failed_reason": row.failed_reason,
            "subscription_ids": subscription_ids or None,
        })

    serialized.sort(key=lambda item: order.get(item["process_id"], len(order)))
    return serialized

get_processes_by_workflow_name(workflow_name)

Get all processes for a given workflow name.

Source code in gso/services/processes.py
def get_processes_by_workflow_name(workflow_name: str) -> Query:
    """Get all processes for a given workflow name."""
    return ProcessTable.query.join(WorkflowTable).filter(WorkflowTable.name == workflow_name)

get_failed_tasks()

Get all tasks that have failed.

Source code in gso/services/processes.py
def get_failed_tasks() -> list[ProcessTable]:
    """Get all tasks that have failed."""
    return ProcessTable.query.filter(
        and_(ProcessTable.is_task.is_(True), ProcessTable.last_status == ProcessStatus.FAILED)
    ).all()

get_suspended_tasks_by_workflow_name(workflow_name)

Get all tasks that have gone into a suspended state, for a specific workflow name.

Source code in gso/services/processes.py
def get_suspended_tasks_by_workflow_name(workflow_name: str) -> list[ProcessTable]:
    """Get all tasks that have gone into a suspended state, for a specific workflow name."""
    return (
        get_processes_by_workflow_name(workflow_name)
        .filter(and_(ProcessTable.is_task.is_(True), ProcessTable.last_status == ProcessStatus.SUSPENDED))
        .all()
    )

get_all_cleanup_tasks()

Get a list of all cleanup tasks that run on a schedule.

Source code in gso/services/processes.py
def get_all_cleanup_tasks() -> list[WorkflowTable]:
    """Get a list of all cleanup tasks that run on a schedule."""
    return WorkflowTable.query.filter(
        or_(WorkflowTable.name == "task_clean_up_tasks", WorkflowTable.name == "task_clean_old_tasks")
    ).all()

get_created_and_completed_processes_by_id(workflow_id)

Get all processes that are either created or completed, by workflow ID.

Source code in gso/services/processes.py
def get_created_and_completed_processes_by_id(workflow_id: UUIDstr) -> ScalarResult:
    """Get all processes that are either created or completed, by workflow ID."""
    return db.session.scalars(
        select(ProcessTable)
        .filter(ProcessTable.is_task.is_(True))
        .filter(ProcessTable.workflow_id == workflow_id)
        .filter(
            or_(
                ProcessTable.last_status == ProcessStatus.COMPLETED.value,
                ProcessTable.last_status == ProcessStatus.CREATED.value,
            )
        )
    )

get_stopped_process_by_id(process_id)

Get a stopped process by its ID.

Source code in gso/services/processes.py
def get_stopped_process_by_id(process_id: UUID | UUIDstr) -> ProcessTable | None:
    """Get a stopped process by its ID."""
    return (
        db.session.query(ProcessTable)
        .join(ProcessTable.steps)
        .filter(
            ProcessTable.process_id == process_id,
            ProcessStepTable.status.in_({StepStatus.ABORT, StepStatus.FAILED, StepStatus.COMPLETE}),
        )
        .options(
            joinedload(ProcessTable.steps),
            joinedload(ProcessTable.process_subscriptions).joinedload(ProcessSubscriptionTable.subscription),
        )
        .order_by(ProcessTable.last_modified_at)
        .one_or_none()
    )