diff --git a/iwf/tests/__init__.py b/iwf/tests/__init__.py index 0b733af..d2b2b65 100644 --- a/iwf/tests/__init__.py +++ b/iwf/tests/__init__.py @@ -28,6 +28,10 @@ from iwf.tests.workflows.state_options_override_workflow import ( StateOptionsOverrideWorkflow, ) +from iwf.tests.workflows.state_options_workflow import ( + StateOptionsWorkflow1, + StateOptionsWorkflow2, +) from iwf.tests.workflows.timer_workflow import TimerWorkflow from iwf.tests.workflows.wait_for_state_with_state_execution_id_workflow import ( WaitForStateWithStateExecutionIdWorkflow, @@ -57,6 +61,8 @@ registry.add_workflow(RpcMemoWorkflow()) registry.add_workflow(RPCWorkflow()) registry.add_workflow(StateOptionsOverrideWorkflow()) +registry.add_workflow(StateOptionsWorkflow1()) +registry.add_workflow(StateOptionsWorkflow2()) registry.add_workflow(TimerWorkflow()) registry.add_workflow(WaitForStateWithStateExecutionIdWorkflow()) registry.add_workflow(WaitForStateWithWaitForKeyWorkflow()) diff --git a/iwf/tests/test_workflow_state_options.py b/iwf/tests/test_workflow_state_options.py index 0a6f23d..d024b50 100644 --- a/iwf/tests/test_workflow_state_options.py +++ b/iwf/tests/test_workflow_state_options.py @@ -1,15 +1,31 @@ +import inspect +import time import unittest +from iwf.client import Client +from iwf.iwf_api.models import IDReusePolicy from iwf.iwf_api.models import ( PersistenceLoadingPolicy, PersistenceLoadingType, WorkflowStateOptions as IdlWorkflowStateOptions, + RetryPolicy, + WaitUntilApiFailurePolicy, ) - +from iwf.tests.worker_server import registry +from iwf.tests.workflows.state_options_workflow import ( + StateOptionsWorkflow1, + StateOptionsWorkflow2, +) +from iwf.workflow_options import WorkflowOptions from iwf.workflow_state_options import WorkflowStateOptions, _to_idl_state_options +from ..errors import WorkflowFailed class TestWorkflowStateOptions(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.client = Client(registry) + def test_convert_to_idl(self): empty_idl = IdlWorkflowStateOptions() assert empty_idl == _to_idl_state_options(False, None, {}) @@ -29,3 +45,65 @@ def test_convert_to_idl(self): assert non_empty_idl == _to_idl_state_options(True, non_empty, {}) non_empty.state_id = "state-id-2" assert non_empty.state_id == "state-id-2" + + """Test that proceed_to_execute_when_wait_until_retry_exhausted correctly handles both enum values.""" + + def test_proceed_to_execute_when_wait_until_retry_exhausted(self): + retry_policy = RetryPolicy(maximum_attempts=1) + + # Test PROCEED_ON_FAILURE + options_proceed = WorkflowStateOptions( + proceed_to_execute_when_wait_until_retry_exhausted=WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE, + wait_until_api_retry_policy=retry_policy, + ) + result_proceed = _to_idl_state_options(False, options_proceed, {}) + assert ( + result_proceed.wait_until_api_failure_policy + == WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE + ) + + # Test FAIL_WORKFLOW_ON_FAILURE + options_fail = WorkflowStateOptions( + proceed_to_execute_when_wait_until_retry_exhausted=WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE, + wait_until_api_retry_policy=retry_policy, + ) + result_fail = _to_idl_state_options(False, options_fail, {}) + assert ( + result_fail.wait_until_api_failure_policy + == WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE + ) + + # Test with None/unset value + options = WorkflowStateOptions() + result = _to_idl_state_options(False, options, {}) + # By default, wait_until_api_failure_policy should not be set when proceed_to_execute_when_wait_until_retry_exhausted is None + # The IWF service will use FAIL_WORKFLOW_ON_FAILURE by default + from iwf.iwf_api.types import Unset + + self.assertTrue(isinstance(result.wait_until_api_failure_policy, Unset)) + + def test_proceed_on_failure(self): + wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}" + self.client.start_workflow( + StateOptionsWorkflow1, + wf_id, + 10, + "input", + WorkflowOptions(workflow_id_reuse_policy=IDReusePolicy.DISALLOW_REUSE), + ) + output = self.client.wait_for_workflow_completion(wf_id) + + assert output == "InitState1_execute_completed" + + def test_fail_workflow_on_failure(self): + wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}" + self.client.start_workflow( + StateOptionsWorkflow2, + wf_id, + 10, + "input", + WorkflowOptions(workflow_id_reuse_policy=IDReusePolicy.DISALLOW_REUSE), + ) + + with self.assertRaises(WorkflowFailed): + self.client.wait_for_workflow_completion(wf_id, str) diff --git a/iwf/tests/workflows/state_options_workflow.py b/iwf/tests/workflows/state_options_workflow.py new file mode 100644 index 0000000..1381217 --- /dev/null +++ b/iwf/tests/workflows/state_options_workflow.py @@ -0,0 +1,78 @@ +from iwf.command_request import CommandRequest +from iwf.command_results import CommandResults +from iwf.communication import Communication + +from iwf.iwf_api.models import RetryPolicy, WaitUntilApiFailurePolicy +from iwf.persistence import Persistence +from iwf.state_decision import StateDecision +from iwf.state_schema import StateSchema +from iwf.workflow import ObjectWorkflow +from iwf.workflow_context import WorkflowContext +from iwf.workflow_state import T, WorkflowState +from iwf.workflow_state_options import WorkflowStateOptions + + +class InitState1(WorkflowState[str]): + def get_state_options(self) -> WorkflowStateOptions: + return WorkflowStateOptions( + wait_until_api_retry_policy=RetryPolicy(maximum_attempts=1), + proceed_to_execute_when_wait_until_retry_exhausted=WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE, + ) + + def wait_until( + self, + ctx: WorkflowContext, + input: T, + persistence: Persistence, + communication: Communication, + ) -> CommandRequest: + raise RuntimeError("test failure") + + def execute( + self, + ctx: WorkflowContext, + input: T, + command_results: CommandResults, + persistence: Persistence, + communication: Communication, + ) -> StateDecision: + data = "InitState1_execute_completed" + return StateDecision.graceful_complete_workflow(data) + + +class InitState2(WorkflowState[str]): + def get_state_options(self) -> WorkflowStateOptions: + return WorkflowStateOptions( + wait_until_api_retry_policy=RetryPolicy(maximum_attempts=1), + proceed_to_execute_when_wait_until_retry_exhausted=WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE, + ) + + def wait_until( + self, + ctx: WorkflowContext, + input: T, + persistence: Persistence, + communication: Communication, + ) -> CommandRequest: + raise RuntimeError("test failure") + + def execute( + self, + ctx: WorkflowContext, + input: T, + command_results: CommandResults, + persistence: Persistence, + communication: Communication, + ) -> StateDecision: + data = "InitState2_execute_completed" + return StateDecision.graceful_complete_workflow(data) + + +class StateOptionsWorkflow1(ObjectWorkflow): + def get_workflow_states(self) -> StateSchema: + return StateSchema.with_starting_state(InitState1()) + + +class StateOptionsWorkflow2(ObjectWorkflow): + def get_workflow_states(self) -> StateSchema: + return StateSchema.with_starting_state(InitState2()) diff --git a/iwf/workflow_state_options.py b/iwf/workflow_state_options.py index 4340dab..66ce679 100644 --- a/iwf/workflow_state_options.py +++ b/iwf/workflow_state_options.py @@ -108,17 +108,6 @@ def _to_idl_state_options( ) if options.wait_until_api_retry_policy is not None: res.wait_until_api_retry_policy = options.wait_until_api_retry_policy - if options.proceed_to_execute_when_wait_until_retry_exhausted is not None: - if options.proceed_to_execute_when_wait_until_retry_exhausted: - res.wait_until_api_failure_policy = ( - WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE - ) - else: - res.wait_until_api_failure_policy = ( - WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE - ) - - pass if options.wait_until_api_timeout_seconds is not None: res.wait_until_api_timeout_seconds = options.wait_until_api_timeout_seconds if options.execute_api_retry_policy is not None: