Task for redeploying base config on multiple routers at one.
This task spawns multiple instances of the redeploy_base_config workflow, based on a list of Nokia routers given as
input by the operator. The operator can then
start_redeploy_workflows(tt_number, selected_routers, callback_route)
Start the massive redeploy_base_config_task with the selected routers.
Source code in gso/workflows/tasks/redeploy_base_config.py
| @step("Start worker to redeploy base config on selected routers")
def start_redeploy_workflows(tt_number: TTNumber, selected_routers: list[UUID], callback_route: str) -> State:
"""Start the massive ``redeploy_base_config_task`` with the selected routers."""
wf_payloads = []
for subscription_id in selected_routers:
router = Router.from_subscription(subscription_id)
wf_payload = BulkWfPayload(
workflow_key="redeploy_base_config",
user_inputs=[
{"subscription_id": str(subscription_id)},
{"tt_number": tt_number, "is_massive_redeploy": True},
],
callback_route=callback_route,
identifier=f"{router.description}: {generate_subscription_link(router.subscription_id)}",
)
wf_payloads.append(wf_payload.model_dump())
bulk_wf_task.apply_async( # type: ignore[attr-defined]
args=[wf_payloads, callback_route, _SUCCESS_KEY, _FAILURE_KEY], countdown=5
)
return {_FAILURE_KEY: {}, _SUCCESS_KEY: {}}
|
evaluate_results(callback_result)
Evaluate the result of the provisioning proxy callback.
Source code in gso/workflows/tasks/redeploy_base_config.py
| @step("Evaluate provisioning proxy result")
def evaluate_results(callback_result: dict) -> State:
"""Evaluate the result of the provisioning proxy callback."""
failed_wfs = callback_result.pop(_FAILURE_KEY, {})
successful_wfs = callback_result.pop(_SUCCESS_KEY, {})
return {"callback_result": callback_result, _FAILURE_KEY: failed_wfs, _SUCCESS_KEY: successful_wfs}
|
workflows_failed_to_prompt(failed_wfs, successful_wfs)
Prompt the operator that some workflows have failed to start.
Source code in gso/workflows/tasks/redeploy_base_config.py
| @inputstep("Some workflows have failed", assignee=Assignee.SYSTEM)
def workflows_failed_to_prompt(failed_wfs: dict, successful_wfs: dict) -> FormGenerator:
"""Prompt the operator that some workflows have failed to start."""
class WFFailurePrompt(SubmitFormPage):
model_config = ConfigDict(title="Some redeploy workflows have failed, please inspect the list below")
failed_workflows: LongText = json.dumps(failed_wfs, indent=4)
successful_workflows: LongText = json.dumps(successful_wfs, indent=4)
yield WFFailurePrompt
return {}
|
task_redeploy_base_config()
Gather a list of routers from the operator to redeploy base config onto.
Source code in gso/workflows/tasks/redeploy_base_config.py
| @workflow("Redeploy base config on multiple routers", initial_input_form=_input_form_generator, target=Target.SYSTEM)
def task_redeploy_base_config() -> StepList:
"""Gather a list of routers from the operator to redeploy base config onto."""
some_failed_to_start = conditional(lambda state: len(state.get(_FAILURE_KEY, {})) > 0)
return (
begin
>> callback_step(
name="Start running redeploy workflows on selected routers",
action_step=start_redeploy_workflows,
validate_step=evaluate_results,
)
>> some_failed_to_start(workflows_failed_to_prompt)
>> done
)
|