-
Notifications
You must be signed in to change notification settings - Fork 9
feat: Add job cancellation for early stop of the job execution. #137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
## Walkthrough
This update adds job and task cancellation features, including schema changes, storage API extensions, worker cancellation checks, and related tests. It also introduces thread-safe `ExecutorHandle` management and updates task executor state methods.
## Changes
| File(s) | Summary |
|-----------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------|
| src/spider/CMakeLists.txt, src/spider/worker/ExecutorHandle.cpp, src/spider/worker/ExecutorHandle.hpp | Adds `ExecutorHandle` class and integrates it into the build process. |
| src/spider/client/Job.hpp | Implements `cancel()` and `get_error()` methods for job cancellation and error retrieval. |
| src/spider/client/TaskContext.cpp | Adds `abort()` method to cancel a job from within a task, with immediate process termination. |
| src/spider/storage/MetadataStorage.hpp, src/spider/storage/mysql/MySqlStorage.hpp | Extends storage API with job/task cancellation and error message retrieval methods. |
| src/spider/storage/mysql/MySqlStorage.cpp | Implements job/task cancellation and error retrieval in MySQL storage; updates `task_fail` logic. |
| src/spider/storage/mysql/mysql_stmt.hpp, tools/scripts/storage/init_db.sql | Adds new `job_errors` table and corresponding SQL schema for storing job error messages. |
| src/spider/worker/TaskExecutor.cpp, src/spider/worker/TaskExecutor.hpp | Renames task state query methods; adds `cancelled()` method. |
| src/spider/worker/worker.cpp | Implements cooperative task cancellation in the worker, integrates with heartbeat, and updates control flow. |
| tests/CMakeLists.txt, tests/client/cancel-test.cpp, tests/integration/test_cancel.py | Adds new client and integration tests for cancellation; updates test build configuration. |
| tests/storage/test-MetadataStorage.cpp | Adds unit tests for job cancellation and cancellation by task. |
| tests/worker/worker-test.cpp, tests/worker/worker-test.hpp | Adds test tasks for sleep and abort scenarios; updates test task declarations. |
| tests/worker/test-TaskExecutor.cpp | Updates test assertions to use new task executor state method names. |
## Sequence Diagram(s)
```mermaid
sequenceDiagram
participant Client
participant Driver
participant MetadataStorage
participant Worker
participant ExecutorHandle
participant TaskExecutor
Client->>Driver: submit_job()
Driver->>MetadataStorage: create_job()
MetadataStorage-->>Driver: job_id
Driver-->>Client: Job
Client->>Job: cancel()
Job->>MetadataStorage: cancel_job(job_id, message)
MetadataStorage-->>Job: result
loop Heartbeat
Worker->>ExecutorHandle: get_task_id()
alt Task running
Worker->>MetadataStorage: get_task_state(task_id)
MetadataStorage-->>Worker: state
alt state == Cancelled
Worker->>ExecutorHandle: get_executor()
Worker->>TaskExecutor: cancel()
end
end
end
Worker->>TaskExecutor: run()
alt Task cancelled
TaskExecutor-->>Worker: is_cancelled() == true
Worker-->>MetadataStorage: update task/job state to Cancelled
else Task completes
Worker-->>MetadataStorage: update task/job state to Succeeded/Failed
endSuggested reviewers
|
davidlion
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this PR and future PRs, we need to try and do a better job of explaining what and why in PR descriptions and titles. This is important as we want other engineers in YScope to reference these PRs and understand them without needing to read all the code.
Currently, both the title and description just list the names of things that were added with little to no explanation on their motivation, usage, or design.
| return; | ||
| } | ||
|
|
||
| if (spider::core::TaskState::Canceled != task_state) { | ||
| return; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function can fail so we should return an error code.
Co-authored-by: davidlion <davidlion2@protonmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (6)
src/spider/worker/ExecutorHandle.hpp (2)
27-28: Move implementation comment to class documentation.The raw pointer comment should be moved to the class documentation as suggested in previous reviews, since it explains an important design decision.
/** * This class acts as a handle for thread-safe access to the task executor and task id. * It maintains a weak reference to the task executor to prevent multiple destructor calls and - * ensures that access remains valid only while the executor itself is valid. + * ensures that access remains valid only while the executor itself is valid. + * + * Note: Uses a raw pointer instead of std::shared_ptr to avoid calling the TaskExecutor + * destructor multiple times. */ class ExecutorHandle { public: [[nodiscard]] auto get_task_id() -> std::optional<boost::uuids::uuid>; auto executor_cancel() -> void; auto set(boost::uuids::uuid task_id, TaskExecutor* executor) -> void; auto clear() -> void; private: boost::uuids::uuid m_task_id; - // Do not use std::shared_ptr to avoid calling destructor twice. TaskExecutor* m_executor = nullptr; std::mutex m_mutex;
25-30: Follow coding guidelines for member variable ordering.According to the coding guidelines referenced in past reviews, consider reordering member variables to follow the established pattern.
private: + std::mutex m_mutex; + boost::uuids::uuid m_task_id; - - // Do not use std::shared_ptr to avoid calling destructor twice. TaskExecutor* m_executor = nullptr; - - std::mutex m_mutex;src/spider/client/Job.hpp (2)
87-100: Refactor duplicated storage connection pattern.This method contains the same storage connection acquisition pattern that was flagged in previous reviews for refactoring into a helper method.
The connection acquisition logic is repeated across multiple methods and should be extracted into a helper as discussed in previous reviews and tracked in issue #158.
173-197: Simplify branching logic and address code duplication.The method has two issues:
- Contains the same duplicated storage connection pattern mentioned in previous reviews
- The branching logic for handling connection vs no-connection cases can be simplified
Consider refactoring to eliminate code duplication:
auto get_error() -> std::pair<std::string, std::string> { - if (nullptr == m_conn) { - std::variant<std::unique_ptr<core::StorageConnection>, core::StorageErr> conn_result - = m_storage_factory->provide_storage_connection(); - if (std::holds_alternative<core::StorageErr>(conn_result)) { - throw ConnectionException(std::get<core::StorageErr>(conn_result).description); - } - auto conn = std::move(std::get<std::unique_ptr<core::StorageConnection>>(conn_result)); - - std::pair<std::string, std::string> res; - core::StorageErr const err - = m_metadata_storage->get_error_message(*conn, m_id, &res.first, &res.second); - if (false == err.success()) { - throw ConnectionException{err.description}; - } - return res; - } - + auto* connection = m_conn.get(); + std::unique_ptr<core::StorageConnection> temp_conn; + + if (nullptr == connection) { + // Use helper method once available (issue #158) + std::variant<std::unique_ptr<core::StorageConnection>, core::StorageErr> conn_result + = m_storage_factory->provide_storage_connection(); + if (std::holds_alternative<core::StorageErr>(conn_result)) { + throw ConnectionException(std::get<core::StorageErr>(conn_result).description); + } + temp_conn = std::move(std::get<std::unique_ptr<core::StorageConnection>>(conn_result)); + connection = temp_conn.get(); + } + std::pair<std::string, std::string> res; core::StorageErr const err - = m_metadata_storage->get_error_message(*m_conn, m_id, &res.first, &res.second); + = m_metadata_storage->get_error_message(*connection, m_id, &res.first, &res.second); if (false == err.success()) { throw ConnectionException{err.description}; } return res;src/spider/storage/mysql/MySqlStorage.cpp (2)
1079-1120: Address the validation concerns from previous reviewsThis method still has the issues identified in previous reviews:
No validation of affected rows: The
UPDATEstatements on lines 1094 and 1102 don't check if any rows were actually updated, meaning the method could silently "succeed" even for non-existent jobs.Potential duplicate error rows: The
INSERTon line 1106 could create multiple identical error messages if called repeatedly for the same job.Apply the suggested fixes from the previous review:
- task_statement->executeUpdate(); + int32_t updated = task_statement->executeUpdate(); + if (updated == 0) { + static_cast<MySqlConnection&>(conn)->rollback(); + return StorageErr{StorageErrType::KeyNotFoundErr, + fmt::format("No running job with id {}", boost::uuids::to_string(id))}; + } ... - job_statement->executeUpdate(); + updated = job_statement->executeUpdate(); + if (updated == 0) { + static_cast<MySqlConnection&>(conn)->rollback(); + return StorageErr{StorageErrType::KeyNotFoundErr, + fmt::format("No job with id {}", boost::uuids::to_string(id))}; + } - "INSERT INTO `job_errors` (`job_id`, `func_name`, `message`) VALUES (?, ?, ?)" + "INSERT IGNORE INTO `job_errors` (`job_id`, `func_name`, `message`) VALUES (?, ?, ?)"
1122-1179: Apply the same validation fixes as suggested for cancel_job_by_userThis method has the same validation issues as
cancel_job_by_user:
- Lines 1153 and 1161: The
UPDATEstatements don't verify affected row counts- Line 1165: Uses regular
INSERTwhich could create duplicatesApply the same defensive checks as suggested in the previous review for consistency.
🧹 Nitpick comments (1)
src/spider/storage/mysql/MySqlStorage.cpp (1)
1181-1209: Consider handling multiple error messagesThe method only retrieves the first error message if multiple exist for the same job. Consider whether this is the intended behaviour or if you should:
- Return the most recent error message (ORDER BY a timestamp)
- Concatenate multiple messages
- Document that only the first message is returned
If you want the most recent message, modify the query:
- "SELECT `func_name`, `message` FROM `job_errors` WHERE `job_id` = ?" + "SELECT `func_name`, `message` FROM `job_errors` WHERE `job_id` = ? ORDER BY `id` DESC LIMIT 1"
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
src/spider/client/Job.hpp(2 hunks)src/spider/storage/MetadataStorage.hpp(2 hunks)src/spider/storage/mysql/MySqlStorage.cpp(3 hunks)src/spider/storage/mysql/MySqlStorage.hpp(2 hunks)src/spider/worker/ExecutorHandle.cpp(1 hunks)src/spider/worker/ExecutorHandle.hpp(1 hunks)tests/client/cancel-test.cpp(1 hunks)tests/storage/test-MetadataStorage.cpp(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- tests/client/cancel-test.cpp
- src/spider/storage/mysql/MySqlStorage.hpp
- src/spider/worker/ExecutorHandle.cpp
- src/spider/storage/MetadataStorage.hpp
- tests/storage/test-MetadataStorage.cpp
🧰 Additional context used
🧬 Code Graph Analysis (2)
src/spider/client/Job.hpp (2)
src/spider/client/Exception.hpp (2)
ConnectionException(14-15)ConnectionException(14-14)src/spider/storage/MetadataStorage.hpp (16)
conn(27-27)conn(29-29)conn(30-30)conn(32-33)conn(35-36)conn(40-43)conn(45-51)conn(53-54)conn(56-57)conn(59-60)conn(62-66)conn(69-70)conn(72-76)conn(87-88)conn(99-100)conn(110-115)
src/spider/storage/mysql/MySqlStorage.cpp (1)
src/spider/storage/mysql/MySqlStorage.hpp (16)
conn(38-38)conn(39-39)conn(40-40)conn(41-42)conn(43-44)conn(45-50)conn(51-57)conn(58-59)conn(60-61)conn(62-63)conn(64-68)conn(69-70)conn(71-75)conn(77-78)conn(80-81)conn(82-87)
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: lint
- GitHub Check: non-storage-unit-tests (ubuntu-24.04)
- GitHub Check: non-storage-unit-tests (ubuntu-22.04)
🔇 Additional comments (4)
src/spider/worker/ExecutorHandle.hpp (1)
12-16: Good improvement on class documentation.The class documentation now properly explains the weak reference design rationale, addressing the previous feedback about moving important design comments to the class docstring.
src/spider/client/Job.hpp (1)
162-169: Documentation clearly explains the new error retrieval functionality.The updated documentation properly explains the return value structure and the different sources of cancellation (user vs task).
src/spider/storage/mysql/MySqlStorage.cpp (2)
1557-1588: LGTM!This method correctly implements task state retrieval with proper error handling and follows the established patterns in the codebase.
1873-1882: Excellent improvement to handle race conditionsThe modification to only update tasks in 'running' state and check affected row counts is a great improvement that:
- Prevents invalid state transitions: Only fails tasks that are actually running
- Handles cancellation race conditions: Gracefully handles cases where tasks are cancelled before failing
- Optimizes unnecessary operations: Skips job state updates when no task was affected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
src/spider/worker/ExecutorHandle.cpp (1)
19-24: Avoid holding the mutex while executing user code – potential deadlockThe method locks
m_mutexand directly callsm_executor->cancel(). Ifcancel()(or any callback it triggers) tries to access theExecutorHandleagain, the thread will deadlock onm_mutex.Refactor to follow the "lock-copy-unlock-act" idiom:
auto ExecutorHandle::cancel_executor() -> void { - std::lock_guard const lock_guard{m_mutex}; - if (nullptr != m_executor) { - m_executor->cancel(); - } + TaskExecutor* exec = nullptr; + { + std::lock_guard const lock_guard{m_mutex}; + exec = m_executor; + } + if (exec != nullptr) { + exec->cancel(); // outside lock + } }
🧹 Nitpick comments (1)
tests/storage/test-MetadataStorage.cpp (1)
452-459: Enhance the documentation to describe task relationshipsThe current documentation could be clearer about the task graph structure and relationships.
Consider this more descriptive documentation:
/** - * Create a common job cancel test setup. Create a job with a task graph that consists of two - * parent tasks and one child task. Set the state of parent 1 to succeed. Parent 2 state remains - * ready and child state remains pending. + * Creates a test job with a task dependency graph for cancellation testing. + * + * Task graph structure: + * parent_1 (p1) ──┐ + * ├──> child_task + * parent_2 (p2) ──┘ + * + * Initial states after setup: + * - parent_1: Succeeded (with output "1.1") + * - parent_2: Ready + * - child_task: Pending (waiting for parent_2 to complete) + * * @param storage * @param conn - * @return A tuple containing the job_id, parent_1_id, parent_2_id, and child_task_id. + * @return A tuple containing (job_id, parent_1_id, parent_2_id, child_task_id) */
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
src/spider/storage/MetadataStorage.hpp(2 hunks)src/spider/storage/mysql/MySqlStorage.cpp(3 hunks)src/spider/storage/mysql/MySqlStorage.hpp(2 hunks)src/spider/storage/mysql/mysql_stmt.hpp(2 hunks)src/spider/worker/ExecutorHandle.cpp(1 hunks)src/spider/worker/ExecutorHandle.hpp(1 hunks)src/spider/worker/TaskExecutor.cpp(4 hunks)src/spider/worker/TaskExecutor.hpp(4 hunks)src/spider/worker/worker.cpp(7 hunks)tests/storage/test-MetadataStorage.cpp(2 hunks)tests/worker/worker-test.cpp(3 hunks)tools/scripts/storage/init_db.sql(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (9)
- tests/worker/worker-test.cpp
- tools/scripts/storage/init_db.sql
- src/spider/storage/mysql/mysql_stmt.hpp
- src/spider/worker/TaskExecutor.hpp
- src/spider/worker/TaskExecutor.cpp
- src/spider/storage/mysql/MySqlStorage.hpp
- src/spider/worker/worker.cpp
- src/spider/storage/MetadataStorage.hpp
- src/spider/storage/mysql/MySqlStorage.cpp
🧰 Additional context used
🧬 Code Graph Analysis (2)
src/spider/worker/ExecutorHandle.cpp (2)
src/spider/worker/TaskExecutor.cpp (2)
get_task_id(21-23)get_task_id(21-21)src/spider/worker/ExecutorHandle.hpp (1)
executor(21-21)
tests/storage/test-MetadataStorage.cpp (3)
src/spider/storage/MetadataStorage.hpp (16)
conn(27-27)conn(29-29)conn(30-30)conn(32-33)conn(35-36)conn(40-43)conn(45-51)conn(53-54)conn(56-57)conn(59-60)conn(62-66)conn(69-70)conn(72-76)conn(87-88)conn(100-101)conn(112-117)src/spider/core/Data.hpp (1)
gen(40-43)src/spider/client/Job.hpp (1)
conn_result(87-100)
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: non-storage-unit-tests (ubuntu-24.04)
- GitHub Check: non-storage-unit-tests (ubuntu-22.04)
- GitHub Check: lint
🔇 Additional comments (2)
src/spider/worker/ExecutorHandle.hpp (1)
12-40: Well-designed thread-safe singleton implementationThe class documentation clearly explains the design rationale, and the implementation properly enforces singleton semantics by deleting all constructors and assignment operators. The explanation for using raw pointers instead of
std::shared_ptris helpful.tests/storage/test-MetadataStorage.cpp (1)
506-578: Well-structured cancellation testsBoth test cases comprehensively verify the cancellation functionality:
- Proper state transitions for all tasks
- Correct error message and offender recording
- Appropriate cleanup after tests
The use of the helper function reduces duplication effectively.
Description
User might want to stop a job from executing if the task fails and has no hope of recovery or because of other reason, so
Spiderneed to support job cancellation both from inside inside the task throughTaskContext::abortand from user throughJob::cancel. The cancelled job will be inJobStatus::Cancelledstate.User also need to get the reason why a job is cancelled. This PR adds
job_errorstable to store the cancellation messages. The messages are set insideTaskContext::abortas a user argument and set toJob cancelled by user.insideJob::cancel. The table also stores the function name of the task callingTaskContext::abort, oruserif the job is cancelled usingJob::cancel. User could retrieve both information usingJob::get_error.Checklist
breaking change.
Validation performed
Summary by CodeRabbit
Summary by CodeRabbit
job_errorstable to store cancellation details.