diff --git a/README.md b/README.md index cd29078..a8b5279 100644 --- a/README.md +++ b/README.md @@ -180,6 +180,26 @@ returns. It can optionally implement a `#reset!` method, which will be invoked when the HUP signal is received, allowing the loader to flush its cache, or perform any other re-initialization. +You can reduce the frequency that your configuration loader is called by +wrapping it with `Resque::Pool::ConfigLoaders::Throttled` and specifying a time +(in seconds) to cache the previous value: + +```ruby +task "resque:pool:setup" do + redis_loader = lambda do |env| + worker_count = Redis.current.get("pool_workers_#{env}").to_i + { "queueA,queueB" => worker_count } + end + + # calls through to redis_loader at most once per 10 seconds + Resque::Pool.config_loader = Resque::Pool::ConfigLoaders::Throttled.new( + redis_loader, period: 10 + ) +end +``` + +See [the spec](spec/config_loaders/throttled_spec.rb) for more details. + Zero-downtime code deploys -------------------------- diff --git a/lib/resque/pool.rb b/lib/resque/pool.rb index 5c99b47..9a76509 100644 --- a/lib/resque/pool.rb +++ b/lib/resque/pool.rb @@ -4,7 +4,7 @@ require 'resque/pool/version' require 'resque/pool/logging' require 'resque/pool/pooled_worker' -require 'resque/pool/file_or_hash_loader' +require 'resque/pool/config_loaders/file_or_hash_loader' require 'erb' require 'fcntl' require 'yaml' @@ -119,7 +119,7 @@ def self.create_configured def init_config(loader) case loader when String, Hash, nil - @config_loader = FileOrHashLoader.new(loader) + @config_loader = ConfigLoaders::FileOrHashLoader.new(loader) else @config_loader = loader end diff --git a/lib/resque/pool/config_loaders/file_or_hash_loader.rb b/lib/resque/pool/config_loaders/file_or_hash_loader.rb new file mode 100644 index 0000000..5d6b108 --- /dev/null +++ b/lib/resque/pool/config_loaders/file_or_hash_loader.rb @@ -0,0 +1,63 @@ +module Resque + class Pool + module ConfigLoaders + + class FileOrHashLoader + def initialize(filename_or_hash=nil) + case filename_or_hash + when String, nil + @filename = filename_or_hash + when Hash + @static_config = filename_or_hash.dup + else + raise "#{self.class} cannot be initialized with #{filename_or_hash.inspect}" + end + end + + def call(environment) + @config ||= load_config_from_file(environment) + end + + def reset! + @config = nil + end + + private + + def load_config_from_file(environment) + if @static_config + new_config = @static_config + else + filename = config_filename + new_config = load_config filename + end + apply_environment new_config, environment + end + + def apply_environment(config, environment) + environment and config[environment] and config.merge!(config[environment]) + config.delete_if {|key, value| value.is_a? Hash } + end + + def config_filename + @filename || choose_config_file + end + + def load_config(filename) + return {} unless filename + YAML.load(ERB.new(IO.read(filename)).result) + end + + CONFIG_FILES = ["resque-pool.yml", "config/resque-pool.yml"] + def choose_config_file + if ENV["RESQUE_POOL_CONFIG"] + ENV["RESQUE_POOL_CONFIG"] + else + CONFIG_FILES.detect { |f| File.exist?(f) } + end + end + end + + end + end +end diff --git a/lib/resque/pool/config_loaders/throttled.rb b/lib/resque/pool/config_loaders/throttled.rb new file mode 100644 index 0000000..4e535e4 --- /dev/null +++ b/lib/resque/pool/config_loaders/throttled.rb @@ -0,0 +1,44 @@ +require "delegate" + +module Resque + class Pool + + module ConfigLoaders + + # Throttle the frequency of loading pool configuration + # Defaults to call only once per 10 seconds. + class Throttled < SimpleDelegator + + def initialize(config_loader, period: 10, time_source: Time) + super(config_loader) + @period = period + @resettable = config_loader.respond_to?(:reset!) + @last_check = 0 + @time_source = time_source + end + + def call(env) + # We do not need to cache per `env`, since the value of `env` will not + # change during the life of the process. + if (now > @last_check + @period) + @cache = super + @last_check = now + end + @cache + end + + def reset! + @last_check = 0 + super if @resettable + end + + private + + def now + @time_source.now.to_f + end + end + + end + end +end diff --git a/lib/resque/pool/file_or_hash_loader.rb b/lib/resque/pool/file_or_hash_loader.rb deleted file mode 100644 index f6c55b6..0000000 --- a/lib/resque/pool/file_or_hash_loader.rb +++ /dev/null @@ -1,59 +0,0 @@ -module Resque - class Pool - class FileOrHashLoader - def initialize(filename_or_hash=nil) - case filename_or_hash - when String, nil - @filename = filename_or_hash - when Hash - @static_config = filename_or_hash.dup - else - raise "#{self.class} cannot be initialized with #{filename_or_hash.inspect}" - end - end - - def call(environment) - @config ||= load_config_from_file(environment) - end - - def reset! - @config = nil - end - - private - - def load_config_from_file(environment) - if @static_config - new_config = @static_config - else - filename = config_filename - new_config = load_config filename - end - apply_environment new_config, environment - end - - def apply_environment(config, environment) - environment and config[environment] and config.merge!(config[environment]) - config.delete_if {|key, value| value.is_a? Hash } - end - - def config_filename - @filename || choose_config_file - end - - def load_config(filename) - return {} unless filename - YAML.load(ERB.new(IO.read(filename)).result) - end - - CONFIG_FILES = ["resque-pool.yml", "config/resque-pool.yml"] - def choose_config_file - if ENV["RESQUE_POOL_CONFIG"] - ENV["RESQUE_POOL_CONFIG"] - else - CONFIG_FILES.detect { |f| File.exist?(f) } - end - end - end - end -end diff --git a/spec/config_loaders/throttled_spec.rb b/spec/config_loaders/throttled_spec.rb new file mode 100644 index 0000000..2f2bf12 --- /dev/null +++ b/spec/config_loaders/throttled_spec.rb @@ -0,0 +1,147 @@ +require 'spec_helper' +require 'resque/pool/config_loaders/throttled' + +module Resque::Pool::ConfigLoaders + + describe Throttled do + let(:fake_time) { FakeTime.new 1445898807 } + + it "returns the config returned by the wrapped config loader for given env" do + wrapped_config = { + "dev" => {"qA,qB" => 1}, + "prd" => {"qA,qB" => 4} + } + wrapped_loader = lambda {|env| wrapped_config[env] } + throttle = Throttled.new(wrapped_loader) + + throttle.call("prd").should eq({"qA,qB" => 4}) + end + + it "does not call wrapped loader again until the default period of time has elapsed" do + wrapped_loader = TestConfigLoader.new + wrapped_loader.configuration = {"qA,qB" => 1} + + throttle = Throttled.new(wrapped_loader, time_source: fake_time) + first_call = throttle.call("prd") + + new_config = {"qA,qB" => 22} + wrapped_loader.configuration = new_config + fake_time.advance_time 6 + # config changed, but not enough time elapsed + + second_call = throttle.call("prd") + + second_call.should eq(first_call) + wrapped_loader.times_called.should == 1 + + fake_time.advance_time 6 + # now, enough time has elapsed to retrieve latest config + + third_call = throttle.call("prd") + + third_call.should_not eq(first_call) + third_call.should eq(new_config) + wrapped_loader.times_called.should == 2 + + # further calls continue to use cached value + throttle.call("prd") + throttle.call("prd") + throttle.call("prd") + wrapped_loader.times_called.should == 2 + end + + it "can specify an alternate cache period" do + config0 = {foo: 2, bar: 1} + config1 = {bar: 3, baz: 9} + config2 = {foo: 4, quux: 1} + wrapped_loader = TestConfigLoader.new + wrapped_loader.configuration = config0 + throttle = Throttled.new( + wrapped_loader, period: 60, time_source: fake_time + ) + throttle.call("prd").should eq(config0) + wrapped_loader.configuration = config1 + fake_time.advance_time 59 + throttle.call("prd").should eq(config0) + fake_time.advance_time 5 + throttle.call("prd").should eq(config1) + wrapped_loader.configuration = config2 + fake_time.advance_time 59 + throttle.call("prd").should eq(config1) + fake_time.advance_time 2 + throttle.call("prd").should eq(config2) + end + + it "forces a call to the wrapperd loader after reset! called, even if required time hasn't elapsed" do + wrapped_loader = TestConfigLoader.new + wrapped_loader.configuration = {"qA,qB" => 1} + + throttle = Throttled.new(wrapped_loader, time_source: fake_time) + throttle.call("prd") + + new_config = {"qA,qB" => 22} + wrapped_loader.configuration = new_config + fake_time.advance_time 6 + # the 10 second period has *not* elapsed + + throttle.reset! + + second_call = throttle.call("prd") + + second_call.should eq(new_config) + wrapped_loader.times_called.should == 2 + end + + it "delegates reset! to the wrapped_loader, when supported" do + wrapped_loader = TestConfigLoader.new + throttle = Throttled.new(wrapped_loader) + + wrapped_loader.times_reset.should == 0 + throttle.reset! + wrapped_loader.times_reset.should == 1 + end + + it "does not delegate reset! to the wrapped_loader when it is not supported" do + wrapped_loader = lambda {|env| Hash.new } + throttle = Throttled.new(wrapped_loader) + + expect { + throttle.reset! + }.to_not raise_error + end + + class TestConfigLoader + attr_accessor :configuration + attr_reader :times_called + attr_reader :times_reset + + def initialize + @times_called = 0 + @times_reset = 0 + end + + def call(env) + @times_called += 1 + configuration + end + + def reset! + @times_reset += 1 + end + end + + class FakeTime + attr_reader :now + + def initialize(start_time) + @now = start_time + end + + def advance_time(seconds) + @now += seconds + end + end + + end + +end diff --git a/spec/resque_pool_spec.rb b/spec/resque_pool_spec.rb index 405c345..1c1bd56 100644 --- a/spec/resque_pool_spec.rb +++ b/spec/resque_pool_spec.rb @@ -248,7 +248,9 @@ module Rails; end subject { Resque::Pool.create_configured } it "created pools use config file and hash loading logic" do - subject.config_loader.should be_instance_of Resque::Pool::FileOrHashLoader + subject.config_loader.should be_instance_of( + Resque::Pool::ConfigLoaders::FileOrHashLoader + ) end end