A collection of methods that make interaction with coreDB more straight-forward.
This prevents someone from having to re-write database statements many times, that might turn out to be erroneous
or inconsistent when not careful. These methods are related to operations regarding processes and workflows.
get_processes_by_workflow_name(workflow_name)
Get all processes for a given workflow name.
Source code in gso/services/processes.py
| def get_processes_by_workflow_name(workflow_name: str) -> Query:
"""Get all processes for a given workflow name."""
return ProcessTable.query.join(WorkflowTable).filter(WorkflowTable.name == workflow_name)
|
get_failed_tasks()
Get all tasks that have failed.
Source code in gso/services/processes.py
| def get_failed_tasks() -> list[ProcessTable]:
"""Get all tasks that have failed."""
return ProcessTable.query.filter(
and_(ProcessTable.is_task.is_(True), ProcessTable.last_status == ProcessStatus.FAILED)
).all()
|
get_suspended_tasks_by_workflow_name(workflow_name)
Get all tasks that have gone into a suspended state, for a specific workflow name.
Source code in gso/services/processes.py
| def get_suspended_tasks_by_workflow_name(workflow_name: str) -> list[ProcessTable]:
"""Get all tasks that have gone into a suspended state, for a specific workflow name."""
return (
get_processes_by_workflow_name(workflow_name)
.filter(and_(ProcessTable.is_task.is_(True), ProcessTable.last_status == ProcessStatus.SUSPENDED))
.all()
)
|
get_all_cleanup_tasks()
Get a list of all cleanup tasks that run on a schedule.
Source code in gso/services/processes.py
| def get_all_cleanup_tasks() -> list[WorkflowTable]:
"""Get a list of all cleanup tasks that run on a schedule."""
return WorkflowTable.query.filter(
or_(WorkflowTable.name == "task_clean_up_tasks", WorkflowTable.name == "task_clean_old_tasks")
).all()
|
get_created_and_completed_processes_by_id(workflow_id)
Get all processes that are either created or completed, by workflow ID.
Source code in gso/services/processes.py
| def get_created_and_completed_processes_by_id(workflow_id: UUIDstr) -> ScalarResult:
"""Get all processes that are either created or completed, by workflow ID."""
return db.session.scalars(
select(ProcessTable)
.filter(ProcessTable.is_task.is_(True))
.filter(ProcessTable.workflow_id == workflow_id)
.filter(
or_(
ProcessTable.last_status == ProcessStatus.COMPLETED.value,
ProcessTable.last_status == ProcessStatus.CREATED.value,
)
)
)
|
get_stopped_process_by_id(process_id)
Get a stopped process by its ID.
Source code in gso/services/processes.py
| def get_stopped_process_by_id(process_id: UUID | UUIDstr) -> ProcessTable | None:
"""Get a stopped process by its ID."""
return (
db.session.query(ProcessTable)
.join(ProcessTable.steps)
.filter(
ProcessTable.process_id == process_id,
ProcessStepTable.status.in_({StepStatus.ABORT, StepStatus.FAILED, StepStatus.COMPLETE}),
)
.options(
joinedload(ProcessTable.steps),
joinedload(ProcessTable.process_subscriptions).joinedload(ProcessSubscriptionTable.subscription),
)
.order_by(ProcessTable.last_modified_at)
.one_or_none()
)
|