Skip to content

Processes

A collection of methods that make interaction with coreDB more straight-forward.

This prevents someone from having to re-write database statements many times, that might turn out to be erroneous or inconsistent when not careful. These methods are related to operations regarding processes and workflows.

count_incomplete_validate_products()

Count the number of incomplete validate_geant_products processes.

Returns:

Type Description
int

The count of incomplete 'validate_geant_products' processes.

Source code in gso/services/processes.py
def count_incomplete_validate_products() -> int:
    """Count the number of incomplete validate_geant_products processes.

    Returns:
        The count of incomplete 'validate_geant_products' processes.
    """
    return ProcessTable.query.filter(
        ProcessTable.workflow_name == "validate_geant_products",
        ProcessTable.last_status != ProcessStatus.COMPLETED.value,
    ).count()

get_failed_tasks()

Get all tasks that have failed.

Source code in gso/services/processes.py
def get_failed_tasks() -> list[ProcessTable]:
    """Get all tasks that have failed."""
    return ProcessTable.query.filter(
        ProcessTable.is_task.is_(True), ProcessTable.last_status == ProcessStatus.FAILED.value
    ).all()

get_all_cleanup_tasks()

Get a list of all cleanup tasks that run on a schedule.

Source code in gso/services/processes.py
def get_all_cleanup_tasks() -> list[WorkflowTable]:
    """Get a list of all cleanup tasks that run on a schedule."""
    return WorkflowTable.query.filter(
        or_(WorkflowTable.name == "task_clean_up_tasks", WorkflowTable.name == "task_clean_old_tasks")
    ).all()

get_created_and_completed_processes_by_id(workflow_id)

Get all processes that are either created or completed, by workflow ID.

Source code in gso/services/processes.py
def get_created_and_completed_processes_by_id(workflow_id: UUIDstr) -> ScalarResult:
    """Get all processes that are either created or completed, by workflow ID."""
    return db.session.scalars(
        select(ProcessTable)
        .filter(ProcessTable.is_task.is_(True))
        .filter(ProcessTable.workflow_id == workflow_id)
        .filter(
            or_(
                ProcessTable.last_status == ProcessStatus.COMPLETED.value,
                ProcessTable.last_status == ProcessStatus.CREATED.value,
            )
        )
    )