From fa061a2be39e2ef39e32aba339f2d7189692d37f Mon Sep 17 00:00:00 2001 From: daniela Date: Wed, 18 Jun 2025 00:22:04 -0700 Subject: [PATCH 1/2] IWF-936: fix FAIL_WORKFLOW_ON_FAILURE state option not working --- iwf/tests/__init__.py | 4 + iwf/tests/test_workflow_state_options.py | 74 +++++++++++++++++- iwf/tests/workflows/state_options_workflow.py | 78 +++++++++++++++++++ iwf/workflow_state_options.py | 11 --- 4 files changed, 154 insertions(+), 13 deletions(-) create mode 100644 iwf/tests/workflows/state_options_workflow.py diff --git a/iwf/tests/__init__.py b/iwf/tests/__init__.py index 0b733af..ee1e083 100644 --- a/iwf/tests/__init__.py +++ b/iwf/tests/__init__.py @@ -28,6 +28,8 @@ from iwf.tests.workflows.state_options_override_workflow import ( StateOptionsOverrideWorkflow, ) +from iwf.tests.workflows.state_options_workflow import (StateOptionsWorkflow1, + StateOptionsWorkflow2) from iwf.tests.workflows.timer_workflow import TimerWorkflow from iwf.tests.workflows.wait_for_state_with_state_execution_id_workflow import ( WaitForStateWithStateExecutionIdWorkflow, @@ -57,6 +59,8 @@ registry.add_workflow(RpcMemoWorkflow()) registry.add_workflow(RPCWorkflow()) registry.add_workflow(StateOptionsOverrideWorkflow()) +registry.add_workflow(StateOptionsWorkflow1()) +registry.add_workflow(StateOptionsWorkflow2()) registry.add_workflow(TimerWorkflow()) registry.add_workflow(WaitForStateWithStateExecutionIdWorkflow()) registry.add_workflow(WaitForStateWithWaitForKeyWorkflow()) diff --git a/iwf/tests/test_workflow_state_options.py b/iwf/tests/test_workflow_state_options.py index 0a6f23d..5892750 100644 --- a/iwf/tests/test_workflow_state_options.py +++ b/iwf/tests/test_workflow_state_options.py @@ -1,15 +1,28 @@ +import inspect +import time import unittest +from iwf.client import Client +from iwf.iwf_api.models import IDReusePolicy from iwf.iwf_api.models import ( PersistenceLoadingPolicy, PersistenceLoadingType, - WorkflowStateOptions as IdlWorkflowStateOptions, + WorkflowStateOptions as IdlWorkflowStateOptions, RetryPolicy, + WaitUntilApiFailurePolicy, ) - +from iwf.tests.worker_server import registry +from iwf.tests.workflows.state_options_workflow import (StateOptionsWorkflow1, + StateOptionsWorkflow2) +from iwf.workflow_options import WorkflowOptions from iwf.workflow_state_options import WorkflowStateOptions, _to_idl_state_options +from ..errors import WorkflowFailed class TestWorkflowStateOptions(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.client = Client(registry) + def test_convert_to_idl(self): empty_idl = IdlWorkflowStateOptions() assert empty_idl == _to_idl_state_options(False, None, {}) @@ -29,3 +42,60 @@ def test_convert_to_idl(self): assert non_empty_idl == _to_idl_state_options(True, non_empty, {}) non_empty.state_id = "state-id-2" assert non_empty.state_id == "state-id-2" + + """Test that proceed_to_execute_when_wait_until_retry_exhausted correctly handles both enum values.""" + def test_proceed_to_execute_when_wait_until_retry_exhausted(self): + retry_policy = RetryPolicy(maximum_attempts=1) + + # Test PROCEED_ON_FAILURE + options_proceed = WorkflowStateOptions( + proceed_to_execute_when_wait_until_retry_exhausted=WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE, + wait_until_api_retry_policy=retry_policy, + ) + result_proceed = _to_idl_state_options(False, options_proceed, {}) + assert result_proceed.wait_until_api_failure_policy == WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE + + # Test FAIL_WORKFLOW_ON_FAILURE + options_fail = WorkflowStateOptions( + proceed_to_execute_when_wait_until_retry_exhausted=WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE, + wait_until_api_retry_policy=retry_policy, + ) + result_fail = _to_idl_state_options(False, options_fail, {}) + assert result_fail.wait_until_api_failure_policy == WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE + + # Test with None/unset value + options = WorkflowStateOptions() + result = _to_idl_state_options(False, options, {}) + # By default, wait_until_api_failure_policy should not be set when proceed_to_execute_when_wait_until_retry_exhausted is None + # The IWF service will use FAIL_WORKFLOW_ON_FAILURE by default + from iwf.iwf_api.types import Unset + self.assertTrue(isinstance(result.wait_until_api_failure_policy, Unset)) + + def test_proceed_on_failure(self): + wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}" + self.client.start_workflow( + StateOptionsWorkflow1, + wf_id, + 10, + "input", + WorkflowOptions(workflow_id_reuse_policy=IDReusePolicy.DISALLOW_REUSE), + ) + output = self.client.wait_for_workflow_completion(wf_id) + + assert ( + output + == "InitState1_execute_completed" + ) + + def test_fail_workflow_on_failure(self): + wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}" + self.client.start_workflow( + StateOptionsWorkflow2, + wf_id, + 10, + "input", + WorkflowOptions(workflow_id_reuse_policy=IDReusePolicy.DISALLOW_REUSE), + ) + + with self.assertRaises(WorkflowFailed): + self.client.wait_for_workflow_completion(wf_id, str) diff --git a/iwf/tests/workflows/state_options_workflow.py b/iwf/tests/workflows/state_options_workflow.py new file mode 100644 index 0000000..32e969a --- /dev/null +++ b/iwf/tests/workflows/state_options_workflow.py @@ -0,0 +1,78 @@ +from iwf.command_request import CommandRequest +from iwf.command_results import CommandResults +from iwf.communication import Communication + +from iwf.iwf_api.models import RetryPolicy, WaitUntilApiFailurePolicy +from iwf.persistence import Persistence +from iwf.state_decision import StateDecision +from iwf.state_schema import StateSchema +from iwf.workflow import ObjectWorkflow +from iwf.workflow_context import WorkflowContext +from iwf.workflow_state import T, WorkflowState +from iwf.workflow_state_options import WorkflowStateOptions + + +class InitState1(WorkflowState[str]): + def get_state_options(self) -> WorkflowStateOptions: + return WorkflowStateOptions( + wait_until_api_retry_policy=RetryPolicy(maximum_attempts=1), + proceed_to_execute_when_wait_until_retry_exhausted=WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE, + ) + + def wait_until( + self, + ctx: WorkflowContext, + input: T, + persistence: Persistence, + communication: Communication, + ) -> CommandRequest: + raise RuntimeError("test failure") + + def execute( + self, + ctx: WorkflowContext, + input: T, + command_results: CommandResults, + persistence: Persistence, + communication: Communication, + ) -> StateDecision: + data = "InitState1_execute_completed" + return StateDecision.graceful_complete_workflow(data) + + +class InitState2(WorkflowState[str]): + def get_state_options(self) -> WorkflowStateOptions: + return WorkflowStateOptions( + wait_until_api_retry_policy=RetryPolicy(maximum_attempts=1), + proceed_to_execute_when_wait_until_retry_exhausted=WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE, + ) + + def wait_until( + self, + ctx: WorkflowContext, + input: T, + persistence: Persistence, + communication: Communication, + ) -> CommandRequest: + raise RuntimeError("test failure") + + def execute( + self, + ctx: WorkflowContext, + input: T, + command_results: CommandResults, + persistence: Persistence, + communication: Communication, + ) -> StateDecision: + data = "InitState2_execute_completed" + return StateDecision.graceful_complete_workflow(data) + + +class StateOptionsWorkflow1(ObjectWorkflow): + def get_workflow_states(self) -> StateSchema: + return StateSchema.with_starting_state(InitState1()) + +class StateOptionsWorkflow2(ObjectWorkflow): + def get_workflow_states(self) -> StateSchema: + return StateSchema.with_starting_state(InitState2()) + diff --git a/iwf/workflow_state_options.py b/iwf/workflow_state_options.py index 4340dab..66ce679 100644 --- a/iwf/workflow_state_options.py +++ b/iwf/workflow_state_options.py @@ -108,17 +108,6 @@ def _to_idl_state_options( ) if options.wait_until_api_retry_policy is not None: res.wait_until_api_retry_policy = options.wait_until_api_retry_policy - if options.proceed_to_execute_when_wait_until_retry_exhausted is not None: - if options.proceed_to_execute_when_wait_until_retry_exhausted: - res.wait_until_api_failure_policy = ( - WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE - ) - else: - res.wait_until_api_failure_policy = ( - WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE - ) - - pass if options.wait_until_api_timeout_seconds is not None: res.wait_until_api_timeout_seconds = options.wait_until_api_timeout_seconds if options.execute_api_retry_policy is not None: From 28d1217370b9d8507597d03d9d9fee0f19277c41 Mon Sep 17 00:00:00 2001 From: daniela Date: Wed, 18 Jun 2025 00:29:18 -0700 Subject: [PATCH 2/2] IWF-963: fix lint issues --- iwf/tests/__init__.py | 6 +++-- iwf/tests/test_workflow_state_options.py | 26 ++++++++++++------- iwf/tests/workflows/state_options_workflow.py | 2 +- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/iwf/tests/__init__.py b/iwf/tests/__init__.py index ee1e083..d2b2b65 100644 --- a/iwf/tests/__init__.py +++ b/iwf/tests/__init__.py @@ -28,8 +28,10 @@ from iwf.tests.workflows.state_options_override_workflow import ( StateOptionsOverrideWorkflow, ) -from iwf.tests.workflows.state_options_workflow import (StateOptionsWorkflow1, - StateOptionsWorkflow2) +from iwf.tests.workflows.state_options_workflow import ( + StateOptionsWorkflow1, + StateOptionsWorkflow2, +) from iwf.tests.workflows.timer_workflow import TimerWorkflow from iwf.tests.workflows.wait_for_state_with_state_execution_id_workflow import ( WaitForStateWithStateExecutionIdWorkflow, diff --git a/iwf/tests/test_workflow_state_options.py b/iwf/tests/test_workflow_state_options.py index 5892750..d024b50 100644 --- a/iwf/tests/test_workflow_state_options.py +++ b/iwf/tests/test_workflow_state_options.py @@ -7,12 +7,15 @@ from iwf.iwf_api.models import ( PersistenceLoadingPolicy, PersistenceLoadingType, - WorkflowStateOptions as IdlWorkflowStateOptions, RetryPolicy, + WorkflowStateOptions as IdlWorkflowStateOptions, + RetryPolicy, WaitUntilApiFailurePolicy, ) from iwf.tests.worker_server import registry -from iwf.tests.workflows.state_options_workflow import (StateOptionsWorkflow1, - StateOptionsWorkflow2) +from iwf.tests.workflows.state_options_workflow import ( + StateOptionsWorkflow1, + StateOptionsWorkflow2, +) from iwf.workflow_options import WorkflowOptions from iwf.workflow_state_options import WorkflowStateOptions, _to_idl_state_options from ..errors import WorkflowFailed @@ -44,6 +47,7 @@ def test_convert_to_idl(self): assert non_empty.state_id == "state-id-2" """Test that proceed_to_execute_when_wait_until_retry_exhausted correctly handles both enum values.""" + def test_proceed_to_execute_when_wait_until_retry_exhausted(self): retry_policy = RetryPolicy(maximum_attempts=1) @@ -53,7 +57,10 @@ def test_proceed_to_execute_when_wait_until_retry_exhausted(self): wait_until_api_retry_policy=retry_policy, ) result_proceed = _to_idl_state_options(False, options_proceed, {}) - assert result_proceed.wait_until_api_failure_policy == WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE + assert ( + result_proceed.wait_until_api_failure_policy + == WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE + ) # Test FAIL_WORKFLOW_ON_FAILURE options_fail = WorkflowStateOptions( @@ -61,7 +68,10 @@ def test_proceed_to_execute_when_wait_until_retry_exhausted(self): wait_until_api_retry_policy=retry_policy, ) result_fail = _to_idl_state_options(False, options_fail, {}) - assert result_fail.wait_until_api_failure_policy == WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE + assert ( + result_fail.wait_until_api_failure_policy + == WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE + ) # Test with None/unset value options = WorkflowStateOptions() @@ -69,6 +79,7 @@ def test_proceed_to_execute_when_wait_until_retry_exhausted(self): # By default, wait_until_api_failure_policy should not be set when proceed_to_execute_when_wait_until_retry_exhausted is None # The IWF service will use FAIL_WORKFLOW_ON_FAILURE by default from iwf.iwf_api.types import Unset + self.assertTrue(isinstance(result.wait_until_api_failure_policy, Unset)) def test_proceed_on_failure(self): @@ -82,10 +93,7 @@ def test_proceed_on_failure(self): ) output = self.client.wait_for_workflow_completion(wf_id) - assert ( - output - == "InitState1_execute_completed" - ) + assert output == "InitState1_execute_completed" def test_fail_workflow_on_failure(self): wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}" diff --git a/iwf/tests/workflows/state_options_workflow.py b/iwf/tests/workflows/state_options_workflow.py index 32e969a..1381217 100644 --- a/iwf/tests/workflows/state_options_workflow.py +++ b/iwf/tests/workflows/state_options_workflow.py @@ -72,7 +72,7 @@ class StateOptionsWorkflow1(ObjectWorkflow): def get_workflow_states(self) -> StateSchema: return StateSchema.with_starting_state(InitState1()) + class StateOptionsWorkflow2(ObjectWorkflow): def get_workflow_states(self) -> StateSchema: return StateSchema.with_starting_state(InitState2()) -