A creation workflow that adds an existing router to the service database.
create_subscription(partner)
Create a new subscription object.
Source code in gso/workflows/router/create_imported_router.py
| @step("Create subscription")
def create_subscription(partner: str) -> State:
"""Create a new subscription object."""
partner_id = get_partner_by_name(partner).partner_id
product_id = get_product_id_by_name(ProductName.IMPORTED_ROUTER)
subscription = ImportedRouterInactive.from_product_id(product_id, partner_id)
return {"subscription": subscription, "subscription_id": subscription.subscription_id}
|
Generate a form that is filled in using information passed through the API endpoint.
Source code in gso/workflows/router/create_imported_router.py
| def initial_input_form_generator() -> FormGenerator:
"""Generate a form that is filled in using information passed through the API endpoint."""
class ImportRouter(SubmitFormPage):
model_config = ConfigDict(title="Import Router")
partner: str
router_site: str
hostname: str
ts_port: int
router_vendor: Vendor
router_role: RouterRole
router_lo_ipv4_address: IPv4AddressType
router_lo_ipv6_address: IPv6AddressType
router_lo_iso_address: str
user_input = yield ImportRouter
return user_input.model_dump()
|
initialize_subscription(subscription, hostname, ts_port, router_site, router_role, router_vendor, router_lo_ipv4_address=None, router_lo_ipv6_address=None, router_lo_iso_address=None)
Initialise the router subscription using input data.
Source code in gso/workflows/router/create_imported_router.py
| @step("Initialize subscription")
def initialize_subscription(
subscription: ImportedRouterInactive,
hostname: str,
ts_port: PortNumber,
router_site: str,
router_role: RouterRole,
router_vendor: Vendor,
router_lo_ipv4_address: IPv4AddressType | None = None,
router_lo_ipv6_address: IPv6AddressType | None = None,
router_lo_iso_address: str | None = None,
) -> State:
"""Initialise the router subscription using input data."""
subscription.router.router_ts_port = ts_port
router_site_obj = get_site_by_name(router_site).site
subscription.router.router_site = router_site_obj
fqdn = generate_fqdn(hostname, router_site_obj.site_name, router_site_obj.site_country_code)
subscription.router.router_fqdn = fqdn
subscription.router.router_role = router_role
subscription.router.router_access_via_ts = False
subscription.description = f"Router {fqdn}"
subscription.router.router_lo_ipv4_address = router_lo_ipv4_address
subscription.router.router_lo_ipv6_address = router_lo_ipv6_address
subscription.router.router_lo_iso_address = router_lo_iso_address
subscription.router.vendor = router_vendor
return {"subscription": subscription}
|
create_imported_router()
Import a router without provisioning it.
Source code in gso/workflows/router/create_imported_router.py
| @workflow(
"Import router",
initial_input_form=initial_input_form_generator,
target=Target.CREATE,
)
def create_imported_router() -> StepList:
"""Import a router without provisioning it."""
return (
begin
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)
|