A creation workflow for adding a new virtual routing and forwarding (VRF) service.
Gather information about the new router from the operator.
Source code in gso/workflows/vrf/create_vrf.py
| def initial_input_form_generator(product_name: str) -> FormGenerator:
"""Gather information about the new router from the operator."""
class CreateVRFForm(FormPage):
model_config = ConfigDict(title=product_name)
tt_number: TTNumber
partner: ReadOnlyField("GEANT", default_type=str) # type: ignore[valid-type]
vrf_name: UniqueField[str]
route_distinguisher: str
route_target: str
vrf_as_number: int
user_input = yield CreateVRFForm
user_input = user_input.model_dump()
summary_fields = ["vrf_name", "route_distinguisher", "route_target", "vrf_as_number"]
yield from create_summary_form(user_input, product_name, summary_fields)
return user_input
|
create_subscription(product, partner)
Create a new VRF subscription.
Source code in gso/workflows/vrf/create_vrf.py
| @step("Create subscription")
def create_subscription(product: UUIDstr, partner: str) -> State:
"""Create a new VRF subscription."""
subscription = VRFInactive.from_product_id(product, get_partner_by_name(partner).partner_id)
return {
"subscription": subscription,
"subscription_id": subscription.subscription_id,
}
|
initialize_subscription(subscription, vrf_name, route_distinguisher, route_target, vrf_as_number)
Initialise the subscription object in the service database.
Source code in gso/workflows/vrf/create_vrf.py
| @step("Initialize subscription")
def initialize_subscription(
subscription: VRFInactive,
vrf_name: str,
route_distinguisher: str,
route_target: str,
vrf_as_number: int,
) -> State:
"""Initialise the subscription object in the service database."""
subscription.vrf.vrf_name = vrf_name
subscription.vrf.route_distinguisher = route_distinguisher
subscription.vrf.route_target = route_target
subscription.vrf.vrf_as_number = vrf_as_number
subscription.description = f"VRF {vrf_name} - {vrf_as_number}"
return {"subscription": subscription}
|
create_vrf()
Create a virtual routing and forwarding (VRF) service.
Source code in gso/workflows/vrf/create_vrf.py
| @workflow(
"Create VRF",
initial_input_form=wrap_create_initial_input_form(initial_input_form_generator),
target=Target.CREATE,
)
def create_vrf() -> StepList:
"""Create a virtual routing and forwarding (VRF) service."""
return (
begin
>> create_subscription
>> store_process_subscription(Target.CREATE)
>> initialize_subscription
>> set_status(SubscriptionLifecycle.ACTIVE)
>> resync
>> done
)
|