Base functionality for modifying an L3 Core Service subscription.
Get input which Access Port should be re-deployed.
Source code in gso/workflows/l3_core_service/redeploy_l3_core_service.py
| def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
"""Get input which Access Port should be re-deployed."""
subscription = SubscriptionModel.from_subscription(subscription_id)
product_name = subscription.product.name
def access_port_selector() -> TypeAlias:
"""Generate a dropdown selector for choosing an Access Port in an input form."""
access_ports = subscription.l3_core.ap_list # type: ignore[attr-defined]
options = {
str(access_port.subscription_instance_id): (
f"{access_port.sbp.gs_id} on "
f"{EdgePort.from_subscription(access_port.sbp.edge_port.owner_subscription_id).description} "
f"({access_port.ap_type})"
)
for access_port in access_ports
}
return cast(
type[Choice],
Choice.__call__(
"Select an Access Port",
zip(options.keys(), options.items(), strict=True),
),
)
class AccessPortSelectionForm(FormPage):
model_config = ConfigDict(title=f"Re-deploy {product_name} subscription")
tt_number: TTNumber
access_port: access_port_selector() # type: ignore[valid-type]
user_input = yield AccessPortSelectionForm
access_port = AccessPort.from_db(user_input.access_port)
access_port_fqdn = EdgePort.from_subscription(
access_port.sbp.edge_port.owner_subscription_id
).edge_port.node.router_fqdn
return user_input.model_dump() | {
"edge_port_fqdn_list": [access_port_fqdn],
"active_ap": access_port.subscription_instance_id,
"verb": "deploy",
}
|
redeploy_l3_core_service()
Redeploy a Layer 3 subscription.
Source code in gso/workflows/l3_core_service/redeploy_l3_core_service.py
| @redeploy_workflow("Redeploy Layer 3 service", initial_input_form=initial_input_form_generator)
def redeploy_l3_core_service() -> StepList:
"""Redeploy a Layer 3 subscription."""
return (
begin
>> populate_partner_and_scope_subscription
>> lso_interaction(provision_sbp_dry)
>> lso_interaction(provision_sbp_real)
>> lso_interaction(deploy_bgp_peers_dry)
>> lso_interaction(deploy_bgp_peers_real)
)
|