A creation workflow that adds an existing switch to the service database.
create_subscription(partner)
Create a new subscription object.
Source code in gso/workflows/switch/create_imported_switch.py
| @step("Create subscription")
def create_subscription(partner: str) -> State:
"""Create a new subscription object."""
partner_id = get_partner_by_name(partner).partner_id
product_id = get_product_id_by_name(ProductName.IMPORTED_SWITCH)
subscription = ImportedSwitchInactive.from_product_id(product_id, partner_id)
return {"subscription": subscription, "subscription_id": subscription.subscription_id}
|
initialize_subscription(subscription, fqdn, ts_port, site, switch_vendor, switch_model)
Initialize the switch subscription using input data.
Source code in gso/workflows/switch/create_imported_switch.py
| @step("Initialize subscription")
def initialize_subscription(
subscription: ImportedSwitchInactive,
fqdn: str,
ts_port: PortNumber,
site: UUIDstr,
switch_vendor: Vendor,
switch_model: SwitchModel,
) -> State:
"""Initialize the switch subscription using input data."""
subscription.switch.fqdn = fqdn
subscription.switch.ts_port = ts_port
subscription.switch.site = Site.from_subscription(site).site
subscription.switch.switch_vendor = switch_vendor
subscription.switch.switch_model = switch_model
return {"subscription": subscription}
|
create_imported_switch()
Create an imported switch without provisioning it.
Source code in gso/workflows/switch/create_imported_switch.py
| @workflow("Create Imported Switch", initial_input_form=_initial_input_form_generator, target=Target.CREATE)
def create_imported_switch() -> StepList:
"""Create an imported switch without provisioning it."""
return (
begin
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)
|