Module that sets up GSO as a Celery worker. This will allow for the scheduling of regular task workflows.
OrchestratorWorker
Bases: Celery
A GSO instance that functions as a Celery worker.
Source code in gso/worker.py
| class OrchestratorWorker(Celery):
"""A GSO instance that functions as a Celery worker."""
websocket_manager: WebSocketManager
process_broadcast_fn: BroadcastFunc
def on_init(self) -> None:
"""Initialise a new Celery worker."""
init_database(app_settings)
# Prepare the wrapped_websocket_manager
# Note: cannot prepare the redis connections here as broadcasting is async
self.websocket_manager = init_websocket_manager(app_settings)
self.process_broadcast_fn = process_broadcast_fn
# Load the products and load the workflows
import gso.products # noqa: PLC0415
import gso.workflows # noqa: PLC0415,F401
logger.info(
"Loaded the workflows and products",
workflows=len(ALL_WORKFLOWS.values()),
products=len(SUBSCRIPTION_MODEL_REGISTRY.values()),
)
def close(self) -> None:
"""Close Celery worker cleanly."""
super().close()
|
on_init()
Initialise a new Celery worker.
Source code in gso/worker.py
| def on_init(self) -> None:
"""Initialise a new Celery worker."""
init_database(app_settings)
# Prepare the wrapped_websocket_manager
# Note: cannot prepare the redis connections here as broadcasting is async
self.websocket_manager = init_websocket_manager(app_settings)
self.process_broadcast_fn = process_broadcast_fn
# Load the products and load the workflows
import gso.products # noqa: PLC0415
import gso.workflows # noqa: PLC0415,F401
logger.info(
"Loaded the workflows and products",
workflows=len(ALL_WORKFLOWS.values()),
products=len(SUBSCRIPTION_MODEL_REGISTRY.values()),
)
|
close()
Close Celery worker cleanly.
Source code in gso/worker.py
| def close(self) -> None:
"""Close Celery worker cleanly."""
super().close()
|
on_setup_logging(**kwargs)
Set up logging for the Celery worker.
Source code in gso/worker.py
| @setup_logging.connect # type: ignore[misc]
def on_setup_logging(**kwargs: Any) -> None: # noqa: ARG001
"""Set up logging for the Celery worker."""
initialise_logging(additional_loggers=LOGGER_OVERRIDES_CELERY)
|
process_broadcast_fn(process_id)
Broadcast process update to WebSocket.
Source code in gso/worker.py
| def process_broadcast_fn(process_id: UUID) -> None:
"""Broadcast process update to WebSocket."""
# Catch all exceptions as broadcasting failure is noncritical to workflow completion
try:
broadcast_process_update_to_websocket(process_id)
except Exception as e:
logger.exception(e) # noqa: TRY401
|
worker_shutting_down_handler(sig, how, exitcode, **kwargs)
Handle the Celery worker shutdown event.
Source code in gso/worker.py
| @worker_shutting_down.connect # type: ignore[misc]
def worker_shutting_down_handler(sig, how, exitcode, **kwargs) -> None: # type: ignore[no-untyped-def] # noqa: ARG001
"""Handle the Celery worker shutdown event."""
celery.close()
|