Skip to content

Indexer

Solr indexer for GSO data.

get_engine()

Return a process-wide Engine, creating it on first use.

Source code in gso/services/solr/indexer.py
def get_engine() -> Engine:
    """Return a process-wide Engine, creating it on first use."""
    global _ENGINE  # noqa: PLW0603
    if _ENGINE is None:
        # created only when first needed, not at module import
        _ENGINE = create_engine(str(app_settings.DATABASE_URI))
    return _ENGINE

_to_scalar(val)

Normalize DB scalars to JSON-serializable values (mirror original behavior).

Source code in gso/services/solr/indexer.py
def _to_scalar(val: Any) -> Any:
    """Normalize DB scalars to JSON-serializable values (mirror original behavior)."""
    if isinstance(val, (int, bytes, uuid.UUID)):
        return str(val)
    return val

_rows_stream(conn, sql)

Execute a SQL statement with streaming enabled and yield dict-like rows.

Works row-by-row without buffering the whole result.

Source code in gso/services/solr/indexer.py
def _rows_stream(conn: Connection, sql: str) -> Iterator[Mapping[str, Any]]:
    """Execute a SQL statement with streaming enabled and yield dict-like rows.

    Works row-by-row without buffering the whole result.
    """
    result = conn.execution_options(stream_results=True).execute(text(sql))
    for row in result:
        yield cast(Mapping[str, Any], row._mapping)  # noqa: SLF001

_init_entry(agg_docs, sub_id, base)

Create the aggregate entry for a subscription if missing, priming multi-value lists.

Source code in gso/services/solr/indexer.py
def _init_entry(
    agg_docs: dict[str, dict[str, Any]],
    sub_id: str,
    base: Mapping[str, Any],
) -> dict[str, Any]:
    """Create the aggregate entry for a subscription if missing, priming multi-value lists."""
    entry = agg_docs.get(sub_id)
    if entry is None:
        entry = dict(base)  # copy scalar base fields
        for f in _MULTI_FIELDS:
            entry[f] = []
        agg_docs[sub_id] = entry
    return entry

_append_fields(entry, src, fields)

Append non-null values for selected fields into the entry's multi-value lists.

Source code in gso/services/solr/indexer.py
def _append_fields(entry: dict[str, Any], src: Mapping[str, Any], fields: Iterable[str]) -> None:
    """Append non-null values for selected fields into the entry's multi-value lists."""
    for f in fields:
        v = src.get(f)
        if v is not None:
            entry[f].append(v)

_dedupe_multi(entry)

De-duplicate multi-value lists while preserving order and dropping Nones.

Source code in gso/services/solr/indexer.py
def _dedupe_multi(entry: dict[str, Any]) -> None:
    """De-duplicate multi-value lists while preserving order and dropping Nones."""
    for f in _MULTI_FIELDS:
        vals = entry.get(f)
        if isinstance(vals, list) and vals:
            entry[f] = list(dict.fromkeys(x for x in vals if x is not None))

_flatten_single_subscription(doc)

Flatten a single denormalized subscription document for Solr indexing.

Source code in gso/services/solr/indexer.py
def _flatten_single_subscription(doc: dict[str, Any]) -> dict[str, Any]:
    """Flatten a single denormalized subscription document for Solr indexing."""
    base = {k: v for k, v in doc.items() if k not in {"subscription_instances", "dependency_instances"}}
    sub_id = base["id"]

    agg_docs: dict[str, dict[str, Any]] = {}
    entry = _init_entry(agg_docs, sub_id, base)

    for section, fields, nested_key, nested_fields in _SECTIONS:
        for inst in doc.get(section, []) or []:
            _append_fields(entry, inst, fields)
            for nested in inst.get(nested_key, []) or []:
                _append_fields(entry, nested, nested_fields)

    _dedupe_multi(entry)
    return entry

_stream_subscriptions_denormalized()

Stream+aggregate all rows and finally yield one doc per subscription.

Query text is kept IDENTICAL to your original (no alias/order changes).

Source code in gso/services/solr/indexer.py
def _stream_subscriptions_denormalized() -> Iterator[dict[str, Any]]:
    """Stream+aggregate all rows and finally yield one doc per subscription.

    Query text is kept IDENTICAL to your original (no alias/order changes).
    """
    query = """
        SELECT
            s.subscription_id,
            s.start_date,
            s.end_date,
            s.description,
            s.status,
            s.product_id,
            s.customer_id,
            s.insync,
            s.note,
            s.version,
            p.name AS product_name,
            p.description AS product_description,
            p.status AS product_status,
            p.product_type AS product_type,
            p.tag AS product_tag,
            partner.name AS customer_name,
            partner.email AS customer_email,
            si.subscription_instance_id,
            si.label AS instance_label,
            si.product_block_id,
            pb.name AS product_block_name,
            pb.description AS product_block_description,
            pb.status AS product_block_status,
            pb.tag AS product_block_tag,
            siv.subscription_instance_value_id,
            siv.value AS instance_value,
            rt.resource_type_id,
            rt.resource_type AS resource_type,
            rt.description AS resource_type_description,
            sm.metadata AS sm_metadata,
            si_dep.subscription_instance_id AS dep_subscription_instance_id,
            si_dep.label AS dep_instance_label,
            si_dep.product_block_id AS dep_product_block_id,
            pb_dep.name AS dep_product_block_name,
            pb_dep.description AS dep_product_block_description,
            pb_dep.status AS dep_product_block_status,
            pb_dep.tag AS dep_product_block_tag,
            siv_dep.subscription_instance_value_id AS dep_subscription_instance_value_id,
            siv_dep.value AS dep_instance_value,
            rt_dep.resource_type_id AS dep_resource_type_id,
            rt_dep.resource_type AS dep_resource_type,
            rt_dep.description AS dep_resource_type_description
        FROM subscriptions s
        LEFT JOIN products p ON s.product_id = p.product_id
        LEFT JOIN subscription_instances si ON s.subscription_id = si.subscription_id
        LEFT JOIN product_blocks pb ON si.product_block_id = pb.product_block_id
        LEFT JOIN subscription_instance_values siv ON si.subscription_instance_id = siv.subscription_instance_id
        LEFT JOIN resource_types rt ON siv.resource_type_id = rt.resource_type_id
        LEFT JOIN partners partner ON s.customer_id = partner.partner_id
        LEFT JOIN subscription_metadata sm ON s.subscription_id = sm.subscription_id
        LEFT JOIN subscription_instance_relations sir ON si.subscription_instance_id = sir.in_use_by_id
        LEFT JOIN subscription_instances si_dep ON sir.depends_on_id = si_dep.subscription_instance_id
        LEFT JOIN product_blocks pb_dep ON si_dep.product_block_id = pb_dep.product_block_id
        LEFT JOIN subscription_instance_values siv_dep ON si_dep.subscription_instance_id = siv_dep.subscription_instance_id
        LEFT JOIN resource_types rt_dep ON siv_dep.resource_type_id = rt_dep.resource_type_id
    """  # noqa: E501

    columns = [
        "subscription_id",
        "start_date",
        "end_date",
        "description",
        "status",
        "product_id",
        "customer_id",
        "insync",
        "note",
        "version",
        "product_name",
        "product_description",
        "product_status",
        "product_type",
        "product_tag",
        "customer_name",
        "customer_email",
        "subscription_instance_id",
        "instance_label",
        "product_block_id",
        "product_block_name",
        "product_block_description",
        "product_block_status",
        "product_block_tag",
        "subscription_instance_value_id",
        "instance_value",
        "resource_type_id",
        "resource_type",
        "resource_type_description",
        "sm_metadata",
        "dep_subscription_instance_id",
        "dep_instance_label",
        "dep_product_block_id",
        "dep_product_block_name",
        "dep_product_block_description",
        "dep_product_block_status",
        "dep_product_block_tag",
        "dep_subscription_instance_value_id",
        "dep_instance_value",
        "dep_resource_type_id",
        "dep_resource_type",
        "dep_resource_type_description",
    ]

    with get_engine().connect() as conn:
        # subscription → base doc (no instances yet)
        subs_map: dict[str, dict[str, Any]] = collections.defaultdict(
            lambda: {
                "type": "subscription",
                "subscription_instances": [],
                "dependency_instances": [],
            }
        )

        # subscription → { instance_id → instance_doc }
        instance_map: dict[str, dict[str, dict[str, Any]]] = collections.defaultdict(
            lambda: collections.defaultdict(lambda: {"subscription_instance_values": []})
        )

        # subscription → { dep_instance_id → dep_instance_doc }
        dep_instance_map: dict[str, dict[str, dict[str, Any]]] = collections.defaultdict(
            lambda: collections.defaultdict(lambda: {"dependency_instance_values": []})
        )

        for row in _rows_stream(conn, query):
            doc = {col: _to_scalar(row[col]) if col in row else None for col in columns}

            sub_id_val = doc["subscription_id"]
            if sub_id_val is None:
                # subscription_id should not be NULL; skip row defensively
                continue
            sub_id: str = cast(str, sub_id_val)

            si_id_val = doc["subscription_instance_id"]
            dep_si_id_val = doc["dep_subscription_instance_id"]

            # Add subscription-level fields only once
            sub_entry = subs_map[sub_id]
            if "id" not in sub_entry:
                sub_entry.update({
                    k: doc[k]
                    for k in columns
                    if k
                    not in {
                        "subscription_instance_id",
                        "instance_label",
                        "product_block_id",
                        "product_block_name",
                        "product_block_description",
                        "product_block_status",
                        "product_block_tag",
                        "subscription_instance_value_id",
                        "instance_value",
                        "resource_type_id",
                        "resource_type",
                        "resource_type_description",
                        "dep_subscription_instance_id",
                        "dep_instance_label",
                        "dep_product_block_id",
                        "dep_product_block_name",
                        "dep_product_block_description",
                        "dep_product_block_status",
                        "dep_product_block_tag",
                        "dep_subscription_instance_value_id",
                        "dep_instance_value",
                        "dep_resource_type_id",
                        "dep_resource_type",
                        "dep_resource_type_description",
                    }
                })
                sub_entry["id"] = sub_id

            # Aggregate instance-level fields
            if si_id_val is not None:
                si_id: str = cast(str, si_id_val)
                inst = instance_map[sub_id][si_id]
                if "subscription_instance_id" not in inst:
                    inst.update({
                        "subscription_instance_id": si_id,
                        "instance_label": doc["instance_label"],
                        "product_block_id": doc["product_block_id"],
                        "product_block_name": doc["product_block_name"],
                        "product_block_description": doc["product_block_description"],
                        "product_block_status": doc["product_block_status"],
                        "product_block_tag": doc["product_block_tag"],
                        "subscription_instance_values": [],
                    })
                if doc.get("subscription_instance_value_id") is not None:
                    inst["subscription_instance_values"].append({
                        "subscription_instance_value_id": doc["subscription_instance_value_id"],
                        "instance_value": doc["instance_value"],
                        "resource_type_id": doc["resource_type_id"],
                        "resource_type": doc["resource_type"],
                        "resource_type_description": doc["resource_type_description"],
                    })

            # Aggregate dependency instance-level fields
            if dep_si_id_val is not None:
                dep_si_id: str = cast(str, dep_si_id_val)
                dep_inst = dep_instance_map[sub_id][dep_si_id]
                if "dep_subscription_instance_id" not in dep_inst:
                    dep_inst.update({
                        "dep_subscription_instance_id": dep_si_id,
                        "dep_instance_label": doc["dep_instance_label"],
                        "dep_product_block_id": doc["dep_product_block_id"],
                        "dep_product_block_name": doc["dep_product_block_name"],
                        "dep_product_block_description": doc["dep_product_block_description"],
                        "dep_product_block_status": doc["dep_product_block_status"],
                        "dep_product_block_tag": doc["dep_product_block_tag"],
                        "dependency_instance_values": [],
                    })
                if doc.get("dep_subscription_instance_value_id") is not None:
                    dep_inst["dependency_instance_values"].append({
                        "dep_subscription_instance_value_id": doc["dep_subscription_instance_value_id"],
                        "dep_instance_value": doc["dep_instance_value"],
                        "dep_resource_type_id": doc["dep_resource_type_id"],
                        "dep_resource_type": doc["dep_resource_type"],
                        "dep_resource_type_description": doc["dep_resource_type_description"],
                    })

        # Yield one document per subscription
        for sub_id, sub_doc in subs_map.items():
            sub_doc["subscription_instances"] = list(instance_map[sub_id].values())
            sub_doc["dependency_instances"] = list(dep_instance_map[sub_id].values())
            yield sub_doc

_stream_table(table, columns, type_name)

Stream a table and yield Solr-ready docs per row (no list accumulation).

Source code in gso/services/solr/indexer.py
def _stream_table(table: str, columns: list[str], type_name: str) -> Iterator[dict[str, Any]]:
    """Stream a table and yield Solr-ready docs per row (no list accumulation)."""
    sql = f"SELECT {', '.join(columns)} FROM {table}"  # noqa: S608
    with get_engine().connect() as conn:
        for row in _rows_stream(conn, sql):
            doc: dict[str, Any] = {}
            for col in columns:
                doc[col] = _to_scalar(row[col])
            doc["type"] = type_name
            if "subscription_id" in doc:
                doc["id"] = doc["subscription_id"]
            elif "product_id" in doc:
                doc["id"] = doc["product_id"]
            elif "partner_id" in doc:
                doc["id"] = doc["partner_id"]
            elif "product_block_id" in doc:
                doc["id"] = doc["product_block_id"]
            yield doc

_stream_task_denormalized()

Stream tasks; mirrors original behavior.

Source code in gso/services/solr/indexer.py
def _stream_task_denormalized() -> Iterator[dict[str, Any]]:
    """Stream tasks; mirrors original behavior."""
    query = """
        SELECT
            p.pid AS process_id,
            p.workflow_id,
            w.name AS workflow_name,
            w.target AS workflow_target,
            w.description AS workflow_description,
            w.created_at AS workflow_created_at,
            w.deleted_at AS workflow_deleted_at,
            w.is_task AS workflow_is_task,
            p.is_task,
            p.created_by,
            p.failed_reason,
            p.started_at,
            p.last_status,
            p.last_step,
            p.assignee,
            p.last_modified_at,
            p.traceback,
            ps.stepid AS step_id,
            ps.name AS step_name,
            ps.status AS step_status,
            ps.created_by AS step_created_by,
            ps.started_at AS step_started_at,
            ps.completed_at AS step_completed_at,
            ps.state AS step_state,
            ps.commit_hash AS step_commit_hash,
            s.subscription_id AS linked_subscription_id,
            s.start_date AS linked_subscription_start_date,
            s.end_date AS linked_subscription_end_date,
            s.description AS linked_subscription_description,
            s.status AS linked_subscription_status,
            s.product_id AS linked_subscription_product_id,
            s.customer_id AS linked_subscription_customer_id,
            s.insync AS linked_subscription_insync,
            s.note AS linked_subscription_note,
            s.version AS linked_subscription_version,
            pdt.product_id AS linked_product_id,
            pdt.name AS linked_product_name,
            pdt.description AS linked_product_description,
            pdt.status AS linked_product_status,
            pdt.product_type AS linked_product_type,
            pdt.tag AS linked_product_tag,
            pdt.created_at AS linked_product_created_at,
            pdt.end_date AS linked_product_end_date
        FROM processes p
        LEFT JOIN workflows w ON p.workflow_id = w.workflow_id
        LEFT JOIN products_workflows pw ON w.workflow_id = pw.workflow_id
        LEFT JOIN products pdt ON pdt.product_id = pw.product_id
        LEFT JOIN process_steps ps ON p.pid = ps.pid
        LEFT JOIN processes_subscriptions psu ON p.pid = psu.pid
        LEFT JOIN subscriptions s ON psu.subscription_id = s.subscription_id
    """
    columns = [
        "process_id",
        "workflow_id",
        "workflow_name",
        "workflow_target",
        "workflow_description",
        "workflow_created_at",
        "workflow_deleted_at",
        "workflow_is_task",
        "is_task",
        "created_by",
        "failed_reason",
        "started_at",
        "last_status",
        "last_step",
        "assignee",
        "last_modified_at",
        "traceback",
        "step_id",
        "step_name",
        "step_status",
        "step_created_by",
        "step_started_at",
        "step_completed_at",
        "step_state",
        "step_commit_hash",
        "linked_subscription_id",
        "linked_subscription_start_date",
        "linked_subscription_end_date",
        "linked_subscription_description",
        "linked_subscription_status",
        "linked_subscription_product_id",
        "linked_subscription_customer_id",
        "linked_subscription_insync",
        "linked_subscription_note",
        "linked_subscription_version",
        "linked_product_id",
        "linked_product_name",
        "linked_product_description",
        "linked_product_status",
        "linked_product_type",
        "linked_product_tag",
        "linked_product_created_at",
        "linked_product_end_date",
    ]
    with get_engine().connect() as conn:
        for row in _rows_stream(conn, query):
            doc: dict[str, Any] = {}
            for col in columns:
                val = row[col]
                if col == "step_state" and val is not None:
                    doc["step_state_json"] = json.dumps(val, default=str) if not isinstance(val, str) else val
                elif col != "step_state":
                    doc[col] = _to_scalar(val)

            doc.pop("step_state", None)
            for k in list(doc.keys()):
                if k.startswith("step_state."):
                    del doc[k]

            doc["type"] = "task"
            doc["id"] = doc["process_id"]
            yield doc

stream_all_data()

Stream all documents needed for Solr indexing.

  • subscriptions (denorm & flattened)
  • products, product_blocks, partners
  • tasks
Source code in gso/services/solr/indexer.py
def stream_all_data() -> Iterator[dict[str, Any]]:
    """Stream all documents needed for Solr indexing.

    - subscriptions (denorm & flattened)
    - products, product_blocks, partners
    - tasks
    """
    for sub_doc in _stream_subscriptions_denormalized():
        yield _flatten_single_subscription(sub_doc)

    yield from _stream_table("products", ["product_id", "name", "description", "status"], "product")
    yield from _stream_table("product_blocks", ["product_block_id", "name", "description", "status"], "product_block")
    yield from _stream_table("partners", ["partner_id", "name", "email"], "customer")

    yield from _stream_task_denormalized()

post_to_solr(docs, batch_size)

Post documents to Solr in batches.

Accepts any iterable (e.g., a generator) and never holds more than batch_size docs in memory.

Source code in gso/services/solr/indexer.py
def post_to_solr(docs: Iterable[dict[str, Any]], batch_size: int) -> None:
    """Post documents to Solr in batches.

    Accepts any `iterable` (e.g., a generator) and never holds more than `batch_size` docs in memory.
    """
    headers = {"Content-Type": "application/json"}

    with requests.Session() as session:
        for batch_no, chunk in enumerate(batched(docs, batch_size), start=1):
            batch = list(chunk)
            resp = session.post(
                SOLR_UPDATE_URL,
                data=json.dumps(batch, default=str),
                headers=headers,
                timeout=10,
            )
            if not resp.ok:
                logger.error("Error indexing batch %s: %s %s", batch_no, resp.status_code, resp.text)
                resp.raise_for_status()

            logger.info("Indexed %s documents to Solr (batch %s).", len(batch), batch_no)

    logger.info("Reindex complete.")

ensure_solr_copy_field()

Ensure the catch-all copy-field to _text_ exists on the configured core.

Source code in gso/services/solr/indexer.py
def ensure_solr_copy_field() -> None:
    """Ensure the catch-all copy-field to `_text_` exists on the configured core."""
    # First, retrieve the current copy fields from Solr schema
    headers = {"Content-Type": "application/json"}
    response = requests.get(f"{SOLR_SCHEMA_URL}/copyfields", headers=headers, timeout=10)
    response.raise_for_status()
    copyfields = response.json().get("copyFields", [])
    # Check if the catch-all copy-field exists
    exists = any(
        cf.get("source") == "*" and (cf.get("dest") == ["_text_"] or cf.get("dest") == "_text_") for cf in copyfields
    )
    if not exists:
        payload = {"add-copy-field": {"source": "*", "dest": ["_text_"]}}
        response = requests.post(SOLR_SCHEMA_URL, data=json.dumps(payload), headers=headers, timeout=10)
        response.raise_for_status()