Skip to content

Client

Solr client wrapper with retry logic and project-specific helpers.

logger = structlog.get_logger(__name__) module-attribute

Lucene/Solr reserved operators (multi-char first). _TWO_CHAR_RESERVED = {"&&", "||"} _SINGLE_RESERVED = {"[", "\", "-", "^", "]", "+", "{", ":", '"', "?", "/", "~", "*", "(", "}", "!", ")"}

SearchPage dataclass

Represents a page of search results from Solr.

Source code in gso/services/solr/client.py
@dataclass(frozen=True)
class SearchPage:
    """Represents a page of search results from Solr."""

    ids: list[str]
    hits: int
    start: int
    rows: int
    docs: list[dict[str, Any]]

SolrDisabledError

Bases: RuntimeError

Raised when Solr is disabled in configuration.

Source code in gso/services/solr/client.py
class SolrDisabledError(RuntimeError):
    """Raised when Solr is disabled in configuration."""

SolrQueryError

Bases: RuntimeError

Raised when Solr query fails or returns an error.

Source code in gso/services/solr/client.py
class SolrQueryError(RuntimeError):
    """Raised when Solr query fails or returns an error."""

SolrUnavailableError

Bases: RuntimeError

Raised when Solr is not reachable or query fails.

Source code in gso/services/solr/client.py
class SolrUnavailableError(RuntimeError):
    """Raised when Solr is not reachable or query fails."""

SolrClient

Thin, retrying wrapper around pysolr with project-friendly helpers.

Source code in gso/services/solr/client.py
class SolrClient:
    """Thin, retrying wrapper around pysolr with project-friendly helpers."""

    def __init__(self, config: SolrParams) -> None:
        """Initialize the Solr client with given configuration."""
        self.url = config.url
        self.enabled = config.enabled
        self.timeout = config.timeout
        self.always_commit = config.always_commit
        self.max_retries = config.max_retries
        self.backoff_seconds = config.backoff_seconds
        self._client = pysolr.Solr(self.url, always_commit=self.always_commit, timeout=self.timeout)

    def _retry(self, fn: Callable, *args: Any, **kwargs: Any) -> SearchPage:
        last_exc: Exception | None = None
        for attempt in range(self.max_retries + 1):
            try:
                return fn(*args, **kwargs)
            except Exception as exc:  # noqa: BLE001
                last_exc = exc
                if attempt < self.max_retries:
                    time.sleep(self.backoff_seconds * (2**attempt))
                else:
                    break

        if last_exc is None:
            msg = "No exception raised, but retry failed"
            raise RuntimeError(msg)

        raise last_exc

    def ping(self) -> bool:
        """Check if Solr is reachable and responsive."""
        if not self.enabled:
            return False

        try:
            self._retry(self._client.ping)
        except Exception as exc:  # noqa: BLE001
            logger.warning("Solr ping failed", url=self.url, error=str(exc))
            return False

        return True

    def search_ids(
        self,
        *,
        query: str,
        doc_type: str,
        id_field: str,
        start: int = 0,
        rows: int = 10,
        default_field: str = "_text_",
        op: str = "OR",
        add_wildcards: bool = True,
        sort: Iterable[str] | None = None,
        extra_fq: Iterable[str] | None = None,
        extra_fields: Iterable[str] | None = None,
    ) -> SearchPage:
        """Search Solr and return just IDs + metadata.

        Args:
            query: user text; will be tokenized & escaped.
            doc_type: value for `type:...` filter.
            id_field: the solr field containing the id(s).
            start: start index for pagination.
            rows: number of results to return.
            default_field: `df` parameter for Solr query.
            op: 'AND' or 'OR' (Solr q.op).
            add_wildcards: wrap each token with *...* for contains-like search.
            sort: List of Solr sort strings (e.g., ['start_date asc']).
            extra_fq: additional filter queries.
            extra_fields: additional 'fl' to request.


        Returns:
            SearchPage with ids, hits, docs.
        """
        if not self.enabled:
            msg = "Solr is disabled; cannot perform search."
            raise SolrDisabledError(msg)

        # Return empty results for empty/whitespace queries
        if not query or not query.strip():
            return SearchPage(ids=[], hits=0, start=start, rows=rows, docs=[])

        fl = [id_field]
        if extra_fields:
            fl.extend(extra_fields)

        params = {
            "start": start,
            "rows": rows,
            "df": default_field,
            "q.op": op,
            "fq": [*list(extra_fq or []), f"type:{doc_type}"],
            "fl": ",".join(fl),
            "sow": "true",
            "sort": ",".join(sort) if sort else None,
        }

        logger.debug(
            "solr.search_ids",
            url=self.url,
            doc_type=doc_type,
            id_field=id_field,
            start=start,
            rows=rows,
            df=default_field,
            op=op,
            add_wildcards=add_wildcards,
            sort=sort,
            fq=[*list(extra_fq or []), f"type:{doc_type}"],
        )

        try:
            res = self._retry(self._client.search, query, **params)
        except pysolr.SolrError as exc:
            logger.exception("Solr query error", error=str(exc), params=params)
            raise SolrQueryError(str(exc)) from exc
        except Exception as exc:  # transport/unavailable
            logger.exception("Solr unavailable", error=str(exc), params=params)
            raise SolrUnavailableError(str(exc)) from exc

        # Flatten `id_field` (can be str or list)
        ids: list[str] = []
        docs: list[dict[str, Any]] = []
        for doc in res.docs:
            docs.append(doc)
            raw = doc.get(id_field)
            if raw is None:
                continue
            if isinstance(raw, list):
                ids.extend(str(v) for v in raw if not isinstance(v, dict))
            elif not isinstance(raw, dict):
                ids.append(str(raw))

        return SearchPage(ids=ids, hits=res.hits, start=start, rows=rows, docs=docs)

    def search_subscription_ids(
        self, query: str, *, filter_by: list[Filter], sort_by: list[Sort], start: int = 0, rows: int = 10
    ) -> SearchPage:
        """Search for subscription IDs in Solr."""
        extra_fq = solr_filter_strings(filter_by)
        sort = solr_sort_strings(sort_by)
        return self.search_ids(
            query=query,
            doc_type="subscription",
            id_field="subscription_id",
            start=start,
            rows=rows,
            extra_fq=extra_fq,
            sort=sort,
        )

    def search_process_ids(
        self, query: str, *, filter_by: list[Filter], sort_by: list[Sort], start: int = 0, rows: int = 10
    ) -> SearchPage:
        """Search for process IDs in Solr."""
        extra_fq = solr_filter_strings(filter_by)
        sort = solr_sort_strings(sort_by)
        return self.search_ids(
            query=query,
            doc_type="task",
            id_field="process_id",
            start=start,
            rows=rows,
            extra_fq=extra_fq,
            sort=sort,
        )

__init__(config)

Initialize the Solr client with given configuration.

Source code in gso/services/solr/client.py
def __init__(self, config: SolrParams) -> None:
    """Initialize the Solr client with given configuration."""
    self.url = config.url
    self.enabled = config.enabled
    self.timeout = config.timeout
    self.always_commit = config.always_commit
    self.max_retries = config.max_retries
    self.backoff_seconds = config.backoff_seconds
    self._client = pysolr.Solr(self.url, always_commit=self.always_commit, timeout=self.timeout)

ping()

Check if Solr is reachable and responsive.

Source code in gso/services/solr/client.py
def ping(self) -> bool:
    """Check if Solr is reachable and responsive."""
    if not self.enabled:
        return False

    try:
        self._retry(self._client.ping)
    except Exception as exc:  # noqa: BLE001
        logger.warning("Solr ping failed", url=self.url, error=str(exc))
        return False

    return True

search_ids(*, query, doc_type, id_field, start=0, rows=10, default_field='_text_', op='OR', add_wildcards=True, sort=None, extra_fq=None, extra_fields=None)

Search Solr and return just IDs + metadata.

Parameters:

Name Type Description Default
query str

user text; will be tokenized & escaped.

required
doc_type str

value for type:... filter.

required
id_field str

the solr field containing the id(s).

required
start int

start index for pagination.

0
rows int

number of results to return.

10
default_field str

df parameter for Solr query.

'_text_'
op str

'AND' or 'OR' (Solr q.op).

'OR'
add_wildcards bool

wrap each token with ... for contains-like search.

True
sort Iterable[str] | None

List of Solr sort strings (e.g., ['start_date asc']).

None
extra_fq Iterable[str] | None

additional filter queries.

None
extra_fields Iterable[str] | None

additional 'fl' to request.

None

Returns:

Type Description
SearchPage

SearchPage with ids, hits, docs.

Source code in gso/services/solr/client.py
def search_ids(
    self,
    *,
    query: str,
    doc_type: str,
    id_field: str,
    start: int = 0,
    rows: int = 10,
    default_field: str = "_text_",
    op: str = "OR",
    add_wildcards: bool = True,
    sort: Iterable[str] | None = None,
    extra_fq: Iterable[str] | None = None,
    extra_fields: Iterable[str] | None = None,
) -> SearchPage:
    """Search Solr and return just IDs + metadata.

    Args:
        query: user text; will be tokenized & escaped.
        doc_type: value for `type:...` filter.
        id_field: the solr field containing the id(s).
        start: start index for pagination.
        rows: number of results to return.
        default_field: `df` parameter for Solr query.
        op: 'AND' or 'OR' (Solr q.op).
        add_wildcards: wrap each token with *...* for contains-like search.
        sort: List of Solr sort strings (e.g., ['start_date asc']).
        extra_fq: additional filter queries.
        extra_fields: additional 'fl' to request.


    Returns:
        SearchPage with ids, hits, docs.
    """
    if not self.enabled:
        msg = "Solr is disabled; cannot perform search."
        raise SolrDisabledError(msg)

    # Return empty results for empty/whitespace queries
    if not query or not query.strip():
        return SearchPage(ids=[], hits=0, start=start, rows=rows, docs=[])

    fl = [id_field]
    if extra_fields:
        fl.extend(extra_fields)

    params = {
        "start": start,
        "rows": rows,
        "df": default_field,
        "q.op": op,
        "fq": [*list(extra_fq or []), f"type:{doc_type}"],
        "fl": ",".join(fl),
        "sow": "true",
        "sort": ",".join(sort) if sort else None,
    }

    logger.debug(
        "solr.search_ids",
        url=self.url,
        doc_type=doc_type,
        id_field=id_field,
        start=start,
        rows=rows,
        df=default_field,
        op=op,
        add_wildcards=add_wildcards,
        sort=sort,
        fq=[*list(extra_fq or []), f"type:{doc_type}"],
    )

    try:
        res = self._retry(self._client.search, query, **params)
    except pysolr.SolrError as exc:
        logger.exception("Solr query error", error=str(exc), params=params)
        raise SolrQueryError(str(exc)) from exc
    except Exception as exc:  # transport/unavailable
        logger.exception("Solr unavailable", error=str(exc), params=params)
        raise SolrUnavailableError(str(exc)) from exc

    # Flatten `id_field` (can be str or list)
    ids: list[str] = []
    docs: list[dict[str, Any]] = []
    for doc in res.docs:
        docs.append(doc)
        raw = doc.get(id_field)
        if raw is None:
            continue
        if isinstance(raw, list):
            ids.extend(str(v) for v in raw if not isinstance(v, dict))
        elif not isinstance(raw, dict):
            ids.append(str(raw))

    return SearchPage(ids=ids, hits=res.hits, start=start, rows=rows, docs=docs)

search_subscription_ids(query, *, filter_by, sort_by, start=0, rows=10)

Search for subscription IDs in Solr.

Source code in gso/services/solr/client.py
def search_subscription_ids(
    self, query: str, *, filter_by: list[Filter], sort_by: list[Sort], start: int = 0, rows: int = 10
) -> SearchPage:
    """Search for subscription IDs in Solr."""
    extra_fq = solr_filter_strings(filter_by)
    sort = solr_sort_strings(sort_by)
    return self.search_ids(
        query=query,
        doc_type="subscription",
        id_field="subscription_id",
        start=start,
        rows=rows,
        extra_fq=extra_fq,
        sort=sort,
    )

search_process_ids(query, *, filter_by, sort_by, start=0, rows=10)

Search for process IDs in Solr.

Source code in gso/services/solr/client.py
def search_process_ids(
    self, query: str, *, filter_by: list[Filter], sort_by: list[Sort], start: int = 0, rows: int = 10
) -> SearchPage:
    """Search for process IDs in Solr."""
    extra_fq = solr_filter_strings(filter_by)
    sort = solr_sort_strings(sort_by)
    return self.search_ids(
        query=query,
        doc_type="task",
        id_field="process_id",
        start=start,
        rows=rows,
        extra_fq=extra_fq,
        sort=sort,
    )

solr_filter_strings(filters)

Convert a list of Filter objects to Solr-compatible filter query strings.

Handles splitting values on '-' or '|' and joining with 'or', and converts field names to snake_case.

Parameters:

Name Type Description Default
filters list[Filter]

List of Filter objects.

required

Returns:

Type Description
list[str]

List of Solr filter query strings.

Source code in gso/services/solr/client.py
def solr_filter_strings(filters: list[Filter]) -> list[str]:
    """Convert a list of Filter objects to Solr-compatible filter query strings.

    Handles splitting values on '-' or '|' and joining with 'or', and converts field names to snake_case.

    Args:
        filters: List of Filter objects.

    Returns:
        List of Solr filter query strings.
    """
    fq = []
    re_split = re.compile(r"[-|]")
    for f in filters:
        field = getattr(f, "field", None) or getattr(f, "name", None)
        value = getattr(f, "value", None)
        if not field or value is None:
            continue
        field_snake = camel_to_snake_case(str(field))
        # If value contains - or |, split and join with 'or'
        if re_split.search(str(value)):
            parts = re_split.split(str(value))
            joined = " or ".join(p.strip() for p in parts if p.strip())
            fq.append(f"{field_snake}:({joined})")
        else:
            fq.append(f"{field_snake}:{value}")
    return fq

solr_sort_strings(sort_by)

Convert a list of Sort objects to Solr-compatible sort strings.

Converts field names to snake_case and formats as 'field order'.

Parameters:

Name Type Description Default
sort_by list[Sort]

List of Sort objects.

required

Returns:

Type Description
list[str]

List of Solr sort strings.

Source code in gso/services/solr/client.py
def solr_sort_strings(sort_by: list[Sort]) -> list[str]:
    """Convert a list of Sort objects to Solr-compatible sort strings.

    Converts field names to snake_case and formats as 'field order'.

    Args:
        sort_by: List of Sort objects.

    Returns:
        List of Solr sort strings.
    """
    sort = []
    for s in sort_by:
        field = getattr(s, "field", None) or getattr(s, "name", None)
        order = getattr(s, "order", None) or getattr(s, "direction", None)
        if not field or not order:
            continue
        # Convert camelCase to snake_case for Solr
        field_snake = camel_to_snake_case(str(field))
        order_str = str(order).lower() if not hasattr(order, "value") else str(order.value).lower()
        if order_str in {"asc", "desc"}:
            sort.append(f"{field_snake} {order_str}")
        else:
            sort.append(f"{field_snake} asc")
    return sort

get_solr_client()

Get the singleton SolrClient instance, initializing it if necessary.

Source code in gso/services/solr/client.py
def get_solr_client() -> SolrClient:
    """Get the singleton SolrClient instance, initializing it if necessary."""
    global _client_singleton  # noqa: PLW0603
    if _client_singleton is None:
        config = load_oss_params().SOLR
        _client_singleton = SolrClient(config)
        logger.info("Initialized SolrClient", url=_client_singleton.url, enabled=_client_singleton.enabled)
    return _client_singleton