diff --git a/.github/workflows/preview.yaml b/.github/workflows/preview.yaml index a384e85b..bd84ecad 100644 --- a/.github/workflows/preview.yaml +++ b/.github/workflows/preview.yaml @@ -1,4 +1,5 @@ name: Preview Changes + on: pull_request: branches: @@ -19,43 +20,51 @@ jobs: name: Test core runs-on: ubuntu-latest steps: + # Checkout repo - uses: actions/checkout@v3 - - uses: pnpm/action-setup@v2.2.4 - with: - version: 8 - run_install: | - - args: [--no-frozen-lockfile] + + # Setup Node.js - uses: actions/setup-node@v3 with: node-version: 18.x - cache: pnpm + + # Force Install pnpm + - uses: pnpm/action-setup@v2 + with: + version: 8 + run_install: true + run_install_args: "--no-frozen-lockfile" + + # Setup Python - uses: actions/setup-python@v4 with: - python-version: "3.11" + python-version: 3.11 + + # Mount Bazel cache - name: Mount bazel caches uses: actions/cache@v3 with: path: | - "~/.cache/bazel" + ~/.cache/bazel key: bazel-cache-${{ hashFiles('**/BUILD.bazel', '**/*.bzl', 'WORKSPACE', '**/*.cpp', '**/*.h', '**/*.rs') }} restore-keys: bazel-cache- + + # Setup Bazelisk - uses: bazelbuild/setup-bazelisk@v2 - - name: Test lib - run: | - bazelisk test //libs/core/test + + # Run Bazel tests + - name: Test core + run: bazelisk test //libs/core/test + - name: Test std - run: | - bazelisk test //libs/std/test + run: bazelisk test //libs/std/test + - name: Test api - run: | - bazelisk test //libs/api/test - - name: Test finance - run: | - bazelisk test //libs/finance/test + run: bazelisk test //libs/api/test + - name: Test python run: | - # use legacy python toolchain until rules_python is adopted bazel test --incompatible_use_python_toolchains=false --python_path=$(which python) //libs/wrappers/python:rtbot_test + - name: Test javascript - run: | - bazelisk test //libs/wrappers/javascript:test + run: bazelisk test //libs/wrappers/javascript:test diff --git a/examples/data/program-test-5.json b/examples/data/program-test-5.json index f912ee6f..69749348 100644 --- a/examples/data/program-test-5.json +++ b/examples/data/program-test-5.json @@ -1,602 +1,19 @@ { - "title": "RSI", - "description": "This program is generated by the compiler, it represents the rsi operator", - "date": "now", - "apiVersion": "v1", - "author": "Eduardo", - "entryOperator": "754", - "output": { - "467": [ - "o1" - ] - }, - "operators": [ - { - "id": "467", - "type": "Output", - "numPorts": 1 - }, - { - "id": "911", - "type": "Add", - "value": 100 - }, - { - "id": "216", - "type": "Scale", - "value": -100 - }, - { - "id": "232", - "type": "Power", - "value": -1 - }, - { - "id": "420", - "type": "Add", - "value": 1 - }, - { - "id": "287", - "type": "Division" - }, - { - "id": "130", - "type": "Linear", - "coeff": [ - 0.92857142857, - 0.07142857142 - ] - }, - { - "id": "746", - "type": "Linear", - "coeff": [ - 0.92857142857, - 0.07142857142 - ] - }, - { - "id": "837", - "type": "TimeShift", - "dt": 1, - "times": 1 - }, - { - "id": "717", - "type": "TimeShift", - "dt": 1, - "times": 1 - }, - { - "id": "855", - "type": "Scale", - "value": -1 - }, - { - "id": "177", - "type": "Constant", - "value": 0 - }, - { - "id": "136", - "type": "Constant", - "value": 0 - }, - { - "id": "572", - "type": "Constant", - "value": 0 - }, - { - "id": "342", - "type": "LessThan", - "value": 0 - }, - { - "id": "764", - "type": "EqualTo", - "value": 0 - }, - { - "id": "794", - "type": "GreaterThan", - "value": 0 - }, - { - "id": "887", - "type": "Difference" - }, - { - "id": "876", - "type": "Variable", - "value": 0 - }, - { - "id": "946", - "type": "Variable", - "value": 0 - }, - { - "id": "649", - "type": "Scale", - "value": 0.07142857142 - }, - { - "id": "654", - "type": "Scale", - "value": 0.07142857142 - }, - { - "id": "138", - "type": "CumulativeSum" - }, - { - "id": "416", - "type": "Scale", - "value": -1 - }, - { - "id": "58", - "type": "CumulativeSum" - }, - { - "id": "180", - "type": "LessThan", - "value": 0 - }, - { - "id": "28", - "type": "EqualTo", - "value": 0 - }, - { - "id": "806", - "type": "GreaterThan", - "value": 0 - }, - { - "id": "34", - "type": "Difference" - }, - { - "id": "144", - "type": "Demultiplexer", - "numPorts": 2 - }, - { - "id": "98", - "type": "Constant", - "value": 1 - }, - { - "id": "434", - "type": "Constant", - "value": 0 - }, - { - "id": "283", - "type": "Constant", - "value": 1 - }, - { - "id": "37", - "type": "Constant", - "value": 0 - }, - { - "id": "495", - "type": "Constant", - "value": 1 - }, - { - "id": "861", - "type": "EqualTo", - "value": 16 - }, - { - "id": "996", - "type": "EqualTo", - "value": 15 - }, - { - "id": "865", - "type": "GreaterThan", - "value": 15 - }, - { - "id": "510", - "type": "LessThan", - "value": 15 - }, - { - "id": "262", - "type": "Count" - }, - { - "id": "754", - "type": "Input", - "numPorts": 1 - } - ], - "connections": [ - { - "from": "911", - "to": "467", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "216", - "to": "911", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "232", - "to": "216", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "420", - "to": "232", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "287", - "to": "420", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "130", - "to": "287", - "fromPort": "o1", - "toPort": "i2" - }, - { - "from": "876", - "to": "287", - "fromPort": "o1", - "toPort": "i2" - }, - { - "from": "746", - "to": "287", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "946", - "to": "287", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "130", - "to": "837", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "177", - "to": "130", - "fromPort": "o1", - "toPort": "i2" - }, - { - "from": "855", - "to": "130", - "fromPort": "o1", - "toPort": "i2" - }, - { - "from": "572", - "to": "130", - "fromPort": "o1", - "toPort": "i2" - }, - { - "from": "837", - "to": "130", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "746", - "to": "717", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "177", - "to": "746", - "fromPort": "o1", - "toPort": "i2" - }, - { - "from": "136", - "to": "746", - "fromPort": "o1", - "toPort": "i2" - }, - { - "from": "794", - "to": "746", - "fromPort": "o1", - "toPort": "i2" - }, - { - "from": "717", - "to": "746", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "876", - "to": "837", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "946", - "to": "717", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "342", - "to": "855", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "764", - "to": "177", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "342", - "to": "136", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "794", - "to": "572", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "887", - "to": "342", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "887", - "to": "764", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "887", - "to": "794", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "144", - "to": "887", - "fromPort": "o2", - "toPort": "i1" - }, - { - "from": "996", - "to": "876", - "fromPort": "o1", - "toPort": "c1" - }, - { - "from": "861", - "to": "876", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "649", - "to": "876", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "996", - "to": "946", - "fromPort": "o1", - "toPort": "c1" - }, - { - "from": "861", - "to": "946", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "654", - "to": "946", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "138", - "to": "649", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "58", - "to": "654", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "28", - "to": "138", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "416", - "to": "138", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "180", - "to": "416", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "28", - "to": "58", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "806", - "to": "58", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "34", - "to": "180", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "34", - "to": "28", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "34", - "to": "806", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "144", - "to": "34", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "98", - "to": "144", - "fromPort": "o1", - "toPort": "c2" - }, - { - "from": "434", - "to": "144", - "fromPort": "o1", - "toPort": "c2" - }, - { - "from": "37", - "to": "144", - "fromPort": "o1", - "toPort": "c1" - }, - { - "from": "283", - "to": "144", - "fromPort": "o1", - "toPort": "c1" - }, - { - "from": "754", - "to": "144", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "996", - "to": "98", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "495", - "to": "98", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "510", - "to": "434", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "996", - "to": "283", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "510", - "to": "283", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "865", - "to": "37", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "865", - "to": "495", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "262", - "to": "861", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "262", - "to": "996", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "262", - "to": "865", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "262", - "to": "510", - "fromPort": "o1", - "toPort": "i1" - }, - { - "from": "754", - "to": "262", - "fromPort": "o1", - "toPort": "i1" - } - ] -} \ No newline at end of file + "title": "Passthru after ignore 10", + "description": "This is a test program", + "apiVersion": "v1", + "author": "Eduardo", + "entryOperator": "input", + "output": { "output": ["o1"] }, + "operators": [ + { "id": "output", "type": "Output", "portTypes": ["number"] }, + { "id": "identity", "type": "Identity" }, + { "id": "ignore", "type": "Ignore", "count": 10 }, + { "id": "input", "type": "Input", "portTypes": ["number"] } + ], + "connections": [ + { "from": "input", "to": "identity", "fromPort": "o1", "toPort": "i1" }, + { "from": "identity", "to": "ignore", "fromPort": "o1", "toPort": "i1" }, + { "from": "ignore", "to": "output", "fromPort": "o1", "toPort": "i1" } + ] +} diff --git a/libs/api/include/rtbot/OperatorJson.h b/libs/api/include/rtbot/OperatorJson.h index e9d186cb..8936156c 100644 --- a/libs/api/include/rtbot/OperatorJson.h +++ b/libs/api/include/rtbot/OperatorJson.h @@ -28,6 +28,7 @@ #include "rtbot/std/FiniteImpulseResponse.h" #include "rtbot/std/Function.h" #include "rtbot/std/Identity.h" +#include "rtbot/std/Ignore.h" #include "rtbot/std/InfiniteImpulseResponse.h" #include "rtbot/std/Linear.h" #include "rtbot/std/MovingAverage.h" @@ -83,8 +84,12 @@ class OperatorJson { return make_join(id, parsed["portTypes"].get>()); } else if (type == "Difference") { return make_difference(id); + } else if (type == "Identity") { + return make_identity(id); } else if (type == "PeakDetector") { return make_peak_detector(id, parsed["window_size"].get()); + } else if (type == "Ignore") { + return make_ignore(id, parsed["count"].get()); } else if (type == "Linear") { return make_linear(id, parsed["coefficients"].get>()); } else if (type == "Subtraction") { @@ -279,6 +284,8 @@ class OperatorJson { j["value"] = std::dynamic_pointer_cast(op)->get_value(); } else if (type == "Add") { j["value"] = std::dynamic_pointer_cast(op)->get_value(); + } else if (type == "Ignore") { + j["value"] = std::dynamic_pointer_cast(op)->get_count(); } else if (type == "Linear") { j["coefficients"] = std::dynamic_pointer_cast(op)->get_coefficients(); } else if (type == "PeakDetector") { @@ -369,6 +376,7 @@ class OperatorJson { } else { throw std::runtime_error("Unknown operator type: " + type); } + return j.dump(); } }; diff --git a/libs/api/test/integration_test_basic_flow_program.cpp b/libs/api/test/integration_test_basic_flow_program.cpp new file mode 100644 index 00000000..439b8792 --- /dev/null +++ b/libs/api/test/integration_test_basic_flow_program.cpp @@ -0,0 +1,67 @@ +#include +#include +#include + +#include "rtbot/Program.h" +#include "rtbot/bindings.h" + +using namespace rtbot; +using json = nlohmann::json; + +TEST_CASE("Basic Flow Pipeline Test", "[basic_pipeline]") { + SECTION("Test Flow Pipeline") { + // Load program JSON + json program_json; + { + std::ifstream in("examples/data/program-test-5.json"); + REQUIRE(in.good()); + in >> program_json; + } + + // Create program + auto& manager = ProgramManager::instance(); + manager.clear_all_programs(); + + std::string validate_result = validate_program(program_json.dump()); + std::cout << "Validate result: " << pretty_print_validation_error(validate_result) << std::endl; + + std::string create_result = manager.create_program("test_prog", program_json.dump()); + std::cout << "Create result: " << create_result << std::endl; + REQUIRE(create_result.empty()); + + std::vector times; + std::vector values; + std::vector ports; + + const uint64_t start = 1; + const uint64_t end = 20; + + for (uint64_t i = start; i <= end; i++) { + times.push_back(i); + values.push_back(i * i); + ports.push_back("i1"); + } + + // Process data in batches and verify outputs + std::string returned = process_batch("test_prog", times, values, ports); + auto result = json::parse(returned); + + if (result.contains("output")) { + const auto& op_outputs = result["output"]; + + if (op_outputs.contains("o1")) { + int time = 11; + for (const auto& msg : op_outputs["o1"]) { + uint64_t current_time = msg["time"]; + double current_value = msg["value"]; + + REQUIRE(current_time == time); + REQUIRE(current_value == time * time); + time++; + } + } else + REQUIRE(1 == 0); + } else + REQUIRE(1 == 0); + } +} \ No newline at end of file diff --git a/libs/std/include/rtbot/std/Ignore.h b/libs/std/include/rtbot/std/Ignore.h new file mode 100644 index 00000000..9a377845 --- /dev/null +++ b/libs/std/include/rtbot/std/Ignore.h @@ -0,0 +1,99 @@ +#ifndef IGNORE_H +#define IGNORE_H + +#include "rtbot/Message.h" +#include "rtbot/Operator.h" +#include "rtbot/PortType.h" + +namespace rtbot { + +class Ignore : public Operator { + public: + Ignore(std::string id, size_t count = 0) : Operator(std::move(id)), count_(count), ignored_(0) { + // Single input and output port + add_data_port(); + add_control_port(); + add_output_port(); + } + + void reset() override { + Operator::reset(); + ignored_ = 0; + } + + std::string type_name() const override { return "Ignore"; } + + size_t get_count() const { return count_; } + + Bytes collect() override { + Bytes bytes = Operator::collect(); + + // Serialize count_ + bytes.insert(bytes.end(), reinterpret_cast(&count_), + reinterpret_cast(&count_) + sizeof(count_)); + + // Serialize ignored_ + bytes.insert(bytes.end(), reinterpret_cast(&ignored_), + reinterpret_cast(&ignored_) + sizeof(ignored_)); + + return bytes; + } + + void restore(Bytes::const_iterator& it) override { + Operator::restore(it); + + // Restore count_ + count_ = *reinterpret_cast(&(*it)); + it += sizeof(size_t); + + // Restore ignored_ + ignored_ = *reinterpret_cast(&(*it)); + it += sizeof(size_t); + } + + protected: + void process_data() override { + auto& input_queue = get_data_queue(0); + auto& output_queue = get_output_queue(0); + + while (!input_queue.empty()) { + const auto* msg = dynamic_cast*>(input_queue.front().get()); + if (!msg) { + throw std::runtime_error("Invalid message type in Ignore"); + } + + if (ignored_ >= count_) { + // Forward message by cloning + output_queue.push_back(input_queue.front()->clone()); + } else + ignored_++; + input_queue.pop_front(); + } + } + + void process_control() override { + auto& control_queue = get_control_queue(0); + + while (!control_queue.empty()) { + const auto* query = dynamic_cast*>(control_queue.front().get()); + if (!query) { + throw std::runtime_error("Invalid control message type in Ignore"); + } + ignored_ = 0; + control_queue.pop_front(); + } + } + + private: + size_t count_; + size_t ignored_; +}; + +// Factory function +inline std::shared_ptr make_ignore(std::string id, size_t count = 0) { + return std::make_shared(std::move(id), count); +} + +} // namespace rtbot + +#endif // IDENTITY_H \ No newline at end of file diff --git a/libs/std/include/rtbot/std/Ignore.md b/libs/std/include/rtbot/std/Ignore.md new file mode 100644 index 00000000..7d736e7f --- /dev/null +++ b/libs/std/include/rtbot/std/Ignore.md @@ -0,0 +1,68 @@ +--- +behavior: + buffered: false + throughput: constant +view: + shape: circle + latex: + template: | + \mathbb{1} if count have been ignored +jsonschema: + type: object + properties: + id: + type: string + description: The id of the operator + examples: ["id1"] + count: + type: number + description: The count to ignore + examples: [40] + required: ["id", "count"] +--- + +# Ignore + +The Ignore operator forwards all input messages without modification once it reaches the indicated count to ignore, maintaining their timestamp and value. Each message received on input port 0 is immediately forwarded to output port 0. + +## Configuration + +Requires an operator ID. +Requires a count to ignore. + +## Ports + +- Input Port 0: Accepts NumberData messages +- Output Port 0: Emits NumberData messages +- Control Port 0: To reset the amount ignore back to zero + +## Example Operation + +| Time | Input Port 0 | Output Port 0 | Ignored | Count | +| ---- | ------------ | ------------- | ------- | ----- | +| 1 | 42.0 | - | 1 | 2 | +| 2 | - | - | 1 | 2 | +| 3 | 15.7 | - | 2 | 2 | +| 4 | - | - | 2 | 2 | +| 5 | 33.1 | 33.1 | 2 | 2 | +| 6 | 80.1 | 80.1 | 2 | 2 | + +## Behavior + +The Identity operator: + +- Maintains message order +- Preserves timestamps +- Does not buffer messages +- Has constant 1:1 throughput when the amount driven by count has been ignored +- Performs no data transformation + +Mathematical representation: +$$y(t_n) = x(t_n)$$ when the amount driven by count has been ignored + +## Error Handling + +Throws std::runtime_error if: + +- Receiving message with incorrect type +- Receiving message on invalid port diff --git a/libs/std/test/test_ignore.cpp b/libs/std/test/test_ignore.cpp new file mode 100644 index 00000000..8c270f1c --- /dev/null +++ b/libs/std/test/test_ignore.cpp @@ -0,0 +1,160 @@ +#include + +#include "rtbot/std/Ignore.h" + +using namespace rtbot; + +SCENARIO("Ignored operator handles basic message forwarding", "[ignore]") { + GIVEN("An ignored operator") { + auto ignored = make_ignore("id1", 0); + + WHEN("Receiving a single message") { + ignored->receive_data(create_message(1, NumberData{42.0}), 0); + ignored->execute(); + + THEN("Message is forwarded unchanged") { + const auto& output = ignored->get_output_queue(0); + REQUIRE(output.size() == 1); + const auto* msg = dynamic_cast*>(output.front().get()); + REQUIRE(msg != nullptr); + REQUIRE(msg->time == 1); + REQUIRE(msg->data.value == 42.0); + } + } + + WHEN("Receiving multiple messages") { + ignored->receive_data(create_message(1, NumberData{1.0}), 0); + ignored->receive_data(create_message(3, NumberData{3.0}), 0); + ignored->receive_data(create_message(5, NumberData{5.0}), 0); + ignored->execute(); + + THEN("All messages are forwarded in order") { + const auto& output = ignored->get_output_queue(0); + REQUIRE(output.size() == 3); + + auto it = output.begin(); + std::vector> expected = {{1, 1.0}, {3, 3.0}, {5, 5.0}}; + + for (const auto& [exp_time, exp_value] : expected) { + const auto* msg = dynamic_cast*>((*it).get()); + REQUIRE(msg != nullptr); + REQUIRE(msg->time == exp_time); + REQUIRE(msg->data.value == exp_value); + ++it; + } + } + } + } +} + +SCENARIO("Ignored operator handles basic message forwarding when ignoring 3 messages", "[ignore]") { + GIVEN("An ignored operator") { + auto ignored = make_ignore("id1", 3); + + WHEN("Receiving a single message") { + ignored->receive_data(create_message(1, NumberData{42.0}), 0); + ignored->execute(); + + THEN("No message is found") { + const auto& output = ignored->get_output_queue(0); + REQUIRE(output.size() == 0); + } + } + + WHEN("Receiving 4 messages only 1 forwarded beacuse we are ignoring 3") { + ignored->receive_data(create_message(1, NumberData{1.0}), 0); + ignored->receive_data(create_message(3, NumberData{3.0}), 0); + ignored->receive_data(create_message(5, NumberData{5.0}), 0); + ignored->receive_data(create_message(6, NumberData{6.0}), 0); + ignored->execute(); + + THEN("Only 1 message is forwarded") { + const auto& output = ignored->get_output_queue(0); + REQUIRE(output.size() == 1); + const auto* msg = dynamic_cast*>(output.front().get()); + REQUIRE(msg != nullptr); + REQUIRE(msg->time == 6); + REQUIRE(msg->data.value == 6.0); + } + } + + WHEN( + "Receiving 5 messages only 1 forwarded beacuse we are ignoring 3 and then receiving a control message to reset " + "back to 0 ignored") { + ignored->receive_data(create_message(1, NumberData{1.0}), 0); + ignored->receive_data(create_message(3, NumberData{3.0}), 0); + ignored->receive_data(create_message(5, NumberData{5.0}), 0); + ignored->receive_data(create_message(6, NumberData{6.0}), 0); + ignored->receive_data(create_message(7, NumberData{7.0}), 0); + ignored->execute(); + + THEN( + "Only 2 message is forwarded, after that we received a control message and 4 data messages and only 1 is " + "forwarded") { + const auto& output = ignored->get_output_queue(0); + REQUIRE(output.size() == 2); + ignored->clear_all_output_ports(); + ignored->receive_control(create_message(9, NumberData{1}), 0); + ignored->receive_data(create_message(9, NumberData{1.0}), 0); + ignored->receive_data(create_message(10, NumberData{3.0}), 0); + ignored->receive_data(create_message(11, NumberData{5.0}), 0); + ignored->receive_data(create_message(12, NumberData{6.0}), 0); + ignored->execute(); + const auto& output1 = ignored->get_output_queue(0); + REQUIRE(output1.size() == 1); + const auto* msg = dynamic_cast*>(output1.front().get()); + REQUIRE(msg != nullptr); + REQUIRE(msg->time == 12); + REQUIRE(msg->data.value == 6.0); + } + } + } +} + +SCENARIO("Ignore operator handles state serialization", "[ignore]") { + GIVEN("An ignore operator with processed messages") { + auto ignore = make_ignore("id1", 3); + + // Add some messages + ignore->receive_data(create_message(1, NumberData{10.0}), 0); + ignore->receive_data(create_message(3, NumberData{30.0}), 0); + ignore->execute(); + + WHEN("State is serialized and restored") { + // Serialize state + Bytes state = ignore->collect(); + + // Create new operator + auto restored = make_ignore("id1", 0); + + // Restore state + auto it = state.cbegin(); + restored->restore(it); + + THEN("Behavior is preserved") { + restored->clear_all_output_ports(); + restored->receive_data(create_message(5, NumberData{50.0}), 0); + restored->receive_data(create_message(6, NumberData{60.0}), 0); + restored->execute(); + + const auto& output = restored->get_output_queue(0); + REQUIRE(output.size() == 1); + + const auto* msg = dynamic_cast*>(output.front().get()); + REQUIRE(msg != nullptr); + REQUIRE(msg->time == 6); + REQUIRE(msg->data.value == 60.0); + } + } + } +} + +SCENARIO("Ignored operator validates message types", "[ignore]") { + GIVEN("An ignored operator") { + auto ignore = make_ignore("id1", 0); + + THEN("It rejects invalid message types") { + REQUIRE_THROWS_AS(ignore->receive_data(create_message(1, BooleanData{true}), 0), std::runtime_error); + } + } +} \ No newline at end of file