Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,26 @@ returns. It can optionally implement a `#reset!` method, which will be invoked
when the HUP signal is received, allowing the loader to flush its cache, or
perform any other re-initialization.

You can reduce the frequency that your configuration loader is called by
wrapping it with `Resque::Pool::ConfigLoaders::Throttled` and specifying a time
(in seconds) to cache the previous value:

```ruby
task "resque:pool:setup" do
redis_loader = lambda do |env|
worker_count = Redis.current.get("pool_workers_#{env}").to_i
{ "queueA,queueB" => worker_count }
end

# calls through to redis_loader at most once per 10 seconds
Resque::Pool.config_loader = Resque::Pool::ConfigLoaders::Throttled.new(
redis_loader, period: 10
)
end
```

See [the spec](spec/config_loaders/throttled_spec.rb) for more details.

Zero-downtime code deploys
--------------------------

Expand Down
4 changes: 2 additions & 2 deletions lib/resque/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
require 'resque/pool/version'
require 'resque/pool/logging'
require 'resque/pool/pooled_worker'
require 'resque/pool/file_or_hash_loader'
require 'resque/pool/config_loaders/file_or_hash_loader'
require 'erb'
require 'fcntl'
require 'yaml'
Expand Down Expand Up @@ -119,7 +119,7 @@ def self.create_configured
def init_config(loader)
case loader
when String, Hash, nil
@config_loader = FileOrHashLoader.new(loader)
@config_loader = ConfigLoaders::FileOrHashLoader.new(loader)
else
@config_loader = loader
end
Expand Down
63 changes: 63 additions & 0 deletions lib/resque/pool/config_loaders/file_or_hash_loader.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
module Resque
class Pool
module ConfigLoaders

class FileOrHashLoader
def initialize(filename_or_hash=nil)
case filename_or_hash
when String, nil
@filename = filename_or_hash
when Hash
@static_config = filename_or_hash.dup
else
raise "#{self.class} cannot be initialized with #{filename_or_hash.inspect}"
end
end

def call(environment)
@config ||= load_config_from_file(environment)
end

def reset!
@config = nil
end

private

def load_config_from_file(environment)
if @static_config
new_config = @static_config
else
filename = config_filename
new_config = load_config filename
end
apply_environment new_config, environment
end

def apply_environment(config, environment)
environment and config[environment] and config.merge!(config[environment])
config.delete_if {|key, value| value.is_a? Hash }
end

def config_filename
@filename || choose_config_file
end

def load_config(filename)
return {} unless filename
YAML.load(ERB.new(IO.read(filename)).result)
end

CONFIG_FILES = ["resque-pool.yml", "config/resque-pool.yml"]
def choose_config_file
if ENV["RESQUE_POOL_CONFIG"]
ENV["RESQUE_POOL_CONFIG"]
else
CONFIG_FILES.detect { |f| File.exist?(f) }
end
end
end

end
end
end
44 changes: 44 additions & 0 deletions lib/resque/pool/config_loaders/throttled.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
require "delegate"

module Resque
class Pool

module ConfigLoaders

# Throttle the frequency of loading pool configuration
# Defaults to call only once per 10 seconds.
class Throttled < SimpleDelegator

def initialize(config_loader, period: 10, time_source: Time)
super(config_loader)
@period = period
@resettable = config_loader.respond_to?(:reset!)
@last_check = 0
@time_source = time_source
end

def call(env)
# We do not need to cache per `env`, since the value of `env` will not
# change during the life of the process.
if (now > @last_check + @period)
@cache = super
@last_check = now
end
@cache
end

def reset!
@last_check = 0
super if @resettable
end

private

def now
@time_source.now.to_f
end
end

end
end
end
59 changes: 0 additions & 59 deletions lib/resque/pool/file_or_hash_loader.rb

This file was deleted.

147 changes: 147 additions & 0 deletions spec/config_loaders/throttled_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
require 'spec_helper'
require 'resque/pool/config_loaders/throttled'

module Resque::Pool::ConfigLoaders

describe Throttled do
let(:fake_time) { FakeTime.new 1445898807 }

it "returns the config returned by the wrapped config loader for given env" do
wrapped_config = {
"dev" => {"qA,qB" => 1},
"prd" => {"qA,qB" => 4}
}
wrapped_loader = lambda {|env| wrapped_config[env] }
throttle = Throttled.new(wrapped_loader)

throttle.call("prd").should eq({"qA,qB" => 4})
end

it "does not call wrapped loader again until the default period of time has elapsed" do
wrapped_loader = TestConfigLoader.new
wrapped_loader.configuration = {"qA,qB" => 1}

throttle = Throttled.new(wrapped_loader, time_source: fake_time)
first_call = throttle.call("prd")

new_config = {"qA,qB" => 22}
wrapped_loader.configuration = new_config
fake_time.advance_time 6
# config changed, but not enough time elapsed

second_call = throttle.call("prd")

second_call.should eq(first_call)
wrapped_loader.times_called.should == 1

fake_time.advance_time 6
# now, enough time has elapsed to retrieve latest config

third_call = throttle.call("prd")

third_call.should_not eq(first_call)
third_call.should eq(new_config)
wrapped_loader.times_called.should == 2

# further calls continue to use cached value
throttle.call("prd")
throttle.call("prd")
throttle.call("prd")
wrapped_loader.times_called.should == 2
end

it "can specify an alternate cache period" do
config0 = {foo: 2, bar: 1}
config1 = {bar: 3, baz: 9}
config2 = {foo: 4, quux: 1}
wrapped_loader = TestConfigLoader.new
wrapped_loader.configuration = config0
throttle = Throttled.new(
wrapped_loader, period: 60, time_source: fake_time
)
throttle.call("prd").should eq(config0)
wrapped_loader.configuration = config1
fake_time.advance_time 59
throttle.call("prd").should eq(config0)
fake_time.advance_time 5
throttle.call("prd").should eq(config1)
wrapped_loader.configuration = config2
fake_time.advance_time 59
throttle.call("prd").should eq(config1)
fake_time.advance_time 2
throttle.call("prd").should eq(config2)
end

it "forces a call to the wrapperd loader after reset! called, even if required time hasn't elapsed" do
wrapped_loader = TestConfigLoader.new
wrapped_loader.configuration = {"qA,qB" => 1}

throttle = Throttled.new(wrapped_loader, time_source: fake_time)
throttle.call("prd")

new_config = {"qA,qB" => 22}
wrapped_loader.configuration = new_config
fake_time.advance_time 6
# the 10 second period has *not* elapsed

throttle.reset!

second_call = throttle.call("prd")

second_call.should eq(new_config)
wrapped_loader.times_called.should == 2
end

it "delegates reset! to the wrapped_loader, when supported" do
wrapped_loader = TestConfigLoader.new
throttle = Throttled.new(wrapped_loader)

wrapped_loader.times_reset.should == 0
throttle.reset!
wrapped_loader.times_reset.should == 1
end

it "does not delegate reset! to the wrapped_loader when it is not supported" do
wrapped_loader = lambda {|env| Hash.new }
throttle = Throttled.new(wrapped_loader)

expect {
throttle.reset!
}.to_not raise_error
end

class TestConfigLoader
attr_accessor :configuration
attr_reader :times_called
attr_reader :times_reset

def initialize
@times_called = 0
@times_reset = 0
end

def call(env)
@times_called += 1
configuration
end

def reset!
@times_reset += 1
end
end

class FakeTime
attr_reader :now

def initialize(start_time)
@now = start_time
end

def advance_time(seconds)
@now += seconds
end
end

end

end
4 changes: 3 additions & 1 deletion spec/resque_pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ module Rails; end
subject { Resque::Pool.create_configured }

it "created pools use config file and hash loading logic" do
subject.config_loader.should be_instance_of Resque::Pool::FileOrHashLoader
subject.config_loader.should be_instance_of(
Resque::Pool::ConfigLoaders::FileOrHashLoader
)
end
end

Expand Down