Skip to content

Import edge port

A modification workflow for migrating an ImportedEdgePort to an EdgePort subscription.

import_edge_port_subscription(subscription_id)

Take an ImportedEdgePort subscription, and turn it into an EdgePort subscription.

Source code in gso/workflows/edge_port/import_edge_port.py
@step("Create new Edge Port subscription")
def import_edge_port_subscription(subscription_id: UUIDstr) -> State:
    """Take an ImportedEdgePort subscription, and turn it into an EdgePort subscription."""
    old_edge_port = ImportedEdgePort.from_subscription(subscription_id)
    new_subscription_id = get_product_id_by_name(ProductName.EDGE_PORT)
    new_subscription = EdgePort.from_other_product(old_edge_port, new_subscription_id)  # type: ignore[arg-type]

    return {"subscription": new_subscription}

import_edge_port()

Modify an ImportedEdgePort subscription into an EdgePort subscription to complete the import.

Source code in gso/workflows/edge_port/import_edge_port.py
@workflow("Import Edge Port", target=Target.MODIFY, initial_input_form=wrap_modify_initial_input_form(None))
def import_edge_port() -> StepList:
    """Modify an ImportedEdgePort subscription into an EdgePort subscription to complete the import."""
    return (
        init >> store_process_subscription(Target.MODIFY) >> unsync >> import_edge_port_subscription >> resync >> done
    )