Skip to content

Search

Search API endpoints for column metadata for subscriptions, workflows, and tasks.

WorkflowProductSchema

Bases: BaseModel

Product associated with a workflow.

Source code in gso/api/v1/search.py
class WorkflowProductSchema(BaseModel):
    """Product associated with a workflow."""

    model_config = ConfigDict(from_attributes=True)

    product_type: str
    product_id: UUID
    name: str

WorkflowSearchSchema

Bases: BaseModel

Schema for workflow search results.

Source code in gso/api/v1/search.py
class WorkflowSearchSchema(BaseModel):
    """Schema for workflow search results."""

    model_config = ConfigDict(from_attributes=True)

    name: str
    products: list[WorkflowProductSchema]
    description: str | None = None
    created_at: datetime | None = None

ProcessSearchSchema

Bases: BaseModel

Schema for process search results.

Source code in gso/api/v1/search.py
class ProcessSearchSchema(BaseModel):
    """Schema for process search results."""

    model_config = ConfigDict(from_attributes=True)

    process_id: UUID
    workflow_name: str
    workflow_id: UUID
    last_status: str
    is_task: bool
    created_by: str | None = None
    started_at: datetime
    last_modified_at: datetime
    last_step: str | None = None
    failed_reason: str | None = None
    subscription_ids: list[UUID] | None = None

SubscriptionColumnPrefix

Bases: StrEnum

Prefix applied to subscription-linked fields.

Source code in gso/api/v1/search.py
class SubscriptionColumnPrefix(StrEnum):
    """Prefix applied to subscription-linked fields."""

    LINKED_SUBSCRIPTION = "linked_subscription"

LabelPrefix

Bases: StrEnum

Display labels applied to linked column names.

Source code in gso/api/v1/search.py
class LabelPrefix(StrEnum):
    """Display labels applied to linked column names."""

    SUBSCRIPTION = "Subscription"
    PRODUCT = "Product"

ProductColumnPrefix

Bases: StrEnum

Prefixes applied to product-linked fields for different contexts.

Source code in gso/api/v1/search.py
class ProductColumnPrefix(StrEnum):
    """Prefixes applied to product-linked fields for different contexts."""

    PROCESS_PREFIX = "linked_product"
    SUBSCRIPTION_PREFIX = "product"

SortModel

Bases: BaseModel

Sorting instruction for a single field.

Source code in gso/api/v1/search.py
class SortModel(BaseModel):
    """Sorting instruction for a single field."""

    field: str
    order: SortOrder = SortOrder.ASC

FilterModel

Bases: BaseModel

Filter instruction pairing a field with a value.

Source code in gso/api/v1/search.py
class FilterModel(BaseModel):
    """Filter instruction pairing a field with a value."""

    field: str
    # Accept any JSON-serializable value to be flexible with legacy clients
    # that may send non-string filter values. Validation of the content is
    # delegated to the filter layer.
    value: object

SubscriptionSearchRequest

Bases: BaseModel

Request payload for subscription search.

Source code in gso/api/v1/search.py
class SubscriptionSearchRequest(BaseModel):
    """Request payload for subscription search."""

    query: str = Field("", description="Solr query string")
    sort_by: list[SortModel] = Field(default_factory=list)
    filter_by: list[FilterModel] = Field(default_factory=list)
    start: int = 0
    size: int = 25

    @field_validator("filter_by", mode="before")
    @classmethod
    def normalize_legacy_filter_strings(cls, v: object) -> object:
        """Support legacy shorthand filters like ["type:subscription"].

        This allows existing clients to keep working while the preferred
        format is a list of objects: [{"field": "type", "value": "subscription"}].

        Additionally, support `resource_type` field syntax such as
        "resource_type.site_country:(*NL*)" by translating it to two filters:
        - {"field": "resource_type", "value": "site_country"}
        - {"field": "instance_value", "value": "(*NL*)"}

        Also support payloads where values already include surrounding
        parentheses, e.g. ``resource_type.site_country_code:(NL)`` which will
        be preserved as-is and passed through to the filter layer.
        """
        if v is None:
            return []

        # Already a list of `dicts` / FilterModel-compatible objects
        if isinstance(v, list) and (not v or isinstance(v[0], dict)):
            return v

        # Legacy string form such as ``["field:value", ...]``
        if isinstance(v, list) and all(isinstance(item, str) for item in v):
            normalized: list[dict[str, object]] = []
            for item in v:
                # Special handling for resource_type.<name>:(<pattern>) syntax
                if item.startswith("resource_type.") and ":" in item:
                    field_part, value_part = item.split(":", 1)
                    # `field_part` is like `resource_type.site_country`
                    _, _, resource_field = field_part.partition(".")
                    value_part = value_part.strip()
                    if resource_field:
                        normalized.extend([
                            {"field": "resource_type", "value": resource_field},
                            {"field": "instance_value", "value": value_part},
                        ])
                        continue

                if ":" in item:
                    field, value = item.split(":", 1)
                    normalized.append({"field": field, "value": value})
                else:
                    # If no colon, treat whole string as value with a generic field
                    normalized.append({"field": "_raw", "value": item})
            return normalized

        return v

normalize_legacy_filter_strings(v) classmethod

Support legacy shorthand filters like ["type:subscription"].

This allows existing clients to keep working while the preferred format is a list of objects: [{"field": "type", "value": "subscription"}].

Additionally, support resource_type field syntax such as "resource_type.site_country:(NL)" by translating it to two filters: - {"field": "resource_type", "value": "site_country"} - {"field": "instance_value", "value": "(NL)"}

Also support payloads where values already include surrounding parentheses, e.g. resource_type.site_country_code:(NL) which will be preserved as-is and passed through to the filter layer.

Source code in gso/api/v1/search.py
@field_validator("filter_by", mode="before")
@classmethod
def normalize_legacy_filter_strings(cls, v: object) -> object:
    """Support legacy shorthand filters like ["type:subscription"].

    This allows existing clients to keep working while the preferred
    format is a list of objects: [{"field": "type", "value": "subscription"}].

    Additionally, support `resource_type` field syntax such as
    "resource_type.site_country:(*NL*)" by translating it to two filters:
    - {"field": "resource_type", "value": "site_country"}
    - {"field": "instance_value", "value": "(*NL*)"}

    Also support payloads where values already include surrounding
    parentheses, e.g. ``resource_type.site_country_code:(NL)`` which will
    be preserved as-is and passed through to the filter layer.
    """
    if v is None:
        return []

    # Already a list of `dicts` / FilterModel-compatible objects
    if isinstance(v, list) and (not v or isinstance(v[0], dict)):
        return v

    # Legacy string form such as ``["field:value", ...]``
    if isinstance(v, list) and all(isinstance(item, str) for item in v):
        normalized: list[dict[str, object]] = []
        for item in v:
            # Special handling for resource_type.<name>:(<pattern>) syntax
            if item.startswith("resource_type.") and ":" in item:
                field_part, value_part = item.split(":", 1)
                # `field_part` is like `resource_type.site_country`
                _, _, resource_field = field_part.partition(".")
                value_part = value_part.strip()
                if resource_field:
                    normalized.extend([
                        {"field": "resource_type", "value": resource_field},
                        {"field": "instance_value", "value": value_part},
                    ])
                    continue

            if ":" in item:
                field, value = item.split(":", 1)
                normalized.append({"field": field, "value": value})
            else:
                # If no colon, treat whole string as value with a generic field
                normalized.append({"field": "_raw", "value": item})
        return normalized

    return v

SubscriptionSearchResponse

Bases: BaseModel

Subscription search results including paging info.

Source code in gso/api/v1/search.py
class SubscriptionSearchResponse(BaseModel):
    """Subscription search results including paging info."""

    total: int
    start: int
    size: int
    items: list[dict[str, object]]

ProcessSearchRequest

Bases: BaseModel

Request payload for process search.

Source code in gso/api/v1/search.py
class ProcessSearchRequest(BaseModel):
    """Request payload for process search."""

    query: str = Field("", description="Solr query string")
    sort_by: list[SortModel] = Field(default_factory=list)
    filter_by: list[FilterModel] = Field(default_factory=list)
    start: int = 0
    size: int = 25

    @field_validator("filter_by", mode="before")
    @classmethod
    def normalize_legacy_filter_strings(cls, v: object) -> object:
        """Support legacy shorthand filters like ["is_task:true"]."""
        if v is None:
            return []

        if isinstance(v, list) and (not v or isinstance(v[0], dict)):
            return v

        if isinstance(v, list) and all(isinstance(item, str) for item in v):
            normalized: list[dict[str, object]] = []
            for item in v:
                if ":" in item:
                    field, value = item.split(":", 1)
                    normalized.append({"field": field, "value": value})
                else:
                    normalized.append({"field": "_raw", "value": item})
            return normalized

        return v

normalize_legacy_filter_strings(v) classmethod

Support legacy shorthand filters like ["is_task:true"].

Source code in gso/api/v1/search.py
@field_validator("filter_by", mode="before")
@classmethod
def normalize_legacy_filter_strings(cls, v: object) -> object:
    """Support legacy shorthand filters like ["is_task:true"]."""
    if v is None:
        return []

    if isinstance(v, list) and (not v or isinstance(v[0], dict)):
        return v

    if isinstance(v, list) and all(isinstance(item, str) for item in v):
        normalized: list[dict[str, object]] = []
        for item in v:
            if ":" in item:
                field, value = item.split(":", 1)
                normalized.append({"field": field, "value": value})
            else:
                normalized.append({"field": "_raw", "value": item})
        return normalized

    return v

ProcessSearchResponse

Bases: BaseModel

Process search results including paging info.

Source code in gso/api/v1/search.py
class ProcessSearchResponse(BaseModel):
    """Process search results including paging info."""

    total: int
    start: int
    size: int
    items: list[dict[str, object]]

_field_type_str(annotation)

Map a Pydantic/typing annotation to a simple type string for the UI.

Source code in gso/api/v1/search.py
def _field_type_str(annotation: object) -> str:
    """Map a Pydantic/typing annotation to a simple type string for the UI."""
    origin = get_origin(annotation)
    args = get_args(annotation)

    typ = origin or annotation
    result = "string"
    if typ is bool:
        result = "boolean"
    elif typ is int:
        result = "integer"
    elif typ in {float, Decimal}:
        result = "number"
    elif typ in {datetime, date}:
        result = "datetime"
    elif typ is UUID:
        result = "uuid"
    elif typ in {list, tuple, set}:
        nested = _field_type_str(args[0]) if args else "string"
        result = f"array<{nested}>"

    return result

_ensure_new_search_enabled()

Return 404 when the new search feature flag is disabled.

Source code in gso/api/v1/search.py
def _ensure_new_search_enabled() -> None:
    """Return 404 when the new search feature flag is disabled."""
    if not load_oss_params().SOLR.enable_new_search:
        raise HTTPException(status.HTTP_404_NOT_FOUND, "Search API is disabled")

_column_definitions_from_names(names)

Convert field names to {field, name} structures expected by the UI.

We title-case the display name; customize per field later if needed.

Source code in gso/api/v1/search.py
def _column_definitions_from_names(names: Iterable[str]) -> list[dict[str, str]]:
    """Convert field names to {field, name} structures expected by the UI.

    We title-case the display name; customize per field later if needed.
    """
    return [{"field": name, "name": name.replace("_", " ").title()} for name in names]

_get_subscription_column_names()

Return the list of subscription table column names.

This uses SQLAlchemy table metadata so it stays in sync with the DB schema.

Source code in gso/api/v1/search.py
def _get_subscription_column_names() -> list[str]:
    """Return the list of subscription table column names.

    This uses `SQLAlchemy` table metadata so it stays in sync with the DB schema.
    """
    return [col.name for col in SubscriptionTable.__table__.columns]

_get_schema_field_names(schema)

Return Pydantic field or alias names for the provided schema.

Source code in gso/api/v1/search.py
def _get_schema_field_names(schema: type[BaseModel]) -> list[str]:
    """Return Pydantic field or alias names for the provided schema."""
    names = [field.alias or name for name, field in schema.model_fields.items()]
    # Remove duplicates while preserving order
    return list(dict.fromkeys(names))

_filter_out_id_suffix(fields)

Filter out fields that end with _id suffix.

Used for the /columns endpoint so internal identifier fields like process_id or workflow_id are not exposed as selectable columns.

Source code in gso/api/v1/search.py
def _filter_out_id_suffix(fields: list[str]) -> list[str]:
    """Filter out fields that end with `_id` suffix.

    Used for the /columns endpoint so internal identifier fields like
    `process_id` or `workflow_id` are not exposed as selectable columns.
    """
    return [name for name in fields if not name.endswith("_id")]

_column_definitions_from_schema(schema, *, include_fields=None)

Build column definitions (field, name, type) from a Pydantic schema.

Optionally restrict to a whitelist of field names/aliases when include_fields is provided.

Source code in gso/api/v1/search.py
def _column_definitions_from_schema(
    schema: type[BaseModel],
    *,
    include_fields: set[str] | None = None,
) -> list[dict[str, str]]:
    """Build column definitions (field, name, type) from a Pydantic schema.

    Optionally restrict to a whitelist of field names/aliases when `include_fields` is provided.
    """
    columns: list[dict[str, str]] = []
    for name, field in schema.model_fields.items():
        field_name = field.alias or name
        if include_fields is not None and field_name not in include_fields:
            continue
        if field_name.endswith("_id"):
            continue
        columns.append({
            "field": field_name,
            "name": field_name.replace("_", " ").title(),
            "type": _field_type_str(field.annotation),
        })
    return columns

_get_linked_columns(schema, *, prefix, label_prefix)

Generic helper to build linked column definitions from a schema.

Source code in gso/api/v1/search.py
def _get_linked_columns(schema: type[BaseModel], *, prefix: str, label_prefix: str) -> list[dict[str, str]]:
    """Generic helper to build linked column definitions from a schema."""
    try:
        base_columns = _column_definitions_from_schema(schema)
    except Exception as exc:  # pragma: no cover - DB errors bubble to API
        raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, str(exc)) from exc

    columns: list[dict[str, str]] = []
    for col in base_columns:
        field_name = f"{prefix}_{col['field']}"
        display_name = f"{label_prefix} {col['name']}"
        columns.append({"field": field_name, "name": display_name, "type": col["type"]})
    return columns

_get_linked_subscription_column(*, prefix=SubscriptionColumnPrefix.LINKED_SUBSCRIPTION, label_prefix=LabelPrefix.SUBSCRIPTION)

Return column definitions for linked subscriptions.

Source code in gso/api/v1/search.py
def _get_linked_subscription_column(
    *,
    prefix: SubscriptionColumnPrefix = SubscriptionColumnPrefix.LINKED_SUBSCRIPTION,
    label_prefix: LabelPrefix = LabelPrefix.SUBSCRIPTION,
) -> list[dict[str, str]]:
    """Return column definitions for linked subscriptions."""
    return _get_linked_columns(SubscriptionBaseSchema, prefix=prefix.value, label_prefix=label_prefix.value)

_get_linked_product_column(*, prefix=ProductColumnPrefix.PROCESS_PREFIX, label_prefix=LabelPrefix.PRODUCT)

Return column definitions for linked products.

Source code in gso/api/v1/search.py
def _get_linked_product_column(
    *, prefix: ProductColumnPrefix = ProductColumnPrefix.PROCESS_PREFIX, label_prefix: LabelPrefix = LabelPrefix.PRODUCT
) -> list[dict[str, str]]:
    """Return column definitions for linked products."""
    return _get_linked_columns(ProductBaseSchema, prefix=prefix.value, label_prefix=label_prefix.value)

_get_resource_type_columns()

Return column definitions derived from all resource types.

Each entry is of the form {"field": "resource_type.", "name": description}.

Source code in gso/api/v1/search.py
def _get_resource_type_columns() -> list[dict[str, str]]:
    """Return column definitions derived from all resource types.

    Each entry is of the form {"field": "resource_type.<resource_type>", "name": description}.
    """
    try:
        rows = list_resource_types()
    except Exception as exc:  # pragma: no cover - DB errors bubble to API
        raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, str(exc)) from exc

    columns: list[dict[str, str]] = []
    for row in rows:
        # Safety: cast to str in case of unexpected types
        field_name = f"resource_type.{row.resource_type}"
        display_name = str(row.description or row.resource_type)
        columns.append({"field": field_name, "name": display_name, "type": "string"})
    return columns

_ensure_columns(columns, extras)

Append missing columns from extras while preserving order.

Source code in gso/api/v1/search.py
def _ensure_columns(columns: list[dict[str, str]], extras: list[dict[str, str]]) -> list[dict[str, str]]:
    """Append missing columns from `extras` while preserving order."""
    existing_fields = {col["field"] for col in columns}
    missing = [extra for extra in extras if extra["field"] not in existing_fields]
    existing_fields.update(extra["field"] for extra in missing)
    columns.extend(missing)
    return columns

get_search_columns() async

Return available columns for subscriptions, workflows, and tasks.

  • Subscriptions: base subscription schema (excluding id-suffixed fields) plus resource-type columns.
  • Workflows: fields from WorkflowSearchSchema (excluding id-suffixed fields) plus linked subscription/product columns.
  • Tasks (processes): selected fields from ProcessSchema (excluding id-suffixed fields) plus linked subscription/product columns.
Source code in gso/api/v1/search.py
@router.get("/columns", status_code=status.HTTP_200_OK)
async def get_search_columns() -> dict:
    """Return available columns for subscriptions, workflows, and tasks.

    - Subscriptions: base subscription schema (excluding id-suffixed fields) plus
      resource-type columns.
    - Workflows: fields from `WorkflowSearchSchema` (excluding id-suffixed
      fields) plus linked subscription/product columns.
    - Tasks (processes): selected fields from `ProcessSchema` (excluding
      id-suffixed fields) plus linked subscription/product columns.
    """
    # Subscriptions: base schema only, without *_id fields
    subscription_columns = _column_definitions_from_schema(SubscriptionBaseSchema)
    subscription_columns.extend(_get_resource_type_columns())
    subscription_columns.extend(_get_linked_product_column(prefix=ProductColumnPrefix.SUBSCRIPTION_PREFIX))

    # Processes: base process search schema fields + linked subscription/product
    process_columns = _column_definitions_from_schema(ProcessSchema, include_fields=_PROCESS_INCLUDE_FIELDS)
    process_columns = _ensure_columns(process_columns, _WORKFLOW_EXTRA_COLUMNS)
    process_columns.extend(_get_linked_subscription_column())
    process_columns.extend(_get_linked_product_column())

    return {
        "subscriptions": subscription_columns,
        "workflows": process_columns,
        "tasks": process_columns,
    }

_linked_subscription_filter_fields()

Return the set of linked subscription field names for filter validation.

Source code in gso/api/v1/search.py
def _linked_subscription_filter_fields() -> set[str]:
    """Return the set of linked subscription field names for filter validation."""
    return {col["field"] for col in _get_linked_subscription_column()}

_linked_product_filter_fields(*, prefix=ProductColumnPrefix.PROCESS_PREFIX)

Return the set of linked product field names for filter validation.

Source code in gso/api/v1/search.py
def _linked_product_filter_fields(*, prefix: ProductColumnPrefix = ProductColumnPrefix.PROCESS_PREFIX) -> set[str]:
    """Return the set of linked product field names for filter validation."""
    return {col["field"] for col in _get_linked_product_column(prefix=prefix)}

search_subscriptions(payload) async

Search subscriptions using Solr.

Source code in gso/api/v1/search.py
@router.post("/subscriptions", response_model=SubscriptionSearchResponse, status_code=status.HTTP_200_OK)
async def search_subscriptions(payload: SubscriptionSearchRequest) -> SubscriptionSearchResponse:
    """Search subscriptions using Solr."""
    size = max(1, min(payload.size, 200))
    start = max(0, payload.start)

    sort_models = _validate_sort_fields(payload.sort_by)
    filter_models = _validate_filter_fields(payload.filter_by)

    try:
        solr_client = get_solr_client()
        page = solr_client.search_subscription_ids(
            payload.query,
            filter_by=filter_models,
            sort_by=sort_models,
            start=start,
            rows=size,
        )
    except SolrDisabledError as exc:
        raise HTTPException(status.HTTP_503_SERVICE_UNAVAILABLE, "Solr search is disabled") from exc
    except (SolrUnavailableError, SolrQueryError) as exc:
        raise HTTPException(status.HTTP_502_BAD_GATEWAY, f"Solr search failed: {exc}") from exc

    if not page.ids:
        return SubscriptionSearchResponse(total=page.hits, start=start, size=size, items=[])

    subscriptions = get_subscriptions(subscription_ids=page.ids)
    return SubscriptionSearchResponse(total=page.hits, start=start, size=size, items=subscriptions)

search_tasks(payload) async

Search processes using Solr and return matching processes enriched with related data.

Source code in gso/api/v1/search.py
@router.post("/processes", response_model=ProcessSearchResponse, status_code=status.HTTP_200_OK)
async def search_tasks(payload: ProcessSearchRequest) -> ProcessSearchResponse:
    """Search processes using Solr and return matching processes enriched with related data."""
    size = max(1, min(payload.size, 200))
    start = max(0, payload.start)

    sort_models = _validate_task_sort_fields(payload.sort_by)
    filter_models = _validate_task_filter_fields(payload.filter_by or [])

    try:
        solr_client = get_solr_client()
        page = solr_client.search_process_ids(
            payload.query,
            filter_by=filter_models,
            sort_by=sort_models,
            start=start,
            rows=size,
        )
    except SolrDisabledError as exc:
        raise HTTPException(status.HTTP_503_SERVICE_UNAVAILABLE, "Solr search is disabled") from exc
    except (SolrUnavailableError, SolrQueryError) as exc:
        raise HTTPException(status.HTTP_502_BAD_GATEWAY, f"Solr search failed: {exc}") from exc

    if not page.ids:
        return ProcessSearchResponse(total=page.hits, start=start, size=size, items=[])

    process_ids = [UUID(pid) for pid in page.ids]
    detailed_flag = False
    rows = get_enrichable_processes_by_ids(process_ids)
    processes = [
        p.model_dump() if hasattr(p, "model_dump") else p.dict()
        for p in [_enrich_process(row, detailed_flag) for row in rows]
    ]

    return ProcessSearchResponse(total=page.hits, start=start, size=size, items=processes)