Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Validation for `workers` count. Worker count should be a positive integer.

### Changed

- Rename module `Rapidflow` to `RapidFlow`.
- Move custom error classes from `RapidFlow::Batch` class under to `RapidFlow` module.

## [0.1.0] - 2025.11.01

Expand Down
1 change: 1 addition & 0 deletions lib/rapidflow.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require_relative "rapidflow/version"
require_relative "rapidflow/errors"
require_relative "rapidflow/counter"
require_relative "rapidflow/pipeline"
require_relative "rapidflow/work_item"
Expand Down
9 changes: 3 additions & 6 deletions lib/rapidflow/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

module RapidFlow
class Batch
class ConfigError < RuntimeError; end
class RunError < RuntimeError; end

# DSL entrypoint
def self.build(&block)
builder = BatchBuilder.new
Expand Down Expand Up @@ -33,7 +30,7 @@ def initialize(*stage_configs)
end

def start
raise ConfigError, "Unable to start the batch without any stages" if @stages.empty?
raise RapidFlow::ConfigError, "Unable to start the batch without any stages" if @stages.empty?

@stages.each(&:start)
mark_run!
Expand Down Expand Up @@ -86,13 +83,13 @@ def finalize!

def ensure_not_finalized!
@locked_mutex.synchronize do
raise RunError, "Cannot push to a locked batch when results are requested" if @locked
raise RapidFlow::RunError, "Cannot push to a locked batch when results are requested" if @locked
end
end

def ensure_running!
@running_mutex.synchronize do
raise RunError, "Batch has not started" unless @running
raise RapidFlow::RunError, "Batch has not started" unless @running
end
end

Expand Down
7 changes: 7 additions & 0 deletions lib/rapidflow/errors.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# frozen_string_literal: true

module RapidFlow
# Error class to use when the client setup code is invalid.
class ConfigError < RuntimeError; end
class RunError < RuntimeError; end
end
8 changes: 8 additions & 0 deletions lib/rapidflow/stage.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ module RapidFlow
# Represents a processing stage in the pipeline
class Stage
def initialize(stage_index:, lambda_fn:, workers:, is_final:, pipeline:)
validate_worker!(workers)

@stage_index = stage_index
@lambda_fn = lambda_fn
@workers = workers
Expand Down Expand Up @@ -56,5 +58,11 @@ def forward_item(work_item)
@pipeline.enqueue(@stage_index + 1, work_item)
@pipeline.decrement_active_workers if @is_final
end

def validate_worker!(workers)
return if workers.kind_of?(Integer) && workers.positive?

raise RapidFlow::ConfigError, "Worker count should be a positive number for stage"
end
end
end
43 changes: 43 additions & 0 deletions test/rapidflow/batch/config_error_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# frozen_string_literal: true
require "test_helper"

module RapidFlow
class BatchConfigErrorTest < Minitest::Test
def test_no_stages_with_build
error = assert_raises(RapidFlow::ConfigError) do
Batch.build do
# no stages
end
end

assert_equal "Unable to start the batch without any stages", error.message
end

def test_no_stages_batch_start
error = assert_raises(RapidFlow::ConfigError) do
batch = Batch.new
batch.start
end

assert_equal "Unable to start the batch without any stages", error.message
end

def test_invalid_worker_count
[
-3,
0,
1.5,
'foo',
:bar
].each do |invalid_worker_count|
error = assert_raises(RapidFlow::ConfigError, "Expected to raise exception for '#{invalid_worker_count}'") do
Batch.new({ fn: ->(data) { data.upcase }, workers: invalid_worker_count })
end

assert_equal "Worker count should be a positive number for stage", error.message
end
end
end
end


143 changes: 143 additions & 0 deletions test/rapidflow/batch/error_handling_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# frozen_string_literal: true
require "test_helper"

module RapidFlow
class BatchErrorHandlingTest < Minitest::Test
def test_error_handling_captures_exceptions
batch = Batch.build do
stage ->(data) {
raise "Error in stage 1" if data == "bad"
data
}
stage ->(data) { data.upcase }
end

batch.push("good")
batch.push("bad")

results = batch.results

assert_equal 2, results.length

# Good result should complete both stages
assert_equal "GOOD", results[0][0]
assert_nil results[0][1]

# Bad result should have error from stage 1 and not be processed by stage 2
assert_equal "bad", results[1][0] # Original data preserved
assert_instance_of RuntimeError, results[1][1]
assert_equal "Error in stage 1", results[1][1].message
end

def test_error_in_middle_stage
batch = Batch.build do
stage ->(data) { data.upcase }
stage ->(data) {
raise "Error in stage 2" if data == "BAD"
data
}
stage ->(data) { data + "!" }
end

batch.push("good")
batch.push("bad")
batch.push("also_good")

results = batch.results

assert_equal 3, results.length
assert_equal ["GOOD!", nil], results[0]
assert_equal ["BAD", results[1][1]], [results[1][0], results[1][1]]
assert_equal "Error in stage 2", results[1][1].message
assert_equal ["ALSO_GOOD!", nil], results[2]
end

def test_error_in_last_stage
batch = Batch.build do
stage ->(data) { data.upcase }
stage ->(data) {
raise "Error in final stage" if data == "BAD"
data
}
end

batch.push("good")
batch.push("bad")

results = batch.results

assert_equal 2, results.length
assert_equal ["GOOD", nil], results[0]
assert_equal ["BAD", results[1][1]], [results[1][0], results[1][1]]
assert_equal "Error in final stage", results[1][1].message
end

def test_multiple_errors_in_sequence
batch = Batch.build do
stage ->(data) {
raise "Error at #{data}" if data.start_with?("bad")
data
}
end

batch.push("good1")
batch.push("bad1")
batch.push("bad2")
batch.push("good2")

results = batch.results

assert_equal 4, results.length
assert_equal ["good1", nil], results[0]
assert_instance_of RuntimeError, results[1][1]
assert_instance_of RuntimeError, results[2][1]
assert_equal ["good2", nil], results[3]
end

def test_exception_types_preserved
batch = Batch.build do
stage ->(data) {
case data
when "argument_error"
raise ArgumentError, "Bad argument"
when "runtime_error"
raise "Runtime problem"
when "custom_error"
raise StandardError, "Custom error"
else
data
end
}
end

batch.push("good")
batch.push("argument_error")
batch.push("runtime_error")
batch.push("custom_error")

results = batch.results

assert_equal 4, results.length
assert_equal ["good", nil], results[0]
assert_instance_of ArgumentError, results[1][1]
assert_instance_of RuntimeError, results[2][1]
assert_instance_of StandardError, results[3][1]
end

def test_all_items_fail
batch = Batch.build do
stage ->(data) { raise "Always fails" }
end

5.times { |i| batch.push(i) }

results = batch.results

assert_equal 5, results.length
results.each do |result, error|
assert_instance_of RuntimeError, error
assert_equal "Always fails", error.message
end
end
end
end
Loading