Skip to content

Modify ias

Modification workflow for a IAS subscription.

modify_ias_input_form_generator(subscription_id)

Initial form generator for modifying the custom attributes of an existing IAS subscription.

Source code in gso/workflows/l3_core_service/ias/modify_ias.py
def modify_ias_input_form_generator(subscription_id: UUIDstr) -> FormGenerator:
    """Initial form generator for modifying the custom attributes of an existing IAS subscription."""
    initial_generator = initial_input_form_generator(subscription_id)
    initial_user_input = yield from initial_generator

    subscription = IAS.from_subscription(subscription_id)

    # Additional IAS step
    class IASExtraForm(FormPage):
        # TODO: remove type hint workaround
        ias_flavor: IASFlavor | str = subscription.ias.ias_flavor

    ias_extra = yield IASExtraForm
    return initial_user_input | ias_extra.model_dump()

modify_ias()

Modify IAS subscription.

Source code in gso/workflows/l3_core_service/ias/modify_ias.py
@workflow(
    "Modify IAS",
    initial_input_form=wrap_modify_initial_input_form(modify_ias_input_form_generator),
    target=Target.MODIFY,
)
def modify_ias() -> StepList:
    """Modify IAS subscription."""
    access_port_is_added = conditional(lambda state: state["operation"] == Operation.ADD)
    access_port_is_removed = conditional(lambda state: state["operation"] == Operation.REMOVE)
    access_port_is_modified = conditional(lambda state: state["operation"] == Operation.EDIT)

    return (
        begin
        >> store_process_subscription(Target.MODIFY)
        >> unsync
        >> update_ias_subscription_model
        >> access_port_is_added(create_new_sbp)
        >> access_port_is_removed(remove_old_sbp)
        >> access_port_is_modified(modify_existing_sbp)
        >> resync
        >> done
    )