Skip to content

Processes

A collection of methods that make interaction with coreDB more straight-forward.

This prevents someone from having to re-write database statements many times, that might turn out to be erroneous or inconsistent when not careful. These methods are related to operations regarding processes and workflows.

get_processes_by_workflow_name(workflow_name)

Get all processes for a given workflow name.

Source code in gso/services/processes.py
def get_processes_by_workflow_name(workflow_name: str) -> Query:
    """Get all processes for a given workflow name."""
    return ProcessTable.query.join(WorkflowTable).filter(WorkflowTable.name == workflow_name)

get_failed_tasks()

Get all tasks that have failed.

Source code in gso/services/processes.py
def get_failed_tasks() -> list[ProcessTable]:
    """Get all tasks that have failed."""
    return ProcessTable.query.filter(
        and_(ProcessTable.is_task.is_(True), ProcessTable.last_status == ProcessStatus.FAILED)
    ).all()

get_suspended_tasks_by_workflow_name(workflow_name)

Get all tasks that have gone into a suspended state, for a specific workflow name.

Source code in gso/services/processes.py
def get_suspended_tasks_by_workflow_name(workflow_name: str) -> list[ProcessTable]:
    """Get all tasks that have gone into a suspended state, for a specific workflow name."""
    return (
        get_processes_by_workflow_name(workflow_name)
        .filter(and_(ProcessTable.is_task.is_(True), ProcessTable.last_status == ProcessStatus.SUSPENDED))
        .all()
    )

get_all_cleanup_tasks()

Get a list of all cleanup tasks that run on a schedule.

Source code in gso/services/processes.py
def get_all_cleanup_tasks() -> list[WorkflowTable]:
    """Get a list of all cleanup tasks that run on a schedule."""
    return WorkflowTable.query.filter(
        or_(WorkflowTable.name == "task_clean_up_tasks", WorkflowTable.name == "task_clean_old_tasks")
    ).all()

get_created_and_completed_processes_by_id(workflow_id)

Get all processes that are either created or completed, by workflow ID.

Source code in gso/services/processes.py
def get_created_and_completed_processes_by_id(workflow_id: UUIDstr) -> ScalarResult:
    """Get all processes that are either created or completed, by workflow ID."""
    return db.session.scalars(
        select(ProcessTable)
        .filter(ProcessTable.is_task.is_(True))
        .filter(ProcessTable.workflow_id == workflow_id)
        .filter(
            or_(
                ProcessTable.last_status == ProcessStatus.COMPLETED.value,
                ProcessTable.last_status == ProcessStatus.CREATED.value,
            )
        )
    )

get_stopped_process_by_id(process_id)

Get a stopped process by its ID.

Source code in gso/services/processes.py
def get_stopped_process_by_id(process_id: UUID | UUIDstr) -> ProcessTable | None:
    """Get a stopped process by its ID."""
    return (
        db.session.query(ProcessTable)
        .join(ProcessTable.steps)
        .filter(
            ProcessTable.process_id == process_id,
            ProcessStepTable.status.in_({StepStatus.ABORT, StepStatus.FAILED, StepStatus.COMPLETE}),
        )
        .options(
            joinedload(ProcessTable.steps),
            joinedload(ProcessTable.process_subscriptions).joinedload(ProcessSubscriptionTable.subscription),
        )
        .order_by(ProcessTable.last_modified_at)
        .one_or_none()
    )