Skip to content

Client

Solr client wrapper providing retry logic and project-specific helpers.

Lucene and Solr both reserve a set of operator characters that must be escaped when constructing user-facing queries. This module exposes two collections that enumerate these characters.

The reserved operator groups are:

TWO_CHAR_RESERVED:
    &&, ||

SINGLE_RESERVED:
    [, -, ^, ], +, {, :, ", ?, /, ~, *, (, }, !, )

A backslash character is also reserved and must always be escaped.

SearchPage dataclass

Represents a page of search results from Solr.

Source code in gso/services/solr/client.py
@dataclass(frozen=True)
class SearchPage:
    """Represents a page of search results from Solr."""

    ids: list[str]
    hits: int
    start: int
    rows: int
    docs: list[dict[str, Any]]

SolrDisabledError

Bases: RuntimeError

Raised when Solr is disabled in configuration.

Source code in gso/services/solr/client.py
class SolrDisabledError(RuntimeError):
    """Raised when Solr is disabled in configuration."""

SolrQueryError

Bases: RuntimeError

Raised when Solr query fails or returns an error.

Source code in gso/services/solr/client.py
class SolrQueryError(RuntimeError):
    """Raised when Solr query fails or returns an error."""

SolrUnavailableError

Bases: RuntimeError

Raised when Solr is not reachable or query fails.

Source code in gso/services/solr/client.py
class SolrUnavailableError(RuntimeError):
    """Raised when Solr is not reachable or query fails."""

SolrClient

Thin, retrying wrapper around pysolr with project-friendly helpers.

Source code in gso/services/solr/client.py
class SolrClient:
    """Thin, retrying wrapper around `pysolr` with project-friendly helpers."""

    def __init__(self, config: SolrParams) -> None:
        """Initialize the Solr client with the given configuration."""
        self.url = config.url
        self.enabled = config.enabled
        self.timeout = config.timeout
        self.always_commit = config.always_commit
        self.max_retries = config.max_retries
        self.backoff_seconds = config.backoff_seconds
        auth = None
        if getattr(config, "username", None) and getattr(config, "password", None):
            auth = (config.username, config.password)
        self._client = pysolr.Solr(self.url, always_commit=self.always_commit, timeout=self.timeout, auth=auth)
        self._default_qf = "fqdn_search^5 _text_^1"

    def _retry(self, fn: Callable, *args: Any, **kwargs: Any) -> SearchPage:
        last_exc: Exception | None = None
        for attempt in range(self.max_retries + 1):
            try:
                return fn(*args, **kwargs)
            except Exception as exc:  # noqa: BLE001
                last_exc = exc
                if attempt < self.max_retries:
                    time.sleep(self.backoff_seconds * (2**attempt))
                else:
                    break

        if last_exc is None:
            msg = "No exception raised, but retry failed"
            raise RuntimeError(msg)

        raise last_exc

    def ping(self) -> bool:
        """Check if Solr is reachable and responsive."""
        if not self.enabled:
            return False

        try:
            self._retry(self._client.ping)
        except Exception as exc:  # noqa: BLE001
            logger.warning("Solr ping failed", url=self.url, error=str(exc))
            return False

        return True

    def search_ids(
        self,
        *,
        query: str,
        doc_type: str,
        id_field: str,
        start: int = 0,
        rows: int = 10,
        op: str = "OR",
        sort: Iterable[str] | None = None,
        extra_fq: Iterable[str] | None = None,
        extra_fields: Iterable[str] | None = None,
        qf: str | None = None,
    ) -> SearchPage:
        """Search Solr and return just IDs plus metadata.

        Parameters:
            query: user text; will be tokenized and escaped.
            doc_type: value for `type:...` filter.
            id_field: the Solr field containing the id(s).
            start: start index for pagination.
            rows: number of results to return.
            op: `AND` or `OR` (Solr q.op).
            sort: List of Solr sort strings (for example, `start_date asc`).
            extra_fq: additional filter queries.
            extra_fields: additional `fl` to request.
            qf: override query fields string; defaults to `fqdn_search` with boost.

        Returns:
            SearchPage with ids, hits, docs.
        """
        if not self.enabled:
            msg = "Solr is disabled; cannot perform search."
            raise SolrDisabledError(msg)

        # Return empty results for empty/whitespace queries
        if not query or not query.strip():
            return SearchPage(ids=[], hits=0, start=start, rows=rows, docs=[])

        sanitized_query = _quote_solr_query(query)

        fl = [id_field]
        if extra_fields:
            fl.extend(extra_fields)

        fq_filters = [*list(extra_fq or []), f"type:{doc_type}"]
        deduped_fq: list[str] = []
        seen_fq: set[str] = set()
        for fq in fq_filters:
            fq_key = str(fq or "").strip().lower()
            if fq_key in seen_fq:
                continue
            seen_fq.add(fq_key)
            deduped_fq.append(str(fq))
        fq_filters = deduped_fq

        params = {
            "start": start,
            "rows": rows,
            "defType": "edismax",
            "qf": qf or self._default_qf,
            "q.op": op,
            "fq": fq_filters,
            "fl": ",".join(fl),
            "sow": "true",
            "sort": ",".join(sort) if sort else None,
            "df": "_text_",
        }
        params = {key: value for key, value in params.items() if value is not None}

        logger.debug(
            "solr.search_ids",
            url=self.url,
            doc_type=doc_type,
            id_field=id_field,
            start=start,
            rows=rows,
            defType="edismax",
            qf=params.get("qf"),
            op=op,
            sort=sort,
            fq=fq_filters,
        )

        try:
            res = self._retry(self._client.search, sanitized_query, **params)
        except pysolr.SolrError as exc:
            logger.exception("Solr query error", error=str(exc), params=params)
            raise SolrQueryError(str(exc)) from exc
        except Exception as exc:  # transport/unavailable
            logger.exception("Solr unavailable", error=str(exc), params=params)
            raise SolrUnavailableError(str(exc)) from exc

        # Flatten `id_field` (can be str or list)
        ids: list[str] = []
        docs: list[dict[str, Any]] = []
        for doc in res.docs:
            docs.append(doc)
            raw = doc.get(id_field)
            if raw is None:
                continue
            if isinstance(raw, list):
                ids.extend(str(v) for v in raw if not isinstance(v, dict))
            elif not isinstance(raw, dict):
                ids.append(str(raw))

        return SearchPage(ids=ids, hits=res.hits, start=start, rows=rows, docs=docs)

    def search_subscription_ids(
        self, query: str, *, filter_by: list[Filter], sort_by: list[Sort], start: int = 0, rows: int = 10
    ) -> SearchPage:
        """Search for subscription IDs in Solr."""
        extra_fq = solr_filter_strings(filter_by)
        sort = solr_sort_strings(sort_by)
        return self.search_ids(
            query=query,
            doc_type="subscription",
            id_field="subscription_id",
            start=start,
            rows=rows,
            extra_fq=extra_fq,
            sort=sort,
        )

    def search_process_ids(
        self, query: str, *, filter_by: list[Filter], sort_by: list[Sort], start: int = 0, rows: int = 10
    ) -> SearchPage:
        """Search for process IDs in Solr."""
        extra_fq = solr_filter_strings(filter_by)
        sort = solr_sort_strings(sort_by)
        return self.search_ids(
            query=query,
            doc_type="task",
            id_field="process_id",
            start=start,
            rows=rows,
            extra_fq=extra_fq,
            sort=sort,
        )

__init__(config)

Initialize the Solr client with the given configuration.

Source code in gso/services/solr/client.py
def __init__(self, config: SolrParams) -> None:
    """Initialize the Solr client with the given configuration."""
    self.url = config.url
    self.enabled = config.enabled
    self.timeout = config.timeout
    self.always_commit = config.always_commit
    self.max_retries = config.max_retries
    self.backoff_seconds = config.backoff_seconds
    auth = None
    if getattr(config, "username", None) and getattr(config, "password", None):
        auth = (config.username, config.password)
    self._client = pysolr.Solr(self.url, always_commit=self.always_commit, timeout=self.timeout, auth=auth)
    self._default_qf = "fqdn_search^5 _text_^1"

ping()

Check if Solr is reachable and responsive.

Source code in gso/services/solr/client.py
def ping(self) -> bool:
    """Check if Solr is reachable and responsive."""
    if not self.enabled:
        return False

    try:
        self._retry(self._client.ping)
    except Exception as exc:  # noqa: BLE001
        logger.warning("Solr ping failed", url=self.url, error=str(exc))
        return False

    return True

search_ids(*, query, doc_type, id_field, start=0, rows=10, op='OR', sort=None, extra_fq=None, extra_fields=None, qf=None)

Search Solr and return just IDs plus metadata.

Parameters:

Name Type Description Default
query str

user text; will be tokenized and escaped.

required
doc_type str

value for type:... filter.

required
id_field str

the Solr field containing the id(s).

required
start int

start index for pagination.

0
rows int

number of results to return.

10
op str

AND or OR (Solr q.op).

'OR'
sort Iterable[str] | None

List of Solr sort strings (for example, start_date asc).

None
extra_fq Iterable[str] | None

additional filter queries.

None
extra_fields Iterable[str] | None

additional fl to request.

None
qf str | None

override query fields string; defaults to fqdn_search with boost.

None

Returns:

Type Description
SearchPage

SearchPage with ids, hits, docs.

Source code in gso/services/solr/client.py
def search_ids(
    self,
    *,
    query: str,
    doc_type: str,
    id_field: str,
    start: int = 0,
    rows: int = 10,
    op: str = "OR",
    sort: Iterable[str] | None = None,
    extra_fq: Iterable[str] | None = None,
    extra_fields: Iterable[str] | None = None,
    qf: str | None = None,
) -> SearchPage:
    """Search Solr and return just IDs plus metadata.

    Parameters:
        query: user text; will be tokenized and escaped.
        doc_type: value for `type:...` filter.
        id_field: the Solr field containing the id(s).
        start: start index for pagination.
        rows: number of results to return.
        op: `AND` or `OR` (Solr q.op).
        sort: List of Solr sort strings (for example, `start_date asc`).
        extra_fq: additional filter queries.
        extra_fields: additional `fl` to request.
        qf: override query fields string; defaults to `fqdn_search` with boost.

    Returns:
        SearchPage with ids, hits, docs.
    """
    if not self.enabled:
        msg = "Solr is disabled; cannot perform search."
        raise SolrDisabledError(msg)

    # Return empty results for empty/whitespace queries
    if not query or not query.strip():
        return SearchPage(ids=[], hits=0, start=start, rows=rows, docs=[])

    sanitized_query = _quote_solr_query(query)

    fl = [id_field]
    if extra_fields:
        fl.extend(extra_fields)

    fq_filters = [*list(extra_fq or []), f"type:{doc_type}"]
    deduped_fq: list[str] = []
    seen_fq: set[str] = set()
    for fq in fq_filters:
        fq_key = str(fq or "").strip().lower()
        if fq_key in seen_fq:
            continue
        seen_fq.add(fq_key)
        deduped_fq.append(str(fq))
    fq_filters = deduped_fq

    params = {
        "start": start,
        "rows": rows,
        "defType": "edismax",
        "qf": qf or self._default_qf,
        "q.op": op,
        "fq": fq_filters,
        "fl": ",".join(fl),
        "sow": "true",
        "sort": ",".join(sort) if sort else None,
        "df": "_text_",
    }
    params = {key: value for key, value in params.items() if value is not None}

    logger.debug(
        "solr.search_ids",
        url=self.url,
        doc_type=doc_type,
        id_field=id_field,
        start=start,
        rows=rows,
        defType="edismax",
        qf=params.get("qf"),
        op=op,
        sort=sort,
        fq=fq_filters,
    )

    try:
        res = self._retry(self._client.search, sanitized_query, **params)
    except pysolr.SolrError as exc:
        logger.exception("Solr query error", error=str(exc), params=params)
        raise SolrQueryError(str(exc)) from exc
    except Exception as exc:  # transport/unavailable
        logger.exception("Solr unavailable", error=str(exc), params=params)
        raise SolrUnavailableError(str(exc)) from exc

    # Flatten `id_field` (can be str or list)
    ids: list[str] = []
    docs: list[dict[str, Any]] = []
    for doc in res.docs:
        docs.append(doc)
        raw = doc.get(id_field)
        if raw is None:
            continue
        if isinstance(raw, list):
            ids.extend(str(v) for v in raw if not isinstance(v, dict))
        elif not isinstance(raw, dict):
            ids.append(str(raw))

    return SearchPage(ids=ids, hits=res.hits, start=start, rows=rows, docs=docs)

search_subscription_ids(query, *, filter_by, sort_by, start=0, rows=10)

Search for subscription IDs in Solr.

Source code in gso/services/solr/client.py
def search_subscription_ids(
    self, query: str, *, filter_by: list[Filter], sort_by: list[Sort], start: int = 0, rows: int = 10
) -> SearchPage:
    """Search for subscription IDs in Solr."""
    extra_fq = solr_filter_strings(filter_by)
    sort = solr_sort_strings(sort_by)
    return self.search_ids(
        query=query,
        doc_type="subscription",
        id_field="subscription_id",
        start=start,
        rows=rows,
        extra_fq=extra_fq,
        sort=sort,
    )

search_process_ids(query, *, filter_by, sort_by, start=0, rows=10)

Search for process IDs in Solr.

Source code in gso/services/solr/client.py
def search_process_ids(
    self, query: str, *, filter_by: list[Filter], sort_by: list[Sort], start: int = 0, rows: int = 10
) -> SearchPage:
    """Search for process IDs in Solr."""
    extra_fq = solr_filter_strings(filter_by)
    sort = solr_sort_strings(sort_by)
    return self.search_ids(
        query=query,
        doc_type="task",
        id_field="process_id",
        start=start,
        rows=rows,
        extra_fq=extra_fq,
        sort=sort,
    )

solr_filter_strings(filters)

Convert filters to Solr filter query strings.

Handles splitting simple values on '-' or '|' (interpreted as OR) and joining with 'or', and converts field names to snake case. Values that look like range/date expressions (contain ' TO ', start with brackets, or include ':') are left untouched so their hyphens are not split.

Source code in gso/services/solr/client.py
def solr_filter_strings(filters: list[Filter]) -> list[str]:
    """Convert filters to Solr filter query strings.

    Handles splitting simple values on '-' or '|' (interpreted as OR) and joining
    with 'or', and converts field names to snake case. Values that look like
    range/date expressions (contain ' TO ', start with brackets, or include ':')
    are left untouched so their hyphens are not split.
    """
    fq = []
    re_split = re.compile(r"[-|]")
    for f in filters:
        field = getattr(f, "field", None) or getattr(f, "name", None)
        value = getattr(f, "value", None)
        if not field or value is None:
            continue
        field_snake = camel_to_snake_case(str(field))
        value_str = str(value)
        # Only split on '-' or '|' when the value is a simple token; avoid
        # splitting range/date expressions that include brackets, colons, or
        # explicit TO clauses.
        if re_split.search(value_str) and not (
            " TO " in value_str or value_str.strip().startswith(("[", "{", "(")) or ":" in value_str
        ):
            parts = re_split.split(value_str)
            joined = " or ".join(p.strip() for p in parts if p.strip())
            fq.append(f"{field_snake}:({joined})")
        else:
            fq.append(f"{field_snake}:{value_str}")
    return fq

solr_sort_strings(sort_by)

Convert sort instructions to Solr-compatible sort strings.

Converts field names to snake case and formats as field order.

Source code in gso/services/solr/client.py
def solr_sort_strings(sort_by: list[Sort]) -> list[str]:
    """Convert sort instructions to Solr-compatible sort strings.

    Converts field names to snake case and formats as `field order`.
    """
    sort = []
    for s in sort_by:
        field = getattr(s, "field", None) or getattr(s, "name", None)
        order = getattr(s, "order", None) or getattr(s, "direction", None)
        if not field or not order:
            continue
        # Convert `camelCase` to snake case for Solr
        field_snake = camel_to_snake_case(str(field))
        order_str = str(order).lower() if not hasattr(order, "value") else str(order.value).lower()
        if order_str in {"asc", "desc"}:
            sort.append(f"{field_snake} {order_str}")
        else:
            sort.append(f"{field_snake} asc")
    return sort

_quote_solr_query(query)

Wrap the query in double quotes and escape embedded quotes.

Special-case a bare '*' to preserve wildcard semantics.

Source code in gso/services/solr/client.py
def _quote_solr_query(query: str) -> str:
    """Wrap the query in double quotes and escape embedded quotes.

    Special-case a bare '*' to preserve wildcard semantics.
    """
    trimmed = query.strip()
    if trimmed == "*":
        return "*"
    if trimmed.startswith('"') and trimmed.endswith('"') and len(trimmed) >= SPECIAL_QUOTED_MIN_LENGTH:
        return trimmed
    escaped = trimmed.replace('"', r"\"")
    return f'"{escaped}"'

get_solr_client()

Get the singleton SolrClient instance, initializing it if necessary.

Source code in gso/services/solr/client.py
def get_solr_client() -> SolrClient:
    """Get the singleton SolrClient instance, initializing it if necessary."""
    global _client_singleton  # noqa: PLW0603
    if _client_singleton is None:
        config = load_oss_params().SOLR
        _client_singleton = SolrClient(config)
        logger.info("Initialized SolrClient", url=_client_singleton.url, enabled=_client_singleton.enabled)
    return _client_singleton