From 6e0031e1e6dc9edb702b8ad34cee017fcb6d0c10 Mon Sep 17 00:00:00 2001 From: Sinaru Gunawardena Date: Tue, 11 Nov 2025 09:56:39 +0000 Subject: [PATCH 1/4] Move Batch tests related to error handling to own test file --- test/rapidflow/batch/error_handling_test.rb | 143 ++++++++++++++++++++ test/rapidflow/batch_test.rb | 138 +------------------ 2 files changed, 144 insertions(+), 137 deletions(-) create mode 100644 test/rapidflow/batch/error_handling_test.rb diff --git a/test/rapidflow/batch/error_handling_test.rb b/test/rapidflow/batch/error_handling_test.rb new file mode 100644 index 0000000..af50a77 --- /dev/null +++ b/test/rapidflow/batch/error_handling_test.rb @@ -0,0 +1,143 @@ +# frozen_string_literal: true +require "test_helper" + +module RapidFlow + class BatchErrorHandlingTest < Minitest::Test + def test_error_handling_captures_exceptions + batch = Batch.build do + stage ->(data) { + raise "Error in stage 1" if data == "bad" + data + } + stage ->(data) { data.upcase } + end + + batch.push("good") + batch.push("bad") + + results = batch.results + + assert_equal 2, results.length + + # Good result should complete both stages + assert_equal "GOOD", results[0][0] + assert_nil results[0][1] + + # Bad result should have error from stage 1 and not be processed by stage 2 + assert_equal "bad", results[1][0] # Original data preserved + assert_instance_of RuntimeError, results[1][1] + assert_equal "Error in stage 1", results[1][1].message + end + + def test_error_in_middle_stage + batch = Batch.build do + stage ->(data) { data.upcase } + stage ->(data) { + raise "Error in stage 2" if data == "BAD" + data + } + stage ->(data) { data + "!" } + end + + batch.push("good") + batch.push("bad") + batch.push("also_good") + + results = batch.results + + assert_equal 3, results.length + assert_equal ["GOOD!", nil], results[0] + assert_equal ["BAD", results[1][1]], [results[1][0], results[1][1]] + assert_equal "Error in stage 2", results[1][1].message + assert_equal ["ALSO_GOOD!", nil], results[2] + end + + def test_error_in_last_stage + batch = Batch.build do + stage ->(data) { data.upcase } + stage ->(data) { + raise "Error in final stage" if data == "BAD" + data + } + end + + batch.push("good") + batch.push("bad") + + results = batch.results + + assert_equal 2, results.length + assert_equal ["GOOD", nil], results[0] + assert_equal ["BAD", results[1][1]], [results[1][0], results[1][1]] + assert_equal "Error in final stage", results[1][1].message + end + + def test_multiple_errors_in_sequence + batch = Batch.build do + stage ->(data) { + raise "Error at #{data}" if data.start_with?("bad") + data + } + end + + batch.push("good1") + batch.push("bad1") + batch.push("bad2") + batch.push("good2") + + results = batch.results + + assert_equal 4, results.length + assert_equal ["good1", nil], results[0] + assert_instance_of RuntimeError, results[1][1] + assert_instance_of RuntimeError, results[2][1] + assert_equal ["good2", nil], results[3] + end + + def test_exception_types_preserved + batch = Batch.build do + stage ->(data) { + case data + when "argument_error" + raise ArgumentError, "Bad argument" + when "runtime_error" + raise "Runtime problem" + when "custom_error" + raise StandardError, "Custom error" + else + data + end + } + end + + batch.push("good") + batch.push("argument_error") + batch.push("runtime_error") + batch.push("custom_error") + + results = batch.results + + assert_equal 4, results.length + assert_equal ["good", nil], results[0] + assert_instance_of ArgumentError, results[1][1] + assert_instance_of RuntimeError, results[2][1] + assert_instance_of StandardError, results[3][1] + end + + def test_all_items_fail + batch = Batch.build do + stage ->(data) { raise "Always fails" } + end + + 5.times { |i| batch.push(i) } + + results = batch.results + + assert_equal 5, results.length + results.each do |result, error| + assert_instance_of RuntimeError, error + assert_equal "Always fails", error.message + end + end + end +end diff --git a/test/rapidflow/batch_test.rb b/test/rapidflow/batch_test.rb index e3ce7fc..970e8f3 100644 --- a/test/rapidflow/batch_test.rb +++ b/test/rapidflow/batch_test.rb @@ -1,3 +1,4 @@ +# frozen_string_literal: true require "test_helper" module RapidFlow @@ -166,32 +167,6 @@ def test_pipeline_stages_process_independently assert stage1_start_b, "B should have started in stage1" end - def test_error_handling_captures_exceptions - batch = Batch.build do - stage ->(data) { - raise "Error in stage 1" if data == "bad" - data - } - stage ->(data) { data.upcase } - end - - batch.push("good") - batch.push("bad") - - results = batch.results - - assert_equal 2, results.length - - # Good result should complete both stages - assert_equal "GOOD", results[0][0] - assert_nil results[0][1] - - # Bad result should have error from stage 1 and not be processed by stage 2 - assert_equal "bad", results[1][0] # Original data preserved - assert_instance_of RuntimeError, results[1][1] - assert_equal "Error in stage 1", results[1][1].message - end - def test_cannot_push_after_results_called batch = Batch.build do stage ->(data) { data } @@ -343,71 +318,6 @@ def test_empty_pipeline assert_equal 0, results.length end - def test_error_in_middle_stage - batch = Batch.build do - stage ->(data) { data.upcase } - stage ->(data) { - raise "Error in stage 2" if data == "BAD" - data - } - stage ->(data) { data + "!" } - end - - batch.push("good") - batch.push("bad") - batch.push("also_good") - - results = batch.results - - assert_equal 3, results.length - assert_equal ["GOOD!", nil], results[0] - assert_equal ["BAD", results[1][1]], [results[1][0], results[1][1]] - assert_equal "Error in stage 2", results[1][1].message - assert_equal ["ALSO_GOOD!", nil], results[2] - end - - def test_error_in_last_stage - batch = Batch.build do - stage ->(data) { data.upcase } - stage ->(data) { - raise "Error in final stage" if data == "BAD" - data - } - end - - batch.push("good") - batch.push("bad") - - results = batch.results - - assert_equal 2, results.length - assert_equal ["GOOD", nil], results[0] - assert_equal ["BAD", results[1][1]], [results[1][0], results[1][1]] - assert_equal "Error in final stage", results[1][1].message - end - - def test_multiple_errors_in_sequence - batch = Batch.build do - stage ->(data) { - raise "Error at #{data}" if data.start_with?("bad") - data - } - end - - batch.push("good1") - batch.push("bad1") - batch.push("bad2") - batch.push("good2") - - results = batch.results - - assert_equal 4, results.length - assert_equal ["good1", nil], results[0] - assert_instance_of RuntimeError, results[1][1] - assert_instance_of RuntimeError, results[2][1] - assert_equal ["good2", nil], results[3] - end - def test_different_worker_counts # Test with 1 worker per stage (sequential at each stage) j1 = Batch.build do @@ -515,52 +425,6 @@ def test_varying_processing_times end end - def test_exception_types_preserved - batch = Batch.build do - stage ->(data) { - case data - when "argument_error" - raise ArgumentError, "Bad argument" - when "runtime_error" - raise "Runtime problem" - when "custom_error" - raise StandardError, "Custom error" - else - data - end - } - end - - batch.push("good") - batch.push("argument_error") - batch.push("runtime_error") - batch.push("custom_error") - - results = batch.results - - assert_equal 4, results.length - assert_equal ["good", nil], results[0] - assert_instance_of ArgumentError, results[1][1] - assert_instance_of RuntimeError, results[2][1] - assert_instance_of StandardError, results[3][1] - end - - def test_all_items_fail - batch = Batch.build do - stage ->(data) { raise "Always fails" } - end - - 5.times { |i| batch.push(i) } - - results = batch.results - - assert_equal 5, results.length - results.each do |result, error| - assert_instance_of RuntimeError, error - assert_equal "Always fails", error.message - end - end - def test_push_many_items_quickly batch = Batch.build do stage ->(data) { data } From 7db97127e860d40847d3b70ccb23a0acc4c64cdb Mon Sep 17 00:00:00 2001 From: Sinaru Gunawardena Date: Tue, 11 Nov 2025 10:09:14 +0000 Subject: [PATCH 2/4] Move Batch tests related to configuration errors to own test file --- test/rapidflow/batch/config_error_test.rb | 27 +++++++++++++++++++++++ test/rapidflow/batch_test.rb | 19 ---------------- 2 files changed, 27 insertions(+), 19 deletions(-) create mode 100644 test/rapidflow/batch/config_error_test.rb diff --git a/test/rapidflow/batch/config_error_test.rb b/test/rapidflow/batch/config_error_test.rb new file mode 100644 index 0000000..1a0b15f --- /dev/null +++ b/test/rapidflow/batch/config_error_test.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true +require "test_helper" + +module RapidFlow + class BatchConfigErrorTest < Minitest::Test + def test_no_stages_with_build + error = assert_raises(Batch::ConfigError) do + Batch.build do + # no stages + end + end + + assert_equal "Unable to start the batch without any stages", error.message + end + + def test_no_stages_batch_start + error = assert_raises(Batch::ConfigError) do + batch = Batch.new + batch.start + end + + assert_equal "Unable to start the batch without any stages", error.message + end + end +end + + diff --git a/test/rapidflow/batch_test.rb b/test/rapidflow/batch_test.rb index 970e8f3..0078abf 100644 --- a/test/rapidflow/batch_test.rb +++ b/test/rapidflow/batch_test.rb @@ -39,25 +39,6 @@ def test_basic_functionality_with_build assert_equal ["WORLD!", nil], results[1] end - def test_no_stages_with_build - error = assert_raises(Batch::ConfigError) do - Batch.build do - # no stages - end - end - - assert_equal "Unable to start the batch without any stages", error.message - end - - def test_no_stages_batch_start - error = assert_raises(Batch::ConfigError) do - batch = Batch.new - batch.start - end - - assert_equal "Unable to start the batch without any stages", error.message - end - def test_concurrent_execution_is_faster_than_sequential # Each lambda sleeps for 0.5 seconds # With 4 items and 2 stages: From d76c99e7136daf3bbad690d16da05432f98776ca Mon Sep 17 00:00:00 2001 From: Sinaru Gunawardena Date: Tue, 11 Nov 2025 10:21:42 +0000 Subject: [PATCH 3/4] Add validation for worker count --- CHANGELOG.md | 6 ++++++ lib/rapidflow/stage.rb | 8 ++++++++ test/rapidflow/batch/config_error_test.rb | 16 ++++++++++++++++ 3 files changed, 30 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bd456e3..a25abe3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Validation for `workers` count. Worker count should be a positive integer. + +### Changed + - Rename module `Rapidflow` to `RapidFlow`. ## [0.1.0] - 2025.11.01 diff --git a/lib/rapidflow/stage.rb b/lib/rapidflow/stage.rb index cf209d1..4e8ae0a 100644 --- a/lib/rapidflow/stage.rb +++ b/lib/rapidflow/stage.rb @@ -4,6 +4,8 @@ module RapidFlow # Represents a processing stage in the pipeline class Stage def initialize(stage_index:, lambda_fn:, workers:, is_final:, pipeline:) + validate_worker!(workers) + @stage_index = stage_index @lambda_fn = lambda_fn @workers = workers @@ -56,5 +58,11 @@ def forward_item(work_item) @pipeline.enqueue(@stage_index + 1, work_item) @pipeline.decrement_active_workers if @is_final end + + def validate_worker!(workers) + return if workers.kind_of?(Integer) && workers.positive? + + raise RapidFlow::Batch::ConfigError, "Worker count should be a positive number for stage" + end end end diff --git a/test/rapidflow/batch/config_error_test.rb b/test/rapidflow/batch/config_error_test.rb index 1a0b15f..6d2a24b 100644 --- a/test/rapidflow/batch/config_error_test.rb +++ b/test/rapidflow/batch/config_error_test.rb @@ -21,6 +21,22 @@ def test_no_stages_batch_start assert_equal "Unable to start the batch without any stages", error.message end + + def test_invalid_worker_count + [ + -3, + 0, + 1.5, + 'foo', + :bar + ].each do |invalid_worker_count| + error = assert_raises(Batch::ConfigError, "Expected to raise exception for '#{invalid_worker_count}'") do + Batch.new({ fn: ->(data) { data.upcase }, workers: invalid_worker_count }) + end + + assert_equal "Worker count should be a positive number for stage", error.message + end + end end end From 9283afa93b2f28bef4aa8bd0d69d391d21a1c598 Mon Sep 17 00:00:00 2001 From: Sinaru Gunawardena Date: Tue, 11 Nov 2025 10:37:05 +0000 Subject: [PATCH 4/4] Move custom error classes under to RapidFlow module --- CHANGELOG.md | 1 + lib/rapidflow.rb | 1 + lib/rapidflow/batch.rb | 9 +++------ lib/rapidflow/errors.rb | 7 +++++++ lib/rapidflow/stage.rb | 2 +- test/rapidflow/batch/config_error_test.rb | 6 +++--- test/rapidflow/batch_test.rb | 2 +- 7 files changed, 17 insertions(+), 11 deletions(-) create mode 100644 lib/rapidflow/errors.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index a25abe3..c33d86e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Rename module `Rapidflow` to `RapidFlow`. +- Move custom error classes from `RapidFlow::Batch` class under to `RapidFlow` module. ## [0.1.0] - 2025.11.01 diff --git a/lib/rapidflow.rb b/lib/rapidflow.rb index b5185f5..51c01cb 100644 --- a/lib/rapidflow.rb +++ b/lib/rapidflow.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require_relative "rapidflow/version" +require_relative "rapidflow/errors" require_relative "rapidflow/counter" require_relative "rapidflow/pipeline" require_relative "rapidflow/work_item" diff --git a/lib/rapidflow/batch.rb b/lib/rapidflow/batch.rb index 83e04c9..656e7ca 100644 --- a/lib/rapidflow/batch.rb +++ b/lib/rapidflow/batch.rb @@ -2,9 +2,6 @@ module RapidFlow class Batch - class ConfigError < RuntimeError; end - class RunError < RuntimeError; end - # DSL entrypoint def self.build(&block) builder = BatchBuilder.new @@ -33,7 +30,7 @@ def initialize(*stage_configs) end def start - raise ConfigError, "Unable to start the batch without any stages" if @stages.empty? + raise RapidFlow::ConfigError, "Unable to start the batch without any stages" if @stages.empty? @stages.each(&:start) mark_run! @@ -86,13 +83,13 @@ def finalize! def ensure_not_finalized! @locked_mutex.synchronize do - raise RunError, "Cannot push to a locked batch when results are requested" if @locked + raise RapidFlow::RunError, "Cannot push to a locked batch when results are requested" if @locked end end def ensure_running! @running_mutex.synchronize do - raise RunError, "Batch has not started" unless @running + raise RapidFlow::RunError, "Batch has not started" unless @running end end diff --git a/lib/rapidflow/errors.rb b/lib/rapidflow/errors.rb new file mode 100644 index 0000000..61ae7d4 --- /dev/null +++ b/lib/rapidflow/errors.rb @@ -0,0 +1,7 @@ +# frozen_string_literal: true + +module RapidFlow + # Error class to use when the client setup code is invalid. + class ConfigError < RuntimeError; end + class RunError < RuntimeError; end +end diff --git a/lib/rapidflow/stage.rb b/lib/rapidflow/stage.rb index 4e8ae0a..24cee18 100644 --- a/lib/rapidflow/stage.rb +++ b/lib/rapidflow/stage.rb @@ -62,7 +62,7 @@ def forward_item(work_item) def validate_worker!(workers) return if workers.kind_of?(Integer) && workers.positive? - raise RapidFlow::Batch::ConfigError, "Worker count should be a positive number for stage" + raise RapidFlow::ConfigError, "Worker count should be a positive number for stage" end end end diff --git a/test/rapidflow/batch/config_error_test.rb b/test/rapidflow/batch/config_error_test.rb index 6d2a24b..9d052fe 100644 --- a/test/rapidflow/batch/config_error_test.rb +++ b/test/rapidflow/batch/config_error_test.rb @@ -4,7 +4,7 @@ module RapidFlow class BatchConfigErrorTest < Minitest::Test def test_no_stages_with_build - error = assert_raises(Batch::ConfigError) do + error = assert_raises(RapidFlow::ConfigError) do Batch.build do # no stages end @@ -14,7 +14,7 @@ def test_no_stages_with_build end def test_no_stages_batch_start - error = assert_raises(Batch::ConfigError) do + error = assert_raises(RapidFlow::ConfigError) do batch = Batch.new batch.start end @@ -30,7 +30,7 @@ def test_invalid_worker_count 'foo', :bar ].each do |invalid_worker_count| - error = assert_raises(Batch::ConfigError, "Expected to raise exception for '#{invalid_worker_count}'") do + error = assert_raises(RapidFlow::ConfigError, "Expected to raise exception for '#{invalid_worker_count}'") do Batch.new({ fn: ->(data) { data.upcase }, workers: invalid_worker_count }) end diff --git a/test/rapidflow/batch_test.rb b/test/rapidflow/batch_test.rb index 0078abf..dee8227 100644 --- a/test/rapidflow/batch_test.rb +++ b/test/rapidflow/batch_test.rb @@ -156,7 +156,7 @@ def test_cannot_push_after_results_called batch.push("item1") batch.results - error = assert_raises(Batch::RunError) { batch.push("item2") } + error = assert_raises(RapidFlow::RunError) { batch.push("item2") } assert_equal "Cannot push to a locked batch when results are requested", error.message end