Skip to content

Conversation

@bradhe
Copy link
Contributor

@bradhe bradhe commented May 10, 2025

This new function allows users to wait for multiple runs that have been started at the same time. Currently, it just serially waits for each run to complete. In the future, we'll want to provide an async interface into this functionality.

@bradhe bradhe requested review from Copilot and datancoffee May 10, 2025 20:01
@bradhe bradhe force-pushed the feature/tow-402-update-wait_for_runs-to-take-a-list-of-runs-or-a-single-run branch from 7f990c8 to f302cca Compare May 10, 2025 20:02
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds a new helper function, wait_for_runs, to allow waiting for multiple runs and updates tests accordingly.

  • Renames one of the tests for clarity and adds a new test for waiting on multiple runs.
  • Introduces the wait_for_runs function in _client.py and updates the init.py exports.

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

File Description
tests/tower/test_client.py Renamed a test and added a new test for multiple runs.
src/tower/_client.py Added wait_for_runs which sequentially waits for runs.
src/tower/init.py Exposed the new wait_for_runs function.
Comments suppressed due to low confidence (1)

tests/tower/test_client.py:46

  • [nitpick] The function name 'test_waiting_for_a_run' may be misleading since it tests wait_for_run instead of the new wait_for_runs functionality. Consider renaming it to 'test_wait_for_run' for consistency.
def test_waiting_for_a_run(httpx_mock):

time.sleep(WAIT_TIMEOUT)


def wait_for_runs(runs: List[Run]) -> None:
Copy link

Copilot AI May 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring for wait_for_runs indicates it returns when any run reaches a terminal state, but the implementation sequentially waits for each run. Consider updating the docstring to accurately reflect this behavior.

Copilot uses AI. Check for mistakes.
of the runs fail.
"""
for run in runs:
wait_for_run(run)
Copy link
Contributor

@datancoffee datancoffee May 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bradhe This implementation is fine for the success case (all runs that we are waiting for are successful).

But in control flows that I am familiar with (e.g. Databricks or Google Dataflow), when people are waiting for 3 child apps, if one of them ends with a failure, the entire parent pipeline fails too.

I think this implementation is ok for v1, but we should think about

  • Returning run status in the wait_for_run()
  • in the wait_for_runs(), check for run status of the children, and fail the function if ANY of the child runs fails.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really good point. What do you think the default behavior should be? I was considering adding a flag that controls the failure behavior.

def wait_for_runs(runs: List[Runs], raise_on_failure: bool = True) -> None:

And if any of these fail, we'd raise an exception (and stop the parent pipeline). Perhaps that should be the default behavior? Likewise, perhaps it should return a tuple of (successful_runs, failed_runs) instead of None? What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bradhe If we look at how other orchestrators do it, they have a pretty complex framework for defining what to do in case of task failures, for a good reason (to allow users to reuse partial calculations and avoid rerunning the entire pipeline).

Airflow:
https://medium.com/@jaywang.recsys/apache-airflow-recovery-from-a-tasks-e66234b6d671
Dagster:
https://docs.dagster.io/guides/deploy/execution/run-retries

I think it would be too early to introduce these retry mechanisms, but at least we should tell users if the child app was successful or not. Then they can decide in the orchestrator app what to do: retry or abort.

The raise_on_error where we would throw an exception would be helpful in use cases when the user wants to stop executing the parent app without dealing with child app statuses. But if we add this parameter, it needs to be set to False by default, otherwise users won't be able to deal with failures themselves.

So, I think this means,

Option 1: the user deals with retries themselves and also decides whether to abort the parent app themselves.
def wait_for_run(run: Run) -> Bool:
returns success or failure

def wait_for_runs(runs: List[Run]) -> (successful_runs, failed_runs):
returns a list of successes/failures

Option 2: the user still has to deal with retries themself, but can tell us whether to abort the parent app or not.
def wait_for_run(run: Run, raise_on_failure: bool = False) -> Bool:
returns success or failure

def wait_for_runs(runs: List[Runs], raise_on_failure: bool = False) -> (successful_runs, failed_runs):
returns a list of successes/failures, or raises an exception on first failure among children, if user decided to do so

@datancoffee
Copy link
Contributor

datancoffee commented May 10, 2025

@bradhe I updated the docstrings

Copy link
Contributor

@datancoffee datancoffee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bradhe small nit.
With the new timeout related to the overall longevity of any child run being 1 day, perhaps WAIT_TIMEOUT should be renamed to something like POLL_FREQUENCY

WAIT_TIMEOUT is the amount of time to wait between requests when polling the

Tower API.

WAIT_TIMEOUT = 2

"""
for run in runs:
wait_for_run(run)
wait_for_run(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bradhe Wait, aren't we supposed to accumulate a list of successful and failed runs?
and return tuple[List[Run], List[Run]]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this was an incomplete implementation. Test coverage now validates this, too.


def wait_for_runs(runs: List[Run]) -> None:
elif _is_run_awaiting_completion(desc):
time.sleep(WAIT_TIMEOUT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bradhe sleeping inside the single-run function is fine when the parent app is just waiting for 1 child app.

But when the parent is waiting for 2 or more apps, then this waiting here will block the status checks for the entire run group.

This is fine for v1, where we assume the best case and that users want all children to succeed before proceeding to the next step of the parent app.

But I would recommend in future versions to implement a heart beat of 2 seconds where we check for status of all children at the same time.

If one of the children fails, and the raise_on_error is true, then the parent will correctly terminate.

In the v1 implementation, the parent will keep executing the first child even if the second child already failed.

Not a big deal right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've addressed this in the updated wait_for_runs implementation.

run: Run,
timeout: Optional[float] = 86_400.0, # one day
raise_on_failure: bool = False,
) -> Run:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bradhe See my comment about sleeping inside the wait_for_run vs wait_for_runs

Perhaps add a parameter "wait_for_completion: bool = True" which should be True when user waits for 1 run
But when this function is called by wait_for_runs, the parameter should be False, and the waiting should be done in wait_for_runs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've separated wait_for_run and wait_for_runs entirely to deal with timeout semantics separately, so no worries here.

Updated implementations ensure that we equally check runs to detect failures
part way through executions. Likewise, we add timeouts while talking to the
Tower API in case there are some operational problems on that side of things.
@bradhe bradhe requested review from Copilot and sankroh May 13, 2025 11:55
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces a new wait_for_runs function for waiting on multiple Tower runs and enhances the wait_for_run function with additional parameters and error handling. Key changes include:

  • Updates to test cases to verify the new wait_for_runs behavior.
  • Enhancements to the Tower client with added timeout, error raising, and improved status-check logic.
  • New exports and documentation updates in init.py to expose the new functionality.

Reviewed Changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated no comments.

File Description
tests/tower/test_client.py Added/updated tests to cover waiting for single and multiple run scenarios
src/tower/exceptions.py Introduced additional exceptions for run handling errors
src/tower/_client.py Enhanced run waiting functions and helper functions with improved errors
src/tower/init.py Exported the new wait_for_runs function
Comments suppressed due to low confidence (5)

src/tower/_client.py:141

  • The variable 't' is undefined; consider replacing it with the 'timeout' value or a descriptive timeout message.
raise TimeoutException(t)

src/tower/_client.py:218

  • The variable 't' is undefined; replace it with the intended timeout value or message.
raise TimeoutException(t)

src/tower/_client.py:157

  • The constant 'WAIT_TIMEOUT' is used without a definition; ensure it is defined or passed as a parameter.
time.sleep(WAIT_TIMEOUT)

src/tower/_client.py:166

  • There is a naming mismatch: the constant is defined as 'DEFAULT_RETIRES_ON_FAILURE' but used as 'DEFAULT_RETRIES_ON_FAILURE'. Please correct the spelling.
if retries >= DEFAULT_RETRIES_ON_FAILURE:

src/tower/_client.py:343

  • The 'httpx' module is not imported in this file yet is used for exception handling; add an import for httpx.
except httpx.TimeoutException:

@bradhe bradhe requested a review from Copilot May 13, 2025 12:00
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This pull request adds a new function, wait_for_runs, to allow waiting for multiple Tower runs until they reach terminal states. It also includes tests to validate waiting behavior and refines error handling and documentation in related modules.

  • Adds wait_for_runs in src/tower/_client.py.
  • Introduces tests in tests/tower/test_client.py to verify both successful and failed run scenarios.
  • Updates exception messages and init methods in src/tower/exceptions.py.

Reviewed Changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.

File Description
tests/tower/test_client.py New and updated tests for waiting on runs with multiple responses
src/tower/exceptions.py Added new exception classes and improved exception message text
src/tower/_client.py Introduced wait_for_runs along with minor refactors in wait_for_run and helper functions
src/tower/init.py Exported the new wait_for_runs function


if _is_successful_run(desc):
successful_runs.append(desc)
awaiting_runs.remove(run)
Copy link

Copilot AI May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modifying the 'awaiting_runs' list while iterating over it can lead to unexpected behavior. Consider iterating over a copy of the list (e.g., for run in awaiting_runs[:]) to safely remove items.

Copilot uses AI. Check for mistakes.
bradhe and others added 2 commits May 13, 2025 14:27
Thanks @copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@bradhe bradhe requested review from Copilot and datancoffee May 13, 2025 12:35
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds a simple wait_for_runs function to wait for multiple Tower app runs to complete serially while retaining the existing wait_for_run functionality. Key changes include new tests for waiting on a single run, waiting on multiple runs, and handling partial failures, plus enhancements to the API client with improved error handling and timeout management.

Reviewed Changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated no comments.

File Description
tests/tower/test_client.py Added fixtures and tests for validating wait_for_run and wait_for_runs
src/tower/exceptions.py New exception classes for various error conditions
src/tower/_client.py Added wait_for_run and wait_for_runs functions with updated polling logic and error handling
src/tower/init.py Updated to export wait_for_runs
Comments suppressed due to low confidence (2)

src/tower/_client.py:158

  • The identifier 'WAIT_TIMEOUT' is used but not defined in this module. Consider defining it as a constant (e.g. WAIT_TIMEOUT = 2) or passing it in via configuration to avoid runtime NameError.
time.sleep(WAIT_TIMEOUT)

src/tower/_client.py:235

  • The identifier 'WAIT_TIMEOUT' is used but not defined in this module. Ensure that WAIT_TIMEOUT is declared (or imported) appropriately to guarantee consistent timeout behavior in wait_for_runs.
time.sleep(WAIT_TIMEOUT)

@datancoffee
Copy link
Contributor

Solid piece of German engineering. Approved.

Copy link
Contributor

@datancoffee datancoffee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solid work.

@bradhe bradhe merged commit 94abe82 into develop May 13, 2025
3 checks passed
@bradhe bradhe deleted the feature/tow-402-update-wait_for_runs-to-take-a-list-of-runs-or-a-single-run branch May 13, 2025 13:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants