diff --git a/CHANGELOG.md b/CHANGELOG.md index bd456e3..c33d86e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Validation for `workers` count. Worker count should be a positive integer. + +### Changed + - Rename module `Rapidflow` to `RapidFlow`. +- Move custom error classes from `RapidFlow::Batch` class under to `RapidFlow` module. ## [0.1.0] - 2025.11.01 diff --git a/lib/rapidflow.rb b/lib/rapidflow.rb index b5185f5..51c01cb 100644 --- a/lib/rapidflow.rb +++ b/lib/rapidflow.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require_relative "rapidflow/version" +require_relative "rapidflow/errors" require_relative "rapidflow/counter" require_relative "rapidflow/pipeline" require_relative "rapidflow/work_item" diff --git a/lib/rapidflow/batch.rb b/lib/rapidflow/batch.rb index 83e04c9..656e7ca 100644 --- a/lib/rapidflow/batch.rb +++ b/lib/rapidflow/batch.rb @@ -2,9 +2,6 @@ module RapidFlow class Batch - class ConfigError < RuntimeError; end - class RunError < RuntimeError; end - # DSL entrypoint def self.build(&block) builder = BatchBuilder.new @@ -33,7 +30,7 @@ def initialize(*stage_configs) end def start - raise ConfigError, "Unable to start the batch without any stages" if @stages.empty? + raise RapidFlow::ConfigError, "Unable to start the batch without any stages" if @stages.empty? @stages.each(&:start) mark_run! @@ -86,13 +83,13 @@ def finalize! def ensure_not_finalized! @locked_mutex.synchronize do - raise RunError, "Cannot push to a locked batch when results are requested" if @locked + raise RapidFlow::RunError, "Cannot push to a locked batch when results are requested" if @locked end end def ensure_running! @running_mutex.synchronize do - raise RunError, "Batch has not started" unless @running + raise RapidFlow::RunError, "Batch has not started" unless @running end end diff --git a/lib/rapidflow/errors.rb b/lib/rapidflow/errors.rb new file mode 100644 index 0000000..61ae7d4 --- /dev/null +++ b/lib/rapidflow/errors.rb @@ -0,0 +1,7 @@ +# frozen_string_literal: true + +module RapidFlow + # Error class to use when the client setup code is invalid. + class ConfigError < RuntimeError; end + class RunError < RuntimeError; end +end diff --git a/lib/rapidflow/stage.rb b/lib/rapidflow/stage.rb index cf209d1..24cee18 100644 --- a/lib/rapidflow/stage.rb +++ b/lib/rapidflow/stage.rb @@ -4,6 +4,8 @@ module RapidFlow # Represents a processing stage in the pipeline class Stage def initialize(stage_index:, lambda_fn:, workers:, is_final:, pipeline:) + validate_worker!(workers) + @stage_index = stage_index @lambda_fn = lambda_fn @workers = workers @@ -56,5 +58,11 @@ def forward_item(work_item) @pipeline.enqueue(@stage_index + 1, work_item) @pipeline.decrement_active_workers if @is_final end + + def validate_worker!(workers) + return if workers.kind_of?(Integer) && workers.positive? + + raise RapidFlow::ConfigError, "Worker count should be a positive number for stage" + end end end diff --git a/test/rapidflow/batch/config_error_test.rb b/test/rapidflow/batch/config_error_test.rb new file mode 100644 index 0000000..9d052fe --- /dev/null +++ b/test/rapidflow/batch/config_error_test.rb @@ -0,0 +1,43 @@ +# frozen_string_literal: true +require "test_helper" + +module RapidFlow + class BatchConfigErrorTest < Minitest::Test + def test_no_stages_with_build + error = assert_raises(RapidFlow::ConfigError) do + Batch.build do + # no stages + end + end + + assert_equal "Unable to start the batch without any stages", error.message + end + + def test_no_stages_batch_start + error = assert_raises(RapidFlow::ConfigError) do + batch = Batch.new + batch.start + end + + assert_equal "Unable to start the batch without any stages", error.message + end + + def test_invalid_worker_count + [ + -3, + 0, + 1.5, + 'foo', + :bar + ].each do |invalid_worker_count| + error = assert_raises(RapidFlow::ConfigError, "Expected to raise exception for '#{invalid_worker_count}'") do + Batch.new({ fn: ->(data) { data.upcase }, workers: invalid_worker_count }) + end + + assert_equal "Worker count should be a positive number for stage", error.message + end + end + end +end + + diff --git a/test/rapidflow/batch/error_handling_test.rb b/test/rapidflow/batch/error_handling_test.rb new file mode 100644 index 0000000..af50a77 --- /dev/null +++ b/test/rapidflow/batch/error_handling_test.rb @@ -0,0 +1,143 @@ +# frozen_string_literal: true +require "test_helper" + +module RapidFlow + class BatchErrorHandlingTest < Minitest::Test + def test_error_handling_captures_exceptions + batch = Batch.build do + stage ->(data) { + raise "Error in stage 1" if data == "bad" + data + } + stage ->(data) { data.upcase } + end + + batch.push("good") + batch.push("bad") + + results = batch.results + + assert_equal 2, results.length + + # Good result should complete both stages + assert_equal "GOOD", results[0][0] + assert_nil results[0][1] + + # Bad result should have error from stage 1 and not be processed by stage 2 + assert_equal "bad", results[1][0] # Original data preserved + assert_instance_of RuntimeError, results[1][1] + assert_equal "Error in stage 1", results[1][1].message + end + + def test_error_in_middle_stage + batch = Batch.build do + stage ->(data) { data.upcase } + stage ->(data) { + raise "Error in stage 2" if data == "BAD" + data + } + stage ->(data) { data + "!" } + end + + batch.push("good") + batch.push("bad") + batch.push("also_good") + + results = batch.results + + assert_equal 3, results.length + assert_equal ["GOOD!", nil], results[0] + assert_equal ["BAD", results[1][1]], [results[1][0], results[1][1]] + assert_equal "Error in stage 2", results[1][1].message + assert_equal ["ALSO_GOOD!", nil], results[2] + end + + def test_error_in_last_stage + batch = Batch.build do + stage ->(data) { data.upcase } + stage ->(data) { + raise "Error in final stage" if data == "BAD" + data + } + end + + batch.push("good") + batch.push("bad") + + results = batch.results + + assert_equal 2, results.length + assert_equal ["GOOD", nil], results[0] + assert_equal ["BAD", results[1][1]], [results[1][0], results[1][1]] + assert_equal "Error in final stage", results[1][1].message + end + + def test_multiple_errors_in_sequence + batch = Batch.build do + stage ->(data) { + raise "Error at #{data}" if data.start_with?("bad") + data + } + end + + batch.push("good1") + batch.push("bad1") + batch.push("bad2") + batch.push("good2") + + results = batch.results + + assert_equal 4, results.length + assert_equal ["good1", nil], results[0] + assert_instance_of RuntimeError, results[1][1] + assert_instance_of RuntimeError, results[2][1] + assert_equal ["good2", nil], results[3] + end + + def test_exception_types_preserved + batch = Batch.build do + stage ->(data) { + case data + when "argument_error" + raise ArgumentError, "Bad argument" + when "runtime_error" + raise "Runtime problem" + when "custom_error" + raise StandardError, "Custom error" + else + data + end + } + end + + batch.push("good") + batch.push("argument_error") + batch.push("runtime_error") + batch.push("custom_error") + + results = batch.results + + assert_equal 4, results.length + assert_equal ["good", nil], results[0] + assert_instance_of ArgumentError, results[1][1] + assert_instance_of RuntimeError, results[2][1] + assert_instance_of StandardError, results[3][1] + end + + def test_all_items_fail + batch = Batch.build do + stage ->(data) { raise "Always fails" } + end + + 5.times { |i| batch.push(i) } + + results = batch.results + + assert_equal 5, results.length + results.each do |result, error| + assert_instance_of RuntimeError, error + assert_equal "Always fails", error.message + end + end + end +end diff --git a/test/rapidflow/batch_test.rb b/test/rapidflow/batch_test.rb index e3ce7fc..dee8227 100644 --- a/test/rapidflow/batch_test.rb +++ b/test/rapidflow/batch_test.rb @@ -1,3 +1,4 @@ +# frozen_string_literal: true require "test_helper" module RapidFlow @@ -38,25 +39,6 @@ def test_basic_functionality_with_build assert_equal ["WORLD!", nil], results[1] end - def test_no_stages_with_build - error = assert_raises(Batch::ConfigError) do - Batch.build do - # no stages - end - end - - assert_equal "Unable to start the batch without any stages", error.message - end - - def test_no_stages_batch_start - error = assert_raises(Batch::ConfigError) do - batch = Batch.new - batch.start - end - - assert_equal "Unable to start the batch without any stages", error.message - end - def test_concurrent_execution_is_faster_than_sequential # Each lambda sleeps for 0.5 seconds # With 4 items and 2 stages: @@ -166,32 +148,6 @@ def test_pipeline_stages_process_independently assert stage1_start_b, "B should have started in stage1" end - def test_error_handling_captures_exceptions - batch = Batch.build do - stage ->(data) { - raise "Error in stage 1" if data == "bad" - data - } - stage ->(data) { data.upcase } - end - - batch.push("good") - batch.push("bad") - - results = batch.results - - assert_equal 2, results.length - - # Good result should complete both stages - assert_equal "GOOD", results[0][0] - assert_nil results[0][1] - - # Bad result should have error from stage 1 and not be processed by stage 2 - assert_equal "bad", results[1][0] # Original data preserved - assert_instance_of RuntimeError, results[1][1] - assert_equal "Error in stage 1", results[1][1].message - end - def test_cannot_push_after_results_called batch = Batch.build do stage ->(data) { data } @@ -200,7 +156,7 @@ def test_cannot_push_after_results_called batch.push("item1") batch.results - error = assert_raises(Batch::RunError) { batch.push("item2") } + error = assert_raises(RapidFlow::RunError) { batch.push("item2") } assert_equal "Cannot push to a locked batch when results are requested", error.message end @@ -343,71 +299,6 @@ def test_empty_pipeline assert_equal 0, results.length end - def test_error_in_middle_stage - batch = Batch.build do - stage ->(data) { data.upcase } - stage ->(data) { - raise "Error in stage 2" if data == "BAD" - data - } - stage ->(data) { data + "!" } - end - - batch.push("good") - batch.push("bad") - batch.push("also_good") - - results = batch.results - - assert_equal 3, results.length - assert_equal ["GOOD!", nil], results[0] - assert_equal ["BAD", results[1][1]], [results[1][0], results[1][1]] - assert_equal "Error in stage 2", results[1][1].message - assert_equal ["ALSO_GOOD!", nil], results[2] - end - - def test_error_in_last_stage - batch = Batch.build do - stage ->(data) { data.upcase } - stage ->(data) { - raise "Error in final stage" if data == "BAD" - data - } - end - - batch.push("good") - batch.push("bad") - - results = batch.results - - assert_equal 2, results.length - assert_equal ["GOOD", nil], results[0] - assert_equal ["BAD", results[1][1]], [results[1][0], results[1][1]] - assert_equal "Error in final stage", results[1][1].message - end - - def test_multiple_errors_in_sequence - batch = Batch.build do - stage ->(data) { - raise "Error at #{data}" if data.start_with?("bad") - data - } - end - - batch.push("good1") - batch.push("bad1") - batch.push("bad2") - batch.push("good2") - - results = batch.results - - assert_equal 4, results.length - assert_equal ["good1", nil], results[0] - assert_instance_of RuntimeError, results[1][1] - assert_instance_of RuntimeError, results[2][1] - assert_equal ["good2", nil], results[3] - end - def test_different_worker_counts # Test with 1 worker per stage (sequential at each stage) j1 = Batch.build do @@ -515,52 +406,6 @@ def test_varying_processing_times end end - def test_exception_types_preserved - batch = Batch.build do - stage ->(data) { - case data - when "argument_error" - raise ArgumentError, "Bad argument" - when "runtime_error" - raise "Runtime problem" - when "custom_error" - raise StandardError, "Custom error" - else - data - end - } - end - - batch.push("good") - batch.push("argument_error") - batch.push("runtime_error") - batch.push("custom_error") - - results = batch.results - - assert_equal 4, results.length - assert_equal ["good", nil], results[0] - assert_instance_of ArgumentError, results[1][1] - assert_instance_of RuntimeError, results[2][1] - assert_instance_of StandardError, results[3][1] - end - - def test_all_items_fail - batch = Batch.build do - stage ->(data) { raise "Always fails" } - end - - 5.times { |i| batch.push(i) } - - results = batch.results - - assert_equal 5, results.length - results.each do |result, error| - assert_instance_of RuntimeError, error - assert_equal "Always fails", error.message - end - end - def test_push_many_items_quickly batch = Batch.build do stage ->(data) { data }