Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cie_config_msgs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ if(BUILD_TESTING)
ament_lint_auto_find_test_dependencies()
endif()

rosidl_generate_interfaces(${PROJECT_NAME} "msg/CallbackGroupInfo.msg")
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/CallbackGroupInfo.msg"
"msg/NonRosThreadInfo.msg"
)

ament_package()
2 changes: 2 additions & 0 deletions cie_config_msgs/msg/NonRosThreadInfo.msg
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
string thread_name
int64 thread_id
7 changes: 6 additions & 1 deletion cie_sample_application/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ find_package(rclcpp_components REQUIRED)
find_package(std_msgs REQUIRED)

find_package(callback_isolated_executor REQUIRED)
find_package(cie_thread_configurator REQUIRED)

if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
Expand All @@ -26,15 +27,19 @@ endif()
include_directories(include)

add_library(sample_node SHARED src/sample_node.cpp)
ament_target_dependencies(sample_node rclcpp rclcpp_components std_msgs)
ament_target_dependencies(sample_node rclcpp rclcpp_components std_msgs cie_thread_configurator)
rclcpp_components_register_nodes(sample_node "SampleNode")

add_executable(sample_node_main src/sample_node_main.cpp)
target_link_libraries(sample_node_main sample_node)
ament_target_dependencies(sample_node_main rclcpp callback_isolated_executor)

add_executable(sample_non_ros_process src/sample_non_ros_process.cpp)
ament_target_dependencies(sample_non_ros_process cie_thread_configurator)

install(TARGETS
sample_node_main
sample_non_ros_process
DESTINATION lib/${PROJECT_NAME}
)

Expand Down
7 changes: 7 additions & 0 deletions cie_sample_application/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,10 @@ Or, load the node to the exsiting component container.
$ ros2 run callback_isolated_executor component_container_callback_isolated --ros-args --remap __node:=sample_container
$ ros2 launch cie_sample_application load_sample_node.launch.xml
```

## Standalone Non-ROS Process
```bash
$ ros2 run cie_sample_application sample_non_ros_process
```

This demonstrates using `spawn_cie_thread` in a standalone process without any ROS2 node.
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ class SampleNode : public rclcpp::Node {
public:
explicit SampleNode(
const rclcpp::NodeOptions &options = rclcpp::NodeOptions());
~SampleNode();

private:
void timer_callback();
void timer_callback2();
void subscription_callback(const std_msgs::msg::Int32::SharedPtr msg);
void non_ros_thread_func(int value);

rclcpp::TimerBase::SharedPtr timer_;
rclcpp::TimerBase::SharedPtr timer2_;
Expand All @@ -26,4 +28,6 @@ class SampleNode : public rclcpp::Node {

size_t count_;
size_t count2_;

std::thread non_ros_thread_;
};
1 change: 1 addition & 0 deletions cie_sample_application/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
<depend>std_msgs</depend>

<depend>callback_isolated_executor</depend>
<depend>cie_thread_configurator</depend>

<export>
<build_type>ament_cmake</build_type>
Expand Down
19 changes: 19 additions & 0 deletions cie_sample_application/src/sample_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <sys/syscall.h>

#include "cie_sample_application/sample_node.hpp"
#include "cie_thread_configurator/cie_thread_configurator.hpp"
#include "rclcpp_components/register_node_macro.hpp"

using namespace std::chrono_literals;
Expand Down Expand Up @@ -31,6 +32,15 @@ SampleNode::SampleNode(const rclcpp::NodeOptions &options)
std::bind(&SampleNode::subscription_callback, this,
std::placeholders::_1),
sub_options);

non_ros_thread_ = cie_thread_configurator::spawn_cie_thread(
"sample_non_ros_thread", &SampleNode::non_ros_thread_func, this, 42);
}

SampleNode::~SampleNode() {
if (non_ros_thread_.joinable()) {
non_ros_thread_.join();
}
}

void SampleNode::timer_callback() {
Expand Down Expand Up @@ -61,4 +71,13 @@ void SampleNode::subscription_callback(
msg->data);
}

void SampleNode::non_ros_thread_func(int value) {
long tid = syscall(SYS_gettid);
for (int i = 0; i < 5; ++i) {
std::this_thread::sleep_for(2s);
RCLCPP_INFO(this->get_logger(), "Test thread running (tid=%ld), value: %d",
tid, value);
}
}

RCLCPP_COMPONENTS_REGISTER_NODE(SampleNode)
12 changes: 12 additions & 0 deletions cie_sample_application/src/sample_non_ros_process.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#include <iostream>

#include "cie_thread_configurator/cie_thread_configurator.hpp"

void worker_function() { std::cout << "Worker thread running" << std::endl; }

int main(int /*argc*/, char ** /*argv*/) {
auto thread = cie_thread_configurator::spawn_cie_thread("standalone_worker",
worker_function);
thread.join();
return 0;
}
30 changes: 29 additions & 1 deletion cie_thread_configurator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ For instructions on how to use this tool, please refer to https://github.com/tie
## YAML Configuration File Format
For each ROS 2 application, prepare a single YAML configuration file.
The format of the YAML configuration file is as follows.
There is a top-level entry called `callback_groups`, under which there are arrays representing each callback group.
There are two top-level entries: `callback_groups` for ROS 2 callback groups, and `non_ros_threads` for non-ROS worker threads.
The IDs for the callback groups are automatically generated by this tool according to the rules described in the next section.

```yaml
Expand All @@ -25,6 +25,13 @@ callback_groups:
policy: SCHED_FIFO
priority: 50

non_ros_threads:
- id: zzzzz
affinity:
- 4
policy: SCHED_OTHER
priority: 0

...
```

Expand Down Expand Up @@ -133,3 +140,24 @@ Timers with the same period cannot be distinguished from each other, so if diffe
For `rclcpp::Waitable`, no distinction is made between instances.

Note: Future updates may exclude `rclcpp::Waitable` from being included in the CallbackGroup ID.

## Non-ROS Worker Thread Management

The `spawn_cie_thread` function enables thread scheduling management for non-ROS2 worker threads. Threads created with this function automatically publish their information to the `cie_thread_configurator` without requiring a ROS node.

### Usage

```cpp
#include "cie_thread_configurator/cie_thread_configurator.hpp"

// Spawn a managed worker thread
auto thread = cie_thread_configurator::spawn_cie_thread(
"worker_thread_name", // Unique thread name (used as ID in YAML)
worker_function, // Thread function
arg1, arg2, ... // Optional arguments
);

thread.join();
```

The thread name specified as the first argument must match the `id` field in the `non_ros_threads` section of the YAML configuration file. This allows the thread's scheduling policy, priority, and CPU affinity to be configured through the same YAML file used for ROS 2 callback groups.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
#include <map>
#include <memory>
#include <string>
#include <sys/syscall.h>

#include "cie_config_msgs/msg/callback_group_info.hpp"
#include "cie_config_msgs/msg/non_ros_thread_info.hpp"

namespace cie_thread_configurator {

Expand All @@ -30,4 +32,44 @@ void publish_callback_group_info(
// Get hardware information from lscpu command
std::map<std::string, std::string> get_hardware_info();

/// Spawn a thread whose scheduling policy can be managed through
/// cie_thread_configurator.
/// Caution: the `thread_name` must be unique among threads managed by
/// cie_thread_configurator.
template <class F, class... Args>
std::thread spawn_cie_thread(const char *thread_name, F &&f, Args &&...args) {
std::thread t([thread_name, func = std::forward<F>(f),
captured_args =
std::make_tuple(std::forward<Args>(args)...)]() mutable {
// Create a unique rclcpp context in case `rclcpp::init()` is not called
rclcpp::InitOptions init_options;
init_options.shutdown_on_signal = false;
init_options.auto_initialize_logging(false);
auto context = std::make_shared<rclcpp::Context>();
context->init(0, nullptr, init_options);

rclcpp::NodeOptions options;
options.context(context);
auto node = std::make_shared<rclcpp::Node>(
"cie_thread_client", "/cie_thread_configurator", options);

auto publisher =
node->create_publisher<cie_config_msgs::msg::NonRosThreadInfo>(
"/cie_thread_configurator/non_ros_thread_info",
rclcpp::QoS(1000).keep_all());
auto tid = static_cast<pid_t>(syscall(SYS_gettid));

auto message = std::make_shared<cie_config_msgs::msg::NonRosThreadInfo>();
message->thread_id = tid;
message->thread_name = thread_name;
publisher->publish(*message);

context->shutdown("Publishing is finished.");

std::apply(std::move(func), std::move(captured_args));
});

return t;
}

} // namespace cie_thread_configurator
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,23 @@
#include "yaml-cpp/yaml.h"

#include "cie_config_msgs/msg/callback_group_info.hpp"
#include "cie_config_msgs/msg/non_ros_thread_info.hpp"

class PrerunNode : public rclcpp::Node {
public:
PrerunNode();
void dump_yaml_config(std::filesystem::path path);

private:
void
topic_callback(const cie_config_msgs::msg::CallbackGroupInfo::SharedPtr msg);
void callback_group_callback(
const cie_config_msgs::msg::CallbackGroupInfo::SharedPtr msg);
void non_ros_thread_callback(
const cie_config_msgs::msg::NonRosThreadInfo::SharedPtr msg);

rclcpp::Subscription<cie_config_msgs::msg::CallbackGroupInfo>::SharedPtr
subscription_;
rclcpp::Subscription<cie_config_msgs::msg::NonRosThreadInfo>::SharedPtr
non_ros_thread_subscription_;
std::set<std::string> callback_group_ids_;
std::set<std::string> non_ros_thread_names_;
};
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
#include "yaml-cpp/yaml.h"

#include "cie_config_msgs/msg/callback_group_info.hpp"
#include "cie_config_msgs/msg/non_ros_thread_info.hpp"

class ThreadConfiguratorNode : public rclcpp::Node {
struct CallbackGroupConfig {
std::string callback_group_id;
struct ThreadConfig {
std::string thread_str; // callback_group_id or thread_name
int64_t thread_id = -1;
std::vector<int> affinity;
std::string policy;
Expand All @@ -35,18 +36,21 @@ class ThreadConfiguratorNode : public rclcpp::Node {

private:
bool set_affinity_by_cgroup(int64_t thread_id, const std::vector<int> &cpus);
bool issue_syscalls(const CallbackGroupConfig &config);
void
topic_callback(const cie_config_msgs::msg::CallbackGroupInfo::SharedPtr msg);
bool issue_syscalls(const ThreadConfig &config);
void callback_group_callback(
const cie_config_msgs::msg::CallbackGroupInfo::SharedPtr msg);
void non_ros_thread_callback(
const cie_config_msgs::msg::NonRosThreadInfo::SharedPtr msg);

rclcpp::Subscription<cie_config_msgs::msg::CallbackGroupInfo>::SharedPtr
subscription_;
rclcpp::Subscription<cie_config_msgs::msg::NonRosThreadInfo>::SharedPtr
non_ros_thread_subscription_;

std::vector<CallbackGroupConfig> callback_group_configs_;
std::unordered_map<std::string, CallbackGroupConfig *>
id_to_callback_group_config_;
std::vector<ThreadConfig> thread_configs_;
std::unordered_map<std::string, ThreadConfig *> id_to_thread_config_;
int unapplied_num_;
int cgroup_num_;

std::vector<CallbackGroupConfig *> deadline_configs_;
std::vector<ThreadConfig *> deadline_configs_;
};
3 changes: 3 additions & 0 deletions cie_thread_configurator/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ static void spin_thread_configurator_node(const std::string &config_filename) {
}

std::cout << config["callback_groups"] << std::endl;
if (config["non_ros_threads"]) {
std::cout << config["non_ros_threads"] << std::endl;
}

auto node = std::make_shared<ThreadConfiguratorNode>(config);
auto executor = std::make_shared<rclcpp::executors::SingleThreadedExecutor>();
Expand Down
50 changes: 47 additions & 3 deletions cie_thread_configurator/src/prerun_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <string>

#include "cie_config_msgs/msg/callback_group_info.hpp"
#include "cie_config_msgs/msg/non_ros_thread_info.hpp"
#include "rclcpp/rclcpp.hpp"
#include "yaml-cpp/yaml.h"

Expand All @@ -14,21 +15,48 @@ PrerunNode::PrerunNode() : Node("prerun_node") {
subscription_ =
this->create_subscription<cie_config_msgs::msg::CallbackGroupInfo>(
"/cie_thread_configurator/callback_group_info", 100,
std::bind(&PrerunNode::topic_callback, this, std::placeholders::_1));
std::bind(&PrerunNode::callback_group_callback, this,
std::placeholders::_1));

non_ros_thread_subscription_ =
this->create_subscription<cie_config_msgs::msg::NonRosThreadInfo>(
"/cie_thread_configurator/non_ros_thread_info", 100,
std::bind(&PrerunNode::non_ros_thread_callback, this,
std::placeholders::_1));
}

void PrerunNode::topic_callback(
void PrerunNode::callback_group_callback(
const cie_config_msgs::msg::CallbackGroupInfo::SharedPtr msg) {
if (callback_group_ids_.find(msg->callback_group_id) !=
callback_group_ids_.end())
callback_group_ids_.end()) {
RCLCPP_ERROR(this->get_logger(),
"Duplicate callback_group_id received: tid=%ld | %s",
msg->thread_id, msg->callback_group_id.c_str());
return;
}

RCLCPP_INFO(this->get_logger(), "Received CallbackGroupInfo: tid=%ld | %s",
msg->thread_id, msg->callback_group_id.c_str());

callback_group_ids_.insert(msg->callback_group_id);
}

void PrerunNode::non_ros_thread_callback(
const cie_config_msgs::msg::NonRosThreadInfo::SharedPtr msg) {
if (non_ros_thread_names_.find(msg->thread_name) !=
non_ros_thread_names_.end()) {
RCLCPP_ERROR(this->get_logger(),
"Duplicate thread_name received: tid=%ld | %s", msg->thread_id,
msg->thread_name.c_str());
return;
}

RCLCPP_INFO(this->get_logger(), "Received NonRosThreadInfo: tid=%ld | %s",
msg->thread_id, msg->thread_name.c_str());

non_ros_thread_names_.insert(msg->thread_name);
}

void PrerunNode::dump_yaml_config(std::filesystem::path path) {
YAML::Emitter out;

Expand Down Expand Up @@ -60,6 +88,22 @@ void PrerunNode::dump_yaml_config(std::filesystem::path path) {
out << YAML::Newline;
}

out << YAML::EndSeq;

// Add non_ros_threads section
out << YAML::Key << "non_ros_threads";
out << YAML::Value << YAML::BeginSeq;

for (const std::string &thread_name : non_ros_thread_names_) {
out << YAML::BeginMap;
out << YAML::Key << "id" << YAML::Value << thread_name;
out << YAML::Key << "affinity" << YAML::Value << YAML::Null;
out << YAML::Key << "policy" << YAML::Value << "SCHED_OTHER";
out << YAML::Key << "priority" << YAML::Value << 0;
out << YAML::EndMap;
out << YAML::Newline;
}

out << YAML::EndSeq;
out << YAML::EndMap;

Expand Down
Loading