Skip to content

Worker

Module that sets up GSO as a Celery worker. This will allow for the scheduling of regular task workflows.

OrchestratorWorker

Bases: Celery

A GSO instance that functions as a Celery worker.

Source code in gso/worker.py
class OrchestratorWorker(Celery):
    """A GSO instance that functions as a Celery worker."""

    websocket_manager: WebSocketManager
    process_broadcast_fn: BroadcastFunc

    def on_init(self) -> None:
        """Initialise a new Celery worker."""
        init_database(app_settings)

        # Prepare the wrapped_websocket_manager
        # Note: cannot prepare the redis connections here as broadcasting is async
        self.websocket_manager = init_websocket_manager(app_settings)
        self.process_broadcast_fn = process_broadcast_fn

        # Load the products and load the workflows
        import gso.products  # noqa: PLC0415
        import gso.workflows  # noqa: PLC0415,F401

        logger.info(
            "Loaded the workflows and products",
            workflows=len(ALL_WORKFLOWS.values()),
            products=len(SUBSCRIPTION_MODEL_REGISTRY.values()),
        )

    def close(self) -> None:
        """Close Celery worker cleanly."""
        super().close()

on_init()

Initialise a new Celery worker.

Source code in gso/worker.py
def on_init(self) -> None:
    """Initialise a new Celery worker."""
    init_database(app_settings)

    # Prepare the wrapped_websocket_manager
    # Note: cannot prepare the redis connections here as broadcasting is async
    self.websocket_manager = init_websocket_manager(app_settings)
    self.process_broadcast_fn = process_broadcast_fn

    # Load the products and load the workflows
    import gso.products  # noqa: PLC0415
    import gso.workflows  # noqa: PLC0415,F401

    logger.info(
        "Loaded the workflows and products",
        workflows=len(ALL_WORKFLOWS.values()),
        products=len(SUBSCRIPTION_MODEL_REGISTRY.values()),
    )

close()

Close Celery worker cleanly.

Source code in gso/worker.py
def close(self) -> None:
    """Close Celery worker cleanly."""
    super().close()

on_setup_logging(**kwargs)

Set up logging for the Celery worker.

Source code in gso/worker.py
@setup_logging.connect  # type: ignore[misc]
def on_setup_logging(**kwargs: Any) -> None:  # noqa: ARG001
    """Set up logging for the Celery worker."""
    initialise_logging(additional_loggers=LOGGER_OVERRIDES_CELERY)

process_broadcast_fn(process_id)

Broadcast process update to WebSocket.

Source code in gso/worker.py
def process_broadcast_fn(process_id: UUID) -> None:
    """Broadcast process update to WebSocket."""
    # Catch all exceptions as broadcasting failure is noncritical to workflow completion
    try:
        broadcast_process_update_to_websocket(process_id)
    except Exception as e:
        logger.exception(e)  # noqa: TRY401

worker_shutting_down_handler(sig, how, exitcode, **kwargs)

Handle the Celery worker shutdown event.

Source code in gso/worker.py
@worker_shutting_down.connect  # type: ignore[misc]
def worker_shutting_down_handler(sig, how, exitcode, **kwargs) -> None:  # type: ignore[no-untyped-def]  # noqa: ARG001
    """Handle the Celery worker shutdown event."""
    celery.close()