Skip to content

Import edge port

Module for process-related API endpoints.

import_workflows_bulk(json_data=Body(...)) async

Import multiple subscriptions using a specified workflow.

Source code in gso/api/v1/import_edge_port.py
@router.post(
    "/import/import_edge_port",
    response_model=dict[str, list[dict]],
    status_code=HTTPStatus.OK,
    dependencies=[Depends(check_global_lock, use_cache=False)],
)
async def import_workflows_bulk(
    json_data: list[dict[str, Any]] | None = Body(...),  # noqa: B008
) -> dict[str, list[dict]]:
    """Import multiple subscriptions using a specified workflow."""
    items = json_data or []

    logger.info(
        "Starting bulk import tasks",
        import_wf="import_edge_port",
        create_imported_wf="create_imported_edge_port",
        item_count=len(items),
    )

    results: list[dict] = []
    for idx, item in enumerate(items):
        try:
            # Step 1: create an imported subscription
            node = get_active_router_subscription_id(item["node"])
            if node is None:
                raise HTTPException(  # noqa: TRY301
                    HTTPStatus.BAD_REQUEST,
                    "There is no node with the given FQDN in the request item.",
                )
            item["node"] = node
            initial_data = EdgePortImportModel(**item)
            create_imported_process = _execute_workflow_and_wait(
                workflow_key="create_imported_edge_port",
                user_inputs=[initial_data.model_dump()],
                check_interval=1,
                max_retries=300,
            )
            subscription = _extract_subscription_or_raise(create_imported_process)

            # Step 2: convert it into a normal subscription
            import_process = _execute_workflow_and_wait(
                workflow_key="import_edge_port",
                user_inputs=[subscription],
                check_interval=1,
                max_retries=300,
            )
            final_subscription = get_subscription_by_process_id(import_process.process_id)
            if not final_subscription:
                raise HTTPException(  # noqa: TRY301
                    HTTPStatus.NOT_FOUND,
                    f"Subscription not found for process ID: {import_process.process_id}",
                )

            results.append({
                "index": idx,
                "item": item,
                "id": str(final_subscription.subscription_id),
            })

        except FormValidationError as ve:
            # Split multi line validation errors into list
            detail = str(ve)
            lines = [line.strip() for line in detail.split("\n") if line.strip()]
            results.append({
                "index": idx,
                "item": item,
                "errors": lines,
            })
        except HTTPException as he:
            detail = str(he.detail)
            lines = [line.strip() for line in detail.split("\n") if line.strip()]
            results.append({
                "index": idx,
                "item": item,
                "errors": lines,
            })
        except ValidationError as ve:
            detail = str(ve)
            lines = [line.strip() for line in detail.split("\n") if line.strip()]
            results.append({
                "index": idx,
                "item": item,
                "errors": lines,
            })
        except Exception as e:  # noqa: BLE001
            results.append({
                "index": idx,
                "item": item,
                "errors": [f"Unexpected error: {e}"],
            })

    return {
        "successes": [r for r in results if "errors" not in r],
        "failures": [r for r in results if "errors" in r],
    }