Skip to content
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,24 @@ returns. It can optionally implement a `#reset!` method, which will be invoked
when the HUP signal is received, allowing the loader to flush its cache, or
perform any other re-initialization.

You can reduce the frequency that your configuration loader is called by
wrapping it with `Resque::Pool::ConfigThrottle` and specifying a time (in seconds)
to cache the previous value (see [the spec](spec/config_throttle_spec.rb) for
more details):

```ruby
task "resque:pool:setup" do
redis_loader = lambda do |env|
worker_count = Redis.current.get("pool_workers_#{env}").to_i
{"queueA,queueB" => worker_count }
end

# calls through to redis_loader at most once per 10 seconds
Resque::Pool.config_loader = Resque::Pool::ConfigThrottle(10, redis_loader)
end
```


Zero-downtime code deploys
--------------------------

Expand Down
2 changes: 1 addition & 1 deletion bin/resque-pool
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@

$LOAD_PATH.unshift File.expand_path('../../lib', __FILE__)

require 'resque/pool/cli'
require 'resque/pool'
Resque::Pool::CLI.run
32 changes: 23 additions & 9 deletions lib/resque/pool.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
# -*- encoding: utf-8 -*-
require 'resque'
require 'resque/worker'
require 'resque/pool/version'
require 'resque/pool/logging'
require 'resque/pool/pooled_worker'
require 'resque/pool/file_or_hash_loader'
require 'erb'
require 'fcntl'
require 'yaml'
require 'socket'

module Resque
class Pool
autoload :CLI, "resque/pool/cli"
autoload :ConfigLoaders, "resque/pool/config_loaders"
autoload :Killer, "resque/pool/killer"
autoload :Logging, "resque/pool/logging"
autoload :PooledWorker, "resque/pool/pooled_worker"
autoload :VERSION, "resque/pool/version"

SIG_QUEUE_MAX_SIZE = 5
DEFAULT_WORKER_INTERVAL = 5
QUEUE_SIGS = [ :QUIT, :INT, :TERM, :USR1, :USR2, :CONT, :HUP, :WINCH, ]
Expand All @@ -23,6 +25,7 @@ class Pool
attr_reader :workers

def initialize(config_loader=nil)
PooledWorker.monkey_patch_resque_worker!
init_config(config_loader)
@workers = Hash.new { |workers, queues| workers[queues] = {} }
procline "(initialized)"
Expand Down Expand Up @@ -75,21 +78,32 @@ def call_#{name}!(*args)
# }}}
# Config: class methods to start up the pool using the config loader {{{

class << self; attr_accessor :config_loader, :app_name, :spawn_delay; end
class << self
attr_accessor :config_loader, :app_name, :pool_name, :spawn_delay
end

# Intended to represent the running application/codebase. Should be shared
# from one deploy to the next and across hosts.
def self.app_name
@app_name ||= File.basename(Dir.pwd)
end

# Represents a single running pool. Usually unique per host, so it defaults
# to hostname, but you can set it e.g. to something unique for running
# multiple pools per host.
def self.pool_name
@pool_name ||= Socket.gethostname
end

def self.handle_winch?
@handle_winch ||= false
end

def self.handle_winch=(bool)
@handle_winch = bool
end

def self.kill_other_pools!
require 'resque/pool/killer'
Resque::Pool::Killer.run
end

Expand Down Expand Up @@ -119,7 +133,7 @@ def self.create_configured
def init_config(loader)
case loader
when String, Hash, nil
@config_loader = FileOrHashLoader.new(loader)
@config_loader = ConfigLoaders::FileOrHashLoader.new(loader)
else
@config_loader = loader
end
Expand Down
3 changes: 1 addition & 2 deletions lib/resque/pool/cli.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
require 'optparse'
require 'resque/pool'
require 'resque/pool/logging'
require 'optparse'
require 'fileutils'

module Resque
Expand Down
13 changes: 13 additions & 0 deletions lib/resque/pool/config_loaders.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module Resque
class Pool

# Namespace for various pre-packaged config loaders or loader decorators.
module ConfigLoaders

autoload :FileOrHashLoader, "resque/pool/config_loaders/file_or_hash_loader"
autoload :Redis, "resque/pool/config_loaders/redis"
autoload :Throttled, "resque/pool/config_loaders/throttled"

end
end
end
66 changes: 66 additions & 0 deletions lib/resque/pool/config_loaders/file_or_hash_loader.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
require 'erb'
require 'yaml'

module Resque
class Pool
module ConfigLoaders

class FileOrHashLoader
def initialize(filename_or_hash=nil)
case filename_or_hash
when String, nil
@filename = filename_or_hash
when Hash
@static_config = filename_or_hash.dup
else
raise "#{self.class} cannot be initialized with #{filename_or_hash.inspect}"
end
end

def call(environment)
@config ||= load_config_from_file(environment)
end

def reset!
@config = nil
end

private

def load_config_from_file(environment)
if @static_config
new_config = @static_config
else
filename = config_filename
new_config = load_config filename
end
apply_environment new_config, environment
end

def apply_environment(config, environment)
environment and config[environment] and config.merge!(config[environment])
config.delete_if {|key, value| value.is_a? Hash }
end

def config_filename
@filename || choose_config_file
end

def load_config(filename)
return {} unless filename
YAML.load(ERB.new(IO.read(filename)).result)
end

CONFIG_FILES = ["resque-pool.yml", "config/resque-pool.yml"]
def choose_config_file
if ENV["RESQUE_POOL_CONFIG"]
ENV["RESQUE_POOL_CONFIG"]
else
CONFIG_FILES.detect { |f| File.exist?(f) }
end
end
end

end
end
end
65 changes: 65 additions & 0 deletions lib/resque/pool/config_loaders/redis.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
require "resque"
require "resque/pool"

module Resque
class Pool
module ConfigLoaders

# Read/write pool config from redis.
# Should be wrapped in +ConfigLoaders::Throttled+.
#
# n.b. The environment needs to be passed in up-front, and will be ignored
# during +call+.
class Redis
attr_reader :redis
attr_reader :app, :pool, :env, :name

def initialize(app_name: Pool.app_name,
pool_name: Pool.pool_name,
environment: "unknown",
config_name: "config",
redis: Resque.redis)
@app = app_name
@pool = pool_name
@env = environment
@name = config_name
@redis = redis
end

# n.b. environment must be set up-front and will be ignored here.
def call(_)
redis.hgetall(key).tap do |h|
h.each do |k,v|
h[k] = v.to_i
end
end
end

# read individual worker config
def [](worker)
redis.hget(key, worker).to_i
end

# write individual worker config
def []=(worker, count)
redis.hset(key, worker, count.to_i)
end

# remove worker config
def delete(worker)
redis.multi do
redis.hget(key, worker)
redis.hdel(key, worker)
end.first.to_i
end

# n.b. this is probably namespaced under +resque+
def key
@key ||= ["pool", "config", app, pool, env, name].join(":")
end

end

end
end
end
44 changes: 44 additions & 0 deletions lib/resque/pool/config_loaders/throttled.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
require "delegate"

module Resque
class Pool

module ConfigLoaders

# Throttle the frequency of loading pool configuration
# Defaults to call only once per 10 seconds.
class Throttled < SimpleDelegator

def initialize(config_loader, period: 10, time_source: Time)
super(config_loader)
@period = period
@resettable = config_loader.respond_to?(:reset!)
@last_check = 0
@time_source = time_source
end

def call(env)
# We do not need to cache per `env`, since the value of `env` will not
# change during the life of the process.
if (now > @last_check + @period)
@cache = super
@last_check = now
end
@cache
end

def reset!
@last_check = 0
super if @resettable
end

private

def now
@time_source.now.to_f
end
end

end
end
end
59 changes: 0 additions & 59 deletions lib/resque/pool/file_or_hash_loader.rb

This file was deleted.

12 changes: 8 additions & 4 deletions lib/resque/pool/pooled_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ def self.included(base)
end
end

end
end
def self.monkey_patch_resque_worker!
return if @patched_once
Resque::Worker.class_eval do
include Resque::Pool::PooledWorker
end
@patched_once = true
end

Resque::Worker.class_eval do
include Resque::Pool::PooledWorker
end
end
Loading