A creation workflow for adding an existing Imported IAS to the service database.
Initial input form generator for creating a new imported IAS subscription.
Source code in gso/workflows/l3_core_service/ias/create_imported_ias.py
| def initial_input_form_generator() -> FormGenerator:
"""Initial input form generator for creating a new imported IAS subscription."""
class ImportL3CoreServiceForm(SubmitFormPage):
partner: str
service_binding_ports: list[ServiceBindingPort]
product_name: L3ProductNameType
ias_flavor: IASFlavor = IASFlavor.IAS_PS_OPT_OUT
user_input = yield ImportL3CoreServiceForm
return user_input.model_dump()
|
create_subscription(partner, ias_flavor)
Create a new subscription object in the database.
Source code in gso/workflows/l3_core_service/ias/create_imported_ias.py
| @step("Create subscription")
def create_subscription(partner: str, ias_flavor: IASFlavor) -> dict:
"""Create a new subscription object in the database."""
partner_id = get_partner_by_name(partner).partner_id
product_id = get_product_id_by_name(ProductName.IMPORTED_IAS)
subscription = ImportedIASInactive.from_product_id(product_id, partner_id)
subscription.ias.ias_flavor = ias_flavor
return {"subscription": subscription, "subscription_id": subscription.subscription_id}
|
create_imported_ias()
Import an IAS without provisioning it.
Source code in gso/workflows/l3_core_service/ias/create_imported_ias.py
| @workflow(
"Create Imported IAS",
initial_input_form=initial_input_form_generator,
target=Target.CREATE,
)
def create_imported_ias() -> StepList:
"""Import an IAS without provisioning it."""
return (
begin
>> create_subscription
>> store_process_subscription()
>> initialize_subscription
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)
|