Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions iwf/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
from iwf.tests.workflows.state_options_override_workflow import (
StateOptionsOverrideWorkflow,
)
from iwf.tests.workflows.state_options_workflow import (
StateOptionsWorkflow1,
StateOptionsWorkflow2,
)
from iwf.tests.workflows.timer_workflow import TimerWorkflow
from iwf.tests.workflows.wait_for_state_with_state_execution_id_workflow import (
WaitForStateWithStateExecutionIdWorkflow,
Expand Down Expand Up @@ -57,6 +61,8 @@
registry.add_workflow(RpcMemoWorkflow())
registry.add_workflow(RPCWorkflow())
registry.add_workflow(StateOptionsOverrideWorkflow())
registry.add_workflow(StateOptionsWorkflow1())
registry.add_workflow(StateOptionsWorkflow2())
registry.add_workflow(TimerWorkflow())
registry.add_workflow(WaitForStateWithStateExecutionIdWorkflow())
registry.add_workflow(WaitForStateWithWaitForKeyWorkflow())
Expand Down
80 changes: 79 additions & 1 deletion iwf/tests/test_workflow_state_options.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,31 @@
import inspect
import time
import unittest

from iwf.client import Client
from iwf.iwf_api.models import IDReusePolicy
from iwf.iwf_api.models import (
PersistenceLoadingPolicy,
PersistenceLoadingType,
WorkflowStateOptions as IdlWorkflowStateOptions,
RetryPolicy,
WaitUntilApiFailurePolicy,
)

from iwf.tests.worker_server import registry
from iwf.tests.workflows.state_options_workflow import (
StateOptionsWorkflow1,
StateOptionsWorkflow2,
)
from iwf.workflow_options import WorkflowOptions
from iwf.workflow_state_options import WorkflowStateOptions, _to_idl_state_options
from ..errors import WorkflowFailed


class TestWorkflowStateOptions(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.client = Client(registry)

def test_convert_to_idl(self):
empty_idl = IdlWorkflowStateOptions()
assert empty_idl == _to_idl_state_options(False, None, {})
Expand All @@ -29,3 +45,65 @@ def test_convert_to_idl(self):
assert non_empty_idl == _to_idl_state_options(True, non_empty, {})
non_empty.state_id = "state-id-2"
assert non_empty.state_id == "state-id-2"

"""Test that proceed_to_execute_when_wait_until_retry_exhausted correctly handles both enum values."""

def test_proceed_to_execute_when_wait_until_retry_exhausted(self):
retry_policy = RetryPolicy(maximum_attempts=1)

# Test PROCEED_ON_FAILURE
options_proceed = WorkflowStateOptions(
proceed_to_execute_when_wait_until_retry_exhausted=WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE,
wait_until_api_retry_policy=retry_policy,
)
result_proceed = _to_idl_state_options(False, options_proceed, {})
assert (
result_proceed.wait_until_api_failure_policy
== WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE
)

# Test FAIL_WORKFLOW_ON_FAILURE
options_fail = WorkflowStateOptions(
proceed_to_execute_when_wait_until_retry_exhausted=WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE,
wait_until_api_retry_policy=retry_policy,
)
result_fail = _to_idl_state_options(False, options_fail, {})
assert (
result_fail.wait_until_api_failure_policy
== WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE
)

# Test with None/unset value
options = WorkflowStateOptions()
result = _to_idl_state_options(False, options, {})
# By default, wait_until_api_failure_policy should not be set when proceed_to_execute_when_wait_until_retry_exhausted is None
# The IWF service will use FAIL_WORKFLOW_ON_FAILURE by default
from iwf.iwf_api.types import Unset

self.assertTrue(isinstance(result.wait_until_api_failure_policy, Unset))

def test_proceed_on_failure(self):
Copy link
Contributor Author

@dlukiantsev dlukiantsev Jun 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below are integration tests, actually testing the behavior with running workflow.

wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
self.client.start_workflow(
StateOptionsWorkflow1,
wf_id,
10,
"input",
WorkflowOptions(workflow_id_reuse_policy=IDReusePolicy.DISALLOW_REUSE),
)
output = self.client.wait_for_workflow_completion(wf_id)

assert output == "InitState1_execute_completed"

def test_fail_workflow_on_failure(self):
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
self.client.start_workflow(
StateOptionsWorkflow2,
wf_id,
10,
"input",
WorkflowOptions(workflow_id_reuse_policy=IDReusePolicy.DISALLOW_REUSE),
)

with self.assertRaises(WorkflowFailed):
self.client.wait_for_workflow_completion(wf_id, str)
78 changes: 78 additions & 0 deletions iwf/tests/workflows/state_options_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from iwf.command_request import CommandRequest
from iwf.command_results import CommandResults
from iwf.communication import Communication

from iwf.iwf_api.models import RetryPolicy, WaitUntilApiFailurePolicy
from iwf.persistence import Persistence
from iwf.state_decision import StateDecision
from iwf.state_schema import StateSchema
from iwf.workflow import ObjectWorkflow
from iwf.workflow_context import WorkflowContext
from iwf.workflow_state import T, WorkflowState
from iwf.workflow_state_options import WorkflowStateOptions


class InitState1(WorkflowState[str]):
Copy link
Contributor Author

@dlukiantsev dlukiantsev Jun 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added two workflows with a single state. One tests that upon waitUntil fails, it proceeds to execute() and completes the workflow. The other, tests that upon waitUntil fails, it fails the workflow (does not proceed to execute).

def get_state_options(self) -> WorkflowStateOptions:
return WorkflowStateOptions(
wait_until_api_retry_policy=RetryPolicy(maximum_attempts=1),
proceed_to_execute_when_wait_until_retry_exhausted=WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE,
)

def wait_until(
self,
ctx: WorkflowContext,
input: T,
persistence: Persistence,
communication: Communication,
) -> CommandRequest:
raise RuntimeError("test failure")

def execute(
self,
ctx: WorkflowContext,
input: T,
command_results: CommandResults,
persistence: Persistence,
communication: Communication,
) -> StateDecision:
data = "InitState1_execute_completed"
return StateDecision.graceful_complete_workflow(data)


class InitState2(WorkflowState[str]):
def get_state_options(self) -> WorkflowStateOptions:
return WorkflowStateOptions(
wait_until_api_retry_policy=RetryPolicy(maximum_attempts=1),
proceed_to_execute_when_wait_until_retry_exhausted=WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE,
)

def wait_until(
self,
ctx: WorkflowContext,
input: T,
persistence: Persistence,
communication: Communication,
) -> CommandRequest:
raise RuntimeError("test failure")

def execute(
self,
ctx: WorkflowContext,
input: T,
command_results: CommandResults,
persistence: Persistence,
communication: Communication,
) -> StateDecision:
data = "InitState2_execute_completed"
return StateDecision.graceful_complete_workflow(data)


class StateOptionsWorkflow1(ObjectWorkflow):
def get_workflow_states(self) -> StateSchema:
return StateSchema.with_starting_state(InitState1())


class StateOptionsWorkflow2(ObjectWorkflow):
def get_workflow_states(self) -> StateSchema:
return StateSchema.with_starting_state(InitState2())
11 changes: 0 additions & 11 deletions iwf/workflow_state_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,6 @@ def _to_idl_state_options(
)
if options.wait_until_api_retry_policy is not None:
res.wait_until_api_retry_policy = options.wait_until_api_retry_policy
if options.proceed_to_execute_when_wait_until_retry_exhausted is not None:
if options.proceed_to_execute_when_wait_until_retry_exhausted:
Copy link
Member

@samuel27m samuel27m Jun 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is the correct fix here. In the Java SDKs, the property passed in this option is boolean. I believe we should instead change the specification to take a bool value, and then use it here.

I'll PM you with a couple links of some of our internal examples, so that you can see how it is currently working in the Java SDK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why in the python sdk Long decided to use enum instead of boolean, but the result should be the same.

Let me explain why I am removing this code block. First, the correct behavior is already implemented in line 95, so this code block was unnecessarily reassigning the value again. In addition to that, this duplicate code block had a bug. It was using the enum as conditional which was always evaluating to true, so the else: was never reached.

Since line 95 already assigns this value correctly, the best approach is to delete this duplicate code block.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense 👍

res.wait_until_api_failure_policy = (
WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE
)
else:
res.wait_until_api_failure_policy = (
WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE
)

pass
if options.wait_until_api_timeout_seconds is not None:
res.wait_until_api_timeout_seconds = options.wait_until_api_timeout_seconds
if options.execute_api_retry_policy is not None:
Expand Down