A creation workflow for adding a new site to the service database.
The create_site
workflow creates a new site object in the service database, and sets the subscription lifecycle to
ACTIVE
. The attributes that are input using the intake form of the workflow are stored, and nothing else happens.
Get input from the operator about the new site subscription.
Source code in gso/workflows/site/create_site.py
| def initial_input_form_generator(product_name: str) -> FormGenerator:
"""Get input from the operator about the new site subscription."""
class CreateSiteForm(FormPage, BaseSiteValidatorModel):
model_config = ConfigDict(title=product_name)
partner: ReadOnlyField("GEANT", default_type=str) # type: ignore[valid-type]
user_input = yield CreateSiteForm
user_input = user_input.model_dump()
summary_fields = [
"site_name",
"site_bgp_community_id",
"site_internal_id",
"site_tier",
"site_ts_address",
"site_country_code",
"site_city",
"site_country",
"site_latitude",
"site_longitude",
"partner",
]
yield from create_summary_form(user_input, product_name, summary_fields)
return user_input
|
create_subscription(product, partner)
Create a new subscription object in the service database.
Source code in gso/workflows/site/create_site.py
| @step("Create subscription")
def create_subscription(product: UUIDstr, partner: str) -> State:
"""Create a new subscription object in the service database."""
subscription = site.SiteInactive.from_product_id(product, get_partner_by_name(partner).partner_id)
return {
"subscription": subscription,
"subscription_id": subscription.subscription_id,
}
|
initialize_subscription(subscription, site_name, site_city, site_country, site_country_code, site_latitude, site_longitude, site_bgp_community_id, site_internal_id, site_ts_address, site_tier)
Initialise the subscription object with all user input.
Source code in gso/workflows/site/create_site.py
| @step("Initialize subscription")
def initialize_subscription(
subscription: site.SiteInactive,
site_name: str,
site_city: str,
site_country: str,
site_country_code: str,
site_latitude: LatitudeCoordinate,
site_longitude: LongitudeCoordinate,
site_bgp_community_id: int,
site_internal_id: int,
site_ts_address: IPAddress,
site_tier: site_pb.SiteTier,
) -> State:
"""Initialise the subscription object with all user input."""
subscription.site.site_name = site_name
subscription.site.site_city = site_city
subscription.site.site_country = site_country
subscription.site.site_country_code = site_country_code
subscription.site.site_latitude = site_latitude
subscription.site.site_longitude = site_longitude
subscription.site.site_bgp_community_id = site_bgp_community_id
subscription.site.site_internal_id = site_internal_id
subscription.site.site_tier = site_tier
subscription.site.site_ts_address = site_ts_address
subscription.site.site_contains_optical_equipment = True
subscription.description = f"Site in {site_city}, {site_country}"
subscription = site.SiteProvisioning.from_other_lifecycle(subscription, SubscriptionLifecycle.PROVISIONING)
return {"subscription": subscription}
|
create_site()
Create a new site subscription.
Source code in gso/workflows/site/create_site.py
| @workflow(
"Create Site",
initial_input_form=wrap_create_initial_input_form(initial_input_form_generator),
target=Target.CREATE,
)
def create_site() -> StepList:
"""Create a new site subscription."""
return (
begin
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)
|