Modify VRF to add or remove routers.
This workflow allows for adding or removing one router to the VRF router list.
Operation
Bases: strEnum
The two operations that can be performed to modify a VRF router list.
Source code in gso/workflows/vrf/modify_vrf_router_list.py
| class Operation(strEnum):
"""The two operations that can be performed to modify a VRF router list."""
ADD = "Add a router"
REMOVE = "Remove a router"
|
Modify VRF to add or remove routers.
Source code in gso/workflows/vrf/modify_vrf_router_list.py
| def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
"""Modify VRF to add or remove routers."""
subscription = VRF.from_subscription(subscription_id)
def existing_router_list() -> type[list]:
return cast(
type[list],
ReadOnlyField([router.router_fqdn for router in subscription.vrf.vrf_router_list], default_type=list[str]),
)
class OperationSelectionForm(FormPage):
model_config = ConfigDict(title="Modify VRF router list.")
tt_number: TTNumber
operation: Operation
initial_input = yield OperationSelectionForm
match initial_input.operation:
case Operation.ADD:
class AddVRFRouterListForm(SubmitFormPage):
model_config = ConfigDict(title=f"Modify the {subscription.vrf.vrf_name} VRF to add a router.")
existing_routers: existing_router_list() # type: ignore[valid-type]
divider: Divider = Field(None, exclude=True)
selected_router: active_router_selector( # type: ignore[valid-type]
excludes=[str(router.owner_subscription_id) for router in subscription.vrf.vrf_router_list]
)
user_input = yield AddVRFRouterListForm
case Operation.REMOVE:
def existing_router_selector() -> TypeAlias:
router_subscriptions = {
str(router.owner_subscription_id): router.router_fqdn for router in subscription.vrf.vrf_router_list
}
return cast(
type[Choice],
Choice.__call__(
"Select a router", zip(router_subscriptions.keys(), router_subscriptions.items(), strict=True)
),
)
class RemoveVRFRouterListForm(SubmitFormPage):
model_config = ConfigDict(title=f"Modify the {subscription.vrf.vrf_name} VRF to remove a router.")
existing_routers: existing_router_list() # type: ignore[valid-type]
divider: Divider = Field(None, exclude=True)
selected_router: existing_router_selector() # type: ignore[valid-type]
user_input = yield RemoveVRFRouterListForm
case _:
msg = f"Invalid operation selected: {initial_input.operation}. Only addition or removal are supported."
raise ValueError(msg)
return {"tt_number": initial_input.tt_number, "operation": initial_input.operation} | user_input.model_dump()
|
update_subscription_model(subscription, selected_router, operation)
Update the database model to update the router list.
Source code in gso/workflows/vrf/modify_vrf_router_list.py
| @step("Update subscription model")
def update_subscription_model(subscription: VRF, selected_router: UUIDstr, operation: Operation) -> State:
"""Update the database model to update the router list."""
selected_router_block = Router.from_subscription(selected_router).router
if operation == Operation.ADD:
subscription.vrf.vrf_router_list.append(selected_router_block)
elif operation == Operation.REMOVE:
logger = logging.getLogger()
logger.error(selected_router_block)
logger.error(subscription.vrf.vrf_router_list)
subscription.vrf.vrf_router_list.remove(selected_router_block)
return {"subscription": subscription}
|
update_vrf_on_router_dry(subscription, process_id, tt_number, selected_router, operation)
Deploy VRF on a router - dry run.
Source code in gso/workflows/vrf/modify_vrf_router_list.py
| @step("[DRY RUN] Update VRF on selected router")
def update_vrf_on_router_dry(
subscription: dict[str, Any],
process_id: UUIDstr,
tt_number: str,
selected_router: UUIDstr,
operation: Operation,
) -> LSOState:
"""Deploy VRF on a router - dry run."""
extra_vars = {
"subscription": subscription,
"dry_run": True,
"verb": "add" if operation is Operation.ADD else "remove",
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
f"{operation} for {subscription["description"]}",
}
return {
"playbook_name": "gap_ansible/playbooks/vrf_update.yaml",
"inventory": {"all": {"hosts": {Router.from_subscription(selected_router).router.router_fqdn: None}}},
"extra_vars": extra_vars,
}
|
update_vrf_on_router_real(subscription, process_id, tt_number, selected_router, operation)
Deploy VRF on a router - with commit.
Source code in gso/workflows/vrf/modify_vrf_router_list.py
| @step("[FOR REAL] Update VRF on selected router")
def update_vrf_on_router_real(
subscription: dict[str, Any],
process_id: UUIDstr,
tt_number: str,
selected_router: UUIDstr,
operation: Operation,
) -> LSOState:
"""Deploy VRF on a router - with commit."""
extra_vars = {
"subscription": subscription,
"dry_run": False,
"verb": "add" if operation is Operation.ADD else "remove",
"commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
f"{operation} for {subscription["description"]}",
}
return {
"playbook_name": "gap_ansible/playbooks/vrf_update.yaml",
"inventory": {"all": {"hosts": {Router.from_subscription(selected_router).router.router_fqdn: None}}},
"extra_vars": extra_vars,
}
|
modify_vrf_router_list()
Modify the VRF router list.
Source code in gso/workflows/vrf/modify_vrf_router_list.py
| @workflow(
"Modify VRF router list",
initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
target=Target.MODIFY,
)
def modify_vrf_router_list() -> StepList:
"""Modify the VRF router list."""
return (
begin
>> store_process_subscription(Target.MODIFY)
>> unsync
>> update_subscription_model
>> lso_interaction(update_vrf_on_router_dry)
>> lso_interaction(update_vrf_on_router_real)
>> resync
>> done
)
|