Skip to content

Modify vrf router list

Modify VRF to add or remove routers.

initial_input_form_generator(subscription_id)

Modify VRF to add or remove routers.

Source code in gso/workflows/vrf/modify_vrf_router_list.py
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
    """Modify VRF to add or remove routers."""
    subscription = VRF.from_subscription(subscription_id)

    class RouterSelection(BaseModel):
        router_id: active_router_selector()  # type: ignore[valid-type]

    class ModifyVRFRouterListForm(SubmitFormPage):
        model_config = ConfigDict(title=f"Modify the {subscription.vrf.vrf_name} VRF to add or remove routers.")
        tt_number: TTNumber

        router_list: Annotated[
            list[RouterSelection],
            AfterValidator(validate_unique_list),
            Field(
                description="A list of routers to add or remove from the VRF.",
                min_length=0 if subscription.vrf.vrf_router_list else 1,
                json_schema_extra={"uniqueItems": True},
            ),
        ] = [  # noqa: RUF012
            RouterSelection(router_id=str(router.owner_subscription_id)) for router in subscription.vrf.vrf_router_list
        ]

    user_input = yield ModifyVRFRouterListForm

    return user_input.model_dump()

update_subscription_model(subscription, router_list)

Update the database model to update the router list.

Source code in gso/workflows/vrf/modify_vrf_router_list.py
@step("Update subscription model")
def update_subscription_model(subscription: VRF, router_list: list[dict[str, UUIDstr]]) -> State:
    """Update the database model to update the router list."""
    subscription.vrf.vrf_router_list = [Router.from_subscription(router["router_id"]).router for router in router_list]

    return {"subscription": subscription}

update_vrf_on_routers_dry(subscription, process_id, tt_number, router_list)

Deploy VRF on a list of routers - Dry run.

Source code in gso/workflows/vrf/modify_vrf_router_list.py
@step("[DRY RUN] Update VRF on list of routers")
def update_vrf_on_routers_dry(
    subscription: dict[str, Any], process_id: UUIDstr, tt_number: str, router_list: list[dict[str, UUIDstr]]
) -> LSOState:
    """Deploy VRF on a list of routers - Dry run."""
    vrf_new_router_list = [Router.from_subscription(router["router_id"]) for router in router_list]
    inventory = {"all": {"hosts": {router.router.router_fqdn: None for router in vrf_new_router_list}}}
    extra_vars = {
        "subscription": subscription,
        "vrf_router_list": vrf_new_router_list,
        "dry_run": True,
        "verb": "update",
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
        f"Deploy config for {subscription["description"]}",
    }
    return {
        "playbook_name": "gap_ansible/playbooks/vrf_update.yaml",
        "inventory": inventory,
        "extra_vars": extra_vars,
    }

update_vrf_on_routers_real(subscription, process_id, tt_number, router_list)

Deploy VRF on a list of routers - with commit.

Source code in gso/workflows/vrf/modify_vrf_router_list.py
@step("[FOR REAL] Update VRF on list of routers")
def update_vrf_on_routers_real(
    subscription: dict[str, Any], process_id: UUIDstr, tt_number: str, router_list: list[dict[str, UUIDstr]]
) -> LSOState:
    """Deploy VRF on a list of routers - with commit."""
    vrf_new_router_list = [Router.from_subscription(router["router_id"]) for router in router_list]
    inventory = {"all": {"hosts": {router.router.router_fqdn: None for router in vrf_new_router_list}}}
    extra_vars = {
        "subscription": subscription,
        "vrf_router_list": vrf_new_router_list,
        "dry_run": False,
        "verb": "update",
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
        f"Deploy config for {subscription["description"]}",
    }
    return {
        "playbook_name": "gap_ansible/playbooks/vrf_update.yaml",
        "inventory": inventory,
        "extra_vars": extra_vars,
    }

modify_vrf_router_list()

Modify the VRF router list.

Source code in gso/workflows/vrf/modify_vrf_router_list.py
@workflow(
    "Modify VRF router list",
    initial_input_form=wrap_modify_initial_input_form(initial_input_form_generator),
    target=Target.MODIFY,
)
def modify_vrf_router_list() -> StepList:
    """Modify the VRF router list."""
    return (
        begin
        >> store_process_subscription(Target.MODIFY)
        >> unsync
        >> lso_interaction(update_vrf_on_routers_dry)
        >> lso_interaction(update_vrf_on_routers_real)
        >> update_subscription_model
        >> resync
        >> done
    )