Skip to content

Shared

Shared functions for the workflows.

summary_form(product_name, summary_data)

Generate a summary form for the product.

Source code in gso/workflows/shared.py
def summary_form(product_name: str, summary_data: dict) -> Generator:
    """Generate a summary form for the product."""

    class SummaryForm(SubmitFormPage):
        model_config = ConfigDict(title=f"{product_name} summary")

        product_summary: cast(type[MigrationSummary], migration_summary(summary_data))  # type: ignore[valid-type]

    yield SummaryForm

create_summary_form(user_input, product_name, fields)

Create a summary form for the product.

Source code in gso/workflows/shared.py
def create_summary_form(user_input: dict, product_name: str, fields: list[str]) -> Generator:
    """Create a summary form for the product."""
    columns = [[str(user_input[nm]) for nm in fields]]
    yield from summary_form(product_name, {"labels": fields, "columns": columns})

modify_summary_form(user_input, block, fields)

Modify the summary form for the product.

Source code in gso/workflows/shared.py
def modify_summary_form(user_input: dict, block: ProductBlockModel, fields: list[str]) -> Generator:
    """Modify the summary form for the product."""
    before = [str(getattr(block, nm)) for nm in fields]
    after = [str(user_input[nm]) for nm in fields]
    yield from summary_form(
        block.subscription.product.name,  # type: ignore[union-attr]
        {
            "labels": fields,
            "headers": ["Before", "After"],
            "columns": [before, after],
        },
    )

get_expected_bgp_values(bgp_sessions)

Get expected BGP values based on the BGP peer.

Source code in gso/workflows/shared.py
def get_expected_bgp_values(bgp_sessions: list[dict]) -> dict:
    """Get expected BGP values based on the BGP peer."""
    ipv4_received_routes = 0
    ipv4_advertised_routes = 0
    ipv6_received_routes = 0
    ipv6_advertised_routes = 0

    for bgp_session in bgp_sessions:
        peer_address = str(bgp_session["peer_address"])
        bgp_stats = get_bgp_status_pre_check(peer_address=peer_address)

        if is_ipv4_address(peer_address):
            ipv4_received_routes = bgp_stats.get("data", {}).get("received_routes", 0)
            ipv4_advertised_routes = bgp_stats.get("data", {}).get("advertised_routes", 0)
        else:
            ipv6_received_routes = bgp_stats.get("data", {}).get("received_routes", 0)
            ipv6_advertised_routes = bgp_stats.get("data", {}).get("advertised_routes", 0)

    return {
        "expected_number_of_ipv4_received_routes": ipv4_received_routes,
        "expected_number_of_ipv4_advertised_routes": ipv4_advertised_routes,
        "expected_number_of_ipv6_received_routes": ipv6_received_routes,
        "expected_number_of_ipv6_advertised_routes": ipv6_advertised_routes,
    }

generate_scoped_subscription_for_l2_service(service, source_edge_port, destination_edge_port)

Generate a scoped subscription for a Layer 2 Circuit service.

Source code in gso/workflows/shared.py
def generate_scoped_subscription_for_l2_service(
    service: Layer2Circuit, source_edge_port: EdgePort, destination_edge_port: EdgePort
) -> dict:
    """Generate a scoped subscription for a Layer 2 Circuit service."""
    scoped_service = json.loads(json_dumps(service))
    replace_index = (
        0
        if service.layer_2_circuit.layer_2_circuit_sides[0].sbp.edge_port.owner_subscription_id
        == source_edge_port.subscription_id
        else 1
    )
    #  We have found the SBP that is to be replaced, we can return all the necessary information to the state.
    #  First, remove the SBP for the side not being migrated.
    scoped_service["layer_2_circuit"]["layer_2_circuit_sides"] = [
        scoped_service["layer_2_circuit"]["layer_2_circuit_sides"][replace_index]
    ]
    #  Then replace the SBP that is migrated such that it includes the destination EP instead of the source one.
    scoped_service["layer_2_circuit"]["layer_2_circuit_sides"][0]["sbp"]["edge_port"] = json.loads(
        json_dumps(destination_edge_port.edge_port)
    )
    return scoped_service

validate_subscription_workflow_eligibility(subscription_id, workflow_target, workflow_name)

Raise if the subscription cannot run the requested workflow; return the id if eligible.

Source code in gso/workflows/shared.py
def validate_subscription_workflow_eligibility(
    subscription_id: UUID, workflow_target: Target, workflow_name: str
) -> UUID:
    """Raise if the subscription cannot run the requested workflow; return the id if eligible."""
    subscription = db.session.get(SubscriptionTable, subscription_id)
    if subscription is None:
        msg = "Subscription not found"
        raise ValueError(msg)

    # Check if there is no reason to prevent this workflow
    workflows = subscriptions.subscription_workflows(subscription)
    current_workflow = first_true(workflows[workflow_target.lower()], None, lambda wf: wf["name"] == workflow_name)

    if not current_workflow:
        msg = "This workflow is not valid for this subscription"
        raise ValueError(msg)

    if "reason" in current_workflow:
        translation = TRANSLATIONS.get(current_workflow["reason"], current_workflow["reason"])
        msg = f"This workflow cannot be started: {translation}"
        raise ValueError(msg)

    return subscription_id