Skip to content

Base create l3 core service

Base workflow for creating a new L3 Core Service.

initial_input_form_generator(product_name)

Gather input from the operator to build a new subscription object.

Source code in gso/workflows/l3_core_service/base_create_l3_core_service.py
def initial_input_form_generator(product_name: str) -> FormGenerator:
    """Gather input from the operator to build a new subscription object."""

    class CreateL3CoreServiceForm(FormPage):
        model_config = ConfigDict(title=f"{product_name} - Select partner")

        tt_number: TTNumber
        label_a: Label = Field(f"Please select the partner for this {product_name}.", exclude=True)
        partner: partner_choice()  # type: ignore[valid-type]
        label_b: Label = Field(
            f"Please select the partner who owns the Edge Port this {product_name} will be deployed on.", exclude=True
        )
        edge_port_partner: partner_choice()  # type: ignore[valid-type]

        @model_validator(mode="after")
        def validate_no_duplicate_subscription(self) -> Self:
            """Ensure that NO active subscription exists for this partner + product combination.

            For creation workflows, we must prevent creating a second subscription.
            The target model is: ONE subscription per (partner, product) with multiple Access Ports.
            """
            product_name_enum = ProductName(product_name)

            # Check for any existing subscriptions
            existing_subscriptions = get_active_l3_subscriptions_by_partner_and_product(self.partner, product_name_enum)

            if len(existing_subscriptions) >= 1:
                partner_name = get_partner_by_id(self.partner).name
                subscription_ids = [str(s.subscription_id) for s in existing_subscriptions]
                msg = (
                    f"Cannot create new {product_name_enum} subscription: an active subscription already exists "
                    f"for partner {partner_name} ({self.partner}). "
                    f"Found {len(existing_subscriptions)} active subscription(s): {', '.join(subscription_ids)}. "
                    "\n\nThe target model requires ONE subscription per partner with multiple Access Ports. "
                    "Please use the 'Modify' workflow to add Access Ports to the existing subscription, "
                    "or use the 'Merge L3 Core Subscriptions' workflow if you need to consolidate multiple "
                    "subscriptions."
                )
                raise ValueError(msg)

            return self

    initial_user_input = yield CreateL3CoreServiceForm

    class EdgePortSelection(BaseModel):
        edge_port: active_edge_port_selector(partner_id=initial_user_input.edge_port_partner)  # type: ignore[valid-type]
        ap_type: APType
        custom_service_name: str | None = None

    class EdgePortSelectionForm(FormPage):
        model_config = ConfigDict(title=f"{product_name} - Select Edge Ports")
        info_label: Label = Field(
            f"Please select the Edge Ports where this {product_name} service will terminate", exclude=True
        )

        edge_port: EdgePortSelection

    selected_edge_port = yield EdgePortSelectionForm

    class BFDSettingsForm(BaseModel):
        bfd_enabled: bool = False
        bfd_interval_rx: int | None = Field(default=None, examples=["BFD RX defaults"])
        bfd_interval_tx: int | None = None
        bfd_multiplier: int | None = None

    class BindingPortInputForm(SubmitFormPage):
        model_config = ConfigDict(title=f"{product_name} - Configure Edge Port")
        info_label: Label = Field("Please configure the Service Binding Ports for the Edge Port.", exclude=True)
        current_ep_label: Label = Field(
            f"Currently configuring on {EdgePort.from_subscription(selected_edge_port.edge_port.edge_port).description}"
            f" (Access Port type: {selected_edge_port.edge_port.ap_type})",
            exclude=True,
        )

        generate_gs_id: bool = True
        gs_id: IMPORTED_GS_ID | None = None
        is_tagged: bool = False
        vlan_id: VLAN_ID
        custom_firewall_filters: bool = False
        divider: Divider = Field(None, exclude=True)
        v4_label: Label = Field("IPV4 SBP interface params", exclude=True)
        ipv4_address: IPv4AddressType | None = None
        ipv4_mask: IPv4Netmask | None = None
        v4_bfd_settings: BFDSettingsForm
        v4_bgp_peer: IPv4BGPPeer | None = None
        divider2: Divider = Field(None, exclude=True)
        v6_label: Label = Field("IPV6 SBP interface params", exclude=True)
        ipv6_address: IPv6AddressType | None = None
        ipv6_mask: IPv6Netmask | None = None
        v6_bfd_settings: BFDSettingsForm
        v6_bgp_peer: IPv6BGPPeer | None = None

        @model_validator(mode="before")
        def validate_gs_id(cls, input_data: dict[str, Any]) -> dict[str, Any]:
            gs_id = input_data.get("gs_id")
            generate_gs_id = input_data.get("generate_gs_id", True)

            if generate_gs_id and gs_id:
                error_message = (
                    "You cannot provide a GS ID manually while the 'Auto-generate GS ID' option is enabled."
                    "Please either uncheck 'Auto-generate GS ID' or remove the manual GS ID."
                )
                raise ValueError(error_message)
            return input_data

        @model_validator(mode="after")
        def validate_ip_addresses(self) -> Self:
            """Validate IP address/mask combinations for the Service Binding Port."""
            # IPv4: address, mask must come together
            if self.ipv4_address is not None and self.ipv4_mask is None:
                msg = (
                    "An IPv4 netmask must be provided when an IPv4 address is configured for the Service Binding Port."
                )
                raise ValueError(msg)
            if self.ipv4_address is None and self.ipv4_mask is not None:
                msg = (
                    "An IPv4 address must be provided when an IPv4 netmask is configured for the Service Binding Port."
                )
                raise ValueError(msg)

            # IPv6: address mask must come together
            if self.ipv6_address is not None and self.ipv6_mask is None:
                msg = (
                    "An IPv6 netmask must be provided when an IPv6 address is configured for the Service Binding Port."
                )
                raise ValueError(msg)
            if self.ipv6_address is None and self.ipv6_mask is not None:
                msg = (
                    "An IPv6 address must be provided when an IPv6 netmask is configured for the Service Binding Port."
                )
                raise ValueError(msg)

            # At least one of IPv4 / IPv6 must be present
            if self.ipv4_address is None and self.ipv6_address is None:
                msg = "Either an IPv4 or an IPv6 address must be provided for the Service Binding Port."
                raise ValueError(msg)

            return self

    binding_port_input_form = yield BindingPortInputForm
    bgp_peers = []
    if binding_port_input_form.v4_bgp_peer:
        bgp_peers.append(binding_port_input_form.v4_bgp_peer)
    if binding_port_input_form.v6_bgp_peer:
        bgp_peers.append(binding_port_input_form.v6_bgp_peer)

    binding_port_input = binding_port_input_form.model_dump() | {"bgp_peers": bgp_peers}

    return (
        initial_user_input.model_dump()
        | selected_edge_port.model_dump()
        | {"binding_port_input": binding_port_input, "product_name": product_name, "verb": "deploy"}
    )

initialize_subscription(subscription, edge_port, binding_port_input, product_name)

Initialize a service binding port for a given service type in the subscription model.

Source code in gso/workflows/l3_core_service/base_create_l3_core_service.py
@step("Initialize subscription")
def initialize_subscription(
    subscription: SubscriptionModel,
    edge_port: dict,
    binding_port_input: dict,
    product_name: str,
) -> dict:
    """Initialize a service binding port for a given service type in the subscription model."""
    edge_port_subscription = EdgePort.from_subscription(edge_port["edge_port"])

    sbp_bgp_session_list = [
        BGPSession.new(subscription_id=uuid4(), rtbh_enabled=True, is_multi_hop=True, **session)
        for session in binding_port_input["bgp_peers"]
    ]

    sbp_gs_id = (
        generate_unique_id(prefix="GS")
        if binding_port_input.pop("generate_gs_id", False)
        else binding_port_input.pop("gs_id", None)
    )

    binding_port_input.pop("gs_id", None)

    service_binding_port = ServiceBindingPortInactive.new(
        subscription_id=uuid4(),
        v4_bfd_settings=BFDSettings.new(subscription_id=uuid4(), **(binding_port_input.pop("v4_bfd_settings"))),
        v6_bfd_settings=BFDSettings.new(subscription_id=uuid4(), **(binding_port_input.pop("v6_bfd_settings"))),
        **binding_port_input,
        bgp_session_list=sbp_bgp_session_list,
        sbp_type=SBPType.L3,
        edge_port=edge_port_subscription.edge_port,
        gs_id=sbp_gs_id,
    )

    subscription.l3_core = L3CoreServiceBlockInactive.new(  # type: ignore[attr-defined]
        subscription_id=uuid4(),
        ap_list=[
            AccessPortInactive.new(
                subscription_id=uuid4(),
                ap_type=edge_port["ap_type"],
                sbp=service_binding_port,
                custom_service_name=edge_port.get("custom_service_name"),
            )
        ],
    )
    partner_name = get_partner_by_id(subscription.customer_id).name
    subscription.description = f"{product_name} service for {partner_name}"
    scoped_subscription = json.loads(json_dumps(subscription))
    scoped_subscription["l3_core"] = scoped_subscription[subscription.service_name_attribute]["l3_core"]  # type: ignore[attr-defined]

    return {
        "subscription": subscription,
        "scoped_subscription": scoped_subscription,
        "edge_port_fqdn_list": [edge_port_subscription.edge_port.node.router_fqdn],
        "partner_name": partner_name,
    }

provision_sbp_dry(workflow_name, scoped_subscription, process_id, tt_number, edge_port_fqdn_list, partner_name, verb)

Perform a dry run of deploying Service Binding Ports.

Source code in gso/workflows/l3_core_service/base_create_l3_core_service.py
@step("[DRY RUN] Deploy service binding port")
def provision_sbp_dry(
    workflow_name: str,
    scoped_subscription: dict[str, Any],
    process_id: UUIDstr,
    tt_number: str,
    edge_port_fqdn_list: list[str],
    partner_name: str,
    verb: str,
) -> LSOState:
    """Perform a dry run of deploying Service Binding Ports."""
    extra_vars = {
        "subscription": scoped_subscription,
        "partner_name": partner_name,
        "dry_run": True,
        "verb": verb,
        "is_redeploy_workflow": workflow_name.startswith("redeploy_"),
        "object": "sbp",
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
        f"Deploy config for {scoped_subscription['description']}",
    }

    return {
        "playbook_name": "gap_ansible/playbooks/l3_core_service.yaml",
        "inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
        "extra_vars": extra_vars,
    }

provision_sbp_real(workflow_name, scoped_subscription, process_id, tt_number, edge_port_fqdn_list, partner_name, verb)

Deploy Service Binding Ports.

Source code in gso/workflows/l3_core_service/base_create_l3_core_service.py
@step("[FOR REAL] Deploy service binding port")
def provision_sbp_real(
    workflow_name: str,
    scoped_subscription: dict[str, Any],
    process_id: UUIDstr,
    tt_number: str,
    edge_port_fqdn_list: list[str],
    partner_name: str,
    verb: str,
) -> LSOState:
    """Deploy Service Binding Ports."""
    extra_vars = {
        "subscription": scoped_subscription,
        "partner_name": partner_name,
        "dry_run": False,
        "verb": verb,
        "is_redeploy_workflow": workflow_name.startswith("redeploy_"),
        "object": "sbp",
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
        f"Deploy config for {scoped_subscription['description']}",
    }

    return {
        "playbook_name": "gap_ansible/playbooks/l3_core_service.yaml",
        "inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
        "extra_vars": extra_vars,
    }

check_sbp_functionality(scoped_subscription, edge_port_fqdn_list)

Check functionality of deployed Service Binding Ports.

Source code in gso/workflows/l3_core_service/base_create_l3_core_service.py
@step("Check service binding port functionality")
def check_sbp_functionality(scoped_subscription: dict[str, Any], edge_port_fqdn_list: list[str]) -> LSOState:
    """Check functionality of deployed Service Binding Ports."""
    extra_vars = {"subscription": scoped_subscription, "verb": "check", "object": "sbp"}

    return {
        "playbook_name": "gap_ansible/playbooks/l3_core_service.yaml",
        "inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
        "extra_vars": extra_vars,
    }

deploy_bgp_peers_dry(workflow_name, scoped_subscription, edge_port_fqdn_list, tt_number, process_id, partner_name, verb)

Perform a dry run of deploying BGP peers.

Source code in gso/workflows/l3_core_service/base_create_l3_core_service.py
@step("[DRY RUN] Deploy BGP peers")
def deploy_bgp_peers_dry(
    workflow_name: str,
    scoped_subscription: dict[str, Any],
    edge_port_fqdn_list: list[str],
    tt_number: str,
    process_id: UUIDstr,
    partner_name: str,
    verb: str,
) -> LSOState:
    """Perform a dry run of deploying BGP peers."""
    extra_vars = {
        "subscription": scoped_subscription,
        "partner_name": partner_name,
        "verb": verb,
        "is_redeploy_workflow": workflow_name.startswith("redeploy_"),
        "object": "bgp",
        "dry_run": True,
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
        f"Deploying BGP peers for {scoped_subscription['description']}",
    }

    return {
        "playbook_name": "gap_ansible/playbooks/l3_core_service.yaml",
        "inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
        "extra_vars": extra_vars,
    }

deploy_bgp_peers_real(workflow_name, scoped_subscription, edge_port_fqdn_list, tt_number, process_id, partner_name, verb)

Deploy BGP peers.

Source code in gso/workflows/l3_core_service/base_create_l3_core_service.py
@step("[FOR REAL] Deploy BGP peers")
def deploy_bgp_peers_real(
    workflow_name: str,
    scoped_subscription: dict[str, Any],
    edge_port_fqdn_list: list[str],
    tt_number: str,
    process_id: UUIDstr,
    partner_name: str,
    verb: str,
) -> LSOState:
    """Deploy BGP peers."""
    extra_vars = {
        "subscription": scoped_subscription,
        "partner_name": partner_name,
        "verb": verb,
        "is_redeploy_workflow": workflow_name.startswith("redeploy_"),
        "object": "bgp",
        "dry_run": False,
        "commit_comment": f"GSO_PROCESS_ID: {process_id} - TT_NUMBER: {tt_number} - "
        f"Deploying BGP peers for {scoped_subscription['description']}",
    }

    return {
        "playbook_name": "gap_ansible/playbooks/l3_core_service.yaml",
        "inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
        "extra_vars": extra_vars,
    }

check_bgp_peers(scoped_subscription, edge_port_fqdn_list)

Check the correct deployment of BGP peers.

Source code in gso/workflows/l3_core_service/base_create_l3_core_service.py
@step("Check BGP peers")
def check_bgp_peers(scoped_subscription: dict[str, Any], edge_port_fqdn_list: list[str]) -> LSOState:
    """Check the correct deployment of BGP peers."""
    extra_vars = {"subscription": scoped_subscription, "verb": "check", "object": "bgp"}

    return {
        "playbook_name": "gap_ansible/playbooks/l3_core_service.yaml",
        "inventory": {"all": {"hosts": dict.fromkeys(edge_port_fqdn_list)}},
        "extra_vars": extra_vars,
    }

update_dns_records(subscription)

Update DNS records in Infoblox.

Source code in gso/workflows/l3_core_service/base_create_l3_core_service.py
@step("Update Infoblox")
def update_dns_records(subscription: SubscriptionModel) -> State:
    """Update DNS records in Infoblox."""
    #  TODO: implement
    return {"subscription": subscription}

create_new_sharepoint_checklist(subscription, tt_number, process_id)

Create a new checklist item in SharePoint for approving this L3 Core Service.

Source code in gso/workflows/l3_core_service/base_create_l3_core_service.py
@step("Create a new SharePoint checklist item")
def create_new_sharepoint_checklist(
    subscription: SubscriptionModel,
    tt_number: TTNumber,
    process_id: UUIDstr,
) -> State:
    """Create a new checklist item in SharePoint for approving this L3 Core Service."""
    new_ep = subscription.l3_core.ap_list[0].sbp.edge_port  # type: ignore[attr-defined]
    new_list_item_url = SharePointClient().add_list_item(
        list_name="l3_core_service",
        fields={
            "Title": f"{subscription.description}",
            "TT_NUMBER": tt_number,
            "ACTIVITY_TYPE": "Creation",
            "PRODUCT_TYPE": subscription.product.name,
            "LOCATION": f"{new_ep.edge_port_name} {new_ep.edge_port_description} on {new_ep.node.router_fqdn}",
            "GAP_PROCESS_URL": f"{load_oss_params().GENERAL.public_hostname}/workflows/{process_id}",
        },
    )

    return {"checklist_url": new_list_item_url}