Skip to content

Common

A collection of methods that make interaction with coreDB more straight-forward.

This prevents someone from having to re-write database statements many times, that might turn out to be erroneous or inconsistent when not careful. These methods relate to operations on subscriptions.

get_product_id_by_name(product_name)

Retrieve the UUID of a product by its name.

Parameters:

Name Type Description Default
product_name ProductName

The name of the product.

required

Returns:

Type Description
UUID

The UUID of the product.

Source code in gso/services/subscriptions/common.py
def get_product_id_by_name(product_name: ProductName) -> UUID:
    """Retrieve the UUID of a product by its name.

    Args:
        product_name: The name of the product.

    Returns:
        The UUID of the product.
    """
    return ProductTable.query.filter_by(name=product_name).first().product_id

get_subscriptions(*, product_types=None, lifecycles=None, includes=None, excludes=None, partner_id=None, subscription_ids=None, exclude_subscription_ids=None, subscription_filters=None, instance_filters=None)

Retrieve active subscriptions for a specific product type.

Parameters:

Name Type Description Default
product_types list[ProductType] | None

The types of the product for which to retrieve subscriptions.

None
lifecycles list[SubscriptionLifecycle] | None

The lifecycles that the products must be in.

None
includes list[str] | None

List of fields to be included in the returned Subscription objects.

None
excludes list[str] | None

List of fields to be excluded from the returned Subscription objects.

None
partner_id UUIDstr | None

The customer id of subscriptions.

None
subscription_ids list[UUIDstr] | None

A list of specific subscription IDs to filter by.

None
exclude_subscription_ids list[UUIDstr] | None

A list of subscription IDs to exclude from the results.

None
subscription_filters list[dict[str, str]] | None

A list of single-entry dicts where each key is a SubscriptionTable column name, e.g. [{"status": "ACTIVE"}].

None
instance_filters list[dict[str, str]] | None

A list of single-entry dicts where each key is a resource-type (in SubscriptionInstanceValueTable), e.g. [{"edge_port_name": "ae11"}]. TODO: We can add nested filters here in the future, e.g. [{"node__router_fqdn": "router.example.com"}].

None

Returns:

Type Description
list[SubscriptionType]

A list of SubscriptionType objects that match the query.

Source code in gso/services/subscriptions/common.py
def get_subscriptions(
    *,
    product_types: list[ProductType] | None = None,
    lifecycles: list[SubscriptionLifecycle] | None = None,
    includes: list[str] | None = None,
    excludes: list[str] | None = None,
    partner_id: UUIDstr | None = None,
    subscription_ids: list[UUIDstr] | None = None,
    exclude_subscription_ids: list[UUIDstr] | None = None,
    subscription_filters: list[dict[str, str]] | None = None,
    instance_filters: list[dict[str, str]] | None = None,
) -> list[SubscriptionType]:
    """Retrieve active subscriptions for a specific product type.

    Args:
        product_types: The types of the product for which to retrieve subscriptions.
        lifecycles: The lifecycles that the products must be in.
        includes: List of fields to be included in the returned Subscription objects.
        excludes: List of fields to be excluded from the returned Subscription objects.
        partner_id: The customer id of subscriptions.
        subscription_ids: A list of specific subscription IDs to filter by.
        exclude_subscription_ids: A list of subscription IDs to exclude from the results.
        subscription_filters: A list of single-entry dicts where each key is a
                        SubscriptionTable column name, e.g. [{"status": "ACTIVE"}].
        instance_filters: A list of single-entry dicts where each key is a
                          resource-type (in SubscriptionInstanceValueTable), e.g.
                          [{"edge_port_name": "ae11"}].
                          TODO: We can add nested filters here in the future, e.g.
                            [{"node__router_fqdn": "router.example.com"}].

    Returns:
        A list of `SubscriptionType` objects that match the query.
    """
    # 1) build the list of columns to select
    if not includes:
        includes = [col.name for col in SubscriptionTable.__table__.columns]

    if excludes:
        includes = [field for field in includes if field not in excludes]

    dynamic_fields = [getattr(SubscriptionTable, field) for field in includes]

    # 2) base query joining to products
    query = db.session.query(SubscriptionTable).join(ProductTable)

    # 3) apply the standard filters
    if product_types:
        query = query.filter(ProductTable.product_type.in_([str(product_type) for product_type in product_types]))

    if lifecycles:
        query = query.filter(SubscriptionTable.status.in_([str(lifecycle) for lifecycle in lifecycles]))

    if partner_id:
        query = query.filter(SubscriptionTable.customer_id == partner_id)

    if subscription_ids:
        query = query.filter(SubscriptionTable.subscription_id.in_(subscription_ids))

    if exclude_subscription_ids:
        query = query.filter(SubscriptionTable.subscription_id.not_in(exclude_subscription_ids))

    # 4a) apply direct column filters
    if subscription_filters:
        for filt in subscription_filters:
            key, val = next(iter(filt.items()))
            col = getattr(SubscriptionTable, key, None)
            if col is None:
                msg = f"No such SubscriptionTable column: {key}"
                raise ValueError(msg)
            query = query.filter(col == val)

    # 4b) apply instance-value filters: each as its own EXISTS subquery
    if instance_filters:
        for filt in instance_filters:
            key, val = next(iter(filt.items()))
            exists_subquery = (
                select(1)
                .select_from(SubscriptionInstanceTable)
                .join(
                    SubscriptionInstanceValueTable,
                    SubscriptionInstanceValueTable.subscription_instance_id
                    == SubscriptionInstanceTable.subscription_instance_id,
                )
                .join(
                    ResourceTypeTable,
                    ResourceTypeTable.resource_type_id == SubscriptionInstanceValueTable.resource_type_id,
                )
                .where(SubscriptionInstanceTable.subscription_id == SubscriptionTable.subscription_id)
                .where(ResourceTypeTable.resource_type == key)
                .where(SubscriptionInstanceValueTable.value == val)
                .exists()
            )
            query = query.filter(exists_subquery)

    # 5) execute and return list of dicts
    rows = query.with_entities(*dynamic_fields).all()
    return [dict(zip(includes, row, strict=True)) for row in rows]