Similar to a modify_workflow but the only required input user input is a TT number.
This decorator is based off the reconcile_workflow
in orchestrator-core
, but adds a default input form instead
of starting the workflow right away.
@redeploy_workflow("Redeploy Edge Port")
def redeploy_edge_port() -> StepList:
return (
begin
>> lso_interaction(redeploy_edge_port_dry)
>> lso_interaction(redeploy_edge_port_real)
)
Source code in gso/utils/workflow.py
| def redeploy_workflow(
description: str,
initial_input_form: InputStepFunc | None = None,
authorize_callback: Authorizer | None = None,
retry_auth_callback: Authorizer | None = None,
) -> Callable[[Callable[[], StepList]], Workflow]:
"""Similar to a modify_workflow but the only required input user input is a TT number.
This decorator is based off the `reconcile_workflow` in `orchestrator-core`, but adds a default input form instead
of starting the workflow right away.
```py
@redeploy_workflow("Redeploy Edge Port")
def redeploy_edge_port() -> StepList:
return (
begin
>> lso_interaction(redeploy_edge_port_dry)
>> lso_interaction(redeploy_edge_port_real)
)
```
"""
def _redeploy_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
subscription = SubscriptionModel.from_subscription(subscription_id)
partner_name = get_partner_by_id(subscription.customer_id).name
class RedeployForm(SubmitFormPage):
model_config = ConfigDict(title=f"Redeploy {subscription.product.name} for {partner_name}")
tt_number: TTNumber
divider: Divider = Field(None, exclude=True)
subscription_description: read_only_field(subscription.description) # type: ignore[valid-type]
partner: read_only_field(partner_name) # type: ignore[valid-type]
user_input = yield RedeployForm
return {"subscription": subscription, "partner_name": partner_name, "tt_number": user_input.tt_number}
input_form_generator = initial_input_form or _redeploy_input_form_generator
wrapped_redeploy_initial_input_form_generator = wrap_modify_initial_input_form(input_form_generator)
def _redeploy_workflow(f: Callable[[], StepList]) -> Workflow:
steplist = (
init >> store_process_subscription() >> unsync >> f() >> resync >> refresh_subscription_search_index >> done
)
return make_workflow(
f,
description,
wrapped_redeploy_initial_input_form_generator,
Target.MODIFY,
steplist,
authorize_callback=authorize_callback,
retry_auth_callback=retry_auth_callback,
)
return _redeploy_workflow
|