diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index 28bcfcb2e..2b38f779a 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -43,6 +43,7 @@ jobs: diagnostic_common_diagnostics diagnostic_remote_logging diagnostic_updater + diagnostic_topic_monitor self_test check_licenses: diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index d92ce5f4c..1a6ab123f 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -18,6 +18,7 @@ jobs: diagnostic_aggregator, diagnostic_common_diagnostics, diagnostic_remote_logging, + diagnostic_topic_monitor, diagnostic_updater, self_test, ] diff --git a/diagnostic_topic_monitor/.gitignore b/diagnostic_topic_monitor/.gitignore new file mode 100644 index 000000000..cc4087c20 --- /dev/null +++ b/diagnostic_topic_monitor/.gitignore @@ -0,0 +1,2 @@ +.pytest_cache +**/__pycache__ \ No newline at end of file diff --git a/diagnostic_topic_monitor/CMakeLists.txt b/diagnostic_topic_monitor/CMakeLists.txt new file mode 100644 index 000000000..18b8f33b3 --- /dev/null +++ b/diagnostic_topic_monitor/CMakeLists.txt @@ -0,0 +1,100 @@ +cmake_minimum_required(VERSION 3.8) +project(diagnostic_topic_monitor) + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +# find dependencies +find_package(ament_cmake REQUIRED) +# uncomment the following section in order to fill in +# further dependencies manually. +# find_package( REQUIRED) + +find_package(ament_cmake_python REQUIRED) + +find_package(rclpy REQUIRED) +find_package(diagnostic_msgs REQUIRED) +find_package(diagnostic_updater REQUIRED) +find_package(rclcpp_lifecycle REQUIRED) +find_package(rclcpp_components REQUIRED) +find_package(lifecycle_msgs REQUIRED) +find_package(rclcpp REQUIRED) +find_package(diagnostic_updater REQUIRED) +find_package(std_msgs REQUIRED) + +add_library(topic_frequency_monitor_component SHARED src/topic_frequency_monitor_component.cpp) +rclcpp_components_register_node(topic_frequency_monitor_component + PLUGIN "diagnostic_topic_monitor::TopicFrequencyMonitor" + EXECUTABLE topic_frequency_monitor +) +target_include_directories(topic_frequency_monitor_component PRIVATE include) +ament_target_dependencies(topic_frequency_monitor_component + "lifecycle_msgs" + "rclcpp" + "rclcpp_lifecycle" + "rclcpp_components" + "diagnostic_msgs" + "diagnostic_updater" +) +ament_export_targets(export_topic_frequency_monitor_component) +install(TARGETS topic_frequency_monitor_component + EXPORT export_topic_frequency_monitor_component + ARCHIVE DESTINATION lib + LIBRARY DESTINATION lib + RUNTIME DESTINATION bin +) + +add_library(topic_age_monitor_component SHARED src/topic_age_monitor_component.cpp) +rclcpp_components_register_node(topic_age_monitor_component + PLUGIN "diagnostic_topic_monitor::TopicAgeMonitor" + EXECUTABLE topic_age_monitor +) +target_include_directories(topic_age_monitor_component PRIVATE include) +ament_target_dependencies(topic_age_monitor_component + "lifecycle_msgs" + "rclcpp" + "rclcpp_lifecycle" + "rclcpp_components" + "diagnostic_msgs" + "diagnostic_updater" +) +ament_export_targets(export_topic_age_monitor_component) +install(TARGETS topic_age_monitor_component + EXPORT export_topic_age_monitor_component + ARCHIVE DESTINATION lib + LIBRARY DESTINATION lib + RUNTIME DESTINATION bin +) + +install( + DIRECTORY include/ + DESTINATION include +) + +if(BUILD_TESTING) + find_package(ament_lint_auto REQUIRED) + find_package(launch_testing_ament_cmake REQUIRED) + + # Install the test configs and dummy publisher used for testing + install( + DIRECTORY test + DESTINATION share/${PROJECT_NAME}/ + ) + install( + PROGRAMS test/dummy_publishers.py + DESTINATION lib/${PROJECT_NAME} + ) + + add_launch_test( + test/test_topic_frequency_monitor.py + TARGET test_topic_frequency_monitor + TIMEOUT 20) + + add_launch_test( + test/test_topic_age_monitor.py + TARGET test_topic_age_monitor + TIMEOUT 20) +endif() + +ament_package() diff --git a/diagnostic_topic_monitor/README.md b/diagnostic_topic_monitor/README.md new file mode 100644 index 000000000..4a4070d9e --- /dev/null +++ b/diagnostic_topic_monitor/README.md @@ -0,0 +1,107 @@ +# Diagnostic Topic Monitor + +Offers diagnostic tools which monitor topics for health, such as frequency and age of messages. The output can be further processed with the [`diagnostic_aggregator`](https://github.com/ros/diagnostics/tree/ros2/diagnostic_aggregator) for cleaner observation. + +## Monitors + +There are two monitors currently available, which can be used as components or standalone nodes: + * topic_frequency_monitor -- this supports _any_ kind of topic and checks for frequency based on receive time + * topic_age_monitor -- this supports only topics with a Header, and it checks that message age is within a given range + +It is totally possible to run both of these at the same time, for different or the same topics, to check for both frequency +and max message age at the same time. + +## Usage + +You typically add the monitor(s) you want to a launch file in your package and configure the parameters to your liking. However, the nodes and components are ROS 2 life-cycle based, which means you need to configure and activate them manually. + +See the launch tests in the `test` folder for examples of how start the standalone monitor nodes, and example configs in `test/config`. + +> TIP: If you like the brevity and don't mind the added dependency, you can use the [nav2_lifecycle_manager](https://docs.nav2.org/configuration/packages/configuring-lifecycle.html) with parameter `autostart` set to `true` and target your monitor nodes to autostart them. + +> Example launch snippet: +```python +# Include this node in your LaunchConfiguration +autostart_monitors_node = LifecycleNode( + package="nav2_lifecycle_manager", + executable="lifecycle_manager", + name="autostart_monitors", + parameters=[ + { + "autostart": True, + "node_names": [], + "bond_timeout": 0.0, + } + ], + ) +``` + +> For this, you need to have the `nav2_lifecycle_manager` package available (binary or source code installation), and ideally add it as an `exec_depend` in your package. + +### Limiting which topics to monitor + +By default, the following internal topic names will be ignored by the monitors, as they are usually irrelevant: + +* ^/rosout$ +* .*/parameter_events +* ^/diagnostics$ +* .*/transition_event + +ROS2 parameter `monitor_configured_only` for both monitors: + +If false (the default), the topic monitor will monitor everything but the internal topics mentioned above. +To instead only monitor those topics which have been explicitly configured, set this to "true" + +### Influencing the output + +ROS2 parameter `diag_prefix` for both the monitors: + +This string will be prefixed to the name of the diagnostic for use within the aggregator. + +## `topic_frequency_monitor` + +Without any parameters, this node (component also available) will subscribe to all normal topics in a system and minimally monitor that there is activity on them (defined as "at least one message per second"). For more specific configuration, the following parameters are available. + +### Frequency Monitoring + +Usually, we want to make sure that the frequency of a topic is within certain bounds. + +For example, to monitor that the frequency of the "talker" topic is about 25Hz and the frequency of the "cmd_vel" topic is about 10Hz, use the following three parameters and pass them lists in the same order. This means, the second element in "min_values" applies to the topic given by the second element of the "topics" parameter. + + topics: [ "talker", "cmd_vel" ] + min_values: [ 23.0, 8.0 ] + max_values: [ 26.5, 11.0 ] + +Please note that the frequencies will be computed purely based on _arrival_ timing, the header is currently ignored. + +## `topic_age_monitor` + +Without any parameters, this node (component also available) will subscribe to all normal topics in a system and minimally monitor that there is activity on them (defined as "at least one message per second"). For more specific configuration, the following parameters are available. + +### Message Age Monitoring + +Usually, we want to make sure that message is not much older than expected (allowing for some jitter due to network transmission), +but also not newer than expected (which would indicate issues with timestamp or system time between machines). + +Since this relies on a sender-side timestamp, this monitor only works with message that include a Header. + +NOTE: You can configure it for any topic and it _will_ interpret the data in there as a timestamp, but unless it +really is a header, the results will be rather random ;-) + +Configuration is similar to the frequency monitor, but giving a range for delays instead: + + topics: [ "talker", "cmd_vel" ] + min_values: [ 80, 8.0 ] + max_values: [ 120, 11.0 ] + monitor_configured_only: True + +> It is highly prudent to only monitor configured topics in the age monitor. If you set it to monitor all, +and you have topics without header fields, you will get a lot of warning messages!! + +## Testing + +To run all tests in this package: + +```bash +colcon test --event-handlers console_direct+ --packages-select diagnostic_topic_monitor && colcon test-result +``` diff --git a/diagnostic_topic_monitor/include/diagnostic_topic_monitor/activity_diagnostic_task.hpp b/diagnostic_topic_monitor/include/diagnostic_topic_monitor/activity_diagnostic_task.hpp new file mode 100644 index 000000000..e88dc2ba5 --- /dev/null +++ b/diagnostic_topic_monitor/include/diagnostic_topic_monitor/activity_diagnostic_task.hpp @@ -0,0 +1,62 @@ +// Copyright (c) 2024, 2025 Robert Bosch GmbH +// +// See the top-level LICENSE file for licensing terms. + +#ifndef ACTIVITY_DIAGNOSTIC_TASK_HPP +#define ACTIVITY_DIAGNOSTIC_TASK_HPP + +#include // NOLINT: upstream + +using namespace std::chrono_literals; + +namespace diagnostic_topic_monitor +{ + +/** + * @brief Very simply DiagnosticTask that just checks whether there has been _any_ + * kind of activity during the last period + */ +class ActivityDiagnosticTask : public diagnostic_updater::DiagnosticTask +{ +public: + ActivityDiagnosticTask( + const std::string & name, rclcpp::Clock::SharedPtr clock, + std::chrono::duration warning_threshold = 1.1s) + : diagnostic_updater::DiagnosticTask(name), + clock_(clock), + last_tick_(0), + warning_threshold_(warning_threshold) + { + } + virtual void tick() {last_tick_ = clock_->now();} + void run(diagnostic_updater::DiagnosticStatusWrapper & stat) override + { + stat.name = getName(); + if (last_tick_.seconds() == 0) { + stat.level = diagnostic_msgs::msg::DiagnosticStatus::WARN; + stat.message = "No data received, yet"; + stat.addf("period", "%f", std::numeric_limits::max()); + } else { + const auto currentTime = clock_->now(); + // more than 1s without tick is considered warning + const auto elapsed = currentTime - last_tick_; + stat.addf("period", "%f", elapsed.seconds()); + if (elapsed <= rclcpp::Duration(warning_threshold_)) { + stat.level = diagnostic_msgs::msg::DiagnosticStatus::OK; + stat.message = "OK"; + } else { + stat.level = diagnostic_msgs::msg::DiagnosticStatus::WARN; + stat.message = "Last data received more than 1s ago"; + } + } + } + +private: + rclcpp::Clock::SharedPtr clock_; + rclcpp::Time last_tick_; + std::chrono::duration warning_threshold_; +}; + +} // namespace diagnostic_topic_monitor + +#endif // ACTIVITY_DIAGNOSTIC_TASK_HPP diff --git a/diagnostic_topic_monitor/include/diagnostic_topic_monitor/generic_topic_monitor.hpp b/diagnostic_topic_monitor/include/diagnostic_topic_monitor/generic_topic_monitor.hpp new file mode 100644 index 000000000..3a55d7c73 --- /dev/null +++ b/diagnostic_topic_monitor/include/diagnostic_topic_monitor/generic_topic_monitor.hpp @@ -0,0 +1,226 @@ +// Copyright (c) 2024, 2025 Robert Bosch GmbH +// +// See the top-level LICENSE file for licensing terms. + +#ifndef GENERIC_TOPIC_MONITOR_HPP +#define GENERIC_TOPIC_MONITOR_HPP + +#include + +#include +#include +#include +#include +#include // NOLINT: upstream +#include // NOLINT: upstream +#include +#include +#include +#include +#include + +#include "diagnostic_msgs/msg/diagnostic_array.hpp" // NOLINT: upstream +#include "diagnostic_topic_monitor/activity_diagnostic_task.hpp" +#include "diagnostic_updater/publisher.hpp" // NOLINT: upstream +#include "diagnostic_updater/update_functions.hpp" // NOLINT: upstream + +namespace diagnostic_topic_monitor +{ +constexpr const char * TOPICS_PARAM_NAME = "topics"; +constexpr const char * MIN_VALUES_PARAM_NAME = "min_values"; +constexpr const char * MAX_VALUES_PARAM_NAME = "max_values"; +constexpr const char * MONITOR_CONFIGURED_ONLY_PARAM_NAME = "monitor_configured_only"; +constexpr const char * DIAG_PREFIX_PARAM_NAME = "diag_prefix"; +typedef rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn LCCBReturn; + +/** + * @brief Base abstract class for creating Topic Monitors. + */ +template +class GenericTopicMonitor : public rclcpp_lifecycle::LifecycleNode +{ +public: + explicit GenericTopicMonitor(const std::string & node_name, rclcpp::NodeOptions options); + ~GenericTopicMonitor() {} + LCCBReturn on_configure(const rclcpp_lifecycle::State &) override; + LCCBReturn on_activate(const rclcpp_lifecycle::State & state) override; + LCCBReturn on_deactivate(const rclcpp_lifecycle::State & state) override; + LCCBReturn on_cleanup(const rclcpp_lifecycle::State &) override {return LCCBReturn::SUCCESS;} + virtual CallbackReturn on_shutdown(const rclcpp_lifecycle::State &) override + { + return LCCBReturn::SUCCESS; + } + +protected: + std::string get_prefixed_name(const std::string & topic_name) const; + bool skip_topic(const std::string & topic_name); + virtual void topic_cb( + const std::string & topic_name, const std::shared_ptr & msg) = 0; + virtual void update_topic_subscriptions(); + virtual StatusParamType parse_params(const int index) = 0; + + std::vector hidden_topics_{ + std::regex("^/rosout$"), std::regex(".*/parameter_events"), std::regex("^/diagnostics$"), + std::regex(".*/transition_event"), std::regex("^/clock")}; + std::shared_ptr timer_; + std::shared_ptr updater_; + std::unordered_map> subscribed_topics_; + std::unordered_map> topic_diag_map_; + std::vector topics_; + std::set known_topics_; + std::vector min_values_; + std::vector max_values_; + bool monitor_configured_only_{true}; + std::string diag_prefix_; +}; + +// Implementation of GenericTopicMonitor methods +template +inline GenericTopicMonitor::GenericTopicMonitor( + const std::string & node_name, rclcpp::NodeOptions options) +: rclcpp_lifecycle::LifecycleNode(node_name, options.allow_undeclared_parameters(true)) +{ + auto desc = rcl_interfaces::msg::ParameterDescriptor{}; + desc.description = "Topics to specify expectations for"; + desc.type = rcl_interfaces::msg::ParameterType::PARAMETER_STRING_ARRAY; + declare_parameter(TOPICS_PARAM_NAME, std::vector(), desc); + auto desc_min = rcl_interfaces::msg::ParameterDescriptor{}; + desc_min.description = "Minimum values for topic from 'topics'"; + desc_min.type = rcl_interfaces::msg::ParameterType::PARAMETER_DOUBLE_ARRAY; + declare_parameter(MIN_VALUES_PARAM_NAME, std::vector(), desc_min); + auto desc_max = rcl_interfaces::msg::ParameterDescriptor{}; + desc_max.description = "Maximum values for topic from 'topics'"; + desc_max.type = rcl_interfaces::msg::ParameterType::PARAMETER_DOUBLE_ARRAY; + declare_parameter(MAX_VALUES_PARAM_NAME, std::vector(), desc_max); + // by default, we include all topics + declare_parameter(MONITOR_CONFIGURED_ONLY_PARAM_NAME, false); + // prefix for diagnostics (for use with modules) + declare_parameter(DIAG_PREFIX_PARAM_NAME, ""); +} + +template +inline std::string +diagnostic_topic_monitor::GenericTopicMonitor::get_prefixed_name( + const std::string & topic_name) const +{ + if (topic_name.find(diag_prefix_) == 0) { + return topic_name; + } + return diag_prefix_ + std::string("/") + topic_name; +} + +template +inline bool diagnostic_topic_monitor::GenericTopicMonitor::skip_topic( + const std::string & topic_name) +{ + // skip if we already subscribed + if (subscribed_topics_.find(topic_name) != subscribed_topics_.end()) { + RCLCPP_DEBUG(get_logger(), "Already subscribed to %s, skipping", topic_name.c_str()); + return true; + } + // if it's configured, we monitor it + if (std::find(topics_.begin(), topics_.end(), topic_name) != topics_.end()) { + RCLCPP_DEBUG(get_logger(), "Topic %s is configured, no skip", topic_name.c_str()); + return false; + } + // skip if we have seen this before and ignored it + if (known_topics_.find(topic_name) != known_topics_.end()) { + return true; + } + // check for matches against internal topic names + for (const auto & re : hidden_topics_) { + if (std::regex_match(topic_name, re)) { + known_topics_.insert(topic_name); + RCLCPP_DEBUG(get_logger(), "Topic %s matches ignore list, skip it", topic_name.c_str()); + return true; + } + } + return monitor_configured_only_; +} + +template +inline void diagnostic_topic_monitor::GenericTopicMonitor< + StatusType, StatusParamType>::update_topic_subscriptions() +{ + RCLCPP_DEBUG( + get_logger(), "Examining topic list for changes, currently monitoring %ld topics", + subscribed_topics_.size()); + const auto topics = this->get_topic_names_and_types(); + for (const auto & e : topics) { + const std::string topic_name = e.first; + if (skip_topic(topic_name)) { + continue; + } + // subscribe with statistics enabled + RCLCPP_DEBUG(get_logger(), "Starting to monitor topic %s", topic_name.c_str()); + std::shared_ptr sub = this->create_generic_subscription( + e.first, *e.second.begin(), rclcpp::QoS(10), + [this, topic_name](std::shared_ptr msg) { + this->topic_cb(topic_name, msg); + }); + subscribed_topics_[topic_name] = sub; + } +} + +template +inline LCCBReturn +diagnostic_topic_monitor::GenericTopicMonitor::on_configure( + const rclcpp_lifecycle::State &) +{ + RCLCPP_DEBUG(get_logger(), "Configuring"); + // configure the diagnostics + topics_ = get_parameter(TOPICS_PARAM_NAME).as_string_array(); + min_values_ = get_parameter(MIN_VALUES_PARAM_NAME).as_double_array(); + max_values_ = get_parameter(MAX_VALUES_PARAM_NAME).as_double_array(); + if (topics_.size() != min_values_.size() || topics_.size() != max_values_.size()) { + throw std::invalid_argument("Topics and min/max_values must have same number of arguments"); + } + monitor_configured_only_ = get_parameter(MONITOR_CONFIGURED_ONLY_PARAM_NAME).as_bool(); + diag_prefix_ = get_parameter(DIAG_PREFIX_PARAM_NAME).as_string(); + + RCLCPP_DEBUG( + get_logger(), "Done configuring for %ld topics, config only: %d", topic_diag_map_.size(), + monitor_configured_only_); + return LCCBReturn::SUCCESS; +} + +template +inline LCCBReturn +diagnostic_topic_monitor::GenericTopicMonitor::on_activate( + const rclcpp_lifecycle::State &) +{ + RCLCPP_DEBUG(get_logger(), "Activating"); + updater_ = std::make_shared(this); + char HOSTNAME[1000]; + gethostname(HOSTNAME, 1000); + updater_->setHardwareID(std::string(HOSTNAME)); + for (size_t i = 0; i < topics_.size(); ++i) { + auto param = parse_params(i); + auto diag = std::make_shared(param, get_prefixed_name(topics_[i]), get_clock()); + topic_diag_map_[topics_[i]] = diag; + updater_->add(*diag); + } + // check existing topics + try { + update_topic_subscriptions(); + } catch (const std::exception & ex) { + RCLCPP_ERROR(this->get_logger(), "Failure to update subscriptions: %s", ex.what()); + return rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn::ERROR; + } + timer_ = create_wall_timer(1s, [this]() {this->update_topic_subscriptions();}); + return LCCBReturn::SUCCESS; +} + +template +inline LCCBReturn +diagnostic_topic_monitor::GenericTopicMonitor::on_deactivate( + const rclcpp_lifecycle::State & state) +{ + (void)state; + RCLCPP_INFO_STREAM(get_logger(), "Deactivating GenericTopicMonitor"); + return LCCBReturn::SUCCESS; +} + +} // namespace diagnostic_topic_monitor + +#endif // GENERIC_TOPIC_MONITOR_HPP diff --git a/diagnostic_topic_monitor/include/diagnostic_topic_monitor/topic_age_monitor.hpp b/diagnostic_topic_monitor/include/diagnostic_topic_monitor/topic_age_monitor.hpp new file mode 100644 index 000000000..d210717fd --- /dev/null +++ b/diagnostic_topic_monitor/include/diagnostic_topic_monitor/topic_age_monitor.hpp @@ -0,0 +1,33 @@ +// Copyright (c) 2024, 2025 Robert Bosch GmbH +// +// See the top-level LICENSE file for licensing terms. + +#ifndef DIAGNOSTIC_TOPIC_MONITOR_TOPIC_FREQUENCY_MONITOR_HPP +#define DIAGNOSTIC_TOPIC_MONITOR_TOPIC_FREQUENCY_MONITOR_HPP + +#include "diagnostic_topic_monitor/generic_topic_monitor.hpp" + +namespace diagnostic_topic_monitor +{ + +class TopicAgeMonitor final + : public GenericTopicMonitor< + diagnostic_updater::TimeStampStatus, diagnostic_updater::TimeStampStatusParam> +{ +public: + TopicAgeMonitor(const std::string & node_name, rclcpp::NodeOptions options); + TopicAgeMonitor(rclcpp::NodeOptions options = rclcpp::NodeOptions()); + ~TopicAgeMonitor() override = default; + + void topic_cb( + const std::string & topic_name, + const std::shared_ptr & msg) override; + diagnostic_updater::TimeStampStatusParam parse_params(const int index) override; + +private: + rclcpp::Serialization _header_serializer; +}; + +} // namespace diagnostic_topic_monitor + +#endif // DIAGNOSTIC_TOPIC_MONITOR_TOPIC_FREQUENCY_MONITOR_HPP diff --git a/diagnostic_topic_monitor/include/diagnostic_topic_monitor/topic_frequency_monitor.hpp b/diagnostic_topic_monitor/include/diagnostic_topic_monitor/topic_frequency_monitor.hpp new file mode 100644 index 000000000..17db075ed --- /dev/null +++ b/diagnostic_topic_monitor/include/diagnostic_topic_monitor/topic_frequency_monitor.hpp @@ -0,0 +1,34 @@ +// Copyright (c) 2024, 2025 Robert Bosch GmbH +// +// See the top-level LICENSE file for licensing terms. + +#ifndef DIAGNOSTIC_TOPIC_MONITOR_TOPIC_FREQUENCY_MONITOR_HPP +#define DIAGNOSTIC_TOPIC_MONITOR_TOPIC_FREQUENCY_MONITOR_HPP + +#include "diagnostic_topic_monitor/generic_topic_monitor.hpp" + +namespace diagnostic_topic_monitor +{ + +class TopicFrequencyMonitor final + : public GenericTopicMonitor< + diagnostic_updater::FrequencyStatus, diagnostic_updater::FrequencyStatusParam> +{ +public: + TopicFrequencyMonitor(const std::string & node_name, rclcpp::NodeOptions options); + TopicFrequencyMonitor(rclcpp::NodeOptions options = rclcpp::NodeOptions()); + ~TopicFrequencyMonitor() override = default; + + void topic_cb( + const std::string & topic_name, + const std::shared_ptr & msg) override; + diagnostic_updater::FrequencyStatusParam parse_params(const int index) override; + void update_topic_subscriptions() override; + +private: + std::unordered_map> fallback_topic_diag_map_; +}; + +} // namespace diagnostic_topic_monitor + +#endif // DIAGNOSTIC_TOPIC_MONITOR_TOPIC_FREQUENCY_MONITOR_HPP diff --git a/diagnostic_topic_monitor/package.xml b/diagnostic_topic_monitor/package.xml new file mode 100644 index 000000000..811ca5301 --- /dev/null +++ b/diagnostic_topic_monitor/package.xml @@ -0,0 +1,33 @@ + + + + diagnostic_topic_monitor + 1.0.0 + Generic Topic Monitor + Ingo Lütkebohle + Ingo Lütkebohle + Tejas Kumar Shastha + BSD-3-Clause + + ament_cmake + + diagnostic_msgs + rclpy + rclcpp + rclcpp_lifecycle + rclcpp_components + statistics_msgs + diagnostic_updater + std_msgs + + python3-ntplib + + ament_lint_auto + ament_lint_common + ament_cmake_pytest + launch_testing + + + ament_cmake + + diff --git a/diagnostic_topic_monitor/src/topic_age_monitor_component.cpp b/diagnostic_topic_monitor/src/topic_age_monitor_component.cpp new file mode 100644 index 000000000..35119c5f7 --- /dev/null +++ b/diagnostic_topic_monitor/src/topic_age_monitor_component.cpp @@ -0,0 +1,45 @@ +#include "diagnostic_topic_monitor/topic_age_monitor.hpp" + +namespace diagnostic_topic_monitor +{ + +TopicAgeMonitor::TopicAgeMonitor(const std::string & node_name, rclcpp::NodeOptions options) +: GenericTopicMonitor(node_name, options) +{ +} + +TopicAgeMonitor::TopicAgeMonitor(rclcpp::NodeOptions options) +: GenericTopicMonitor("topic_age_monitor", options) +{ +} + +diagnostic_updater::TimeStampStatusParam diagnostic_topic_monitor::TopicAgeMonitor::parse_params( + const int index) +{ + return diagnostic_updater::TimeStampStatusParam(min_values_[index], max_values_[index]); +} + +void TopicAgeMonitor::topic_cb( + const std::string & topic_name, const std::shared_ptr & msg) +{ + std_msgs::msg::Header header; + const auto diag = topic_diag_map_.find(topic_name); + try { + _header_serializer.deserialize_message(msg.get(), &header); + } catch (const std::exception & e) { + RCLCPP_WARN( + get_logger(), "Failed to deserialize message from topic %s, skipping it", topic_name.c_str()); + return; + } + + if (diag != topic_diag_map_.end()) { + diag->second->tick(header.stamp); + } else { + RCLCPP_WARN(get_logger(), "No diagnostic found for topic %s", topic_name.c_str()); + } +} + +} // namespace diagnostic_topic_monitor + +#include // NOLINT: upstream +RCLCPP_COMPONENTS_REGISTER_NODE(diagnostic_topic_monitor::TopicAgeMonitor) diff --git a/diagnostic_topic_monitor/src/topic_frequency_monitor_component.cpp b/diagnostic_topic_monitor/src/topic_frequency_monitor_component.cpp new file mode 100644 index 000000000..4295c7725 --- /dev/null +++ b/diagnostic_topic_monitor/src/topic_frequency_monitor_component.cpp @@ -0,0 +1,78 @@ +// Copyright (c) 2024, 2025 Robert Bosch GmbH +// +// See the top-level LICENSE file for licensing terms. + +#include "diagnostic_topic_monitor/topic_frequency_monitor.hpp" + +namespace diagnostic_topic_monitor +{ + +TopicFrequencyMonitor::TopicFrequencyMonitor( + const std::string & node_name, rclcpp::NodeOptions options) +: GenericTopicMonitor(node_name, options) +{ +} + +TopicFrequencyMonitor::TopicFrequencyMonitor(rclcpp::NodeOptions options) +: GenericTopicMonitor("topic_frequency_monitor", options) +{ +} + +diagnostic_updater::FrequencyStatusParam +diagnostic_topic_monitor::TopicFrequencyMonitor::parse_params(const int index) +{ + return diagnostic_updater::FrequencyStatusParam(&min_values_[index], &max_values_[index]); +} + +void TopicFrequencyMonitor::topic_cb( + const std::string & topic_name, const std::shared_ptr &) +{ + const auto diag = topic_diag_map_.find(topic_name); + if (diag != topic_diag_map_.end()) { + diag->second->tick(); + } else { + const auto fallback_diag = fallback_topic_diag_map_.find(topic_name); + if (fallback_diag != fallback_topic_diag_map_.end()) { + fallback_diag->second->tick(); + } else { + RCLCPP_WARN(get_logger(), "No diagnostic found for topic %s", topic_name.c_str()); + } + } +} + +void diagnostic_topic_monitor::TopicFrequencyMonitor::update_topic_subscriptions() +{ + RCLCPP_DEBUG( + get_logger(), "Examining topic list for changes, currently monitoring %ld topics", + subscribed_topics_.size()); + const auto topics = this->get_topic_names_and_types(); + for (const auto & e : topics) { + const std::string topic_name = e.first; + if (skip_topic(topic_name)) { + continue; + } + // subscribe with statistics enabled + RCLCPP_DEBUG(get_logger(), "Starting to monitor topic %s", topic_name.c_str()); + std::shared_ptr sub = this->create_generic_subscription( + e.first, *e.second.begin(), rclcpp::QoS(10), + [this, topic_name](std::shared_ptr msg) { + this->topic_cb(topic_name, msg); + }); + subscribed_topics_[topic_name] = sub; + // add a default diagnostic if none is configured + if (topic_diag_map_.find(topic_name) == topic_diag_map_.end()) { + auto fallback_diag = + std::make_shared(get_prefixed_name(topic_name), get_clock()); + fallback_topic_diag_map_[topic_name] = fallback_diag; + updater_->add(*fallback_diag); + } + } + RCLCPP_DEBUG( + get_logger(), "Done updating topic list. Found %ld, hidden %ld, now monitoring %ld", + topics.size(), known_topics_.size(), subscribed_topics_.size()); +} + +} // namespace diagnostic_topic_monitor + +#include // NOLINT: upstream +RCLCPP_COMPONENTS_REGISTER_NODE(diagnostic_topic_monitor::TopicFrequencyMonitor) diff --git a/diagnostic_topic_monitor/test/config/topic_age_monitor.yaml b/diagnostic_topic_monitor/test/config/topic_age_monitor.yaml new file mode 100644 index 000000000..850c1fbc0 --- /dev/null +++ b/diagnostic_topic_monitor/test/config/topic_age_monitor.yaml @@ -0,0 +1,9 @@ +/**: + ros__parameters: + monitor_configured_only: True + topics: + - /dummy_header_topic + min_values: + - -0.01 + max_values: + - 0.2 diff --git a/diagnostic_topic_monitor/test/config/topic_frequency_monitor.yaml b/diagnostic_topic_monitor/test/config/topic_frequency_monitor.yaml new file mode 100644 index 000000000..37236c4f3 --- /dev/null +++ b/diagnostic_topic_monitor/test/config/topic_frequency_monitor.yaml @@ -0,0 +1,12 @@ +/**: + ros__parameters: + monitor_configured_only: True + topics: + - /dummy_string_topic1 + - /dummy_string_topic2 + min_values: + - 5.0 + - 5.0 + max_values: + - 15.0 + - 15.0 diff --git a/diagnostic_topic_monitor/test/dummy_publishers.py b/diagnostic_topic_monitor/test/dummy_publishers.py new file mode 100755 index 000000000..8ac1a833d --- /dev/null +++ b/diagnostic_topic_monitor/test/dummy_publishers.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 + +# Copyright (c) 2024, 2025 Robert Bosch GmbH +# +# See the top-level LICENSE file for licensing terms. + +import rclpy +from rclpy.node import Node +from std_msgs.msg import String +from sensor_msgs.msg import CameraInfo + +class DummyPublisherNode(Node): + def __init__(self): + super().__init__('dummy_publisher_node') + + # Publishers + self.string_pub1 = self.create_publisher(String, '/dummy_string_topic1', 10) + self.string_pub2 = self.create_publisher(String, '/dummy_string_topic2', 10) + self.string_pub2 = self.create_publisher(String, '/dummy_string_topic3', 10) + self.header_pub = self.create_publisher(CameraInfo, "/dummy_header_topic", 10) + + # Timer to publish at 10Hz + self.timer = self.create_timer(0.1, self.publish_messages) + + self.get_logger().info('DummyPublisherNode has been started.') + + def publish_messages(self): + # Publish to /string_topic1 + msg1 = String() + msg1.data = 'Hello from string_topic1' + self.string_pub1.publish(msg1) + + # Publish to /string_topic2 + msg2 = String() + msg2.data = 'Hello from string_topic2' + self.string_pub2.publish(msg2) + + # Publish to /string_topic3 + msg3 = String() + msg3.data = 'Hello from string_topic3' + self.string_pub2.publish(msg3) + + # Publish to /header_topic + header_msg = CameraInfo() + header_msg.header.stamp = self.get_clock().now().to_msg() + header_msg.header.frame_id = "dummy_frame" + self.header_pub.publish(header_msg) + + self.get_logger().debug('Published dummy messages to all topics.') + + +def main(args=None): + rclpy.init(args=args) + node = DummyPublisherNode() + + try: + rclpy.spin(node) + except KeyboardInterrupt: + node.get_logger().info('Shutting down DummyPublisherNode...') + + +if __name__ == '__main__': + main() diff --git a/diagnostic_topic_monitor/test/test_topic_age_monitor.py b/diagnostic_topic_monitor/test/test_topic_age_monitor.py new file mode 100755 index 000000000..f3bdb35bd --- /dev/null +++ b/diagnostic_topic_monitor/test/test_topic_age_monitor.py @@ -0,0 +1,205 @@ +#! /usr/bin/env python3 + +# Copyright (c) 2024, 2025 Robert Bosch GmbH +# +# See the top-level LICENSE file for licensing terms. + +import time +import unittest +import pytest + +import launch +from launch.actions import EmitEvent, RegisterEventHandler, LogInfo, SetEnvironmentVariable +import launch.event_handlers.on_process_start + +from launch_ros.actions import LifecycleNode, Node +from launch_ros.events.lifecycle import ChangeState +import launch_ros.events.lifecycle + +import launch_testing +import launch_testing.actions +import launch_testing.asserts + +from lifecycle_msgs.msg import Transition +from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus + +from launch.substitutions import PathJoinSubstitution +from launch_ros.substitutions import FindPackageShare + +import rclpy +from rclpy.time import Time, Duration + +# Helper functions to activate the lifecycle monitor nodes +def create_change_state(target, target_state): + return EmitEvent( + event=ChangeState( + lifecycle_node_matcher=launch.events.matches_action(target), + transition_id=target_state, + ) + ) + + +def create_register_configure(target_action): + return RegisterEventHandler( + launch.event_handlers.on_process_start.OnProcessStart( + target_action=target_action, + on_start=[ + LogInfo(msg=f"Emitting configure event for {target_action}"), + create_change_state( + target_action, Transition.TRANSITION_CONFIGURE + ) + ], + ) + ) + + +def create_register_activate(target_action): + return RegisterEventHandler( + launch_ros.event_handlers.OnStateTransition( + target_lifecycle_node=target_action, + start_state="configuring", + goal_state="inactive", + entities=[ + LogInfo(msg=f"Emitting activate event for {target_action}"), + create_change_state( + target_action, Transition.TRANSITION_ACTIVATE + ) + ], + ) + ) + + +CONFIG_MONITOR_NAME = "monitor_configured_topics_node" + + +@pytest.mark.launch_test +def generate_test_description(): + # Node that publishes the topics we want to monitor + talker_node = Node( + package="diagnostic_topic_monitor", + executable="dummy_publishers.py", + output="log", + name="talker", + arguments=["--ros-args", "--log-level", "talker:=warn"], + ) + # Monitor with configuration file + monitor_config_node = LifecycleNode( + package="diagnostic_topic_monitor", + executable="topic_age_monitor", + name=CONFIG_MONITOR_NAME, + output="both", + namespace="", + parameters=[ + PathJoinSubstitution( + [ + FindPackageShare("diagnostic_topic_monitor"), + "test", + "config", + "topic_age_monitor.yaml", + ] + ), + ], + arguments=["--ros-args", "--log-level", "monitor_configured_topics_node:=INFO"], + ) + + return launch.LaunchDescription( + [ + SetEnvironmentVariable("RCUTILS_LOGGING_BUFFERED_STREAM", "1"), + talker_node, + monitor_config_node, + # Right after the monitor starts, make it take the 'configure' transition. + create_register_configure(monitor_config_node), + # When the monitor reaches the 'inactive' state, 'activate'. + create_register_activate(monitor_config_node), + # When the monitor node reaches the 'active' state, we're ready for testing + RegisterEventHandler( + launch_ros.event_handlers.OnStateTransition( + target_lifecycle_node=monitor_config_node, + start_state="activating", + goal_state="active", + entities=[ + LogInfo(msg="Monitor reached active state"), + launch_testing.actions.ReadyToTest(), + ], + ) + ), + ] + ) + + +class TestMonitor(unittest.TestCase): + TIMEOUT = 30 + + @classmethod + def setUpClass(cls): + rclpy.init() + + @classmethod + def tearDownClass(cls): + rclpy.shutdown() + + def setUp(self): + self.node = rclpy.create_node("test_topic_age_monitor_node") + self.log = self.node.get_logger() + self.sub = self.node.create_subscription( + DiagnosticArray, "/diagnostics", self.stat_cb, 10 + ) + self.pub_count = self.node.count_publishers("/diagnostics") + self.log.info( + f"Number of publishers for /diagnostics: {self.pub_count}. Listening for messages..." + ) + self.age_messages = [] # Store messages from configured age monitor + + start_time = time.time() + while len(self.age_messages) < 3: + rclpy.spin_once(self.node, timeout_sec=1.0) + self.log.debug(f"Got {len(self.age_messages)} age msgs msgs") + if (time.time() - start_time) > self.TIMEOUT: + self.fail("Timed out waiting for message in /diagnostics topic") + self.log.info(f"Stored {len(self.age_messages)} age msgs") + + def tearDown(self): + self.node.destroy_node() + del self.node + + def stat_cb(self, msg): + """Store message for future processing.""" + if len(msg.status) == 0: + return + if CONFIG_MONITOR_NAME in msg.status[0].name: + self.age_messages.append(msg) + + def test_diag_msg(self): + """Check that diagnostics messages contain the right content.""" + last_msg = self.age_messages.pop() + # header + current_time = self.node.get_clock().now() + header_time = Time.from_msg(last_msg.header.stamp) + self.assertLess(current_time - header_time, Duration(seconds=0.1)) + last_status = last_msg.status[0] + # status should be OK + self.assertEqual(last_status.level, DiagnosticStatus.OK) + + def test_age_diag_msg(self): + """Check that the age diagnostic works.""" + last_msg = self.age_messages.pop() + self.assertTrue(len(last_msg.status) > 0) + status = last_msg.status[0] + # check some fields for present/content + self.assertTrue(CONFIG_MONITOR_NAME in status.name) + keys = [kv.key for kv in status.values] + self.assertTrue("Earliest timestamp delay:" in keys) + + def test_ignore_unconfigured(self): + """Check that we ignore the topic we don't monitor.""" + last_msg = self.age_messages.pop() + self.assertEqual(len(last_msg.status), 1) # We monitor 1 topic only + names = [status.name for status in last_msg.status] + # This topic should be monitored + self.assertIn(f"{CONFIG_MONITOR_NAME}: /dummy_header_topic", names, f"{names}") + # This topic should not be monitored + self.assertNotIn( + f"{CONFIG_MONITOR_NAME}: /dummy_string_topic1", + names, + f"{names}", + ) diff --git a/diagnostic_topic_monitor/test/test_topic_frequency_monitor.py b/diagnostic_topic_monitor/test/test_topic_frequency_monitor.py new file mode 100755 index 000000000..0072d4405 --- /dev/null +++ b/diagnostic_topic_monitor/test/test_topic_frequency_monitor.py @@ -0,0 +1,219 @@ +#! /usr/bin/env python3 + +# Copyright (c) 2024, 2025 Robert Bosch GmbH +# +# See the top-level LICENSE file for licensing terms. + +import time +import unittest +import pytest + +import launch +from launch.actions import EmitEvent, RegisterEventHandler, LogInfo, SetEnvironmentVariable +import launch.event_handlers.on_process_start + +from launch_ros.actions import LifecycleNode, Node +from launch_ros.events.lifecycle import ChangeState +import launch_ros.events.lifecycle + +import launch_testing +import launch_testing.actions +import launch_testing.asserts + +from lifecycle_msgs.msg import Transition +from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus + +from launch.substitutions import PathJoinSubstitution +from launch_ros.substitutions import FindPackageShare + +import rclpy +from rclpy.time import Time, Duration + +# Helper functions to activate the lifecycle monitor nodes +def create_change_state(target, target_state): + return EmitEvent( + event=ChangeState( + lifecycle_node_matcher=launch.events.matches_action(target), + transition_id=target_state, + ) + ) + + +def create_register_configure(target_action): + return RegisterEventHandler( + launch.event_handlers.on_process_start.OnProcessStart( + target_action=target_action, + on_start=[ + LogInfo(msg=f"Emitting configure event for {target_action}"), + create_change_state( + target_action, Transition.TRANSITION_CONFIGURE + ) + ], + ) + ) + + +def create_register_activate(target_action): + return RegisterEventHandler( + launch_ros.event_handlers.OnStateTransition( + target_lifecycle_node=target_action, + start_state="configuring", + goal_state="inactive", + entities=[ + LogInfo(msg=f"Emitting activate event for {target_action}"), + create_change_state( + target_action, Transition.TRANSITION_ACTIVATE + ) + ], + ) + ) + + +ALL_MONITOR_NAME = "monitor_all_topics_node" +CONFIG_MONITOR_NAME = "monitor_configured_topics_node" + + +@pytest.mark.launch_test +def generate_test_description(): + # Node that publishes the topics we want to monitor + talker_node = Node( + package="diagnostic_topic_monitor", + executable="dummy_publishers.py", + output="log", + name="talker", + arguments=["--ros-args", "--log-level", "talker:=warn"], + ) + # Un-configured frequency monitor that checks all topics + monitor_all_node = LifecycleNode( + package="diagnostic_topic_monitor", + executable="topic_frequency_monitor", + name=ALL_MONITOR_NAME, + output="both", + namespace="", + arguments=["--ros-args", "--log-level", "all_monitor:=INFO"], + ) + # Frequency monitor with configuration file + monitor_config_node = LifecycleNode( + package="diagnostic_topic_monitor", + executable="topic_frequency_monitor", + name=CONFIG_MONITOR_NAME, + output="both", + namespace="", + parameters=[ + PathJoinSubstitution( + [ + FindPackageShare("diagnostic_topic_monitor"), + "test", + "config", + "topic_frequency_monitor.yaml", + ] + ), + ], + arguments=["--ros-args", "--log-level", "monitor_configured_topics_node:=INFO"], + ) + + return launch.LaunchDescription( + [ + SetEnvironmentVariable("RCUTILS_LOGGING_BUFFERED_STREAM", "1"), + talker_node, + monitor_all_node, + monitor_config_node, + # Right after the monitor starts, make it take the 'configure' transition. + create_register_configure(monitor_all_node), + create_register_configure(monitor_config_node), + # When the monitor reaches the 'inactive' state, 'activate'. + create_register_activate(monitor_all_node), + create_register_activate(monitor_config_node), + # When the monitor node reaches the 'active' state, we're ready for testing + RegisterEventHandler( + launch_ros.event_handlers.OnStateTransition( + target_lifecycle_node=monitor_all_node, + start_state="activating", + goal_state="active", + entities=[ + LogInfo(msg="Monitor reached active state"), + launch_testing.actions.ReadyToTest(), + ], + ) + ), + ] + ) + + +class TestMonitor(unittest.TestCase): + TIMEOUT = 30 + + @classmethod + def setUpClass(cls): + rclpy.init() + + @classmethod + def tearDownClass(cls): + rclpy.shutdown() + + def setUp(self): + self.node = rclpy.create_node("test_topic_frequency_monitor_node") + self.log = self.node.get_logger() + self.sub = self.node.create_subscription( + DiagnosticArray, "/diagnostics", self.stat_cb, 10 + ) + self.pub_count = self.node.count_publishers("/diagnostics") + self.log.info( + f"Number of publishers for /diagnostics: {self.pub_count}. Listening for messages..." + ) + self.messages = [] + self.freq_messages = [] + + start_time = time.time() + while len(self.messages) < 3 or len(self.freq_messages) < 3: + rclpy.spin_once(self.node, timeout_sec=1.0) + self.log.debug(f"Got {len(self.messages)} and {len(self.freq_messages)}") + if (time.time() - start_time) > self.TIMEOUT: + self.fail("Timed out waiting for message in /diagnostics topic") + self.log.debug(f"Got {len(self.messages)} and {len(self.freq_messages)}") + + def tearDown(self): + self.node.destroy_node() + del self.node + + def stat_cb(self, msg): + """Store message for future processing.""" + if len(msg.status) == 0: + return + if CONFIG_MONITOR_NAME in msg.status[0].name: + self.freq_messages.append(msg) + else: + self.messages.append(msg) + + def test_diag_msg(self): + """Check that diagnostics messages contain the right content.""" + last_msg = self.messages.pop() + # header + current_time = self.node.get_clock().now() + header_time = Time.from_msg(last_msg.header.stamp) + self.assertLess(current_time - header_time, Duration(seconds=0.1)) + last_status = last_msg.status[0] + # status should be OK + self.assertEqual(last_status.level, DiagnosticStatus.OK) + keys = [value.key for value in last_status.values] + self.assertTrue("period" in keys) + names = [status.name for status in last_msg.status] + # The all_topics monitor should have all topics + self.assertIn(f"{ALL_MONITOR_NAME}: /dummy_header_topic", names, f"{names}") + self.assertIn(f"{ALL_MONITOR_NAME}: /dummy_string_topic1", names, f"{names}") + + def test_frequency_diag_msg(self): + """Check that the frequency diagnostic works.""" + last_msg = self.freq_messages.pop() + self.log.debug(f"{last_msg}") + self.assertTrue(len(last_msg.status) > 0) + status = last_msg.status[0] + # check some fields for present/content + self.assertTrue(CONFIG_MONITOR_NAME in status.name) + keys = [kv.key for kv in status.values] + self.assertIn("Actual frequency (Hz)", keys) + + def test_ignore_unconfigured(self): + """Check that we ignore the topic we don't monitor.""" + last_msg = self.freq_messages.pop() + self.assertEqual(len(last_msg.status), 2) # We monitor 2 topics