From a7dbf88ebe606eebebc2d15cf1412ac8a0230dd9 Mon Sep 17 00:00:00 2001 From: Eduardo Perez Verdecia Date: Fri, 21 Nov 2025 18:03:37 -0700 Subject: [PATCH 1/2] fix(rtbot): debug mode to all runtime functions --- libs/core/include/rtbot/Buffer.h | 2 +- libs/core/include/rtbot/Demultiplexer.h | 2 +- libs/core/include/rtbot/FilterByValue.h | 2 +- libs/core/include/rtbot/Input.h | 2 +- libs/core/include/rtbot/Join.h | 2 +- libs/core/include/rtbot/Multiplexer.h | 2 +- libs/core/include/rtbot/Operator.h | 8 +++---- libs/core/include/rtbot/Output.h | 2 +- libs/core/include/rtbot/Pipeline.h | 23 +++++++++++-------- libs/core/include/rtbot/ReduceJoin.h | 2 +- libs/std/include/rtbot/std/ArithmeticScalar.h | 2 +- libs/std/include/rtbot/std/Constant.h | 2 +- libs/std/include/rtbot/std/Count.h | 2 +- libs/std/include/rtbot/std/CumulativeSum.h | 2 +- libs/std/include/rtbot/std/FilterScalar.h | 2 +- libs/std/include/rtbot/std/Function.h | 2 +- libs/std/include/rtbot/std/Identity.h | 2 +- .../rtbot/std/InfiniteImpulseResponse.h | 2 +- libs/std/include/rtbot/std/Linear.h | 2 +- libs/std/include/rtbot/std/Replace.h | 2 +- .../std/include/rtbot/std/ResamplerConstant.h | 2 +- libs/std/include/rtbot/std/TimeShift.h | 2 +- libs/std/include/rtbot/std/Variable.h | 4 ++-- libs/std/test/test_pipeline.cpp | 2 ++ 24 files changed, 42 insertions(+), 35 deletions(-) diff --git a/libs/core/include/rtbot/Buffer.h b/libs/core/include/rtbot/Buffer.h index 7f3c3d8b..dc62420c 100644 --- a/libs/core/include/rtbot/Buffer.h +++ b/libs/core/include/rtbot/Buffer.h @@ -183,7 +183,7 @@ class Buffer : public Operator { } protected: - void process_data() override { + void process_data(bool debug=false) override { auto& input_queue = get_data_queue(0); while (!input_queue.empty()) { diff --git a/libs/core/include/rtbot/Demultiplexer.h b/libs/core/include/rtbot/Demultiplexer.h index c008daa5..ac78f741 100644 --- a/libs/core/include/rtbot/Demultiplexer.h +++ b/libs/core/include/rtbot/Demultiplexer.h @@ -51,7 +51,7 @@ class Demultiplexer : public Operator { protected: - void process_data() override { + void process_data(bool debug=false) override { while(true) { bool is_any_control_empty; diff --git a/libs/core/include/rtbot/FilterByValue.h b/libs/core/include/rtbot/FilterByValue.h index 25df3e70..6d7a518c 100644 --- a/libs/core/include/rtbot/FilterByValue.h +++ b/libs/core/include/rtbot/FilterByValue.h @@ -26,7 +26,7 @@ class FilterByValue : public Operator { } protected: - void process_data() override { + void process_data(bool debug=false) override { auto& input_queue = get_data_queue(0); auto& output_queue = get_output_queue(0); diff --git a/libs/core/include/rtbot/Input.h b/libs/core/include/rtbot/Input.h index c75d666c..b6152859 100644 --- a/libs/core/include/rtbot/Input.h +++ b/libs/core/include/rtbot/Input.h @@ -53,7 +53,7 @@ class Input : public Operator { } protected: - void process_data() override { + void process_data(bool debug=false) override { // Process each port independently to allow concurrent timestamps for (int port_index = 0; port_index < num_data_ports(); port_index++) { const auto& input_queue = get_data_queue(port_index); diff --git a/libs/core/include/rtbot/Join.h b/libs/core/include/rtbot/Join.h index 53282a2e..ce69aae1 100644 --- a/libs/core/include/rtbot/Join.h +++ b/libs/core/include/rtbot/Join.h @@ -93,7 +93,7 @@ class Join : public Operator { protected: // Performs synchronization of input messages - void process_data() override { + void process_data(bool debug=false) override { while(true) { diff --git a/libs/core/include/rtbot/Multiplexer.h b/libs/core/include/rtbot/Multiplexer.h index ff16cc3e..b014b9dd 100644 --- a/libs/core/include/rtbot/Multiplexer.h +++ b/libs/core/include/rtbot/Multiplexer.h @@ -77,7 +77,7 @@ class Multiplexer : public Operator { protected: - void process_data() override { + void process_data(bool debug=false) override { while (true) { int num_empty_data_ports = 0; diff --git a/libs/core/include/rtbot/Operator.h b/libs/core/include/rtbot/Operator.h index ac146773..5c5de0cd 100644 --- a/libs/core/include/rtbot/Operator.h +++ b/libs/core/include/rtbot/Operator.h @@ -202,13 +202,13 @@ class Operator { // Process control messages first if (num_control_ports() > 0) { SpanScope control_scope{"process_control"}; - process_control(); + process_control(debug); } // Then process data if (num_data_ports() > 0) { SpanScope data_scope{"process_data"}; - process_data(); + process_data(debug); } #ifdef RTBOT_INSTRUMENTATION @@ -410,8 +410,8 @@ class Operator { } protected: - virtual void process_data() = 0; - virtual void process_control() {} + virtual void process_data(bool debug) = 0; + virtual void process_control(bool debug=false) {}; bool sync_data_inputs() { diff --git a/libs/core/include/rtbot/Output.h b/libs/core/include/rtbot/Output.h index 84119c4e..bf48176c 100644 --- a/libs/core/include/rtbot/Output.h +++ b/libs/core/include/rtbot/Output.h @@ -48,7 +48,7 @@ class Output : public Operator { } protected: - void process_data() override { + void process_data(bool debug=false) override { // Forward all messages from inputs to corresponding outputs for (size_t i = 0; i < num_data_ports(); ++i) { auto& input_queue = get_data_queue(i); diff --git a/libs/core/include/rtbot/Pipeline.h b/libs/core/include/rtbot/Pipeline.h index 70bf9426..72eb4401 100644 --- a/libs/core/include/rtbot/Pipeline.h +++ b/libs/core/include/rtbot/Pipeline.h @@ -60,14 +60,17 @@ class Pipeline : public Operator { // API for configuring the pipeline void register_operator(std::shared_ptr op) { operators_[op->id()] = std::move(op); } - void set_entry(const std::string& op_id, size_t port = 0) { + void set_entry(const std::string& op_id) { auto it = operators_.find(op_id); if (it == operators_.end()) { throw std::runtime_error("Entry operator not found: " + op_id); } - entry_operator_ = it->second; - entry_port_ = port; - RTBOT_LOG_DEBUG("Setting entry operator: ", op_id, " -> ", port); + if (it->second->num_data_ports() >= num_data_ports()) { + entry_operator_ = it->second; + RTBOT_LOG_DEBUG("Setting entry operator: ", op_id); + } else { + throw std::runtime_error("Entry operator has less data ports that the pipeline: " + op_id); + } } void add_output_mapping(const std::string& op_id, size_t op_port, size_t pipeline_port) { @@ -117,8 +120,11 @@ class Pipeline : public Operator { if (input_port_types_ != other.input_port_types_) return false; if (output_port_types_!= other.output_port_types_) return false; if (output_mappings_ != other.output_mappings_) return false; - if (entry_operator_ != other.entry_operator_) return false; - if (entry_port_ != other.entry_port_) return false; + if ((bool)entry_operator_ != (bool)other.entry_operator_) return false; + if (entry_operator_ && other.entry_operator_) { + if (*entry_operator_ != *other.entry_operator_) + return false; + } if (operators_.size() != other.operators_.size()) return false; for (const auto& [key, op1] : operators_) { @@ -141,7 +147,7 @@ class Pipeline : public Operator { } protected: - void process_data() override { + void process_data(bool debug=false) override { // Check if we have an entry point configured if (!entry_operator_) { throw std::runtime_error("Pipeline entry point not configured"); @@ -153,7 +159,7 @@ class Pipeline : public Operator { while (!input_queue.empty()) { auto& msg = input_queue.front(); entry_operator_->receive_data(msg->clone(), i); - entry_operator_->execute(); + entry_operator_->execute(debug); input_queue.pop_front(); // Process output mappings bool was_reset = false; @@ -195,7 +201,6 @@ class Pipeline : public Operator { std::vector pipeline_connections_; std::map> operators_; std::shared_ptr entry_operator_; - size_t entry_port_; std::map>> output_mappings_; }; diff --git a/libs/core/include/rtbot/ReduceJoin.h b/libs/core/include/rtbot/ReduceJoin.h index 5a094519..33871b7f 100644 --- a/libs/core/include/rtbot/ReduceJoin.h +++ b/libs/core/include/rtbot/ReduceJoin.h @@ -41,7 +41,7 @@ class ReduceJoin : public Join { } protected: - void process_data() override { + void process_data(bool debug=false) override { while(true) { diff --git a/libs/std/include/rtbot/std/ArithmeticScalar.h b/libs/std/include/rtbot/std/ArithmeticScalar.h index b24a1c2a..561438af 100644 --- a/libs/std/include/rtbot/std/ArithmeticScalar.h +++ b/libs/std/include/rtbot/std/ArithmeticScalar.h @@ -29,7 +29,7 @@ class ArithmeticScalar : public Operator { } protected: - void process_data() override { + void process_data(bool debug=false) override { auto& input_queue = get_data_queue(0); auto& output_queue = get_output_queue(0); diff --git a/libs/std/include/rtbot/std/Constant.h b/libs/std/include/rtbot/std/Constant.h index 1097246f..27fdeb76 100644 --- a/libs/std/include/rtbot/std/Constant.h +++ b/libs/std/include/rtbot/std/Constant.h @@ -34,7 +34,7 @@ class Constant : public Operator { } protected: - void process_data() override { + void process_data(bool debug=false) override { auto& input_queue = get_data_queue(0); auto& output_queue = get_output_queue(0); diff --git a/libs/std/include/rtbot/std/Count.h b/libs/std/include/rtbot/std/Count.h index f907acb8..468539e0 100644 --- a/libs/std/include/rtbot/std/Count.h +++ b/libs/std/include/rtbot/std/Count.h @@ -55,7 +55,7 @@ class Count : public Operator { } protected: - void process_data() override { + void process_data(bool debug=false) override { auto& input = get_data_queue(0); auto& output = get_output_queue(0); diff --git a/libs/std/include/rtbot/std/CumulativeSum.h b/libs/std/include/rtbot/std/CumulativeSum.h index e3a760f9..d894214a 100644 --- a/libs/std/include/rtbot/std/CumulativeSum.h +++ b/libs/std/include/rtbot/std/CumulativeSum.h @@ -57,7 +57,7 @@ class CumulativeSum : public Operator { } protected: - void process_data() override { + void process_data(bool debug=false) override { auto& input_queue = get_data_queue(0); auto& output_queue = get_output_queue(0); diff --git a/libs/std/include/rtbot/std/FilterScalar.h b/libs/std/include/rtbot/std/FilterScalar.h index 4b660e98..a25c9ee9 100644 --- a/libs/std/include/rtbot/std/FilterScalar.h +++ b/libs/std/include/rtbot/std/FilterScalar.h @@ -28,7 +28,7 @@ class FilterScalar : public Operator { } protected: - void process_data() override { + void process_data(bool debug=false) override { auto& input_queue = get_data_queue(0); auto& output_queue = get_output_queue(0); diff --git a/libs/std/include/rtbot/std/Function.h b/libs/std/include/rtbot/std/Function.h index b3bfade1..e2f3ed57 100644 --- a/libs/std/include/rtbot/std/Function.h +++ b/libs/std/include/rtbot/std/Function.h @@ -48,7 +48,7 @@ class Function : public Operator { } protected: - void process_data() override { + void process_data(bool debug=false) override { auto& input_queue = get_data_queue(0); auto& output_queue = get_output_queue(0); diff --git a/libs/std/include/rtbot/std/Identity.h b/libs/std/include/rtbot/std/Identity.h index edd0b866..ae312730 100644 --- a/libs/std/include/rtbot/std/Identity.h +++ b/libs/std/include/rtbot/std/Identity.h @@ -30,7 +30,7 @@ class Identity : public Operator { } protected: - void process_data() override { + void process_data(bool debug=false) override { auto& input_queue = get_data_queue(0); auto& output_queue = get_output_queue(0); diff --git a/libs/std/include/rtbot/std/InfiniteImpulseResponse.h b/libs/std/include/rtbot/std/InfiniteImpulseResponse.h index 5435da7e..334877c3 100644 --- a/libs/std/include/rtbot/std/InfiniteImpulseResponse.h +++ b/libs/std/include/rtbot/std/InfiniteImpulseResponse.h @@ -111,7 +111,7 @@ class InfiniteImpulseResponse : public Operator { } protected: - void process_data() override { + void process_data(bool debug=false) override { auto& input_queue = get_data_queue(0); auto& output_queue = get_output_queue(0); diff --git a/libs/std/include/rtbot/std/Linear.h b/libs/std/include/rtbot/std/Linear.h index cd4635d8..a3c9e514 100644 --- a/libs/std/include/rtbot/std/Linear.h +++ b/libs/std/include/rtbot/std/Linear.h @@ -35,7 +35,7 @@ class Linear : public Join { } protected: - void process_data() override { + void process_data(bool debug=false) override { while(true) { diff --git a/libs/std/include/rtbot/std/Replace.h b/libs/std/include/rtbot/std/Replace.h index 3f82697a..f567d0c3 100644 --- a/libs/std/include/rtbot/std/Replace.h +++ b/libs/std/include/rtbot/std/Replace.h @@ -28,7 +28,7 @@ class Replace : public Operator { } protected: - void process_data() override { + void process_data(bool debug=false) override { auto& input_queue = get_data_queue(0); auto& output_queue = get_output_queue(0); diff --git a/libs/std/include/rtbot/std/ResamplerConstant.h b/libs/std/include/rtbot/std/ResamplerConstant.h index 917fe054..b1b25db3 100644 --- a/libs/std/include/rtbot/std/ResamplerConstant.h +++ b/libs/std/include/rtbot/std/ResamplerConstant.h @@ -92,7 +92,7 @@ class ResamplerConstant : public Operator { std::optional get_t0() const { return t0_; } protected: - void process_data() override { + void process_data(bool debug=false) override { auto& input_queue = get_data_queue(0); auto& output_queue = get_output_queue(0); diff --git a/libs/std/include/rtbot/std/TimeShift.h b/libs/std/include/rtbot/std/TimeShift.h index b36334fa..d58525aa 100644 --- a/libs/std/include/rtbot/std/TimeShift.h +++ b/libs/std/include/rtbot/std/TimeShift.h @@ -29,7 +29,7 @@ class TimeShift : public Operator { } protected: - void process_data() override { + void process_data(bool debug=false) override { auto& input_queue = get_data_queue(0); auto& output_queue = get_output_queue(0); diff --git a/libs/std/include/rtbot/std/Variable.h b/libs/std/include/rtbot/std/Variable.h index 75ae1e9c..39b2950c 100644 --- a/libs/std/include/rtbot/std/Variable.h +++ b/libs/std/include/rtbot/std/Variable.h @@ -38,14 +38,14 @@ class Variable : public Operator { } protected: - void process_data() override { + void process_data(bool debug=false) override { if (!get_data_queue(0).empty() && !get_control_queue(0).empty()) process_pending_queries(); } - void process_control() override { + void process_control(bool debug=false) override { if (!get_data_queue(0).empty() && !get_control_queue(0).empty()) process_pending_queries(); diff --git a/libs/std/test/test_pipeline.cpp b/libs/std/test/test_pipeline.cpp index 53ef974b..f2a8cd61 100644 --- a/libs/std/test/test_pipeline.cpp +++ b/libs/std/test/test_pipeline.cpp @@ -220,6 +220,8 @@ SCENARIO("Pipeline handles state serialization correctly", "[pipeline][State]") pipeline->receive_data(create_message(1, NumberData{1.0}), 0); restored->receive_data(create_message(1, NumberData{1.0}), 0); + REQUIRE(*pipeline==*restored); + pipeline->execute(); restored->execute(); From e5294e7a366bc79f48ddd5b768f8422cea454985 Mon Sep 17 00:00:00 2001 From: Eduardo Perez Verdecia Date: Sun, 23 Nov 2025 10:10:17 -0700 Subject: [PATCH 2/2] fix(rtbot): deleting unused definiton --- libs/core/include/rtbot/FilterByValue.h | 54 ------------------------- 1 file changed, 54 deletions(-) delete mode 100644 libs/core/include/rtbot/FilterByValue.h diff --git a/libs/core/include/rtbot/FilterByValue.h b/libs/core/include/rtbot/FilterByValue.h deleted file mode 100644 index 6d7a518c..00000000 --- a/libs/core/include/rtbot/FilterByValue.h +++ /dev/null @@ -1,54 +0,0 @@ -#ifndef FILTER_BY_VALUE_H -#define FILTER_BY_VALUE_H - -#include - -#include "rtbot/Operator.h" -#include "rtbot/PortType.h" - -namespace rtbot { - -template -class FilterByValue : public Operator { - public: - using FilterFunction = std::function; - - FilterByValue(std::string id, FilterFunction filter) : Operator(std::move(id)), filter_(std::move(filter)) { - // Add single data input port - add_data_port(); - - // Add single output port - add_output_port(); - } - - bool equals(const FilterByValue& other) const { - return Operator::equals(other); - } - - protected: - void process_data(bool debug=false) override { - auto& input_queue = get_data_queue(0); - auto& output_queue = get_output_queue(0); - - while (!input_queue.empty()) { - const auto* msg = dynamic_cast*>(input_queue.front().get()); - if (!msg) { - throw std::runtime_error("Invalid message type in FilterByValue"); - } - - // Apply filter - if (filter_(msg->data)) { - output_queue.push_back(input_queue.front()->clone()); - } - - input_queue.pop_front(); - } - } - - private: - FilterFunction filter_; -}; - -} // namespace rtbot - -#endif // FILTER_BY_VALUE_H \ No newline at end of file