Skip to content

Indexer

Solr indexer for GSO data.

_fetch_field(schema_url, field_name)

Fetch a field definition; return None if missing.

Source code in gso/services/solr/indexer.py
def _fetch_field(schema_url: str, field_name: str) -> dict[str, Any] | None:
    """Fetch a field definition; return None if missing."""
    resp = requests.get(f"{schema_url}/fields/{field_name}?showDefaults=true", timeout=60)
    if resp.status_code == NOT_FOUND_STATUS:
        return None
    resp.raise_for_status()
    return resp.json().get("field")

get_engine()

Return a process-wide Engine, creating it on first use.

Source code in gso/services/solr/indexer.py
def get_engine() -> Engine:
    """Return a process-wide Engine, creating it on first use."""
    global _ENGINE  # noqa: PLW0603
    if _ENGINE is None:
        # created only when first needed, not at module import
        _ENGINE = create_engine(str(app_settings.DATABASE_URI))
    return _ENGINE

_to_scalar(val)

Normalize DB scalars to JSON-serializable values (mirror original behavior).

Source code in gso/services/solr/indexer.py
def _to_scalar(val: Any) -> Any:
    """Normalize DB scalars to JSON-serializable values (mirror original behavior)."""
    if isinstance(val, (int, bytes, uuid.UUID)):
        return str(val)
    return val

_rows_stream(conn, sql)

Execute a SQL statement with streaming enabled and yield dict-like rows.

Works row-by-row without buffering the whole result.

Source code in gso/services/solr/indexer.py
def _rows_stream(conn: Connection, sql: str) -> Iterator[Mapping[str, Any]]:
    """Execute a SQL statement with streaming enabled and yield dict-like rows.

    Works row-by-row without buffering the whole result.
    """
    result = conn.execution_options(stream_results=True).execute(text(sql))
    for row in result:
        yield cast(Mapping[str, Any], row._mapping)  # noqa: SLF001

_init_entry(agg_docs, sub_id, base)

Create the aggregate entry for a subscription if missing, priming multi-value lists.

Source code in gso/services/solr/indexer.py
def _init_entry(
    agg_docs: dict[str, dict[str, Any]],
    sub_id: str,
    base: Mapping[str, Any],
) -> dict[str, Any]:
    """Create the aggregate entry for a subscription if missing, priming multi-value lists."""
    entry = agg_docs.get(sub_id)
    if entry is None:
        entry = dict(base)  # copy scalar base fields
        for f in _MULTI_FIELDS:
            entry[f] = []
        agg_docs[sub_id] = entry
    return entry

_append_fields(entry, src, fields)

Append non-null values for selected fields into the entry's multi-value lists.

Source code in gso/services/solr/indexer.py
def _append_fields(entry: dict[str, Any], src: Mapping[str, Any], fields: Iterable[str]) -> None:
    """Append non-null values for selected fields into the entry's multi-value lists."""
    for f in fields:
        v = src.get(f)
        if v is not None:
            entry[f].append(v)

_dedupe_multi(entry)

De-duplicate multi-value lists while preserving order and dropping Nones.

Source code in gso/services/solr/indexer.py
def _dedupe_multi(entry: dict[str, Any]) -> None:
    """De-duplicate multi-value lists while preserving order and dropping Nones."""
    for f in _MULTI_FIELDS:
        vals = entry.get(f)
        if isinstance(vals, list) and vals:
            entry[f] = list(dict.fromkeys(x for x in vals if x is not None))

_flatten_single_subscription(doc)

Flatten a single denormalised subscription document for Solr indexing.

Source code in gso/services/solr/indexer.py
def _flatten_single_subscription(doc: dict[str, Any]) -> dict[str, Any]:
    """Flatten a single `denormalised` subscription document for Solr indexing."""
    base = {k: v for k, v in doc.items() if k not in {"subscription_instances", "dependency_instances"}}
    sub_id = base["id"]

    agg_docs: dict[str, dict[str, Any]] = {}
    entry = _init_entry(agg_docs, sub_id, base)

    for section, fields, nested_key, nested_fields in _SECTIONS:
        for inst in doc.get(section, []) or []:
            _append_fields(entry, inst, fields)
            for nested in inst.get(nested_key, []) or []:
                _append_fields(entry, nested, nested_fields)

    _dedupe_multi(entry)
    return entry

_stream_subscriptions_denormalized()

Stream and aggregate rows, then yield one document per subscription.

Query text is kept IDENTICAL to your original (no alias/order changes).

Source code in gso/services/solr/indexer.py
def _stream_subscriptions_denormalized() -> Iterator[dict[str, Any]]:
    """Stream and aggregate rows, then yield one document per subscription.

    Query text is kept IDENTICAL to your original (no alias/order changes).
    """
    query = """
        SELECT
            s.subscription_id,
            s.start_date,
            s.end_date,
            s.description,
            s.status,
            s.product_id,
            s.customer_id,
            s.insync,
            s.note,
            s.version,
            p.name AS product_name,
            p.description AS product_description,
            p.status AS product_status,
            p.product_type AS product_type,
            p.tag AS product_tag,
            partner.name AS customer_name,
            partner.email AS customer_email,
            si.subscription_instance_id,
            si.label AS instance_label,
            si.product_block_id,
            pb.name AS product_block_name,
            pb.description AS product_block_description,
            pb.status AS product_block_status,
            pb.tag AS product_block_tag,
            siv.subscription_instance_value_id,
            siv.value AS instance_value,
            rt.resource_type_id,
            rt.resource_type AS resource_type,
            rt.description AS resource_type_description,
            sm.metadata AS sm_metadata,
            si_dep.subscription_instance_id AS dep_subscription_instance_id,
            si_dep.label AS dep_instance_label,
            si_dep.product_block_id AS dep_product_block_id,
            pb_dep.name AS dep_product_block_name,
            pb_dep.description AS dep_product_block_description,
            pb_dep.status AS dep_product_block_status,
            pb_dep.tag AS dep_product_block_tag,
            siv_dep.subscription_instance_value_id AS dep_subscription_instance_value_id,
            siv_dep.value AS dep_instance_value,
            rt_dep.resource_type_id AS dep_resource_type_id,
            rt_dep.resource_type AS dep_resource_type,
            rt_dep.description AS dep_resource_type_description
        FROM subscriptions s
        LEFT JOIN products p ON s.product_id = p.product_id
        LEFT JOIN subscription_instances si ON s.subscription_id = si.subscription_id
        LEFT JOIN product_blocks pb ON si.product_block_id = pb.product_block_id
        LEFT JOIN subscription_instance_values siv ON si.subscription_instance_id = siv.subscription_instance_id
        LEFT JOIN resource_types rt ON siv.resource_type_id = rt.resource_type_id
        LEFT JOIN partners partner ON s.customer_id = partner.partner_id
        LEFT JOIN subscription_metadata sm ON s.subscription_id = sm.subscription_id
        LEFT JOIN subscription_instance_relations sir ON si.subscription_instance_id = sir.in_use_by_id
        LEFT JOIN subscription_instances si_dep ON sir.depends_on_id = si_dep.subscription_instance_id
        LEFT JOIN product_blocks pb_dep ON si_dep.product_block_id = pb_dep.product_block_id
        LEFT JOIN subscription_instance_values siv_dep ON si_dep.subscription_instance_id = siv_dep.subscription_instance_id
        LEFT JOIN resource_types rt_dep ON siv_dep.resource_type_id = rt_dep.resource_type_id
    """  # noqa: E501

    columns = SUBSCRIPTION_QUERY_COLUMNS

    with get_engine().connect() as conn:
        # subscription → base doc (no instances yet)
        subs_map: dict[str, dict[str, Any]] = collections.defaultdict(
            lambda: {
                "type": "subscription",
                "subscription_instances": [],
                "dependency_instances": [],
            }
        )

        # subscription → { `instance_id` → `instance_doc` }
        instance_map: dict[str, dict[str, dict[str, Any]]] = collections.defaultdict(
            lambda: collections.defaultdict(lambda: {"subscription_instance_values": []})
        )

        # subscription → { `dep_instance_id` → `dep_instance_doc` }
        dep_instance_map: dict[str, dict[str, dict[str, Any]]] = collections.defaultdict(
            lambda: collections.defaultdict(lambda: {"dependency_instance_values": []})
        )

        for row in _rows_stream(conn, query):
            doc = {col: _to_scalar(row[col]) if col in row else None for col in columns}

            sub_id_val = doc["subscription_id"]
            if sub_id_val is None:
                # `subscription_id` should not be NULL; skip row defensively
                continue
            sub_id: str = cast(str, sub_id_val)

            si_id_val = doc["subscription_instance_id"]
            dep_si_id_val = doc["dep_subscription_instance_id"]

            # Add subscription-level fields only once
            sub_entry = subs_map[sub_id]
            if "id" not in sub_entry:
                sub_entry.update({
                    k: doc[k]
                    for k in columns
                    if k
                    not in {
                        "subscription_instance_id",
                        "instance_label",
                        "product_block_id",
                        "product_block_name",
                        "product_block_description",
                        "product_block_status",
                        "product_block_tag",
                        "subscription_instance_value_id",
                        "instance_value",
                        "resource_type_id",
                        "resource_type",
                        "resource_type_description",
                        "dep_subscription_instance_id",
                        "dep_instance_label",
                        "dep_product_block_id",
                        "dep_product_block_name",
                        "dep_product_block_description",
                        "dep_product_block_status",
                        "dep_product_block_tag",
                        "dep_subscription_instance_value_id",
                        "dep_instance_value",
                        "dep_resource_type_id",
                        "dep_resource_type",
                        "dep_resource_type_description",
                    }
                })
                sub_entry["id"] = sub_id

            # Aggregate instance-level fields
            if si_id_val is not None:
                si_id: str = cast(str, si_id_val)
                instance_doc = instance_map[sub_id][si_id]
                if "subscription_instance_id" not in instance_doc:
                    instance_doc.update({
                        "subscription_instance_id": si_id,
                        "instance_label": doc["instance_label"],
                        "product_block_id": doc["product_block_id"],
                        "product_block_name": doc["product_block_name"],
                        "product_block_description": doc["product_block_description"],
                        "product_block_status": doc["product_block_status"],
                        "product_block_tag": doc["product_block_tag"],
                        "subscription_instance_values": [],
                    })
                if doc.get("subscription_instance_value_id") is not None:
                    instance_doc["subscription_instance_values"].append({
                        "subscription_instance_value_id": doc["subscription_instance_value_id"],
                        "instance_value": doc["instance_value"],
                        "resource_type_id": doc["resource_type_id"],
                        "resource_type": doc["resource_type"],
                        "resource_type_description": doc["resource_type_description"],
                    })

            # Aggregate dependency instance-level fields
            if dep_si_id_val is not None:
                dep_si_id: str = cast(str, dep_si_id_val)
                dep_instance_doc = dep_instance_map[sub_id][dep_si_id]
                if "dep_subscription_instance_id" not in dep_instance_doc:
                    dep_instance_doc.update({
                        "dep_subscription_instance_id": dep_si_id,
                        "dep_instance_label": doc["dep_instance_label"],
                        "dep_product_block_id": doc["dep_product_block_id"],
                        "dep_product_block_name": doc["dep_product_block_name"],
                        "dep_product_block_description": doc["dep_product_block_description"],
                        "dep_product_block_status": doc["dep_product_block_status"],
                        "dep_product_block_tag": doc["dep_product_block_tag"],
                        "dependency_instance_values": [],
                    })
                if doc.get("dep_subscription_instance_value_id") is not None:
                    dep_instance_doc["dependency_instance_values"].append({
                        "dep_subscription_instance_value_id": doc["dep_subscription_instance_value_id"],
                        "dep_instance_value": doc["dep_instance_value"],
                        "dep_resource_type_id": doc["dep_resource_type_id"],
                        "dep_resource_type": doc["dep_resource_type"],
                        "dep_resource_type_description": doc["dep_resource_type_description"],
                    })

        # Yield one document per subscription
        for sub_id, sub_doc in subs_map.items():
            sub_doc["subscription_instances"] = list(instance_map[sub_id].values())
            sub_doc["dependency_instances"] = list(dep_instance_map[sub_id].values())
            yield sub_doc

_stream_table(table, columns, type_name)

Stream a table and yield Solr-ready docs per row (no list accumulation).

Source code in gso/services/solr/indexer.py
def _stream_table(table: str, columns: list[str], type_name: str) -> Iterator[dict[str, Any]]:
    """Stream a table and yield Solr-ready docs per row (no list accumulation)."""
    sql = f"SELECT {', '.join(columns)} FROM {table}"  # noqa: S608
    with get_engine().connect() as conn:
        for row in _rows_stream(conn, sql):
            doc: dict[str, Any] = {}
            for col in columns:
                doc[col] = _to_scalar(row[col])
            doc["type"] = type_name
            if "subscription_id" in doc:
                doc["id"] = doc["subscription_id"]
            elif "product_id" in doc:
                doc["id"] = doc["product_id"]
            elif "partner_id" in doc:
                doc["id"] = doc["partner_id"]
            elif "product_block_id" in doc:
                doc["id"] = doc["product_block_id"]
            yield doc

_stream_task_denormalized()

Stream tasks; mirrors original behavior.

Source code in gso/services/solr/indexer.py
def _stream_task_denormalized() -> Iterator[dict[str, Any]]:
    """Stream tasks; mirrors original behavior."""
    query = """
        SELECT
            p.pid AS process_id,
            p.workflow_id,
            w.name AS workflow_name,
            w.target AS workflow_target,
            w.description AS workflow_description,
            w.created_at AS workflow_created_at,
            w.deleted_at AS workflow_deleted_at,
            w.is_task AS workflow_is_task,
            p.is_task,
            p.created_by,
            p.failed_reason,
            p.started_at,
            p.last_status,
            p.last_step,
            p.assignee,
            p.last_modified_at,
            p.traceback,
            ps.stepid AS step_id,
            ps.name AS step_name,
            ps.status AS step_status,
            ps.created_by AS step_created_by,
            ps.started_at AS step_started_at,
            ps.completed_at AS step_completed_at,
            ps.state AS step_state,
            ps.commit_hash AS step_commit_hash,
            s.subscription_id AS linked_subscription_id,
            s.start_date AS linked_subscription_start_date,
            s.end_date AS linked_subscription_end_date,
            s.description AS linked_subscription_description,
            s.status AS linked_subscription_status,
            s.product_id AS linked_subscription_product_id,
            s.customer_id AS linked_subscription_customer_id,
            s.insync AS linked_subscription_insync,
            s.note AS linked_subscription_note,
            s.version AS linked_subscription_version,
            pdt.product_id AS linked_product_id,
            pdt.name AS linked_product_name,
            pdt.description AS linked_product_description,
            pdt.status AS linked_product_status,
            pdt.product_type AS linked_product_type,
            pdt.tag AS linked_product_tag,
            pdt.created_at AS linked_product_created_at,
            pdt.end_date AS linked_product_end_date
        FROM processes p
        LEFT JOIN workflows w ON p.workflow_id = w.workflow_id
        LEFT JOIN products_workflows pw ON w.workflow_id = pw.workflow_id
        LEFT JOIN products pdt ON pdt.product_id = pw.product_id
        LEFT JOIN process_steps ps ON p.pid = ps.pid
        LEFT JOIN processes_subscriptions psu ON p.pid = psu.pid
        LEFT JOIN subscriptions s ON psu.subscription_id = s.subscription_id
    """
    columns = TASK_QUERY_COLUMNS
    with get_engine().connect() as conn:
        for row in _rows_stream(conn, query):
            doc: dict[str, Any] = {}
            for col in columns:
                val = row[col]
                if col == "step_state" and val is not None:
                    doc["step_state_json"] = json.dumps(val, default=str) if not isinstance(val, str) else val
                elif col != "step_state":
                    doc[col] = _to_scalar(val)

            doc.pop("step_state", None)
            for k in list(doc.keys()):
                if k.startswith("step_state."):
                    del doc[k]

            doc["type"] = "task"
            doc["id"] = doc["process_id"]
            yield doc

stream_all_data()

Stream all documents needed for Solr indexing.

  • subscriptions (denormalised and flattened)
  • products, product_blocks, partners
  • tasks
Source code in gso/services/solr/indexer.py
def stream_all_data() -> Iterator[dict[str, Any]]:
    """Stream all documents needed for Solr indexing.

    - subscriptions (`denormalised` and flattened)
    - products, `product_blocks`, partners
    - tasks
    """
    for sub_doc in _stream_subscriptions_denormalized():
        yield _flatten_single_subscription(sub_doc)

    yield from _stream_table("products", ["product_id", "name", "description", "status"], "product")
    yield from _stream_table("product_blocks", ["product_block_id", "name", "description", "status"], "product_block")
    yield from _stream_table("partners", ["partner_id", "name", "email"], "customer")

    yield from _stream_task_denormalized()

post_to_solr(docs, batch_size)

Post documents to Solr in batches.

Accepts any iterable (e.g., a generator) and never holds more than batch_size docs in memory.

Source code in gso/services/solr/indexer.py
def post_to_solr(docs: Iterable[dict[str, Any]], batch_size: int) -> None:
    """Post documents to Solr in batches.

    Accepts any `iterable` (e.g., a generator) and never holds more than `batch_size` docs in memory.
    """
    headers = {"Content-Type": "application/json"}

    with requests.Session() as session:
        for batch_no, chunk in enumerate(batched(docs, batch_size), start=1):
            batch = list(chunk)
            resp = session.post(
                SOLR_UPDATE_URL,
                data=json.dumps(batch, default=str),
                headers=headers,
                timeout=60,
            )
            if not resp.ok:
                logger.error("Error indexing batch %s: %s %s", batch_no, resp.status_code, resp.text)
                resp.raise_for_status()

            logger.info("Indexed %s documents to Solr (batch %s).", len(batch), batch_no)

    logger.info("Reindex complete.")

ensure_solr_copy_field()

Ensure the catch-all copy-field to _text_ exists on the configured core.

Source code in gso/services/solr/indexer.py
def ensure_solr_copy_field() -> None:
    """Ensure the catch-all copy-field to `_text_` exists on the configured core."""
    # First, retrieve the current copy fields from Solr schema
    headers = {"Content-Type": "application/json"}
    response = requests.get(f"{SOLR_SCHEMA_URL}/copyfields", headers=headers, timeout=60)
    response.raise_for_status()
    copyfields = response.json().get("copyFields", [])
    # Check if the catch-all copy-field exists
    exists = any(
        cf.get("source") == "*" and (cf.get("dest") == ["_text_"] or cf.get("dest") == "_text_") for cf in copyfields
    )
    if not exists:
        payload = {"add-copy-field": {"source": "*", "dest": ["_text_"]}}
        response = requests.post(SOLR_SCHEMA_URL, data=json.dumps(payload), headers=headers, timeout=60)
        response.raise_for_status()

ensure_fqdn_schema()

Ensure FQDN-friendly analyzers and field types exist on the Solr core.

  • Adds the fqdn_text field type (lowercase + ClassicTokenizer + EdgeNGram for indexing).
  • Replaces schema for description and instance_value to fqdn_text with correct flags.
Source code in gso/services/solr/indexer.py
def ensure_fqdn_schema() -> None:  # noqa: PLR0912, PLR0915
    """Ensure FQDN-friendly analyzers and field types exist on the Solr core.

    - Adds the `fqdn_text` field type (lowercase + ClassicTokenizer + EdgeNGram for indexing).
    - Replaces schema for description and `instance_value` to `fqdn_text` with correct flags.
    """
    headers = {"Content-Type": "application/json"}

    # Ensure field type exists (idempotent via pre-check)
    ft_resp = requests.get(f"{SOLR_SCHEMA_URL}/fieldtypes", headers=headers, timeout=60)
    ft_resp.raise_for_status()
    field_types = ft_resp.json().get("fieldTypes", [])
    if not any(ft.get("name") == _FQDN_FIELD_TYPE_NAME for ft in field_types):
        resp = requests.post(SOLR_SCHEMA_URL, data=json.dumps(_FQDN_FIELD_TYPE_PAYLOAD), headers=headers, timeout=60)
        resp.raise_for_status()
        logger.info("Added Solr fieldType %s", _FQDN_FIELD_TYPE_NAME)
    else:
        logger.info("Solr fieldType %s already present; skipping add", _FQDN_FIELD_TYPE_NAME)

    # Replace fields to use `fqdn_text`. If the field does not yet exist, add it with same payload.
    for payload in _FQDN_REPLACE_FIELDS:
        field_name = payload["replace-field"]["name"]
        desired = payload["replace-field"]
        current = _fetch_field(SOLR_SCHEMA_URL, field_name)

        # If missing entirely, add-field and continue
        if current is None:
            add_payload = {"add-field": desired}
            resp = requests.post(SOLR_SCHEMA_URL, data=json.dumps(add_payload), headers=headers, timeout=60)
            if not resp.ok:
                logger.error("Failed to add missing field %s: %s", field_name, resp.text)
            resp.raise_for_status()
            logger.info("Added missing field %s with fqdn_text", field_name)
            continue

        # Default `multiValued` comparison to False if absent to align with Solr defaults
        def _boolish(d: dict[str, Any], key: str) -> bool:
            val = d.get(key)
            return bool(val) if val is not None else False

        if (
            current.get("type") == desired.get("type")
            and current.get("stored") == desired.get("stored")
            and current.get("indexed") == desired.get("indexed")
            and _boolish(current, "multiValued") == _boolish(desired, "multiValued")
        ):
            logger.info("Field %s already matches desired fqdn schema; skipping.", field_name)
            continue

        resp = requests.post(SOLR_SCHEMA_URL, data=json.dumps(payload), headers=headers, timeout=60)
        if not resp.ok:
            logger.error("Failed to apply schema change for field %s: %s", field_name, resp.text)
        resp.raise_for_status()
        logger.info("Applied schema change for field %s", field_name)

    # Ensure `fqdn_substring` `fieldType` and `fqdn_search` field exist (idempotent).
    if not any(ft.get("name") == _FQDN_SUBSTRING_FIELD_TYPE_NAME for ft in field_types):
        resp = requests.post(
            SOLR_SCHEMA_URL,
            data=json.dumps(_FQDN_SUBSTRING_FIELD_TYPE_PAYLOAD),
            headers=headers,
            timeout=60,
        )
        resp.raise_for_status()
        logger.info("Added Solr fieldType %s", _FQDN_SUBSTRING_FIELD_TYPE_NAME)
    else:
        logger.info("Solr fieldType %s already present; skipping add", _FQDN_SUBSTRING_FIELD_TYPE_NAME)

    fqdn_search = _fetch_field(SOLR_SCHEMA_URL, "fqdn_search")
    desired_fqdn = _FQDN_SEARCH_FIELD_PAYLOAD["add-field"]
    if fqdn_search is None:
        resp = requests.post(SOLR_SCHEMA_URL, data=json.dumps(_FQDN_SEARCH_FIELD_PAYLOAD), headers=headers, timeout=60)
        if not resp.ok:
            logger.error("Failed to add fqdn_search field: %s", resp.text)
        resp.raise_for_status()
        logger.info("Added fqdn_search field")
    else:

        def _boolish_field(d: dict[str, Any], key: str) -> bool:
            val = d.get(key)
            return bool(val) if val is not None else False

        if (
            fqdn_search.get("type") != desired_fqdn.get("type")
            or _boolish_field(fqdn_search, "indexed") != _boolish_field(desired_fqdn, "indexed")
            or _boolish_field(fqdn_search, "stored") != _boolish_field(desired_fqdn, "stored")
            or _boolish_field(fqdn_search, "multiValued") != _boolish_field(desired_fqdn, "multiValued")
        ):
            replace_payload = {"replace-field": desired_fqdn}
            resp = requests.post(SOLR_SCHEMA_URL, data=json.dumps(replace_payload), headers=headers, timeout=60)
            if not resp.ok:
                logger.error("Failed to update fqdn_search field: %s", resp.text)
            resp.raise_for_status()
            logger.info("Updated fqdn_search field definition")
        else:
            logger.info("fqdn_search field already matches desired schema; skipping")

    # Ensure copy-fields into `fqdn_search` exist for hostname-bearing fields.
    cf_resp = requests.get(f"{SOLR_SCHEMA_URL}/copyfields", headers=headers, timeout=60)
    cf_resp.raise_for_status()
    copyfields = cf_resp.json().get("copyFields", [])

    def _copy_field_exists(source: str, dest: str) -> bool:
        for cf in copyfields:
            if cf.get("source") != source:
                continue
            dest_val = cf.get("dest")
            if isinstance(dest_val, list):
                if dest in dest_val:
                    return True
            elif dest_val == dest:
                return True
        return False

    for source in _FQDN_COPY_SOURCES:
        if _copy_field_exists(source, "fqdn_search"):
            logger.info("Copy-field %s -> fqdn_search already present; skipping", source)
            continue
        payload = {"add-copy-field": {"source": source, "dest": "fqdn_search"}}
        resp = requests.post(SOLR_SCHEMA_URL, data=json.dumps(payload), headers=headers, timeout=60)
        if not resp.ok:
            logger.error("Failed to add copy-field %s -> fqdn_search: %s", source, resp.text)
        resp.raise_for_status()
        logger.info("Added copy-field %s -> fqdn_search", source)

_get_solr_endpoints()

Return Solr endpoint URLs (lazy, to allow monkey patching in tests).

Source code in gso/services/solr/indexer.py
def _get_solr_endpoints() -> tuple[str, str]:
    """Return Solr endpoint URLs (lazy, to allow monkey patching in tests)."""
    global _SOLR_ENDPOINTS  # noqa: PLW0603
    if _SOLR_ENDPOINTS is None:
        _SOLR_ENDPOINTS = _derive_solr_endpoints()
    return _SOLR_ENDPOINTS