Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libs/core/include/rtbot/Buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class Buffer : public Operator {
}

protected:
void process_data() override {
void process_data(bool debug=false) override {
auto& input_queue = get_data_queue(0);

while (!input_queue.empty()) {
Expand Down
2 changes: 1 addition & 1 deletion libs/core/include/rtbot/Demultiplexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class Demultiplexer : public Operator {

protected:

void process_data() override {
void process_data(bool debug=false) override {
while(true) {

bool is_any_control_empty;
Expand Down
54 changes: 0 additions & 54 deletions libs/core/include/rtbot/FilterByValue.h

This file was deleted.

2 changes: 1 addition & 1 deletion libs/core/include/rtbot/Input.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class Input : public Operator {
}

protected:
void process_data() override {
void process_data(bool debug=false) override {
// Process each port independently to allow concurrent timestamps
for (int port_index = 0; port_index < num_data_ports(); port_index++) {
const auto& input_queue = get_data_queue(port_index);
Expand Down
2 changes: 1 addition & 1 deletion libs/core/include/rtbot/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class Join : public Operator {
protected:
// Performs synchronization of input messages

void process_data() override {
void process_data(bool debug=false) override {

while(true) {

Expand Down
2 changes: 1 addition & 1 deletion libs/core/include/rtbot/Multiplexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class Multiplexer : public Operator {

protected:

void process_data() override {
void process_data(bool debug=false) override {
while (true) {

int num_empty_data_ports = 0;
Expand Down
8 changes: 4 additions & 4 deletions libs/core/include/rtbot/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,13 @@ class Operator {
// Process control messages first
if (num_control_ports() > 0) {
SpanScope control_scope{"process_control"};
process_control();
process_control(debug);
}

// Then process data
if (num_data_ports() > 0) {
SpanScope data_scope{"process_data"};
process_data();
process_data(debug);
}

#ifdef RTBOT_INSTRUMENTATION
Expand Down Expand Up @@ -410,8 +410,8 @@ class Operator {
}

protected:
virtual void process_data() = 0;
virtual void process_control() {}
virtual void process_data(bool debug) = 0;
virtual void process_control(bool debug=false) {};

bool sync_data_inputs() {

Expand Down
2 changes: 1 addition & 1 deletion libs/core/include/rtbot/Output.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class Output : public Operator {
}

protected:
void process_data() override {
void process_data(bool debug=false) override {
// Forward all messages from inputs to corresponding outputs
for (size_t i = 0; i < num_data_ports(); ++i) {
auto& input_queue = get_data_queue(i);
Expand Down
23 changes: 14 additions & 9 deletions libs/core/include/rtbot/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,17 @@ class Pipeline : public Operator {
// API for configuring the pipeline
void register_operator(std::shared_ptr<Operator> op) { operators_[op->id()] = std::move(op); }

void set_entry(const std::string& op_id, size_t port = 0) {
void set_entry(const std::string& op_id) {
auto it = operators_.find(op_id);
if (it == operators_.end()) {
throw std::runtime_error("Entry operator not found: " + op_id);
}
entry_operator_ = it->second;
entry_port_ = port;
RTBOT_LOG_DEBUG("Setting entry operator: ", op_id, " -> ", port);
if (it->second->num_data_ports() >= num_data_ports()) {
entry_operator_ = it->second;
RTBOT_LOG_DEBUG("Setting entry operator: ", op_id);
} else {
throw std::runtime_error("Entry operator has less data ports that the pipeline: " + op_id);
}
}

void add_output_mapping(const std::string& op_id, size_t op_port, size_t pipeline_port) {
Expand Down Expand Up @@ -117,8 +120,11 @@ class Pipeline : public Operator {
if (input_port_types_ != other.input_port_types_) return false;
if (output_port_types_!= other.output_port_types_) return false;
if (output_mappings_ != other.output_mappings_) return false;
if (entry_operator_ != other.entry_operator_) return false;
if (entry_port_ != other.entry_port_) return false;
if ((bool)entry_operator_ != (bool)other.entry_operator_) return false;
if (entry_operator_ && other.entry_operator_) {
if (*entry_operator_ != *other.entry_operator_)
return false;
}
if (operators_.size() != other.operators_.size()) return false;

for (const auto& [key, op1] : operators_) {
Expand All @@ -141,7 +147,7 @@ class Pipeline : public Operator {
}

protected:
void process_data() override {
void process_data(bool debug=false) override {
// Check if we have an entry point configured
if (!entry_operator_) {
throw std::runtime_error("Pipeline entry point not configured");
Expand All @@ -153,7 +159,7 @@ class Pipeline : public Operator {
while (!input_queue.empty()) {
auto& msg = input_queue.front();
entry_operator_->receive_data(msg->clone(), i);
entry_operator_->execute();
entry_operator_->execute(debug);
input_queue.pop_front();
// Process output mappings
bool was_reset = false;
Expand Down Expand Up @@ -195,7 +201,6 @@ class Pipeline : public Operator {
std::vector<PipelineConnection> pipeline_connections_;
std::map<std::string, std::shared_ptr<Operator>> operators_;
std::shared_ptr<Operator> entry_operator_;
size_t entry_port_;
std::map<std::string, std::vector<std::pair<size_t, size_t>>> output_mappings_;
};

Expand Down
2 changes: 1 addition & 1 deletion libs/core/include/rtbot/ReduceJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ReduceJoin : public Join {
}

protected:
void process_data() override {
void process_data(bool debug=false) override {

while(true) {

Expand Down
2 changes: 1 addition & 1 deletion libs/std/include/rtbot/std/ArithmeticScalar.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ArithmeticScalar : public Operator {
}

protected:
void process_data() override {
void process_data(bool debug=false) override {
auto& input_queue = get_data_queue(0);
auto& output_queue = get_output_queue(0);

Expand Down
2 changes: 1 addition & 1 deletion libs/std/include/rtbot/std/Constant.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Constant : public Operator {
}

protected:
void process_data() override {
void process_data(bool debug=false) override {
auto& input_queue = get_data_queue(0);
auto& output_queue = get_output_queue(0);

Expand Down
2 changes: 1 addition & 1 deletion libs/std/include/rtbot/std/Count.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class Count : public Operator {
}

protected:
void process_data() override {
void process_data(bool debug=false) override {
auto& input = get_data_queue(0);
auto& output = get_output_queue(0);

Expand Down
2 changes: 1 addition & 1 deletion libs/std/include/rtbot/std/CumulativeSum.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class CumulativeSum : public Operator {
}

protected:
void process_data() override {
void process_data(bool debug=false) override {
auto& input_queue = get_data_queue(0);
auto& output_queue = get_output_queue(0);

Expand Down
2 changes: 1 addition & 1 deletion libs/std/include/rtbot/std/FilterScalar.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class FilterScalar : public Operator {
}

protected:
void process_data() override {
void process_data(bool debug=false) override {
auto& input_queue = get_data_queue(0);
auto& output_queue = get_output_queue(0);

Expand Down
2 changes: 1 addition & 1 deletion libs/std/include/rtbot/std/Function.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class Function : public Operator {
}

protected:
void process_data() override {
void process_data(bool debug=false) override {
auto& input_queue = get_data_queue(0);
auto& output_queue = get_output_queue(0);

Expand Down
2 changes: 1 addition & 1 deletion libs/std/include/rtbot/std/Identity.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Identity : public Operator {
}

protected:
void process_data() override {
void process_data(bool debug=false) override {
auto& input_queue = get_data_queue(0);
auto& output_queue = get_output_queue(0);

Expand Down
2 changes: 1 addition & 1 deletion libs/std/include/rtbot/std/InfiniteImpulseResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class InfiniteImpulseResponse : public Operator {
}

protected:
void process_data() override {
void process_data(bool debug=false) override {
auto& input_queue = get_data_queue(0);
auto& output_queue = get_output_queue(0);

Expand Down
2 changes: 1 addition & 1 deletion libs/std/include/rtbot/std/Linear.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class Linear : public Join {
}

protected:
void process_data() override {
void process_data(bool debug=false) override {

while(true) {

Expand Down
2 changes: 1 addition & 1 deletion libs/std/include/rtbot/std/Replace.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class Replace : public Operator {
}

protected:
void process_data() override {
void process_data(bool debug=false) override {
auto& input_queue = get_data_queue(0);
auto& output_queue = get_output_queue(0);

Expand Down
2 changes: 1 addition & 1 deletion libs/std/include/rtbot/std/ResamplerConstant.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class ResamplerConstant : public Operator {
std::optional<timestamp_t> get_t0() const { return t0_; }

protected:
void process_data() override {
void process_data(bool debug=false) override {
auto& input_queue = get_data_queue(0);
auto& output_queue = get_output_queue(0);

Expand Down
2 changes: 1 addition & 1 deletion libs/std/include/rtbot/std/TimeShift.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class TimeShift : public Operator {
}

protected:
void process_data() override {
void process_data(bool debug=false) override {
auto& input_queue = get_data_queue(0);
auto& output_queue = get_output_queue(0);

Expand Down
4 changes: 2 additions & 2 deletions libs/std/include/rtbot/std/Variable.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ class Variable : public Operator {
}

protected:
void process_data() override {
void process_data(bool debug=false) override {

if (!get_data_queue(0).empty() && !get_control_queue(0).empty())
process_pending_queries();

}

void process_control() override {
void process_control(bool debug=false) override {

if (!get_data_queue(0).empty() && !get_control_queue(0).empty())
process_pending_queries();
Expand Down
2 changes: 2 additions & 0 deletions libs/std/test/test_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ SCENARIO("Pipeline handles state serialization correctly", "[pipeline][State]")
pipeline->receive_data(create_message<NumberData>(1, NumberData{1.0}), 0);
restored->receive_data(create_message<NumberData>(1, NumberData{1.0}), 0);

REQUIRE(*pipeline==*restored);

pipeline->execute();
restored->execute();

Expand Down
Loading