Skip to content

Migrate edge port services

A modification workflow that migrates all L2 and L3 services linked to an EdgePort to a different endpoint.

initial_input_form_generator(subscription_id)

Gather input from the operator on the new router that the EdgePort should connect to.

Source code in gso/workflows/edge_port/migrate_edge_port_services.py
def initial_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
    """Gather input from the operator on the new router that the EdgePort should connect to."""
    subscription = EdgePort.from_subscription(subscription_id)
    form_title = (
        f"Migrating All Layer 2 and Layer 3 Services from "
        f"{subscription.edge_port.edge_port_description or subscription.edge_port.edge_port_name}"
    )
    partner = get_partner_by_id(subscription.customer_id)

    def _destination_edge_port_selector(partner_id: UUIDstr) -> TypeAlias:
        edge_port_subscriptions = get_active_edge_port_subscriptions(
            partner_id=partner_id, exclude_subscription_ids=[str(subscription.subscription_id)]
        )
        edge_ports = {str(ep.subscription_id): ep.description for ep in edge_port_subscriptions}

        return cast(
            type[Choice],
            Choice.__call__(
                "Select destination Edge Port",
                zip(edge_ports.keys(), edge_ports.items(), strict=True),
            ),
        )

    class MigrateServicesForm(FormPage):
        model_config = ConfigDict(title=form_title)

        tt_number: TTNumber
        partner_name: read_only_field(partner.name)  # type: ignore[valid-type]
        divider: Divider = Field(None, exclude=True)
        destination_edge_port: _destination_edge_port_selector(partner.partner_id) | str  # type: ignore[valid-type]

    initial_user_input = yield MigrateServicesForm

    destination_edge_port = EdgePort.from_subscription(initial_user_input.destination_edge_port)
    input_forms_data = initial_user_input.model_dump()
    summary_form_data = input_forms_data | {
        "partner_name": initial_user_input.partner_name,
        "destination_edge_port_name": destination_edge_port.edge_port.edge_port_name,
        "destination_edge_port_description": destination_edge_port.edge_port.edge_port_description,
    }
    summary_fields = [
        "partner_name",
        "destination_edge_port_name",
        "destination_edge_port_description",
    ]
    yield from create_summary_form(summary_form_data, subscription.product.name, summary_fields)

    return input_forms_data | {
        "subscription": subscription,
        "source_edge_port": subscription,
        "destination_edge_port": destination_edge_port,
        _L3_SUCCESS_KEY: {},
        _L3_FAILURE_KEY: {},
        _L2_SUCCESS_KEY: {},
        _L2_FAILURE_KEY: {},
    }

confirm_graphs_looks_good_in_moodi()

Wait for confirmation from an operator that the new Migration looks good so far.

Source code in gso/workflows/edge_port/migrate_edge_port_services.py
@inputstep("Wait for confirmation", assignee=Assignee.SYSTEM)
def confirm_graphs_looks_good_in_moodi() -> FormGenerator:
    """Wait for confirmation from an operator that the new Migration looks good so far."""

    class ProvisioningResultPage(SubmitFormPage):
        model_config = ConfigDict(title="Please confirm before continuing")

        info_label: Label = "Do you confirm that everything looks good in Moodi before continuing the workflow?"

    yield ProvisioningResultPage

    return {}

migrate_l3_core_services_to_new_node(tt_number, callback_route, source_edge_port, destination_edge_port)

Migrate all L3 core services from the old EdgePort to the new EdgePort.

The migration playbook is executed once for each service to apply the configuration on the new node and as a result, the service bindings port and BGP sessions related to this edge port of each service will be moved to the new node.

Source code in gso/workflows/edge_port/migrate_edge_port_services.py
@step("Migrate L3 core services to new node")
def migrate_l3_core_services_to_new_node(
    tt_number: TTNumber, callback_route: str, source_edge_port: EdgePort, destination_edge_port: EdgePort
) -> State:
    """Migrate all L3 core services from the old EdgePort to the new EdgePort.

    The migration playbook is executed once for each service to apply the configuration on the new node and as a result,
    the service bindings port and BGP sessions related to this edge port of each service will be moved to the new node.
    """
    services = get_active_l3_services_linked_to_edge_port(source_edge_port.subscription_id)

    wf_payloads = []
    for service in services:
        wf_payload = BulkWfPayload(
            identifier=_wf_identifier(service),
            workflow_key=L3_MIGRATION_WF_MAP[ProductName(service.product.name).value],
            user_inputs=[
                {"subscription_id": str(service.subscription_id)},
                {"edge_port_partner": source_edge_port.customer_id},
                {
                    "tt_number": tt_number,
                    "is_human_initiated_wf": False,
                    "source_edge_port": str(source_edge_port.subscription_id),
                },
                {
                    "destination_edge_port": str(destination_edge_port.subscription_id),
                },
            ],
        )
        wf_payloads.append(wf_payload.model_dump())

    oss_params = load_oss_params()
    bulk_wf_task.apply_async(  # type: ignore[attr-defined]
        kwargs={
            "wf_payloads": wf_payloads,
            "callback_route": callback_route,
            "success_key": _L3_SUCCESS_KEY,
            "failure_key": _L3_FAILURE_KEY,
            "run_mode": BulkRunMode.STAGGERED.value,
            "stagger_step": oss_params.GENERAL.l3_migration_stagger_step_seconds,
            "initial_delay": 0.0,
        },
        countdown=random.choice([2, 3, 4, 5]),
    )

    return {"l3_wf_payloads": wf_payloads, _L3_SUCCESS_KEY: {}, _L3_FAILURE_KEY: {}}

migrate_l2_circuits_to_new_node(tt_number, callback_route, source_edge_port, destination_edge_port)

Migrate Layer2 circuits from the old EdgePort to the new EdgePort.

Source code in gso/workflows/edge_port/migrate_edge_port_services.py
@step("Migrate L2 circuits to new node")
def migrate_l2_circuits_to_new_node(
    tt_number: TTNumber, callback_route: str, source_edge_port: EdgePort, destination_edge_port: EdgePort
) -> State:
    """Migrate Layer2 circuits from the old EdgePort to the new EdgePort."""
    services = get_active_l2_circuit_services_linked_to_edge_port(source_edge_port.subscription_id)

    wf_payloads = []
    for service in services:
        different_site = source_edge_port.edge_port.node.router_site != destination_edge_port.edge_port.node.router_site
        wf_payload = BulkWfPayload(
            workflow_key="migrate_layer_2_circuit",
            identifier=_wf_identifier(service),
            user_inputs=[
                {"subscription_id": str(service.subscription_id)},
                {
                    "tt_number": tt_number,
                    "is_human_initiated_wf": False,
                    "source_edge_port": str(source_edge_port.subscription_id),
                    "migrate_to_different_site": different_site,
                    "run_old_side_ansible": True,
                    "run_new_side_ansible": True,
                },
                {
                    "destination_edge_port": str(destination_edge_port.subscription_id),
                    "generate_new_vc_id": False,
                },
            ],
        )
        wf_payloads.append(wf_payload.model_dump())

    oss_params = load_oss_params()
    bulk_wf_task.apply_async(  # type: ignore[attr-defined]
        kwargs={
            "wf_payloads": wf_payloads,
            "callback_route": callback_route,
            "success_key": _L2_SUCCESS_KEY,
            "failure_key": _L2_FAILURE_KEY,
            "run_mode": BulkRunMode.STAGGERED.value,
            "stagger_step": oss_params.GENERAL.l2_migration_stagger_step_seconds,
            "initial_delay": 0.0,
        },
        countdown=random.choice([2, 3, 4, 5]),
    )
    return {"l2_wf_payloads": wf_payloads, _L2_SUCCESS_KEY: {}, _L2_FAILURE_KEY: {}}

disable_old_config_dry(subscription, process_id, tt_number)

Perform a dry run of disabling the old configuration on the routers.

Source code in gso/workflows/edge_port/migrate_edge_port_services.py
@step("[DRY RUN] Disable configuration on old router")
def disable_old_config_dry(
    subscription: EdgePort,
    process_id: UUIDstr,
    tt_number: str,
) -> LSOState:
    """Perform a dry run of disabling the old configuration on the routers."""
    layer3_services = get_active_l3_services_linked_to_edge_port(subscription.subscription_id)
    layer2_circuits = get_active_l2_circuit_services_linked_to_edge_port(subscription.subscription_id)

    extra_vars = {
        "verb": "deactivate",
        "config_object": "deactivate",
        "dry_run": True,
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for #TODO",
        "l3_core_services": [json.loads(json_dumps(layer3_service)) for layer3_service in layer3_services],
        "l2_circuits": [json.loads(json_dumps(layer2_circuit)) for layer2_circuit in layer2_circuits],
    }

    return {
        "playbook_name": "gap_ansible/playbooks/iptrunks_migration.yaml",
        "inventory": {
            "all": {
                "hosts": {
                    # TODO
                }
            }
        },
        "extra_vars": extra_vars,
    }

disable_old_config_real(subscription, process_id, tt_number)

Disable old configuration on the routers.

Source code in gso/workflows/edge_port/migrate_edge_port_services.py
@step("[FOR REAL] Disable configuration on old router")
def disable_old_config_real(
    subscription: EdgePort,
    process_id: UUIDstr,
    tt_number: str,
) -> LSOState:
    """Disable old configuration on the routers."""
    layer3_services = get_active_l3_services_linked_to_edge_port(subscription.subscription_id)
    layer2_circuits = get_active_l2_circuit_services_linked_to_edge_port(subscription.subscription_id)

    extra_vars = {
        "verb": "deactivate",
        "config_object": "deactivate",
        "dry_run": False,
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - Deploy config for # TODO",
        "l3_core_services": [json.loads(json_dumps(layer3_service)) for layer3_service in layer3_services],
        "l2_circuits": [json.loads(json_dumps(layer2_circuit)) for layer2_circuit in layer2_circuits],
    }

    return {
        "playbook_name": "gap_ansible/playbooks/iptrunks_migration.yaml",
        "inventory": {
            "all": {
                "hosts": {
                    # TODO
                }
            }
        },
        "extra_vars": extra_vars,
    }

inform_operator_traffic_check()

Wait for confirmation from an operator that the results from the pre-checks look OK.

In case the results look OK, the workflow can continue. If the results don't look OK, the workflow can still be aborted at this time, without the subscription going out of sync. Moodi will also not start, and the subscription model has not been updated yet. Effectively, this prevents any changes inside the orchestrator from occurring. The one thing that must be rolled back manually, is the deactivated configuration that sits on the source device.

Source code in gso/workflows/edge_port/migrate_edge_port_services.py
@inputstep("Verify pre-check results", assignee=Assignee.SYSTEM)
def inform_operator_traffic_check() -> FormGenerator:
    """Wait for confirmation from an operator that the results from the pre-checks look OK.

    In case the results look OK, the workflow can continue. If the results don't look OK, the workflow can still be
    aborted at this time, without the subscription going out of sync. Moodi will also not start, and the subscription
    model has not been updated yet. Effectively, this prevents any changes inside the orchestrator from occurring. The
    one thing that must be rolled back manually, is the deactivated configuration that sits on the source device.
    """

    class PreCheckPage(SubmitFormPage):
        model_config = ConfigDict(title="Please confirm before continuing")

        info_label_1: Label = "Please verify that traffic has moved as expected."
        info_label_2: Label = (
            "If traffic misbehaves, this is the last chance to cleanly abort the workflow—note that "
            "the deactivated config on the source device must be manually rolled back."
        )

    yield PreCheckPage
    return {}

evaluate_l3_wfs_results(callback_result)

Evaluate the result of the provisioning proxy callback.

Source code in gso/workflows/edge_port/migrate_edge_port_services.py
@step("Evaluate provisioning proxy result")
def evaluate_l3_wfs_results(callback_result: dict) -> State:
    """Evaluate the result of the provisioning proxy callback."""
    failed_wfs = callback_result.pop(_L3_FAILURE_KEY, {})
    successful_wfs = callback_result.pop("successful_l3_wfs", {})
    details = {
        "failed_l3_wfs": failed_wfs,
        "successful_l3_wfs": successful_wfs,
    }

    if failed_wfs:
        msg = _generate_failure_message()
        raise ProcessFailureError(msg, details=details)

    return {"callback_result": callback_result, _L3_SUCCESS_KEY: successful_wfs}

evaluate_l2_wfs_results(callback_result)

Evaluate the result of the provisioning proxy callback.

Source code in gso/workflows/edge_port/migrate_edge_port_services.py
@step("Evaluate provisioning proxy result")
def evaluate_l2_wfs_results(callback_result: dict) -> State:
    """Evaluate the result of the provisioning proxy callback."""
    failed_wfs = callback_result.pop(_L2_FAILURE_KEY, {})
    successful_wfs = callback_result.pop(_L2_SUCCESS_KEY, {})
    details = {
        "failed_l2_wfs": failed_wfs,
        "successful_l2_wfs": successful_wfs,
    }

    if failed_wfs:
        msg = _generate_failure_message()
        raise ProcessFailureError(msg, details=details)

    return {"callback_result": callback_result, _L2_SUCCESS_KEY: successful_wfs}

generate_scoped_subscription_models(source_edge_port, destination_edge_port)

Preview the updated L3 service subscriptions model for Moodi without applying the changes.

Source code in gso/workflows/edge_port/migrate_edge_port_services.py
@step("Generate updated subscription models")
def generate_scoped_subscription_models(
    source_edge_port: EdgePort,
    destination_edge_port: EdgePort,
) -> State:
    """Preview the updated L3 service subscriptions model for Moodi without applying the changes."""
    scoped_l3_services, extra_kwargs = _generate_scoped_subscription_models_for_l3_services(
        destination_edge_port, source_edge_port
    )
    scoped_l2_services = _generate_scoped_subscription_for_l2_services(source_edge_port, destination_edge_port)

    return {
        _MONITORED_OBJECTS_KEY: [
            *scoped_l3_services,
            *scoped_l2_services,
        ],
        MOODI_EXTRA_KWARGS_KEY: extra_kwargs,
    }

_generate_scoped_subscription_for_l2_services(source_edge_port, destination_edge_port)

Calculate what the updated subscription model will look like, but don't update the actual subscription yet.

The new subscription is used for Moodi, but the updated subscription model is not stored yet, to avoid issues recovering when the workflow is aborted.

Source code in gso/workflows/edge_port/migrate_edge_port_services.py
def _generate_scoped_subscription_for_l2_services(
    source_edge_port: EdgePort,
    destination_edge_port: EdgePort,
) -> list[dict[str, Any]]:
    """Calculate what the updated subscription model will look like, but don't update the actual subscription yet.

    The new subscription is used for Moodi, but the updated subscription model is not
    stored yet, to avoid issues recovering when the workflow is aborted.
    """
    l2_circuit_services = get_active_l2_circuit_services_linked_to_edge_port(source_edge_port.subscription_id)
    scoped_services = []
    for service in l2_circuit_services:
        scoped_service = generate_scoped_subscription_for_l2_service(service, source_edge_port, destination_edge_port)
        scoped_services.append(scoped_service)

    return scoped_services

_generate_scoped_subscription_models_for_l3_services(destination_edge_port, source_edge_port)

Calculate what the updated subscription model will look like, but don't update the actual subscription yet.

The new subscription is used for Moodi, but the updated subscription model is not stored yet, to avoid issues recovering when the workflow is aborted.

Source code in gso/workflows/edge_port/migrate_edge_port_services.py
def _generate_scoped_subscription_models_for_l3_services(
    destination_edge_port: EdgePort, source_edge_port: EdgePort
) -> tuple[list[dict[str, Any]], dict[str, Any]]:
    """Calculate what the updated subscription model will look like, but don't update the actual subscription yet.

    The new subscription is used for Moodi, but the updated subscription model is not
    stored yet, to avoid issues recovering when the workflow is aborted.
    """
    l3_services = get_active_l3_services_linked_to_edge_port(source_edge_port.subscription_id)
    scoped_services = []
    extra_kwargs = {}
    for service in l3_services:
        service_name = service.service_name_attribute  # type: ignore[attr-defined]
        scoped_service = json.loads(json_dumps(service))
        found_ap = False
        for ap in scoped_service[service_name]["l3_core"]["ap_list"]:
            if ap["sbp"]["edge_port"]["owner_subscription_id"] == str(source_edge_port.subscription_id):
                found_ap = True
                # We have found the AP that is to be replaced, so we will update it with the new Edge Port information
                ap["sbp"]["edge_port"] = json.loads(json_dumps(destination_edge_port.edge_port))
                scoped_service[service_name]["l3_core"]["ap_list"] = [ap]
                scoped_services.append(scoped_service)
                extra_kwargs[scoped_service["subscription_id"]] = get_expected_bgp_values(ap["sbp"]["bgp_session_list"])

        if not found_ap:
            msg = f"Failed to find selected EdgePort in {service_name}: {service.subscription_id}"
            raise ProcessFailureError(msg, details=source_edge_port)

    return scoped_services, extra_kwargs

validate_edge_port_migration_eligibility(subscription)

Validate that all services linked to the EdgePort can be migrated.

Source code in gso/workflows/edge_port/migrate_edge_port_services.py
@step("Validate that all L2 & L3 services linked to the EdgePort can be migrated.")
def validate_edge_port_migration_eligibility(subscription: EdgePort) -> State:
    """Validate that all services linked to the EdgePort can be migrated."""
    layer3_services = get_active_l3_services_linked_to_edge_port(subscription.subscription_id)
    layer_3_errors = {}
    for service in layer3_services:
        try:
            validate_subscription_workflow_eligibility(
                subscription_id=service.subscription_id,
                workflow_target=Target.MODIFY,
                workflow_name=L3_MIGRATION_WF_MAP[ProductName(service.product.name).value],
            )
        except ValueError as e:
            layer_3_errors[_wf_identifier(service)] = str(e)

    layer2_services = get_active_l2_circuit_services_linked_to_edge_port(subscription.subscription_id)
    layer_2_errors = {}
    for service in layer2_services:
        try:
            validate_subscription_workflow_eligibility(
                subscription_id=service.subscription_id,
                workflow_target=Target.MODIFY,
                workflow_name="migrate_layer_2_circuit",
            )
        except ValueError as e:
            layer_2_errors[_wf_identifier(service)] = str(e)

    if layer_3_errors or layer_2_errors:
        details = {
            "layer_3_errors": layer_3_errors,
            "layer_2_errors": layer_2_errors,
        }
        msg = _generate_failure_message()
        raise ProcessFailureError(msg, details=details)

    return {}

confirm_continue_move_fiber()

Wait for confirmation from an operator that the physical fiber has been moved.

Source code in gso/workflows/edge_port/migrate_edge_port_services.py
@inputstep("Wait for confirmation", assignee=Assignee.SYSTEM)
def confirm_continue_move_fiber() -> FormGenerator:
    """Wait for confirmation from an operator that the physical fiber has been moved."""

    class ProvisioningResultPage(SubmitFormPage):
        model_config = ConfigDict(title="Please confirm before continuing")

        info_label: Label = "Wait for confirmation from an operator that the physical fiber has been moved."

    yield ProvisioningResultPage

    return {}

migrate_edge_port_services()

Migrate all services from an EdgePort to a new EdgePort.

Source code in gso/workflows/edge_port/migrate_edge_port_services.py
@modify_workflow("Migrate All Services To a New Edge Port", initial_input_form=initial_input_form_generator)
def migrate_edge_port_services() -> StepList:
    """Migrate all services from an EdgePort to a new EdgePort."""
    return (
        begin
        >> resync
        >> validate_edge_port_migration_eligibility
        >> unsync
        >> lso_interaction(disable_old_config_dry)
        >> lso_interaction(disable_old_config_real)
        >> inform_operator_traffic_check
        >> confirm_continue_move_fiber
        >> generate_scoped_subscription_models
        >> start_moodi(monitored_objects_key=_MONITORED_OBJECTS_KEY)
        >> resync
        >> callback_step(
            name="Start running L3 core service migration workflows",
            action_step=migrate_l3_core_services_to_new_node,
            validate_step=evaluate_l3_wfs_results,
        )
        >> confirm_graphs_looks_good_in_moodi
        >> callback_step(
            name="Start running L2 core service migration workflows",
            action_step=migrate_l2_circuits_to_new_node,
            validate_step=evaluate_l2_wfs_results,
        )
        >> stop_moodi()
    )