Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions lib/async/fork_handler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2025, by Samuel Williams.

module Async
# Private module that hooks into Process._fork to handle fork events.
#
# If `Scheduler#process_fork` hook is adopted in Ruby 4, this code can be removed after Ruby < 4 is no longer supported.
module ForkHandler
def _fork(&block)
result = super

if result.zero?
# Child process:
if Fiber.scheduler.respond_to?(:process_fork)
Fiber.scheduler.process_fork
end
end

return result
end
end

private_constant :ForkHandler

# Hook into Process._fork to handle fork events automatically:
unless (Fiber.const_get(:SCHEDULER_PROCESS_FORK) rescue false)
::Process.singleton_class.prepend(ForkHandler)
end
end
3 changes: 2 additions & 1 deletion lib/async/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ def parent=(parent)
end

protected def remove_child(child)
@children.remove(child)
# In the case of a fork, the children list may be nil:
@children&.remove(child)
child.set_parent(nil)
end

Expand Down
44 changes: 33 additions & 11 deletions lib/async/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
require_relative "clock"
require_relative "task"
require_relative "timeout"
require_relative "fork_handler"

require "io/event"

Expand Down Expand Up @@ -146,24 +147,26 @@ def terminate
# Terminate all child tasks and close the scheduler.
# @public Since *Async v1*.
def close
self.run_loop do
until self.terminate
self.run_once!
unless @children.nil?
self.run_loop do
until self.terminate
self.run_once!
end
end
end

Kernel.raise "Closing scheduler with blocked operations!" if @blocked > 0
ensure
# We want `@selector = nil` to be a visible side effect from this point forward, specifically in `#interrupt` and `#unblock`. If the selector is closed, then we don't want to push any fibers to it.
selector = @selector
@selector = nil

selector&.close

worker_pool = @worker_pool
@worker_pool = nil
if selector = @selector
@selector = nil
selector.close
end

worker_pool&.close
if worker_pool = @worker_pool
@worker_pool = nil
worker_pool.close
end

consume
end
Expand Down Expand Up @@ -642,5 +645,24 @@ def timeout_after(duration, exception, message, &block)
yield duration
end
end

# Handle fork in the child process. This method is called automatically when `Process.fork` is invoked.
#
# The child process starts with a clean slate - no scheduler is set. Users can create a new scheduler if needed.
#
# @public Since *Async v2.35*.
def process_fork
if profiler = @profiler
@profiler = nil
profiler.stop
end

@children = nil
@selector = nil
@timers = nil

# Close the scheduler:
Fiber.set_scheduler(nil)
end
end
end
4 changes: 4 additions & 0 deletions releases.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Releases

## Unreleased

- `Process.fork` is now properly handled by the Async fiber scheduler, ensuring that the scheduler state is correctly reset in the child process after a fork. This prevents issues where the child process inherits the scheduler state from the parent, which could lead to unexpected behavior.

## v2.34.0

### `Kernel::Barrier` Convenience Interface
Expand Down
47 changes: 47 additions & 0 deletions test/process/fork.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2025, by Samuel Williams.

require "sus/fixtures/async"
require "async"

describe Process do
describe ".fork" do
it "can fork with block form" do
r, w = IO.pipe

Async do
pid = Process.fork do
# Child process:
w.write("hello")
end

# Parent process:
w.close
expect(r.read).to be == "hello"
ensure
Process.waitpid(pid) if pid
end
end

it "can fork with non-block form" do
r, w = IO.pipe

Async do
unless pid = Process.fork
# Child process:
w.write("hello")

exit!
end

# Parent process:
w.close
expect(r.read).to be == "hello"
ensure
Process.waitpid(pid) if pid
end
end
end
end
Loading