From 9a3be9edc92b2f4e01d273b1adfbdf4649a9b080 Mon Sep 17 00:00:00 2001 From: Joshua Flanagan Date: Mon, 26 Oct 2015 18:16:45 -0500 Subject: [PATCH] Throttle calls to a custom configuration loader Many custom configuration loaders will retrieve the configuration from an external resource. `Resque::Pool` asks the loader for the latest configuration roughly once per second. You may want to reduce load on your external resource by caching the value, and only really fetching the latest configuration after a specific amount of time has passed. Instead of forcing each configuration loader author to re-write (and test) this logic, we provide `Resque::Pool::ConfigThrottle`. See the spec for full details of its behavior. --- README.md | 18 +++++ lib/resque/pool/config_throttle.rb | 35 +++++++++ spec/config_throttle_spec.rb | 121 +++++++++++++++++++++++++++++ 3 files changed, 174 insertions(+) create mode 100644 lib/resque/pool/config_throttle.rb create mode 100644 spec/config_throttle_spec.rb diff --git a/README.md b/README.md index cd29078..a1d14ad 100644 --- a/README.md +++ b/README.md @@ -180,6 +180,24 @@ returns. It can optionally implement a `#reset!` method, which will be invoked when the HUP signal is received, allowing the loader to flush its cache, or perform any other re-initialization. +You can reduce the frequency that your configuration loader is called by +wrapping it with `Resque::Pool::ConfigThrottle` and specifying a time (in seconds) +to cache the previous value (see [the spec](spec/config_throttle_spec.rb) for +more details): + +```ruby +task "resque:pool:setup" do + redis_loader = lambda do |env| + worker_count = Redis.current.get("pool_workers_#{env}").to_i + {"queueA,queueB" => worker_count } + end + + # calls through to redis_loader at most once per 10 seconds + Resque::Pool.config_loader = Resque::Pool::ConfigThrottle(10, redis_loader) +end +``` + + Zero-downtime code deploys -------------------------- diff --git a/lib/resque/pool/config_throttle.rb b/lib/resque/pool/config_throttle.rb new file mode 100644 index 0000000..77c958a --- /dev/null +++ b/lib/resque/pool/config_throttle.rb @@ -0,0 +1,35 @@ +module Resque + class Pool + # Throttle the frequency of loading pool configuration + class ConfigThrottle + def initialize(period, config_loader, time_source: Time) + @period = period + @config_loader = config_loader + @resettable = config_loader.respond_to?(:reset!) + @last_check = 0 + @time_source = time_source + end + + def call(env) + # We do not need to cache per `env`, since the value of `env` will not + # change during the life of the process. + if (now > @last_check + @period) + @cache = @config_loader.call(env) + @last_check = now + end + @cache + end + + def reset! + @last_check = 0 + if @resettable + @config_loader.reset! + end + end + + def now + @time_source.now.to_f + end + end + end +end diff --git a/spec/config_throttle_spec.rb b/spec/config_throttle_spec.rb new file mode 100644 index 0000000..374c63d --- /dev/null +++ b/spec/config_throttle_spec.rb @@ -0,0 +1,121 @@ +require 'spec_helper' +require 'resque/pool/config_throttle' + +describe Resque::Pool::ConfigThrottle do + let(:fake_time) { FakeTime.new 1445898807 } + + it "returns the config returned by the wrapped config loader for given env" do + wrapped_config = { + "dev" => {"qA,qB" => 1}, + "prd" => {"qA,qB" => 4} + } + wrapped_loader = lambda {|env| wrapped_config[env] } + throttle = Resque::Pool::ConfigThrottle.new(10, wrapped_loader) + + throttle.call("prd").should eq({"qA,qB" => 4}) + end + + it "does not call wrapped loader again until the specified period of time has elapsed" do + wrapped_loader = TestConfigLoader.new + wrapped_loader.configuration = {"qA,qB" => 1} + + throttle = Resque::Pool::ConfigThrottle.new(10, wrapped_loader, time_source: fake_time) + first_call = throttle.call("prd") + + new_config = {"qA,qB" => 22} + wrapped_loader.configuration = new_config + fake_time.advance_time 6 + # config changed, but not enough time elapsed + + second_call = throttle.call("prd") + + second_call.should eq(first_call) + wrapped_loader.times_called.should == 1 + + fake_time.advance_time 6 + # now, enough time has elapsed to retrieve latest config + + third_call = throttle.call("prd") + + third_call.should_not eq(first_call) + third_call.should eq(new_config) + wrapped_loader.times_called.should == 2 + + # further calls continue to use cached value + throttle.call("prd") + throttle.call("prd") + throttle.call("prd") + wrapped_loader.times_called.should == 2 + end + + it "forces a call to the wrapperd loader after reset! called, even if required time hasn't elapsed" do + wrapped_loader = TestConfigLoader.new + wrapped_loader.configuration = {"qA,qB" => 1} + + throttle = Resque::Pool::ConfigThrottle.new(10, wrapped_loader, time_source: fake_time) + first_call = throttle.call("prd") + + new_config = {"qA,qB" => 22} + wrapped_loader.configuration = new_config + fake_time.advance_time 6 + # the 10 second period has *not* elapsed + + throttle.reset! + + second_call = throttle.call("prd") + + second_call.should eq(new_config) + wrapped_loader.times_called.should == 2 + end + + it "delegates reset! to the wrapped_loader, when supported" do + wrapped_loader = TestConfigLoader.new + throttle = Resque::Pool::ConfigThrottle.new(10, wrapped_loader) + + wrapped_loader.times_reset.should == 0 + throttle.reset! + wrapped_loader.times_reset.should == 1 + end + + it "does not delegate reset! to the wrapped_loader when it is not supported" do + wrapped_loader = lambda {|env| Hash.new } + throttle = Resque::Pool::ConfigThrottle.new(10, wrapped_loader) + + expect { + throttle.reset! + }.to_not raise_error + end + + + class TestConfigLoader + attr_accessor :configuration + attr_reader :times_called + attr_reader :times_reset + + def initialize + @times_called = 0 + @times_reset = 0 + end + + def call(env) + @times_called += 1 + configuration + end + + def reset! + @times_reset += 1 + end + end + + class FakeTime + attr_reader :now + + def initialize(start_time) + @now = start_time + end + + def advance_time(seconds) + @now += seconds + end + end +end