From f6b48668ff0bedde0eb13619f58a5a48c6d7d78a Mon Sep 17 00:00:00 2001 From: sykwer Date: Mon, 22 Dec 2025 18:15:59 +0900 Subject: [PATCH 1/4] deal with reentrant Signed-off-by: sykwer --- callback_isolated_executor/CMakeLists.txt | 4 +- .../callback_isolated_executor.hpp | 14 ++ .../multi_threaded_executor_internal.hpp | 42 ++++++ .../src/callback_isolated_executor.cpp | 74 ++++++++--- .../src/multi_threaded_executor_internal.cpp | 122 ++++++++++++++++++ cie_sample_application/CMakeLists.txt | 14 +- .../cie_sample_application/reentrant_node.hpp | 24 ++++ cie_sample_application/src/reentrant_node.cpp | 30 +++++ .../src/reentrant_node_main.cpp | 16 +++ 9 files changed, 315 insertions(+), 25 deletions(-) create mode 100644 callback_isolated_executor/include/callback_isolated_executor/multi_threaded_executor_internal.hpp create mode 100644 callback_isolated_executor/src/multi_threaded_executor_internal.cpp create mode 100644 cie_sample_application/include/cie_sample_application/reentrant_node.hpp create mode 100644 cie_sample_application/src/reentrant_node.cpp create mode 100644 cie_sample_application/src/reentrant_node_main.cpp diff --git a/callback_isolated_executor/CMakeLists.txt b/callback_isolated_executor/CMakeLists.txt index 295cfeb..bf31569 100644 --- a/callback_isolated_executor/CMakeLists.txt +++ b/callback_isolated_executor/CMakeLists.txt @@ -30,8 +30,8 @@ ament_target_dependencies(component_container_callback_isolated rclcpp rclcpp_co add_executable(component_container_single src/component_container_single.cpp) ament_target_dependencies(component_container_single rclcpp rclcpp_components cie_thread_configurator) -add_library(callback_isolated_executor SHARED src/callback_isolated_executor.cpp) -ament_target_dependencies(callback_isolated_executor rclcpp cie_thread_configurator) +add_library(callback_isolated_executor SHARED src/callback_isolated_executor.cpp src/multi_threaded_executor_internal.cpp) +ament_target_dependencies(callback_isolated_executor rclcpp cie_thread_configurator cie_config_msgs) target_include_directories(callback_isolated_executor PUBLIC $ $ diff --git a/callback_isolated_executor/include/callback_isolated_executor/callback_isolated_executor.hpp b/callback_isolated_executor/include/callback_isolated_executor/callback_isolated_executor.hpp index c243669..f0a5d0f 100644 --- a/callback_isolated_executor/include/callback_isolated_executor/callback_isolated_executor.hpp +++ b/callback_isolated_executor/include/callback_isolated_executor/callback_isolated_executor.hpp @@ -1,9 +1,15 @@ #pragma once + #include "rclcpp/rclcpp.hpp" +#include "cie_config_msgs/msg/callback_group_info.hpp" class CallbackIsolatedExecutor : public rclcpp::Executor { RCLCPP_DISABLE_COPY(CallbackIsolatedExecutor) + std::mutex client_publisher_mutex_; + rclcpp::Publisher::SharedPtr client_publisher_; + size_t reentrant_parallelism_{4}; + // Nodes associated with this CallbackIsolatedExecutor, appended by add_node() // and removed by remove_node() std::set threads_; + std::vector tids_; // guarded by mtx_ + size_t ready_count_{0}; // guarded by mtx_ (the number of threads that saved their TID) + bool start_allowed_{false}; // guarded by mtx_ (spin() allowed run() to proceed or not) + + // Synchronization in start phase + std::mutex mtx_; + std::condition_variable cv_all_ready_; + std::condition_variable cv_start_; + + std::mutex wait_mutex_; // to guard get_next_executable in run() + std::atomic_bool pre_spinning_{false}; + + void run(); + +public: + explicit MultiThreadedExecutorInternal(size_t number_of_threads) + : rclcpp::Executor(rclcpp::ExecutorOptions()), number_of_threads_(number_of_threads) + { + // hardcode for now + yield_before_execute_ = false; + next_exec_timeout_ = std::chrono::nanoseconds(-1); + } + + void pre_spin(); + + void spin() override; + + std::vector get_thread_ids(); +}; \ No newline at end of file diff --git a/callback_isolated_executor/src/callback_isolated_executor.cpp b/callback_isolated_executor/src/callback_isolated_executor.cpp index c238c15..636f756 100644 --- a/callback_isolated_executor/src/callback_isolated_executor.cpp +++ b/callback_isolated_executor/src/callback_isolated_executor.cpp @@ -8,10 +8,13 @@ #include "rclcpp/rclcpp.hpp" #include "callback_isolated_executor/callback_isolated_executor.hpp" +#include "callback_isolated_executor/multi_threaded_executor_internal.hpp" CallbackIsolatedExecutor::CallbackIsolatedExecutor( const rclcpp::ExecutorOptions &options) - : rclcpp::Executor(options) {} + : rclcpp::Executor(options) { + client_publisher_ = cie_thread_configurator::create_client_publisher(); + } void CallbackIsolatedExecutor::spin() { std::vector threads; @@ -52,33 +55,62 @@ void CallbackIsolatedExecutor::spin() { } } // guard mutex_ - std::mutex client_publisher_mutex; - auto client_publisher = cie_thread_configurator::create_client_publisher(); - for (auto [group, node] : groups_and_nodes) { - auto executor = - std::make_shared(); - executor->add_callback_group(group, node); - auto callback_group_id = - cie_thread_configurator::create_callback_group_id(group, node); + if (group->type() == rclcpp::CallbackGroupType::Reentrant && reentrant_parallelism_ >= 2) { + threads.emplace_back( + &CallbackIsolatedExecutor::spin_reentrant_callback_group, this, + group, node); + } else { + threads.emplace_back( + &CallbackIsolatedExecutor::spin_mutually_exclusive_callback_group, + this, group, node); + } + } - threads.emplace_back([executor, callback_group_id, &client_publisher, - &client_publisher_mutex]() { - auto tid = static_cast(syscall(SYS_gettid)); + for (auto &t : threads) { + t.join(); + } +} - { - std::lock_guard lock{client_publisher_mutex}; - cie_thread_configurator::publish_callback_group_info( - client_publisher, tid, callback_group_id); - } +void CallbackIsolatedExecutor::spin_mutually_exclusive_callback_group( + rclcpp::CallbackGroup::SharedPtr group, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node) { + auto executor = std::make_shared(); + executor->add_callback_group(group, node); + auto callback_group_id = + cie_thread_configurator::create_callback_group_id(group, node); + auto tid = static_cast(syscall(SYS_gettid)); - executor->spin(); - }); + { + std::lock_guard lock{client_publisher_mutex_}; + cie_thread_configurator::publish_callback_group_info( + client_publisher_, tid, callback_group_id); } - for (auto &t : threads) { - t.join(); + executor->spin(); +} + +void CallbackIsolatedExecutor::spin_reentrant_callback_group( + rclcpp::CallbackGroup::SharedPtr group, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node) { + auto executor = + std::make_shared(reentrant_parallelism_); + executor->add_callback_group(group, node); + auto callback_group_id = + cie_thread_configurator::create_callback_group_id(group, node); + + executor->pre_spin(); + auto tids = executor->get_thread_ids(); + + { + std::lock_guard lock{client_publisher_mutex_}; + for (auto tid : tids) { + cie_thread_configurator::publish_callback_group_info( + client_publisher_, tid, callback_group_id); + } } + + executor->spin(); } void CallbackIsolatedExecutor::add_callback_group( diff --git a/callback_isolated_executor/src/multi_threaded_executor_internal.cpp b/callback_isolated_executor/src/multi_threaded_executor_internal.cpp new file mode 100644 index 0000000..7e51e62 --- /dev/null +++ b/callback_isolated_executor/src/multi_threaded_executor_internal.cpp @@ -0,0 +1,122 @@ +#include + +#include "callback_isolated_executor/multi_threaded_executor_internal.hpp" + +void MultiThreadedExecutorInternal::pre_spin() +{ + if (pre_spinning_.exchange(true)) { + throw std::runtime_error("pre_spin() called while already pre-spinning"); + } + + { + std::lock_guard lock{mtx_}; + assert(threads_.empty() && tids_.empty()); + ready_count_ = 0; + start_allowed_ = false; + } + + threads_.reserve(number_of_threads_); + for (size_t i = 0; i < number_of_threads_; i++) { + threads_.emplace_back(&MultiThreadedExecutorInternal::run, this); + } +} + +void MultiThreadedExecutorInternal::run() +{ + auto tid = static_cast(syscall(SYS_gettid)); + bool will_notify = false; + + { + std::lock_guard lock{wait_mutex_}; + tids_.push_back(tid); + ready_count_++; + if (ready_count_ == number_of_threads_) { + will_notify = true; + } + } + + if (will_notify) { + cv_all_ready_.notify_one(); + } + + { + std::unique_lock lock{mtx_}; + cv_start_.wait(lock, [this]() { return start_allowed_; }); + } + + + while (rclcpp::ok(this->context_) && spinning.load()) { + rclcpp::AnyExecutable any_exec; + + { + std::lock_guard wait_lock{wait_mutex_}; + + if (!rclcpp::ok(this->context_) || !spinning.load()) { + return; + } + + if (!get_next_executable(any_exec, next_exec_timeout_)) { + continue; + } + } + + if (yield_before_execute_) { + std::this_thread::yield(); + } + + execute_any_executable(any_exec); + + // Clear the callback_group to prevent the AnyExecutable destructor from + // resetting the callback group `can_be_taken_from` + any_exec.callback_group.reset(); + } +} + +void MultiThreadedExecutorInternal::spin() +{ + if (!pre_spinning_.load()) { + throw std::runtime_error("spin() called without pre_spin()"); + } + + if (spinning.exchange(true)) { + throw std::runtime_error("spin() called while already spinning"); + } + + RCPPUTILS_SCOPE_EXIT(pre_spinning_.store(false); ); + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + + { + std::unique_lock lock{mtx_}; + cv_all_ready_.wait(lock, [this]() { + return ready_count_ == number_of_threads_; + }); + } + + { + std::lock_guard lock{mtx_}; + start_allowed_ = true; + } + + cv_start_.notify_all(); + + for (auto &thread : threads_) { + thread.join(); + } +} + +std::vector MultiThreadedExecutorInternal::get_thread_ids() +{ + if (!pre_spinning_.load()) { + throw std::runtime_error("get_thread_ids() called without pre_spin()"); + } + + { + std::unique_lock lock{mtx_}; + cv_all_ready_.wait(lock, [this]() { + return ready_count_ == number_of_threads_; + }); + } + + std::lock_guard lock{mtx_}; + return tids_; +} diff --git a/cie_sample_application/CMakeLists.txt b/cie_sample_application/CMakeLists.txt index 771b0b3..8a1a39b 100644 --- a/cie_sample_application/CMakeLists.txt +++ b/cie_sample_application/CMakeLists.txt @@ -33,8 +33,18 @@ add_executable(sample_node_main src/sample_node_main.cpp) target_link_libraries(sample_node_main sample_node) ament_target_dependencies(sample_node_main rclcpp callback_isolated_executor) +# New reentrant node component and executable +add_library(reentrant_node SHARED src/reentrant_node.cpp) +ament_target_dependencies(reentrant_node rclcpp rclcpp_components std_msgs) +rclcpp_components_register_nodes(reentrant_node "ReentrantNode") + +add_executable(reentrant_node_main src/reentrant_node_main.cpp) +target_link_libraries(reentrant_node_main reentrant_node) +ament_target_dependencies(reentrant_node_main rclcpp callback_isolated_executor) + install(TARGETS sample_node_main + reentrant_node_main DESTINATION lib/${PROJECT_NAME} ) @@ -42,7 +52,7 @@ install(DIRECTORY include/ DESTINATION include ) -install(TARGETS sample_node +install(TARGETS sample_node reentrant_node EXPORT export_${PROJECT_NAME} DESTINATION lib ) @@ -50,6 +60,6 @@ install(TARGETS sample_node install(DIRECTORY launch/ DESTINATION share/${PROJECT_NAME}/launch) ament_export_include_directories(include) -ament_export_libraries(sample_node) +ament_export_libraries(sample_node reentrant_node) ament_package() diff --git a/cie_sample_application/include/cie_sample_application/reentrant_node.hpp b/cie_sample_application/include/cie_sample_application/reentrant_node.hpp new file mode 100644 index 0000000..2b7f841 --- /dev/null +++ b/cie_sample_application/include/cie_sample_application/reentrant_node.hpp @@ -0,0 +1,24 @@ +#pragma once + +#include "rclcpp/rclcpp.hpp" +#include "std_msgs/msg/int32.hpp" + +class ReentrantNode : public rclcpp::Node { +public: + explicit ReentrantNode(const rclcpp::NodeOptions &options = rclcpp::NodeOptions()); + +private: + void timer_callback_1(); + void timer_callback_2(); + void subscription_callback(const std_msgs::msg::Int32::SharedPtr msg); + + rclcpp::TimerBase::SharedPtr timer1_; + rclcpp::TimerBase::SharedPtr timer2_; + rclcpp::Publisher::SharedPtr pub_; + rclcpp::Subscription::SharedPtr sub_; + + // Single reentrant group to allow parallel callbacks + rclcpp::CallbackGroup::SharedPtr reentrant_group_; + + std::atomic count_{0}; +}; diff --git a/cie_sample_application/src/reentrant_node.cpp b/cie_sample_application/src/reentrant_node.cpp new file mode 100644 index 0000000..f2dfe97 --- /dev/null +++ b/cie_sample_application/src/reentrant_node.cpp @@ -0,0 +1,30 @@ +#include +#include +#include + +#include "cie_sample_application/reentrant_node.hpp" +#include "rclcpp_components/register_node_macro.hpp" + +using namespace std::chrono_literals; + +ReentrantNode::ReentrantNode(const rclcpp::NodeOptions &options) + : Node("reentrant_node", options) { + // Create a single reentrant callback group + reentrant_group_ = create_callback_group(rclcpp::CallbackGroupType::Reentrant); + + timer1_ = this->create_wall_timer(100ms, std::bind(&ReentrantNode::timer_callback_1, this), reentrant_group_); + timer2_ = this->create_wall_timer(1000ms, std::bind(&ReentrantNode::timer_callback_2, this), reentrant_group_); +} + +void ReentrantNode::timer_callback_1() { + long tid = syscall(SYS_gettid); + RCLCPP_INFO(this->get_logger(), "Timer1 (T=100ms) (tid=%ld)", tid); +} + +void ReentrantNode::timer_callback_2() { + std::this_thread::sleep_for(900ms); + long tid = syscall(SYS_gettid); + RCLCPP_INFO(this->get_logger(), "Timer2 (T=1000ms) (tid=%ld)", tid); +} + +RCLCPP_COMPONENTS_REGISTER_NODE(ReentrantNode) diff --git a/cie_sample_application/src/reentrant_node_main.cpp b/cie_sample_application/src/reentrant_node_main.cpp new file mode 100644 index 0000000..13863b3 --- /dev/null +++ b/cie_sample_application/src/reentrant_node_main.cpp @@ -0,0 +1,16 @@ +#include "callback_isolated_executor/callback_isolated_executor.hpp" +#include "cie_sample_application/reentrant_node.hpp" + +int main(int argc, char *argv[]) { + rclcpp::init(argc, argv); + + auto node = std::make_shared(); + auto executor = std::make_shared(); + // executor->set_reentrant_parallelism(4); + + executor->add_node(node); + executor->spin(); + + rclcpp::shutdown(); + return 0; +} From f747b3cf0574c97aa237df86ccb0c8cbd233f2af Mon Sep 17 00:00:00 2001 From: sykwer Date: Mon, 22 Dec 2025 18:19:36 +0900 Subject: [PATCH 2/4] fix Signed-off-by: sykwer --- .../callback_isolated_executor.hpp | 5 +- .../multi_threaded_executor_internal.hpp | 59 ++++---- .../src/callback_isolated_executor.cpp | 19 +-- .../src/multi_threaded_executor_internal.cpp | 135 +++++++++--------- .../cie_sample_application/reentrant_node.hpp | 3 +- cie_sample_application/src/reentrant_node.cpp | 11 +- 6 files changed, 118 insertions(+), 114 deletions(-) diff --git a/callback_isolated_executor/include/callback_isolated_executor/callback_isolated_executor.hpp b/callback_isolated_executor/include/callback_isolated_executor/callback_isolated_executor.hpp index f0a5d0f..226b601 100644 --- a/callback_isolated_executor/include/callback_isolated_executor/callback_isolated_executor.hpp +++ b/callback_isolated_executor/include/callback_isolated_executor/callback_isolated_executor.hpp @@ -1,13 +1,14 @@ #pragma once -#include "rclcpp/rclcpp.hpp" #include "cie_config_msgs/msg/callback_group_info.hpp" +#include "rclcpp/rclcpp.hpp" class CallbackIsolatedExecutor : public rclcpp::Executor { RCLCPP_DISABLE_COPY(CallbackIsolatedExecutor) std::mutex client_publisher_mutex_; - rclcpp::Publisher::SharedPtr client_publisher_; + rclcpp::Publisher::SharedPtr + client_publisher_; size_t reentrant_parallelism_{4}; // Nodes associated with this CallbackIsolatedExecutor, appended by add_node() diff --git a/callback_isolated_executor/include/callback_isolated_executor/multi_threaded_executor_internal.hpp b/callback_isolated_executor/include/callback_isolated_executor/multi_threaded_executor_internal.hpp index c2c24b2..442cb2b 100644 --- a/callback_isolated_executor/include/callback_isolated_executor/multi_threaded_executor_internal.hpp +++ b/callback_isolated_executor/include/callback_isolated_executor/multi_threaded_executor_internal.hpp @@ -1,42 +1,45 @@ #include "rclcpp/rclcpp.hpp" -// To implement parallelism for reentrant callback groups in CallbackIsolatedExecutor +// To implement parallelism for reentrant callback groups in +// CallbackIsolatedExecutor class MultiThreadedExecutorInternal : public rclcpp::Executor { - RCLCPP_DISABLE_COPY(MultiThreadedExecutorInternal) + RCLCPP_DISABLE_COPY(MultiThreadedExecutorInternal) - // Configuration - size_t number_of_threads_; - bool yield_before_execute_; - std::chrono::nanoseconds next_exec_timeout_; + // Configuration + size_t number_of_threads_; + bool yield_before_execute_; + std::chrono::nanoseconds next_exec_timeout_; - // Thread management - std::vector threads_; - std::vector tids_; // guarded by mtx_ - size_t ready_count_{0}; // guarded by mtx_ (the number of threads that saved their TID) - bool start_allowed_{false}; // guarded by mtx_ (spin() allowed run() to proceed or not) + // Thread management + std::vector threads_; + std::vector tids_; // guarded by mtx_ + size_t ready_count_{ + 0}; // guarded by mtx_ (the number of threads that saved their TID) + bool start_allowed_{ + false}; // guarded by mtx_ (spin() allowed run() to proceed or not) - // Synchronization in start phase - std::mutex mtx_; - std::condition_variable cv_all_ready_; - std::condition_variable cv_start_; + // Synchronization in start phase + std::mutex mtx_; + std::condition_variable cv_all_ready_; + std::condition_variable cv_start_; - std::mutex wait_mutex_; // to guard get_next_executable in run() - std::atomic_bool pre_spinning_{false}; + std::mutex wait_mutex_; // to guard get_next_executable in run() + std::atomic_bool pre_spinning_{false}; - void run(); + void run(); public: - explicit MultiThreadedExecutorInternal(size_t number_of_threads) - : rclcpp::Executor(rclcpp::ExecutorOptions()), number_of_threads_(number_of_threads) - { - // hardcode for now - yield_before_execute_ = false; - next_exec_timeout_ = std::chrono::nanoseconds(-1); - } + explicit MultiThreadedExecutorInternal(size_t number_of_threads) + : rclcpp::Executor(rclcpp::ExecutorOptions()), + number_of_threads_(number_of_threads) { + // hardcode for now + yield_before_execute_ = false; + next_exec_timeout_ = std::chrono::nanoseconds(-1); + } - void pre_spin(); + void pre_spin(); - void spin() override; + void spin() override; - std::vector get_thread_ids(); + std::vector get_thread_ids(); }; \ No newline at end of file diff --git a/callback_isolated_executor/src/callback_isolated_executor.cpp b/callback_isolated_executor/src/callback_isolated_executor.cpp index 636f756..0cd31c5 100644 --- a/callback_isolated_executor/src/callback_isolated_executor.cpp +++ b/callback_isolated_executor/src/callback_isolated_executor.cpp @@ -13,8 +13,8 @@ CallbackIsolatedExecutor::CallbackIsolatedExecutor( const rclcpp::ExecutorOptions &options) : rclcpp::Executor(options) { - client_publisher_ = cie_thread_configurator::create_client_publisher(); - } + client_publisher_ = cie_thread_configurator::create_client_publisher(); +} void CallbackIsolatedExecutor::spin() { std::vector threads; @@ -56,10 +56,11 @@ void CallbackIsolatedExecutor::spin() { } // guard mutex_ for (auto [group, node] : groups_and_nodes) { - if (group->type() == rclcpp::CallbackGroupType::Reentrant && reentrant_parallelism_ >= 2) { + if (group->type() == rclcpp::CallbackGroupType::Reentrant && + reentrant_parallelism_ >= 2) { threads.emplace_back( - &CallbackIsolatedExecutor::spin_reentrant_callback_group, this, - group, node); + &CallbackIsolatedExecutor::spin_reentrant_callback_group, this, group, + node); } else { threads.emplace_back( &CallbackIsolatedExecutor::spin_mutually_exclusive_callback_group, @@ -78,13 +79,13 @@ void CallbackIsolatedExecutor::spin_mutually_exclusive_callback_group( auto executor = std::make_shared(); executor->add_callback_group(group, node); auto callback_group_id = - cie_thread_configurator::create_callback_group_id(group, node); + cie_thread_configurator::create_callback_group_id(group, node); auto tid = static_cast(syscall(SYS_gettid)); { std::lock_guard lock{client_publisher_mutex_}; - cie_thread_configurator::publish_callback_group_info( - client_publisher_, tid, callback_group_id); + cie_thread_configurator::publish_callback_group_info(client_publisher_, tid, + callback_group_id); } executor->spin(); @@ -97,7 +98,7 @@ void CallbackIsolatedExecutor::spin_reentrant_callback_group( std::make_shared(reentrant_parallelism_); executor->add_callback_group(group, node); auto callback_group_id = - cie_thread_configurator::create_callback_group_id(group, node); + cie_thread_configurator::create_callback_group_id(group, node); executor->pre_spin(); auto tids = executor->get_thread_ids(); diff --git a/callback_isolated_executor/src/multi_threaded_executor_internal.cpp b/callback_isolated_executor/src/multi_threaded_executor_internal.cpp index 7e51e62..e79a932 100644 --- a/callback_isolated_executor/src/multi_threaded_executor_internal.cpp +++ b/callback_isolated_executor/src/multi_threaded_executor_internal.cpp @@ -2,48 +2,45 @@ #include "callback_isolated_executor/multi_threaded_executor_internal.hpp" -void MultiThreadedExecutorInternal::pre_spin() -{ - if (pre_spinning_.exchange(true)) { - throw std::runtime_error("pre_spin() called while already pre-spinning"); - } +void MultiThreadedExecutorInternal::pre_spin() { + if (pre_spinning_.exchange(true)) { + throw std::runtime_error("pre_spin() called while already pre-spinning"); + } - { - std::lock_guard lock{mtx_}; - assert(threads_.empty() && tids_.empty()); - ready_count_ = 0; - start_allowed_ = false; - } + { + std::lock_guard lock{mtx_}; + assert(threads_.empty() && tids_.empty()); + ready_count_ = 0; + start_allowed_ = false; + } - threads_.reserve(number_of_threads_); - for (size_t i = 0; i < number_of_threads_; i++) { - threads_.emplace_back(&MultiThreadedExecutorInternal::run, this); - } + threads_.reserve(number_of_threads_); + for (size_t i = 0; i < number_of_threads_; i++) { + threads_.emplace_back(&MultiThreadedExecutorInternal::run, this); + } } -void MultiThreadedExecutorInternal::run() -{ - auto tid = static_cast(syscall(SYS_gettid)); - bool will_notify = false; - - { - std::lock_guard lock{wait_mutex_}; - tids_.push_back(tid); - ready_count_++; - if (ready_count_ == number_of_threads_) { - will_notify = true; - } - } +void MultiThreadedExecutorInternal::run() { + auto tid = static_cast(syscall(SYS_gettid)); + bool will_notify = false; - if (will_notify) { - cv_all_ready_.notify_one(); + { + std::lock_guard lock{wait_mutex_}; + tids_.push_back(tid); + ready_count_++; + if (ready_count_ == number_of_threads_) { + will_notify = true; } + } - { - std::unique_lock lock{mtx_}; - cv_start_.wait(lock, [this]() { return start_allowed_; }); - } + if (will_notify) { + cv_all_ready_.notify_one(); + } + { + std::unique_lock lock{mtx_}; + cv_start_.wait(lock, [this]() { return start_allowed_; }); + } while (rclcpp::ok(this->context_) && spinning.load()) { rclcpp::AnyExecutable any_exec; @@ -72,51 +69,47 @@ void MultiThreadedExecutorInternal::run() } } -void MultiThreadedExecutorInternal::spin() -{ - if (!pre_spinning_.load()) { - throw std::runtime_error("spin() called without pre_spin()"); - } +void MultiThreadedExecutorInternal::spin() { + if (!pre_spinning_.load()) { + throw std::runtime_error("spin() called without pre_spin()"); + } - if (spinning.exchange(true)) { - throw std::runtime_error("spin() called while already spinning"); - } + if (spinning.exchange(true)) { + throw std::runtime_error("spin() called while already spinning"); + } - RCPPUTILS_SCOPE_EXIT(pre_spinning_.store(false); ); - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + RCPPUTILS_SCOPE_EXIT(pre_spinning_.store(false);); + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false);); - { - std::unique_lock lock{mtx_}; - cv_all_ready_.wait(lock, [this]() { - return ready_count_ == number_of_threads_; - }); - } + { + std::unique_lock lock{mtx_}; + cv_all_ready_.wait(lock, + [this]() { return ready_count_ == number_of_threads_; }); + } - { - std::lock_guard lock{mtx_}; - start_allowed_ = true; - } + { + std::lock_guard lock{mtx_}; + start_allowed_ = true; + } - cv_start_.notify_all(); + cv_start_.notify_all(); - for (auto &thread : threads_) { - thread.join(); - } + for (auto &thread : threads_) { + thread.join(); + } } -std::vector MultiThreadedExecutorInternal::get_thread_ids() -{ - if (!pre_spinning_.load()) { - throw std::runtime_error("get_thread_ids() called without pre_spin()"); - } +std::vector MultiThreadedExecutorInternal::get_thread_ids() { + if (!pre_spinning_.load()) { + throw std::runtime_error("get_thread_ids() called without pre_spin()"); + } - { - std::unique_lock lock{mtx_}; - cv_all_ready_.wait(lock, [this]() { - return ready_count_ == number_of_threads_; - }); - } + { + std::unique_lock lock{mtx_}; + cv_all_ready_.wait(lock, + [this]() { return ready_count_ == number_of_threads_; }); + } - std::lock_guard lock{mtx_}; - return tids_; + std::lock_guard lock{mtx_}; + return tids_; } diff --git a/cie_sample_application/include/cie_sample_application/reentrant_node.hpp b/cie_sample_application/include/cie_sample_application/reentrant_node.hpp index 2b7f841..4d35c57 100644 --- a/cie_sample_application/include/cie_sample_application/reentrant_node.hpp +++ b/cie_sample_application/include/cie_sample_application/reentrant_node.hpp @@ -5,7 +5,8 @@ class ReentrantNode : public rclcpp::Node { public: - explicit ReentrantNode(const rclcpp::NodeOptions &options = rclcpp::NodeOptions()); + explicit ReentrantNode( + const rclcpp::NodeOptions &options = rclcpp::NodeOptions()); private: void timer_callback_1(); diff --git a/cie_sample_application/src/reentrant_node.cpp b/cie_sample_application/src/reentrant_node.cpp index f2dfe97..3425e10 100644 --- a/cie_sample_application/src/reentrant_node.cpp +++ b/cie_sample_application/src/reentrant_node.cpp @@ -10,10 +10,15 @@ using namespace std::chrono_literals; ReentrantNode::ReentrantNode(const rclcpp::NodeOptions &options) : Node("reentrant_node", options) { // Create a single reentrant callback group - reentrant_group_ = create_callback_group(rclcpp::CallbackGroupType::Reentrant); + reentrant_group_ = + create_callback_group(rclcpp::CallbackGroupType::Reentrant); - timer1_ = this->create_wall_timer(100ms, std::bind(&ReentrantNode::timer_callback_1, this), reentrant_group_); - timer2_ = this->create_wall_timer(1000ms, std::bind(&ReentrantNode::timer_callback_2, this), reentrant_group_); + timer1_ = this->create_wall_timer( + 100ms, std::bind(&ReentrantNode::timer_callback_1, this), + reentrant_group_); + timer2_ = this->create_wall_timer( + 1000ms, std::bind(&ReentrantNode::timer_callback_2, this), + reentrant_group_); } void ReentrantNode::timer_callback_1() { From 43e22079c27a144179f3888d6981358b8f1cf27c Mon Sep 17 00:00:00 2001 From: sykwer Date: Mon, 22 Dec 2025 18:46:00 +0900 Subject: [PATCH 3/4] fix Signed-off-by: sykwer --- .../multi_threaded_executor_internal.hpp | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/callback_isolated_executor/include/callback_isolated_executor/multi_threaded_executor_internal.hpp b/callback_isolated_executor/include/callback_isolated_executor/multi_threaded_executor_internal.hpp index 442cb2b..7472491 100644 --- a/callback_isolated_executor/include/callback_isolated_executor/multi_threaded_executor_internal.hpp +++ b/callback_isolated_executor/include/callback_isolated_executor/multi_threaded_executor_internal.hpp @@ -13,10 +13,8 @@ class MultiThreadedExecutorInternal : public rclcpp::Executor { // Thread management std::vector threads_; std::vector tids_; // guarded by mtx_ - size_t ready_count_{ - 0}; // guarded by mtx_ (the number of threads that saved their TID) - bool start_allowed_{ - false}; // guarded by mtx_ (spin() allowed run() to proceed or not) + size_t ready_count_{0}; // guarded by mtx_ (# of threads that saved TID) + bool start_allowed_{false}; // guarded by mtx_ (run() allowed to proceed) // Synchronization in start phase std::mutex mtx_; @@ -42,4 +40,4 @@ class MultiThreadedExecutorInternal : public rclcpp::Executor { void spin() override; std::vector get_thread_ids(); -}; \ No newline at end of file +}; From 16eca5ae9a2d9f8c4815f7945e5b3a4b4b2f01eb Mon Sep 17 00:00:00 2001 From: sykwer Date: Mon, 22 Dec 2025 19:14:51 +0900 Subject: [PATCH 4/4] fix Signed-off-by: sykwer --- .../multi_threaded_executor_internal.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/callback_isolated_executor/include/callback_isolated_executor/multi_threaded_executor_internal.hpp b/callback_isolated_executor/include/callback_isolated_executor/multi_threaded_executor_internal.hpp index 7472491..efa947c 100644 --- a/callback_isolated_executor/include/callback_isolated_executor/multi_threaded_executor_internal.hpp +++ b/callback_isolated_executor/include/callback_isolated_executor/multi_threaded_executor_internal.hpp @@ -12,8 +12,8 @@ class MultiThreadedExecutorInternal : public rclcpp::Executor { // Thread management std::vector threads_; - std::vector tids_; // guarded by mtx_ - size_t ready_count_{0}; // guarded by mtx_ (# of threads that saved TID) + std::vector tids_; // guarded by mtx_ + size_t ready_count_{0}; // guarded by mtx_ (# of threads that saved TID) bool start_allowed_{false}; // guarded by mtx_ (run() allowed to proceed) // Synchronization in start phase