API endpoint for fetching different types of subscriptions.
subscription_routers()
Retrieve all active or provisioning router subscriptions.
Source code in gso/api/v1/subscriptions.py
| @router.get(
"/routers",
status_code=status.HTTP_200_OK,
response_model=list[SubscriptionDomainModelSchema],
)
def subscription_routers() -> list[dict[str, Any]]:
"""Retrieve all active or provisioning router subscriptions."""
subscriptions = []
routers = get_router_subscriptions(lifecycles=[SubscriptionLifecycle.ACTIVE, SubscriptionLifecycle.PROVISIONING])
for r in routers:
subscription = SubscriptionModel.from_subscription(r["subscription_id"])
extended_model = build_extended_domain_model(subscription)
subscriptions.append(extended_model)
return subscriptions
|
subscription_dashboard_devices()
Retrieve FQDN for all dashboard devices that are monitored.
Source code in gso/api/v1/subscriptions.py
| @router.get(
"/dashboard_devices",
status_code=status.HTTP_200_OK,
response_class=Response,
responses={
200: {
"content": {"text/plain": {}},
"description": "Return a flat file of FQDNs.",
}
},
)
def subscription_dashboard_devices() -> Response:
"""Retrieve FQDN for all dashboard devices that are monitored."""
fqdns = []
dashboard_devices = get_subscriptions(
product_types=[ProductType.ROUTER, ProductType.SUPER_POP_SWITCH, ProductType.OFFICE_ROUTER],
lifecycles=[SubscriptionLifecycle.ACTIVE, SubscriptionLifecycle.PROVISIONING],
)
for device in dashboard_devices:
subscription = SubscriptionModel.from_subscription(device["subscription_id"])
extended_model = build_extended_domain_model(subscription)
if extended_model["product"]["product_type"] == ProductType.ROUTER:
fqdns.append(extended_model["router"]["router_fqdn"])
elif extended_model["product"]["product_type"] == ProductType.SUPER_POP_SWITCH:
fqdns.append(extended_model["super_pop_switch"]["super_pop_switch_fqdn"])
elif extended_model["product"]["product_type"] == ProductType.OFFICE_ROUTER:
fqdns.append(extended_model["office_router"]["office_router_fqdn"])
fqdn_flat_file = "\n".join(fqdns)
return Response(content=fqdn_flat_file, media_type="text/plain")
|