From d49598f2a6c5c38aaedc993ec5079b51c7473fa3 Mon Sep 17 00:00:00 2001 From: Alex Kira Date: Sun, 29 Oct 2017 11:05:19 -0700 Subject: [PATCH 01/15] Bump to 0.9.1 + README [skipci] --- README.md | 11 +++++++++-- mix.exs | 2 +- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 75199ce6..b998628d 100644 --- a/README.md +++ b/README.md @@ -64,7 +64,7 @@ Add exq to your mix.exs deps (replace version with the latest hex.pm package ver defp deps do [ # ... other deps - {:exq, "~> 0.9.0"} + {:exq, "~> 0.9.1"} ] end ``` @@ -424,11 +424,18 @@ By default, Exq will register itself under the ```Elixir.Exq``` atom. You can c {:ok, exq} = Exq.start_link(name: Exq.Custom) ``` +## Donation + +To donate, send to: + +Bitcoin (BTC): `17j52Veb8qRmVKVvTDijVtmRXvTUpsAWHv` +Ethereum (ETH): `0xA0add27EBdB4394E15b7d1F84D4173aDE1b5fBB3` + + ## Questions? Issues? For issues, please submit a Github issue with steps on how to reproduce the problem. -For questions, stop in at the [#elixir-lang Slack group](https://elixir-slackin.herokuapp.com/) ## Contributions diff --git a/mix.exs b/mix.exs index 85d15f56..401497ba 100644 --- a/mix.exs +++ b/mix.exs @@ -3,7 +3,7 @@ defmodule Exq.Mixfile do def project do [ app: :exq, - version: "0.9.0", + version: "0.9.1", elixir: "~> 1.3", elixirc_paths: ["lib"], package: [ From fb5e98e25f8e982358bb1ff876eeb6d9b095eb9d Mon Sep 17 00:00:00 2001 From: Alex Kira Date: Tue, 28 Nov 2017 21:55:02 -0800 Subject: [PATCH 02/15] Remove password from error logging Fixes #293 --- lib/exq/manager/server.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/exq/manager/server.ex b/lib/exq/manager/server.ex index ab904641..8dbc6a8b 100644 --- a/lib/exq/manager/server.ex +++ b/lib/exq/manager/server.ex @@ -365,7 +365,7 @@ defmodule Exq.Manager.Server do {:ok, _} = Exq.Redis.Connection.q(opts[:redis], ~w(PING)) catch err, reason -> - opts = Exq.Support.Opts.redis_opts(opts) + opts = Exq.Support.Opts.redis_opts(opts) |> Enum.into(Map.new) |> Map.delete(:password) raise """ \n\n\n#{String.duplicate("=", 100)} ERROR! Could not connect to Redis! From 29aa3c936c786f101e45d3a4ea2ec76ed3da521d Mon Sep 17 00:00:00 2001 From: Anantha Kumaran Date: Tue, 19 Dec 2017 15:33:19 +0700 Subject: [PATCH 03/15] make redis module name and start_link args configurable --- lib/exq/redis/connection.ex | 8 ++++---- lib/exq/support/mode.ex | 7 +++---- lib/exq/support/opts.ex | 23 +++++++++++++++++++---- mix.exs | 1 + mix.lock | 9 ++++++--- test/config_test.exs | 19 ++++++++++++++----- test/exq_test.exs | 15 +++++++++++++++ test/test-sentinel.conf | 6 ++++++ test/test_helper.exs | 14 +++++++++++++- 9 files changed, 81 insertions(+), 21 deletions(-) create mode 100644 test/test-sentinel.conf diff --git a/lib/exq/redis/connection.ex b/lib/exq/redis/connection.ex index 42667132..d3006b24 100644 --- a/lib/exq/redis/connection.ex +++ b/lib/exq/redis/connection.ex @@ -6,6 +6,7 @@ defmodule Exq.Redis.Connection do require Logger alias Exq.Support.Config + import Exq.Support.Opts, only: [redis_worker_module: 0] def flushdb!(redis) do {:ok, res} = q(redis, ["flushdb"]) @@ -155,15 +156,14 @@ defmodule Exq.Redis.Connection do end def q(redis, command) do - Redix.command(redis, command, [timeout: Config.get(:redis_timeout)]) + redis_worker_module().command(redis, command, [timeout: Config.get(:redis_timeout)]) end def qp(redis, command) do - Redix.pipeline(redis, command, [timeout: Config.get(:redis_timeout)]) + redis_worker_module().pipeline(redis, command, [timeout: Config.get(:redis_timeout)]) end def qp!(redis, command) do - Redix.pipeline!(redis, command, [timeout: Config.get(:redis_timeout)]) + redis_worker_module().pipeline!(redis, command, [timeout: Config.get(:redis_timeout)]) end - end diff --git a/lib/exq/support/mode.ex b/lib/exq/support/mode.ex index 4c0342f5..dccb9f26 100644 --- a/lib/exq/support/mode.ex +++ b/lib/exq/support/mode.ex @@ -11,14 +11,13 @@ defmodule Exq.Support.Mode do Returns child list for the main Exq supervisor """ - import Exq.Support.Opts, only: [conform_opts: 1] + import Exq.Support.Opts, only: [redis_worker_opts: 1] import Supervisor.Spec def children(opts) do - {redis_opts, connection_opts, opts} = conform_opts(opts) - + {module, args, opts} = redis_worker_opts(opts) # make sure redis always first(start in order) - children = [worker(Redix, [redis_opts, connection_opts])] + children = [worker(module, args)] children = children ++ children(opts[:mode], opts) children end diff --git a/lib/exq/support/opts.ex b/lib/exq/support/opts.ex index 4ea04229..7c030fd2 100644 --- a/lib/exq/support/opts.ex +++ b/lib/exq/support/opts.ex @@ -11,10 +11,7 @@ defmodule Exq.Support.Opts do "#{name}.Sup" |> String.to_atom end - @doc """ - Return {redis_options, redis_connection_opts, gen_server_opts} - """ - def conform_opts(opts \\ []) do + defp conform_opts(opts) do mode = opts[:mode] || Config.get(:mode) redis = redis_client_name(opts[:name]) opts = [{:redis, redis}|opts] @@ -42,6 +39,24 @@ defmodule Exq.Support.Opts do end end + @doc """ + Return {redis_module, redis_args, gen_server_opts} + """ + def redis_worker_opts(opts) do + {redis_opts, connection_opts, opts} = conform_opts(opts) + case Config.get(:redis_worker) do + {module, args} -> {module, args, opts} + _ -> {Redix, [redis_opts, connection_opts], opts} + end + end + + def redis_worker_module() do + case Config.get(:redis_worker) do + {module, _args} -> module + _ -> Redix + end + end + def connection_opts(opts \\ []) do reconnect_on_sleep = opts[:reconnect_on_sleep] || Config.get(:reconnect_on_sleep) timeout = opts[:redis_timeout] || Config.get(:redis_timeout) diff --git a/mix.exs b/mix.exs index 401497ba..8d6997f1 100644 --- a/mix.exs +++ b/mix.exs @@ -38,6 +38,7 @@ defmodule Exq.Mixfile do { :redix, ">= 0.5.0"}, { :poison, ">= 1.2.0 or ~> 2.0"}, { :excoveralls, "~> 0.6", only: :test }, + { :redix_sentinel, "~> 0.5.0", only: :test }, { :flaky_connection, git: "https://github.com/hamiltop/flaky_connection.git", only: :test}, # docs diff --git a/mix.lock b/mix.lock index 8d7efa58..bd492b4b 100644 --- a/mix.lock +++ b/mix.lock @@ -1,4 +1,5 @@ -%{"certifi": {:hex, :certifi, "0.7.0", "861a57f3808f7eb0c2d1802afeaae0fa5de813b0df0979153cbafcd853ababaf", [:rebar3], []}, +%{ + "certifi": {:hex, :certifi, "0.7.0", "861a57f3808f7eb0c2d1802afeaae0fa5de813b0df0979153cbafcd853ababaf", [:rebar3], []}, "connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], []}, "earmark": {:hex, :earmark, "1.2.2", "f718159d6b65068e8daeef709ccddae5f7fdc770707d82e7d126f584cd925b74", [:mix], []}, "ex_doc": {:hex, :ex_doc, "0.15.1", "d5f9d588fd802152516fccfdb96d6073753f77314fcfee892b15b6724ca0d596", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, optional: false]}]}, @@ -13,7 +14,9 @@ "mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], []}, "poison": {:hex, :poison, "2.1.0", "f583218ced822675e484648fa26c933d621373f01c6c76bd00005d7bd4b82e27", [:mix], []}, "ranch": {:hex, :ranch, "1.1.0", "f7ed6d97db8c2a27cca85cacbd543558001fc5a355e93a7bff1e9a9065a8545b", [:make], []}, - "redix": {:hex, :redix, "0.5.0", "46b81bdad24d4a330cd9f9eb26d6da6c40c8e5d1ce0c1607a82b4739f8f678c4", [:mix], [{:connection, "~> 1.0", [hex: :connection, optional: false]}]}, + "redix": {:hex, :redix, "0.6.1", "20986b0e02f02b13e6f53c79a1ae70aa83147488c408f40275ec261f5bb0a6d0", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm"}, + "redix_sentinel": {:hex, :redix_sentinel, "0.5.0", "cd15dc6ff0b676c974bb2694aeb57bc4c4dfc3c5335900fd7b9cee2b1208ab64", [:mix], [{:connection, "~> 1.0.3", [hex: :connection, repo: "hexpm", optional: false]}, {:redix, "~> 0.6", [hex: :redix, repo: "hexpm", optional: false]}], "hexpm"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], []}, "ssl_verify_hostname": {:hex, :ssl_verify_hostname, "1.0.5", "2e73e068cd6393526f9fa6d399353d7c9477d6886ba005f323b592d389fb47be", [:make], []}, - "uuid": {:hex, :uuid, "1.0.1", "ebb032f2b429761540ca3e67436d1eb140206f139ddb7e1f2cc24b77cb4af45b", [:mix], []}} + "uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm"}, +} diff --git a/test/config_test.exs b/test/config_test.exs index a67940c7..3fa7c8aa 100644 --- a/test/config_test.exs +++ b/test/config_test.exs @@ -1,6 +1,7 @@ defmodule Exq.ConfigTest do use ExUnit.Case require Mix.Config + import ExqTestUtil setup_all do ExqTestUtil.reset_config @@ -107,7 +108,7 @@ defmodule Exq.ConfigTest do assert client_name == nil end - test "default conform_opts" do + test "default redis_worker_opts" do Mix.Config.persist([ exq: [ queues: ["default"], @@ -120,7 +121,7 @@ defmodule Exq.ConfigTest do shutdown_timeout: 7000, ] ]) - {_redis_opts, _connection_opts, server_opts} = Exq.Support.Opts.conform_opts([mode: :default]) + {Redix, [_redis_opts, _connection_opts], server_opts} = Exq.Support.Opts.redis_worker_opts([mode: :default]) [scheduler_enable: scheduler_enable, namespace: namespace, scheduler_poll_timeout: scheduler_poll_timeout, workers_sup: workers_sup, poll_timeout: poll_timeout, enqueuer: enqueuer, metadata: metadata, stats: stats, name: name, scheduler: scheduler, queues: queues, redis: redis, concurrency: concurrency, middleware: middleware, @@ -145,18 +146,26 @@ defmodule Exq.ConfigTest do assert mode == :default Mix.Config.persist([exq: [queues: [{"default", 1000}, {"test1", 2000}]]]) - {_redis_opts, _connection_opts, server_opts} = Exq.Support.Opts.conform_opts([mode: :default]) + {Redix, [_redis_opts, _connection_opts], server_opts} = Exq.Support.Opts.redis_worker_opts([mode: :default]) assert server_opts[:queues] == ["default", "test1"] assert server_opts[:concurrency] == [{"default", 1000, 0}, {"test1", 2000, 0}] end - test "api conform_opts" do + test "api redis_worker_opts" do Mix.Config.persist([exq: []]) - {_redis_opts, _connection_opts, server_opts} = Exq.Support.Opts.conform_opts([mode: :api]) + {Redix, [_redis_opts, _connection_opts], server_opts} = Exq.Support.Opts.redis_worker_opts([mode: :api]) [name: name, namespace: namespace, redis: redis, mode: mode] = server_opts assert namespace == "test" assert name == nil assert redis == Exq.Redis.Client assert mode == :api end + + test "custom redis module" do + with_application_env(:exq, :redis_worker, {RedisWorker, [1, 2]}, fn -> + {module, args, server_opts} = Exq.Support.Opts.redis_worker_opts([mode: :default]) + assert module == RedisWorker + assert args == [1, 2] + end) + end end diff --git a/test/exq_test.exs b/test/exq_test.exs index 3799554a..d5713b8e 100644 --- a/test/exq_test.exs +++ b/test/exq_test.exs @@ -78,6 +78,21 @@ defmodule ExqTest do stop_process(sup) end + test "enqueue and run job via redis sentinel" do + sentinel_args = [ + [role: "master", group: "exq", sentinels: [[host: "127.0.0.1", port: 6666]]], + [database: 0, password: nil], + [backoff: 100, timeout: 5000, name: Exq.Redis.Client, socket_opts: []] + ] + with_application_env(:exq, :redis_worker, {RedixSentinel, sentinel_args}, fn -> + Process.register(self(), :exqtest) + {:ok, sup} = Exq.start_link + {:ok, _} = Exq.enqueue(Exq, "default", ExqTest.PerformWorker, []) + assert_receive {:worked} + stop_process(sup) + end) + end + test "run jobs from backup queue on boot" do host = elem(:inet.gethostname(), 1) Process.register(self(), :exqtest) diff --git a/test/test-sentinel.conf b/test/test-sentinel.conf new file mode 100644 index 00000000..250c50c5 --- /dev/null +++ b/test/test-sentinel.conf @@ -0,0 +1,6 @@ +port 6666 +daemonize yes +logfile stdout +pidfile /tmp/resquex-redis-sentinel.pid + +sentinel monitor exq 127.0.0.1 6555 1 diff --git a/test/test_helper.exs b/test/test_helper.exs index a6b1ac85..39650a9f 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -85,6 +85,16 @@ defmodule ExqTestUtil do config = Mix.Config.read!(Path.join([Path.dirname(__DIR__), "config", "config.exs"])) Mix.Config.persist(config) end + + def with_application_env(app, key, new, context) do + old = Application.get_env(app, key) + Application.put_env(app, key, new) + try do + context.() + after + Application.put_env(app, key, old) + end + end end defmodule TestRedis do @@ -96,13 +106,15 @@ defmodule TestRedis do def start do unless Config.get(:test_with_local_redis) == false do [] = :os.cmd('redis-server test/test-redis.conf') - :timer.sleep(100) + [] = :os.cmd('redis-server test/test-sentinel.conf --sentinel') + :timer.sleep(500) end end def stop do unless Config.get(:test_with_local_redis) == false do [] = :os.cmd('redis-cli -p 6555 shutdown') + [] = :os.cmd('redis-cli -p 6666 shutdown') end end From f6464661b349c1ed9d04fd8eb1749af720732d0d Mon Sep 17 00:00:00 2001 From: Anantha Kumaran Date: Tue, 16 Jan 2018 11:55:25 +0530 Subject: [PATCH 04/15] add note about sentinel support --- README.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/README.md b/README.md index b998628d..926c6cbf 100644 --- a/README.md +++ b/README.md @@ -182,6 +182,25 @@ config :exq, ] ``` +### Sentinel: + +Exq by default uses [Redix](https://github.com/whatyouhide/redix) +which doesn't support Redis Sentinel. To use Sentinel, add +[RedixSentinel](https://github.com/ananthakumaran/redix_sentinel) to +the list of dependencies. Configure `:redis_worker ({module, start_link_args})` appropriately. + + +```elixir +config :exq + redis_worker: {RedixSentinel, [ + [role: "master", group: "exq", sentinels: [[host: "127.0.0.1", port: 6666]]], + [database: 0, password: nil], + [backoff: 100, timeout: 5000, name: Exq.Redis.Client, socket_opts: []] + ]} + ... +``` + + ## Using iex: If you'd like to try Exq out on the iex console, you can do this by typing ``` From 5df7dcff5ed587bfa403817b0c5757a5ddb73c97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Bartas?= Date: Tue, 10 Oct 2017 21:45:17 +0200 Subject: [PATCH 05/15] Update Manager and stats to correctly store information about processing jobs --- lib/exq/manager/server.ex | 50 +++++++++++++++++-- lib/exq/redis/connection.ex | 15 ++++++ lib/exq/redis/job_stat.ex | 69 ++++++++++++++++++++------ lib/exq/serializers/json_serializer.ex | 8 ++- lib/exq/stats/server.ex | 14 +++--- lib/exq/support/process.ex | 2 +- lib/exq/worker/server.ex | 1 - test/api_test.exs | 4 +- test/exq_test.exs | 3 +- test/job_stat_test.exs | 9 ++-- 10 files changed, 135 insertions(+), 40 deletions(-) diff --git a/lib/exq/manager/server.ex b/lib/exq/manager/server.ex index 8dbc6a8b..18c70b87 100644 --- a/lib/exq/manager/server.ex +++ b/lib/exq/manager/server.ex @@ -113,14 +113,16 @@ defmodule Exq.Manager.Server do use GenServer alias Exq.Enqueuer alias Exq.Support.Config + alias Exq.Support.Time alias Exq.Redis.JobQueue + alias Exq.Redis.Connection @backoff_mult 10 defmodule State do defstruct redis: nil, stats: nil, enqueuer: nil, pid: nil, node_id: nil, namespace: nil, work_table: nil, queues: nil, poll_timeout: nil, scheduler_poll_timeout: nil, workers_sup: nil, - middleware: nil, metadata: nil + middleware: nil, metadata: nil, started_at: nil end def start_link(opts\\[]) do @@ -135,7 +137,6 @@ defmodule Exq.Manager.Server do def server_name(nil), do: Config.get(:name) def server_name(name), do: name - ##=========================================================== ## gen server callbacks ##=========================================================== @@ -160,13 +161,37 @@ defmodule Exq.Manager.Server do queues: opts[:queues], pid: self(), poll_timeout: opts[:poll_timeout], - scheduler_poll_timeout: opts[:scheduler_poll_timeout] + scheduler_poll_timeout: opts[:scheduler_poll_timeout], + started_at: Time.unix_seconds, } check_redis_connection(opts) + + name = redis_worker_name(state) + worker_init = [ + ["DEL", "#{name}:workers"], # remove old working processes + ["SADD", JobQueue.full_key(state.namespace, "processes"), name], + ["HSET", name, "quiet", "false"], + ["HSET", name, "info", Poison.encode!(%{ hostname: state.node_id, started_at: state.started_at, pid: "#{:erlang.pid_to_list(state.pid)}", concurrency: cocurency_count(state), queues: state.queues})], + ["HSET", name, "beat", Time.unix_seconds], + ["EXPIRE", name, (state.poll_timeout / 1000 + 5)], + ] + Connection.qp!(state.redis, worker_init) + {:ok, state, 0} end + def cocurency_count(state) do + Enum.map(state.queues, fn(q) -> + [{_, concurrency, _}] = :ets.lookup(state.work_table, q) + cond do + concurrency == :infinite -> 1000000 + true -> concurrency + end + end) + |> Enum.sum + end + def handle_call({:enqueue, queue, worker, args, options}, from, state) do Enqueuer.enqueue(state.enqueuer, from, queue, worker, args, options) {:noreply, state, 10} @@ -214,7 +239,7 @@ defmodule Exq.Manager.Server do """ def handle_cast(:cleanup_host_stats, state) do rescue_timeout(fn -> - Exq.Stats.Server.cleanup_host_stats(state.stats, state.namespace, state.node_id) + Exq.Stats.Server.cleanup_host_stats(state.stats, state.namespace, state.node_id, state.pid) end) {:noreply, state, 0} end @@ -240,6 +265,11 @@ defmodule Exq.Manager.Server do ##=========================================================== ## Internal Functions ##=========================================================== + + defp redis_worker_name(state) do + JobQueue.full_key(state.namespace, "#{state.node_id}:elixir") + end + @doc """ Dequeue jobs and dispatch to workers """ @@ -251,6 +281,17 @@ defmodule Exq.Manager.Server do job_results = jobs |> Enum.map(fn(potential_job) -> dispatch_job(state, potential_job) end) + # Update worker info in redis that it is alive + name = redis_worker_name(state) + worker_init = [ + ["SADD", JobQueue.full_key(state.namespace, "processes"), name], + ["HSET", name, "quiet", "false"], + ["HSET", name, "info", Poison.encode!(%{ hostname: state.node_id, started_at: state.started_at, pid: "#{:erlang.pid_to_list(state.pid)}", concurrency: cocurency_count(state), queues: state.queues})], + ["HSET", name, "beat", Time.unix_seconds], + ["EXPIRE", name, (state.poll_timeout / 1000 + 5)], # expire information about live worker in poll_interval + 5s + ] + Connection.qp!(state.redis, worker_init) + cond do Enum.any?(job_results, fn(status) -> elem(status, 1) == :dispatch end) -> {state, 0} @@ -297,7 +338,6 @@ defmodule Exq.Manager.Server do update_worker_count(state.work_table, queue, 1) end - # Setup queues from options / configs. # The following is done: diff --git a/lib/exq/redis/connection.ex b/lib/exq/redis/connection.ex index d3006b24..8afe7a20 100644 --- a/lib/exq/redis/connection.ex +++ b/lib/exq/redis/connection.ex @@ -28,6 +28,21 @@ defmodule Exq.Redis.Connection do val end + def hget!(redis, key, field) do + {:ok, val} = q(redis, ["HGET", key, field]) + val + end + + def hvals!(redis, key) do + {:ok, val} = q(redis, ["HVALS", key]) + val + end + + def hlen!(redis, key) do + {:ok, val} = q(redis, ["HLEN", key]) + val + end + def set!(redis, key, val \\ 0) do q(redis, ["SET", key, val]) end diff --git a/lib/exq/redis/job_stat.ex b/lib/exq/redis/job_stat.ex index 042dbf34..9f78f778 100644 --- a/lib/exq/redis/job_stat.ex +++ b/lib/exq/redis/job_stat.ex @@ -38,9 +38,20 @@ defmodule Exq.Redis.JobStat do {:ok, count} end - def add_process_commands(namespace, process_info, serialized_process \\ nil) do - serialized = serialized_process || Exq.Support.Process.encode(process_info) - [["SADD", JobQueue.full_key(namespace, "processes"), serialized]] + def add_process_commands(namespace, process_info, _) do + name = supervisor_worker_name(namespace, process_info) + string_pid = :erlang.pid_to_list(process_info.pid) + [ + ["SADD", JobQueue.full_key(namespace, "processes"), name], # ensure supervisor worker is added to list + ["HINCRBY", name, "busy", "1"], + ["HSET", "#{name}:workers", string_pid, Poison.encode!(%{ + run_at: process_info.started_at, + pid: string_pid, + payload: serialize_processing_payload(process_info.job), + hostname: process_info.hostname, + queue: process_info.job && process_info.job.queue + })] + ] end def add_process(redis, namespace, process_info, serialized_process \\ nil) do instr = add_process_commands(namespace, process_info, serialized_process) @@ -48,9 +59,26 @@ defmodule Exq.Redis.JobStat do :ok end - def remove_process_commands(namespace, process_info, serialized_process \\ nil) do - serialized = serialized_process || Exq.Support.Process.encode(process_info) - [["SREM", JobQueue.full_key(namespace, "processes"), serialized]] + defp serialize_processing_payload(nil) do + %{} + end + defp serialize_processing_payload(job) do + %{ + queue: job.queue, + class: job.class, + args: job.args, + jid: job.jid, + created_at: job.enqueued_at, + enqueued_at: job.enqueued_at + } + end + + def remove_process_commands(namespace, process_info, _) do + name = supervisor_worker_name(namespace, process_info) + [ + ["HINCRBY", name, "busy", "-1"], + ["HDEL", "#{name}:workers", :erlang.pid_to_list(process_info.pid)], + ] end def remove_process(redis, namespace, process_info, serialized_process \\ nil) do instr = remove_process_commands(namespace, process_info, serialized_process) @@ -58,21 +86,30 @@ defmodule Exq.Redis.JobStat do :ok end - def cleanup_processes(redis, namespace, host) do - Connection.smembers!(redis, JobQueue.full_key(namespace, "processes")) - |> Enum.map(fn(serialized) -> {Process.decode(serialized), serialized} end) - |> Enum.filter(fn({process, _}) -> process.host == host end) - |> Enum.each(fn({process, serialized}) -> remove_process(redis, namespace, process, serialized) end) + def cleanup_processes(redis, namespace, hostname, master_pid) do + processes = JobQueue.full_key(namespace, "processes") + master_pid_string = "#{:erlang.pid_to_list(master_pid)}" + instr = Connection.smembers!(redis, processes) + |> Enum.filter(fn(key) -> key =~ "#{hostname}:" end) + |> Enum.filter(fn(key) -> ((Connection.hget!(redis, key, "info") || '{}') |> Poison.decode!)["pid"] != master_pid_string end) + |> Enum.flat_map(fn(key) -> [["SREM", processes, key], ["DEL", "#{processes}:workers"]] end) + + if Enum.count(instr) > 0 do + Connection.qp!(redis, instr) + end :ok end def busy(redis, namespace) do - Connection.scard!(redis, JobQueue.full_key(namespace, "processes")) + (Connection.smembers!(redis, JobQueue.full_key(namespace, "processes")) || []) + |> Enum.map(fn(key) -> Connection.hlen!(redis, "#{key}:workers") end) + |> Enum.sum end def processes(redis, namespace) do - list = Connection.smembers!(redis, JobQueue.full_key(namespace, "processes")) || [] - Enum.map(list, &Process.decode/1) + (Connection.smembers!(redis, JobQueue.full_key(namespace, "processes")) || []) + |> Enum.flat_map(fn(key) -> Connection.hvals!(redis, "#{key}:workers") end) + |> Enum.map(&Process.decode(&1)) end def find_failed(redis, namespace, jid) do @@ -120,6 +157,10 @@ defmodule Exq.Redis.JobStat do {:ok, failures, successes} end + defp supervisor_worker_name(namespace, process_info) do + JobQueue.full_key(namespace, "#{process_info.hostname}:elixir") + end + defp realtime_stats_formatter(redis, namespace) do fn(keys, ns) -> if Enum.empty?(keys) do diff --git a/lib/exq/serializers/json_serializer.ex b/lib/exq/serializers/json_serializer.ex index b3db3794..2ef91b8f 100644 --- a/lib/exq/serializers/json_serializer.ex +++ b/lib/exq/serializers/json_serializer.ex @@ -56,7 +56,9 @@ defmodule Exq.Serializers.JsonSerializer do deserialized = decode!(serialized) %Exq.Support.Process{ pid: Map.get(deserialized, "pid"), - host: Map.get(deserialized, "host"), + hostname: Map.get(deserialized, "hostname"), + queues: Map.get(deserialized, "queues"), + concurrency: Map.get(deserialized, "concurrency"), job: Map.get(deserialized, "job"), started_at: Map.get(deserialized, "started_at") } @@ -66,7 +68,9 @@ defmodule Exq.Serializers.JsonSerializer do formatted_pid = to_string(:io_lib.format("~p", [process.pid])) deserialized = Enum.into([ pid: formatted_pid, - host: process.host, + hostname: process.hostname, + queues: process.queues, + concurrency: process.concurrency, job: process.job, started_at: process.started_at], Map.new) diff --git a/lib/exq/stats/server.ex b/lib/exq/stats/server.ex index b2eabdb2..809b0ac4 100644 --- a/lib/exq/stats/server.ex +++ b/lib/exq/stats/server.ex @@ -24,9 +24,9 @@ defmodule Exq.Stats.Server do @doc """ Add in progress worker process """ - def add_process(stats, namespace, worker, host, job_serialized) do + def add_process(stats, namespace, worker, hostname, job_serialized) do process_info = %Process{pid: worker, - host: host, + hostname: hostname, job: Exq.Support.Config.serializer.decode_job(job_serialized), started_at: Time.unix_seconds} serialized = Exq.Support.Process.encode(process_info) @@ -62,8 +62,8 @@ defmodule Exq.Stats.Server do @doc """ Cleanup stats on boot. This includes cleaning up busy workers. """ - def cleanup_host_stats(stats, namespace, host) do - GenServer.call(stats, {:cleanup_host_stats, namespace, host}) + def cleanup_host_stats(stats, namespace, host, master_pid) do + GenServer.call(stats, {:cleanup_host_stats, namespace, host, master_pid}) :ok end @@ -76,7 +76,6 @@ defmodule Exq.Stats.Server do GenServer.call(stats, :force_flush) end - ##=========================================================== ## gen server callbacks ##=========================================================== @@ -102,9 +101,9 @@ defmodule Exq.Stats.Server do {:reply, :ok, state} end - def handle_call({:cleanup_host_stats, namespace, host}, _from, state) do + def handle_call({:cleanup_host_stats, namespace, host, master_pid}, _from, state) do try do - JobStat.cleanup_processes(state.redis, namespace, host) + JobStat.cleanup_processes(state.redis, namespace, host, master_pid) rescue e -> Logger.error("Error cleaning up processes - #{Kernel.inspect e}") end @@ -124,7 +123,6 @@ defmodule Exq.Stats.Server do :ok end - ##=========================================================== ## Methods ##=========================================================== diff --git a/lib/exq/support/process.ex b/lib/exq/support/process.ex index c40bb16d..e90ddd99 100644 --- a/lib/exq/support/process.ex +++ b/lib/exq/support/process.ex @@ -2,7 +2,7 @@ defmodule Exq.Support.Process do @moduledoc """ Struct for in progress worker """ - defstruct pid: nil, host: nil, job: nil, started_at: nil + defstruct pid: nil, hostname: nil, job: nil, started_at: nil, concurrency: nil, job: nil, queues: nil alias Exq.Support.Config diff --git a/lib/exq/worker/server.ex b/lib/exq/worker/server.ex index 85765976..261d2e77 100644 --- a/lib/exq/worker/server.ex +++ b/lib/exq/worker/server.ex @@ -73,7 +73,6 @@ defmodule Exq.Worker.Server do {:noreply, state} end - @doc """ Dispatch work to the target module (call :perform method of target) """ diff --git a/test/api_test.exs b/test/api_test.exs index 0b33e4a8..f5d3585e 100644 --- a/test/api_test.exs +++ b/test/api_test.exs @@ -67,8 +67,8 @@ defmodule ApiTest do test "processes with data" do JobStat.add_process(:testredis, "test", %Process{pid: self()}) assert {:ok, [processes]} = Exq.Api.processes(Exq.Api) - my_pid_str = to_string(:erlang.pid_to_list(self())) - assert %Process{pid: ^my_pid_str} = processes + pid = to_string(:erlang.pid_to_list(self())) + assert pid = processes.pid end test "jobs when empty" do diff --git a/test/exq_test.exs b/test/exq_test.exs index d5713b8e..ce65855b 100644 --- a/test/exq_test.exs +++ b/test/exq_test.exs @@ -281,7 +281,7 @@ defmodule ExqTest do # Clear processes for this node host = Exq.NodeIdentifier.HostnameIdentifier.node_id() - Exq.Stats.Server.cleanup_host_stats(ExqP.Stats, "test", host) + Exq.Stats.Server.cleanup_host_stats(ExqP.Stats, "test", host, self()) # Check that process has been cleared processes = Exq.Redis.JobStat.processes(state.redis, "test") @@ -320,7 +320,6 @@ defmodule ExqTest do {:ok, count} = TestStats.failed_count(state.redis, "test") assert count == "2" - {:ok, jid} = Exq.enqueue(Exq, "default", "ExqTest.FailWorker/failure_perform", []) # if we kill Exq too fast we dont record the failure because exq is gone diff --git a/test/job_stat_test.exs b/test/job_stat_test.exs index 77d141ef..4c91d11a 100644 --- a/test/job_stat_test.exs +++ b/test/job_stat_test.exs @@ -28,9 +28,9 @@ defmodule JobStatTest do {:ok, jid} end - def create_process_info(host) do + def create_process_info(hostname) do process_info = %Process{pid: self(), - host: host, + hostname: hostname, job: %Job{}, started_at: Time.unix_seconds} serialized = Exq.Support.Process.encode(process_info) @@ -122,11 +122,10 @@ defmodule JobStatTest do assert Enum.count(Exq.Redis.JobStat.processes(:testredis, namespace)) == 2 # Should cleanup only the host that is passed in - JobStat.cleanup_processes(:testredis, namespace, "host123") + JobStat.cleanup_processes(:testredis, namespace, "host123", self()) processes = Exq.Redis.JobStat.processes(:testredis, namespace) assert Enum.count(processes) == 1 - assert Enum.find(processes, fn(process) -> process.host == "host456" end) != nil + assert Enum.find(processes, fn(process) -> process.hostname == "host456" end) != nil end - end From 28da15d179d8add7974b2192fb07736747caf5b9 Mon Sep 17 00:00:00 2001 From: Antonin Date: Wed, 24 Jan 2018 14:35:22 +0100 Subject: [PATCH 06/15] Move heartbeat to separate genserver --- lib/exq/manager/heartbeat_server.ex | 61 +++++++++++++++++++++++++++++ lib/exq/manager/server.ex | 39 ++---------------- 2 files changed, 65 insertions(+), 35 deletions(-) create mode 100644 lib/exq/manager/heartbeat_server.ex diff --git a/lib/exq/manager/heartbeat_server.ex b/lib/exq/manager/heartbeat_server.ex new file mode 100644 index 00000000..9cc28963 --- /dev/null +++ b/lib/exq/manager/heartbeat_server.ex @@ -0,0 +1,61 @@ +defmodule HeartbeatServer do + use GenServer + alias Exq.Redis.JobQueue + alias Exq.Redis.Connection + alias Exq.Support.Time + + def start_link(master_state) do + GenServer.start_link(__MODULE__, master_state) + end + + def init(master_state) do + worker_init = [ ["DEL", "#{redis_worker_name(master_state)}:workers"] ] + ++ getRedisCommands(master_state) + Connection.qp!(master_state.redis, worker_init) + + schedule_work() + + {:ok, master_state} + end + + def handle_info(:heartbeat, master_state) do + schedule_work() + + current_state = GenServer.call(Map.get(master_state, :pid), :get_state) + Connection.qp!(current_state.redis, getRedisCommands(current_state)) + + {:noreply, current_state} + end + + defp schedule_work() do + Process.send_after(self(), :heartbeat, 1000) + end + + defp redis_worker_name(state) do + JobQueue.full_key(state.namespace, "#{state.node_id}:elixir") + end + + defp getRedisCommands(state) do + name = redis_worker_name(state) + [ + ["SADD", JobQueue.full_key(state.namespace, "processes"), name], + ["HSET", name, "quiet", "false"], + ["HSET", name, "info", Poison.encode!(%{ hostname: state.node_id, started_at: state.started_at, pid: "#{:erlang.pid_to_list(state.pid)}", concurrency: cocurency_count(state), queues: state.queues})], + ["HSET", name, "beat", Time.unix_seconds], + ["EXPIRE", name, (state.poll_timeout / 1000 + 5)], # expire information about live worker in poll_interval + 5s + ] + end + + + defp cocurency_count(state) do + Enum.map(state.queues, fn(q) -> + [{_, concurrency, _}] = :ets.lookup(state.work_table, q) + cond do + concurrency == :infinite -> 1000000 + true -> concurrency + end + end) + |> Enum.sum + end + +end diff --git a/lib/exq/manager/server.ex b/lib/exq/manager/server.ex index 18c70b87..a4c7a0a0 100644 --- a/lib/exq/manager/server.ex +++ b/lib/exq/manager/server.ex @@ -115,7 +115,6 @@ defmodule Exq.Manager.Server do alias Exq.Support.Config alias Exq.Support.Time alias Exq.Redis.JobQueue - alias Exq.Redis.Connection @backoff_mult 10 @@ -167,29 +166,12 @@ defmodule Exq.Manager.Server do check_redis_connection(opts) - name = redis_worker_name(state) - worker_init = [ - ["DEL", "#{name}:workers"], # remove old working processes - ["SADD", JobQueue.full_key(state.namespace, "processes"), name], - ["HSET", name, "quiet", "false"], - ["HSET", name, "info", Poison.encode!(%{ hostname: state.node_id, started_at: state.started_at, pid: "#{:erlang.pid_to_list(state.pid)}", concurrency: cocurency_count(state), queues: state.queues})], - ["HSET", name, "beat", Time.unix_seconds], - ["EXPIRE", name, (state.poll_timeout / 1000 + 5)], - ] - Connection.qp!(state.redis, worker_init) - + HeartbeatServer.start_link(state) {:ok, state, 0} end - def cocurency_count(state) do - Enum.map(state.queues, fn(q) -> - [{_, concurrency, _}] = :ets.lookup(state.work_table, q) - cond do - concurrency == :infinite -> 1000000 - true -> concurrency - end - end) - |> Enum.sum + def handle_call(:get_state, _from, state) do + {:reply, state, state} end def handle_call({:enqueue, queue, worker, args, options}, from, state) do @@ -266,9 +248,7 @@ defmodule Exq.Manager.Server do ## Internal Functions ##=========================================================== - defp redis_worker_name(state) do - JobQueue.full_key(state.namespace, "#{state.node_id}:elixir") - end + @doc """ Dequeue jobs and dispatch to workers @@ -281,17 +261,6 @@ defmodule Exq.Manager.Server do job_results = jobs |> Enum.map(fn(potential_job) -> dispatch_job(state, potential_job) end) - # Update worker info in redis that it is alive - name = redis_worker_name(state) - worker_init = [ - ["SADD", JobQueue.full_key(state.namespace, "processes"), name], - ["HSET", name, "quiet", "false"], - ["HSET", name, "info", Poison.encode!(%{ hostname: state.node_id, started_at: state.started_at, pid: "#{:erlang.pid_to_list(state.pid)}", concurrency: cocurency_count(state), queues: state.queues})], - ["HSET", name, "beat", Time.unix_seconds], - ["EXPIRE", name, (state.poll_timeout / 1000 + 5)], # expire information about live worker in poll_interval + 5s - ] - Connection.qp!(state.redis, worker_init) - cond do Enum.any?(job_results, fn(status) -> elem(status, 1) == :dispatch end) -> {state, 0} From e7c2058f697a68ad04377404432c492d8991985a Mon Sep 17 00:00:00 2001 From: Antonin Date: Wed, 24 Jan 2018 15:23:31 +0100 Subject: [PATCH 07/15] Clean up stats at init of server --- lib/exq/manager/server.ex | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/lib/exq/manager/server.ex b/lib/exq/manager/server.ex index a4c7a0a0..17beed88 100644 --- a/lib/exq/manager/server.ex +++ b/lib/exq/manager/server.ex @@ -141,10 +141,6 @@ defmodule Exq.Manager.Server do ##=========================================================== def init(opts) do - - # Cleanup stale stats - GenServer.cast(self(), :cleanup_host_stats) - # Setup queues work_table = setup_queues(opts) @@ -166,6 +162,11 @@ defmodule Exq.Manager.Server do check_redis_connection(opts) + # Cleanup stale stats + rescue_timeout(fn -> + Exq.Stats.Server.cleanup_host_stats(state.stats, state.namespace, state.node_id, state.pid) + end) + HeartbeatServer.start_link(state) {:ok, state, 0} end @@ -216,16 +217,6 @@ defmodule Exq.Manager.Server do {:noreply, state, 0} end - @doc """ - Cleanup host stats on boot - """ - def handle_cast(:cleanup_host_stats, state) do - rescue_timeout(fn -> - Exq.Stats.Server.cleanup_host_stats(state.stats, state.namespace, state.node_id, state.pid) - end) - {:noreply, state, 0} - end - def handle_cast({:job_terminated, _namespace, queue, _job_serialized}, state) do update_worker_count(state.work_table, queue, -1) {:noreply, state, 0} From 82f5082d234aa951d7327331f37a84feea43828f Mon Sep 17 00:00:00 2001 From: Antonin Date: Wed, 14 Feb 2018 12:08:10 +0100 Subject: [PATCH 08/15] Hearbeat server fixes --- lib/exq/manager/heartbeat_server.ex | 78 +++++++++++++---------------- lib/exq/manager/server.ex | 8 +-- lib/exq/redis/job_stat.ex | 27 ++++++++++ lib/exq/support/mode.ex | 1 + 4 files changed, 68 insertions(+), 46 deletions(-) diff --git a/lib/exq/manager/heartbeat_server.ex b/lib/exq/manager/heartbeat_server.ex index 9cc28963..b2c44b46 100644 --- a/lib/exq/manager/heartbeat_server.ex +++ b/lib/exq/manager/heartbeat_server.ex @@ -1,61 +1,55 @@ -defmodule HeartbeatServer do +defmodule Exq.Heartbeat.Server do use GenServer alias Exq.Redis.JobQueue alias Exq.Redis.Connection + alias Exq.Support.Config alias Exq.Support.Time + alias Exq.Redis.JobStat - def start_link(master_state) do - GenServer.start_link(__MODULE__, master_state) + defmodule State do + defstruct name: nil, node_id: nil, namespace: nil, started_at: nil, pid: nil, queues: nil, poll_timeout: nil, work_table: nil, redis: nil end - def init(master_state) do - worker_init = [ ["DEL", "#{redis_worker_name(master_state)}:workers"] ] - ++ getRedisCommands(master_state) - Connection.qp!(master_state.redis, worker_init) - - schedule_work() - - {:ok, master_state} + def start_link(opts) do + GenServer.start_link(__MODULE__, opts) end - def handle_info(:heartbeat, master_state) do - schedule_work() - - current_state = GenServer.call(Map.get(master_state, :pid), :get_state) - Connection.qp!(current_state.redis, getRedisCommands(current_state)) + def init(opts) do + state = %State{ + name: Exq.Manager.Server.server_name(opts[:name]), + redis: opts[:redis], + node_id: Config.node_identifier.node_id(), + namespace: opts[:namespace], + queues: opts[:queues], + poll_timeout: opts[:poll_timeout] + } + schedule_work(state) + {:ok, state} + end + def handle_cast({:heartbeat, master_state}, state) do + schedule_work(state) + current_state = struct(State, Map.from_struct(master_state)) + Connection.qp!( + current_state.redis, + JobStat.get_redis_commands( + current_state.namespace, + current_state.node_id, + current_state.started_at, + current_state.pid, + current_state.queues, + current_state.work_table, + current_state.poll_timeout + ) + ) {:noreply, current_state} end - defp schedule_work() do - Process.send_after(self(), :heartbeat, 1000) + defp schedule_work(state) do + Process.send_after(state.name, {:get_state, self()}, 1000) end defp redis_worker_name(state) do JobQueue.full_key(state.namespace, "#{state.node_id}:elixir") end - - defp getRedisCommands(state) do - name = redis_worker_name(state) - [ - ["SADD", JobQueue.full_key(state.namespace, "processes"), name], - ["HSET", name, "quiet", "false"], - ["HSET", name, "info", Poison.encode!(%{ hostname: state.node_id, started_at: state.started_at, pid: "#{:erlang.pid_to_list(state.pid)}", concurrency: cocurency_count(state), queues: state.queues})], - ["HSET", name, "beat", Time.unix_seconds], - ["EXPIRE", name, (state.poll_timeout / 1000 + 5)], # expire information about live worker in poll_interval + 5s - ] - end - - - defp cocurency_count(state) do - Enum.map(state.queues, fn(q) -> - [{_, concurrency, _}] = :ets.lookup(state.work_table, q) - cond do - concurrency == :infinite -> 1000000 - true -> concurrency - end - end) - |> Enum.sum - end - end diff --git a/lib/exq/manager/server.ex b/lib/exq/manager/server.ex index 17beed88..8c2de42b 100644 --- a/lib/exq/manager/server.ex +++ b/lib/exq/manager/server.ex @@ -157,7 +157,7 @@ defmodule Exq.Manager.Server do pid: self(), poll_timeout: opts[:poll_timeout], scheduler_poll_timeout: opts[:scheduler_poll_timeout], - started_at: Time.unix_seconds, + started_at: Time.unix_seconds } check_redis_connection(opts) @@ -167,12 +167,12 @@ defmodule Exq.Manager.Server do Exq.Stats.Server.cleanup_host_stats(state.stats, state.namespace, state.node_id, state.pid) end) - HeartbeatServer.start_link(state) {:ok, state, 0} end - def handle_call(:get_state, _from, state) do - {:reply, state, state} + def handle_info({:get_state, pid}, state) do + GenServer.cast(pid, {:heartbeat, state}) + {:noreply, state} end def handle_call({:enqueue, queue, worker, args, options}, from, state) do diff --git a/lib/exq/redis/job_stat.ex b/lib/exq/redis/job_stat.ex index 9f78f778..9f125a7e 100644 --- a/lib/exq/redis/job_stat.ex +++ b/lib/exq/redis/job_stat.ex @@ -186,4 +186,31 @@ defmodule Exq.Redis.JobStat do val end end + + def get_redis_commands(namespace, node_id, started_at, master_pid, queues, work_table, poll_timeout) do + name = redis_worker_name(namespace, node_id) + [ + ["DEL", "#{name}:workers"], + ["SADD", JobQueue.full_key(namespace, "processes"), name], + ["HSET", name, "quiet", "false"], + ["HSET", name, "info", Poison.encode!(%{ hostname: node_id, started_at: started_at, pid: "#{:erlang.pid_to_list(master_pid)}", concurrency: cocurency_count(queues, work_table), queues: queues})], + ["HSET", name, "beat", Time.unix_seconds], + ["EXPIRE", name, (poll_timeout / 1000 + 5)] + ] + end + + defp redis_worker_name(namespace, node_id) do + JobQueue.full_key(namespace, "#{node_id}:elixir") + end + + defp cocurency_count(queues, work_table) do + Enum.map(queues, fn(q) -> + [{_, concurrency, _}] = :ets.lookup(work_table, q) + cond do + concurrency == :infinite -> 1000000 + true -> concurrency + end + end) + |> Enum.sum + end end diff --git a/lib/exq/support/mode.ex b/lib/exq/support/mode.ex index dccb9f26..92721aea 100644 --- a/lib/exq/support/mode.ex +++ b/lib/exq/support/mode.ex @@ -28,6 +28,7 @@ defmodule Exq.Support.Mode do worker(Exq.Stats.Server, [opts]), supervisor(Exq.Worker.Supervisor, [opts]), worker(Exq.Manager.Server, [opts]), + worker(Exq.Heartbeat.Server, [opts]), worker(Exq.WorkerDrainer.Server, [opts]), worker(Exq.Enqueuer.Server, [opts]), worker(Exq.Api.Server, [opts]) From 2919ebc4189d1ad5b2c5ba5553a0a6a00eae752d Mon Sep 17 00:00:00 2001 From: Antonin Date: Thu, 15 Feb 2018 14:33:37 +0100 Subject: [PATCH 09/15] Start Heartbeat server from Mode modul --- lib/exq/manager/heartbeat_server.ex | 39 +++++++++++++---------------- lib/exq/manager/server.ex | 10 ++++---- lib/exq/redis/job_stat.ex | 1 - 3 files changed, 23 insertions(+), 27 deletions(-) diff --git a/lib/exq/manager/heartbeat_server.ex b/lib/exq/manager/heartbeat_server.ex index b2c44b46..a3ada43a 100644 --- a/lib/exq/manager/heartbeat_server.ex +++ b/lib/exq/manager/heartbeat_server.ex @@ -1,9 +1,7 @@ defmodule Exq.Heartbeat.Server do use GenServer - alias Exq.Redis.JobQueue alias Exq.Redis.Connection alias Exq.Support.Config - alias Exq.Support.Time alias Exq.Redis.JobStat defmodule State do @@ -23,33 +21,32 @@ defmodule Exq.Heartbeat.Server do queues: opts[:queues], poll_timeout: opts[:poll_timeout] } - schedule_work(state) + schedule_work(state, true) {:ok, state} end - def handle_cast({:heartbeat, master_state}, state) do + def handle_cast({:heartbeat, master_state, status}, state) do schedule_work(state) - current_state = struct(State, Map.from_struct(master_state)) + master_data = Map.from_struct(master_state) + current_state = %{struct(State, master_data) | name: state.name} + init_data = if status, do: [["DEL", "#{current_state.name}:workers"]], else: [] + data = init_data ++ JobStat.get_redis_commands( + current_state.namespace, + current_state.node_id, + current_state.started_at, + current_state.pid, + current_state.queues, + current_state.work_table, + current_state.poll_timeout + ) Connection.qp!( current_state.redis, - JobStat.get_redis_commands( - current_state.namespace, - current_state.node_id, - current_state.started_at, - current_state.pid, - current_state.queues, - current_state.work_table, - current_state.poll_timeout - ) - ) + data + ) {:noreply, current_state} end - defp schedule_work(state) do - Process.send_after(state.name, {:get_state, self()}, 1000) - end - - defp redis_worker_name(state) do - JobQueue.full_key(state.namespace, "#{state.node_id}:elixir") + defp schedule_work(state, status \\ false) do + Process.send_after(state.name, {:get_state, self(), status}, 1000) end end diff --git a/lib/exq/manager/server.ex b/lib/exq/manager/server.ex index 8c2de42b..15c05baa 100644 --- a/lib/exq/manager/server.ex +++ b/lib/exq/manager/server.ex @@ -170,11 +170,6 @@ defmodule Exq.Manager.Server do {:ok, state, 0} end - def handle_info({:get_state, pid}, state) do - GenServer.cast(pid, {:heartbeat, state}) - {:noreply, state} - end - def handle_call({:enqueue, queue, worker, args, options}, from, state) do Enqueuer.enqueue(state.enqueuer, from, queue, worker, args, options) {:noreply, state, 10} @@ -227,6 +222,11 @@ defmodule Exq.Manager.Server do {:noreply, updated_state, timeout} end + def handle_info({:get_state, pid, status}, state) do + GenServer.cast(pid, {:heartbeat, state, status}) + {:noreply, state} + end + def handle_info(_info, state) do {:noreply, state, state.poll_timeout} end diff --git a/lib/exq/redis/job_stat.ex b/lib/exq/redis/job_stat.ex index 9f125a7e..81174143 100644 --- a/lib/exq/redis/job_stat.ex +++ b/lib/exq/redis/job_stat.ex @@ -190,7 +190,6 @@ defmodule Exq.Redis.JobStat do def get_redis_commands(namespace, node_id, started_at, master_pid, queues, work_table, poll_timeout) do name = redis_worker_name(namespace, node_id) [ - ["DEL", "#{name}:workers"], ["SADD", JobQueue.full_key(namespace, "processes"), name], ["HSET", name, "quiet", "false"], ["HSET", name, "info", Poison.encode!(%{ hostname: node_id, started_at: started_at, pid: "#{:erlang.pid_to_list(master_pid)}", concurrency: cocurency_count(queues, work_table), queues: queues})], From 6c0f851b8a0e716fa8752489f1cceaee29f9eddd Mon Sep 17 00:00:00 2001 From: Antonin Date: Tue, 20 Feb 2018 14:54:14 +0100 Subject: [PATCH 10/15] First Heartbeat run without timeout --- lib/exq/manager/heartbeat_server.ex | 4 ++-- lib/exq/support/mode.ex | 4 ++-- test/exq_test.exs | 2 ++ test/test-redis.conf | 3 ++- test/test-sentinel.conf | 2 +- 5 files changed, 9 insertions(+), 6 deletions(-) diff --git a/lib/exq/manager/heartbeat_server.ex b/lib/exq/manager/heartbeat_server.ex index a3ada43a..edc45efc 100644 --- a/lib/exq/manager/heartbeat_server.ex +++ b/lib/exq/manager/heartbeat_server.ex @@ -21,7 +21,7 @@ defmodule Exq.Heartbeat.Server do queues: opts[:queues], poll_timeout: opts[:poll_timeout] } - schedule_work(state, true) + schedule_work(state, true, 0) {:ok, state} end @@ -46,7 +46,7 @@ defmodule Exq.Heartbeat.Server do {:noreply, current_state} end - defp schedule_work(state, status \\ false) do + defp schedule_work(state, status \\ false, timeout \\ 1000) do Process.send_after(state.name, {:get_state, self(), status}, 1000) end end diff --git a/lib/exq/support/mode.ex b/lib/exq/support/mode.ex index 92721aea..7a5d85a6 100644 --- a/lib/exq/support/mode.ex +++ b/lib/exq/support/mode.ex @@ -28,10 +28,10 @@ defmodule Exq.Support.Mode do worker(Exq.Stats.Server, [opts]), supervisor(Exq.Worker.Supervisor, [opts]), worker(Exq.Manager.Server, [opts]), - worker(Exq.Heartbeat.Server, [opts]), worker(Exq.WorkerDrainer.Server, [opts]), worker(Exq.Enqueuer.Server, [opts]), - worker(Exq.Api.Server, [opts]) + worker(Exq.Api.Server, [opts]), + worker(Exq.Heartbeat.Server, [opts]) ] if opts[:scheduler_enable] do diff --git a/test/exq_test.exs b/test/exq_test.exs index ce65855b..26fd1e35 100644 --- a/test/exq_test.exs +++ b/test/exq_test.exs @@ -118,7 +118,9 @@ defmodule ExqTest do Process.register(self(), :exqtest) {:ok, sup} = Exq.start_link(scheduler_enable: true) {:ok, _} = Exq.enqueue_in(Exq, "default", 0, ExqTest.PerformWorker, []) + :erlang.process_info(sup, :messages) |> IO.inspect assert_receive {:worked} + stop_process(sup) end diff --git a/test/test-redis.conf b/test/test-redis.conf index 56aa2694..cd150476 100644 --- a/test/test-redis.conf +++ b/test/test-redis.conf @@ -1,4 +1,5 @@ -port 6555 +port 6379 +bind redis_db daemonize yes logfile stdout pidfile /tmp/resquex-redis.pid diff --git a/test/test-sentinel.conf b/test/test-sentinel.conf index 250c50c5..e0e03f70 100644 --- a/test/test-sentinel.conf +++ b/test/test-sentinel.conf @@ -3,4 +3,4 @@ daemonize yes logfile stdout pidfile /tmp/resquex-redis-sentinel.pid -sentinel monitor exq 127.0.0.1 6555 1 +sentinel monitor exq redis_db 6379 1 From 94fdc057f3fefd00d93401553ab067ce9b754367 Mon Sep 17 00:00:00 2001 From: Antonin Date: Tue, 20 Feb 2018 20:42:32 +0100 Subject: [PATCH 11/15] Fix naming --- lib/exq/manager/heartbeat_server.ex | 6 +++--- lib/exq/manager/server.ex | 3 --- lib/exq/redis/job_stat.ex | 6 +++--- test/exq_test.exs | 2 -- test/test-redis.conf | 3 +-- test/test-sentinel.conf | 2 +- 6 files changed, 8 insertions(+), 14 deletions(-) diff --git a/lib/exq/manager/heartbeat_server.ex b/lib/exq/manager/heartbeat_server.ex index edc45efc..dd15a361 100644 --- a/lib/exq/manager/heartbeat_server.ex +++ b/lib/exq/manager/heartbeat_server.ex @@ -21,7 +21,7 @@ defmodule Exq.Heartbeat.Server do queues: opts[:queues], poll_timeout: opts[:poll_timeout] } - schedule_work(state, true, 0) + schedule_work(state, true) {:ok, state} end @@ -30,7 +30,7 @@ defmodule Exq.Heartbeat.Server do master_data = Map.from_struct(master_state) current_state = %{struct(State, master_data) | name: state.name} init_data = if status, do: [["DEL", "#{current_state.name}:workers"]], else: [] - data = init_data ++ JobStat.get_redis_commands( + data = init_data ++ JobStat.status_process_commands( current_state.namespace, current_state.node_id, current_state.started_at, @@ -46,7 +46,7 @@ defmodule Exq.Heartbeat.Server do {:noreply, current_state} end - defp schedule_work(state, status \\ false, timeout \\ 1000) do + defp schedule_work(state, status \\ false) do Process.send_after(state.name, {:get_state, self(), status}, 1000) end end diff --git a/lib/exq/manager/server.ex b/lib/exq/manager/server.ex index 15c05baa..b89eabed 100644 --- a/lib/exq/manager/server.ex +++ b/lib/exq/manager/server.ex @@ -238,9 +238,6 @@ defmodule Exq.Manager.Server do ##=========================================================== ## Internal Functions ##=========================================================== - - - @doc """ Dequeue jobs and dispatch to workers """ diff --git a/lib/exq/redis/job_stat.ex b/lib/exq/redis/job_stat.ex index 81174143..c0666dca 100644 --- a/lib/exq/redis/job_stat.ex +++ b/lib/exq/redis/job_stat.ex @@ -187,12 +187,12 @@ defmodule Exq.Redis.JobStat do end end - def get_redis_commands(namespace, node_id, started_at, master_pid, queues, work_table, poll_timeout) do + def status_process_commands(namespace, node_id, started_at, master_pid, queues, work_table, poll_timeout) do name = redis_worker_name(namespace, node_id) [ ["SADD", JobQueue.full_key(namespace, "processes"), name], ["HSET", name, "quiet", "false"], - ["HSET", name, "info", Poison.encode!(%{ hostname: node_id, started_at: started_at, pid: "#{:erlang.pid_to_list(master_pid)}", concurrency: cocurency_count(queues, work_table), queues: queues})], + ["HSET", name, "info", Poison.encode!(%{ hostname: node_id, started_at: started_at, pid: "#{:erlang.pid_to_list(master_pid)}", concurrency: concurrency_count(queues, work_table), queues: queues})], ["HSET", name, "beat", Time.unix_seconds], ["EXPIRE", name, (poll_timeout / 1000 + 5)] ] @@ -202,7 +202,7 @@ defmodule Exq.Redis.JobStat do JobQueue.full_key(namespace, "#{node_id}:elixir") end - defp cocurency_count(queues, work_table) do + defp concurrency_count(queues, work_table) do Enum.map(queues, fn(q) -> [{_, concurrency, _}] = :ets.lookup(work_table, q) cond do diff --git a/test/exq_test.exs b/test/exq_test.exs index 26fd1e35..ce65855b 100644 --- a/test/exq_test.exs +++ b/test/exq_test.exs @@ -118,9 +118,7 @@ defmodule ExqTest do Process.register(self(), :exqtest) {:ok, sup} = Exq.start_link(scheduler_enable: true) {:ok, _} = Exq.enqueue_in(Exq, "default", 0, ExqTest.PerformWorker, []) - :erlang.process_info(sup, :messages) |> IO.inspect assert_receive {:worked} - stop_process(sup) end diff --git a/test/test-redis.conf b/test/test-redis.conf index cd150476..56aa2694 100644 --- a/test/test-redis.conf +++ b/test/test-redis.conf @@ -1,5 +1,4 @@ -port 6379 -bind redis_db +port 6555 daemonize yes logfile stdout pidfile /tmp/resquex-redis.pid diff --git a/test/test-sentinel.conf b/test/test-sentinel.conf index e0e03f70..250c50c5 100644 --- a/test/test-sentinel.conf +++ b/test/test-sentinel.conf @@ -3,4 +3,4 @@ daemonize yes logfile stdout pidfile /tmp/resquex-redis-sentinel.pid -sentinel monitor exq redis_db 6379 1 +sentinel monitor exq 127.0.0.1 6555 1 From f32d0b5763057b74735c4dfc9471b54da95fea10 Mon Sep 17 00:00:00 2001 From: Antonin Date: Thu, 22 Mar 2018 18:10:24 +0100 Subject: [PATCH 12/15] Fix expire to seconds --- lib/exq/redis/job_stat.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/exq/redis/job_stat.ex b/lib/exq/redis/job_stat.ex index c0666dca..89b8518d 100644 --- a/lib/exq/redis/job_stat.ex +++ b/lib/exq/redis/job_stat.ex @@ -194,7 +194,7 @@ defmodule Exq.Redis.JobStat do ["HSET", name, "quiet", "false"], ["HSET", name, "info", Poison.encode!(%{ hostname: node_id, started_at: started_at, pid: "#{:erlang.pid_to_list(master_pid)}", concurrency: concurrency_count(queues, work_table), queues: queues})], ["HSET", name, "beat", Time.unix_seconds], - ["EXPIRE", name, (poll_timeout / 1000 + 5)] + ["EXPIRE", name, (round(poll_timeout / 1000) + 5)] ] end From a1be48ca50bf28815fbc6d8d5fc4f15754350481 Mon Sep 17 00:00:00 2001 From: Antonin Date: Tue, 3 Apr 2018 14:50:46 +0200 Subject: [PATCH 13/15] Simplify heartbeat & add timout to call --- lib/exq/manager/heartbeat_server.ex | 48 ++++++++++------------------- lib/exq/manager/server.ex | 13 ++++---- 2 files changed, 23 insertions(+), 38 deletions(-) diff --git a/lib/exq/manager/heartbeat_server.ex b/lib/exq/manager/heartbeat_server.ex index dd15a361..41f2ecfd 100644 --- a/lib/exq/manager/heartbeat_server.ex +++ b/lib/exq/manager/heartbeat_server.ex @@ -1,52 +1,38 @@ defmodule Exq.Heartbeat.Server do use GenServer alias Exq.Redis.Connection - alias Exq.Support.Config alias Exq.Redis.JobStat - defmodule State do - defstruct name: nil, node_id: nil, namespace: nil, started_at: nil, pid: nil, queues: nil, poll_timeout: nil, work_table: nil, redis: nil - end - def start_link(opts) do GenServer.start_link(__MODULE__, opts) end def init(opts) do - state = %State{ - name: Exq.Manager.Server.server_name(opts[:name]), - redis: opts[:redis], - node_id: Config.node_identifier.node_id(), - namespace: opts[:namespace], - queues: opts[:queues], - poll_timeout: opts[:poll_timeout] - } - schedule_work(state, true) - {:ok, state} + schedule_new_work(Exq.Manager.Server.server_name(opts[:name])) + {:ok} end - def handle_cast({:heartbeat, master_state, status}, state) do - schedule_work(state) - master_data = Map.from_struct(master_state) - current_state = %{struct(State, master_data) | name: state.name} - init_data = if status, do: [["DEL", "#{current_state.name}:workers"]], else: [] + def heartbeat(name, status) do + master_state = GenServer.call(name, :get_state) + init_data = if status, do: [["DEL", "#{master_state.name}:workers"]], else: [] data = init_data ++ JobStat.status_process_commands( - current_state.namespace, - current_state.node_id, - current_state.started_at, - current_state.pid, - current_state.queues, - current_state.work_table, - current_state.poll_timeout + master_state.namespace, + master_state.node_id, + master_state.started_at, + master_state.pid, + master_state.queues, + master_state.work_table, + master_state.poll_timeout ) Connection.qp!( - current_state.redis, + master_state.redis, data ) - {:noreply, current_state} end - defp schedule_work(state, status \\ false) do - Process.send_after(state.name, {:get_state, self(), status}, 1000) + defp schedule_new_work(name, status \\ false) do + :timer.sleep(1000) + heartbeat(name, status) + schedule_new_work(name) end end diff --git a/lib/exq/manager/server.ex b/lib/exq/manager/server.ex index b89eabed..d7c262f0 100644 --- a/lib/exq/manager/server.ex +++ b/lib/exq/manager/server.ex @@ -144,7 +144,8 @@ defmodule Exq.Manager.Server do # Setup queues work_table = setup_queues(opts) - state = %State{work_table: work_table, + state = %State{ + work_table: work_table, redis: opts[:redis], stats: opts[:stats], workers_sup: opts[:workers_sup], @@ -205,6 +206,10 @@ defmodule Exq.Manager.Server do {:reply, :ok, updated_state, 0} end + def handle_call(:get_state, _from, state) do + {:reply, state, state, state.poll_timeout} + end + def handle_cast({:re_enqueue_backup, queue}, state) do rescue_timeout(fn -> JobQueue.re_enqueue_backup(state.redis, state.namespace, state.node_id, queue) @@ -222,11 +227,6 @@ defmodule Exq.Manager.Server do {:noreply, updated_state, timeout} end - def handle_info({:get_state, pid, status}, state) do - GenServer.cast(pid, {:heartbeat, state, status}) - {:noreply, state} - end - def handle_info(_info, state) do {:noreply, state, state.poll_timeout} end @@ -246,7 +246,6 @@ defmodule Exq.Manager.Server do def dequeue_and_dispatch(state, queues) do rescue_timeout({state, state.poll_timeout}, fn -> jobs = Exq.Redis.JobQueue.dequeue(state.redis, state.namespace, state.node_id, queues) - job_results = jobs |> Enum.map(fn(potential_job) -> dispatch_job(state, potential_job) end) cond do From 1fc9f5c13e34d8f66d39da98256e55c87593b929 Mon Sep 17 00:00:00 2001 From: Antonin Date: Tue, 3 Apr 2018 16:44:13 +0200 Subject: [PATCH 14/15] Cast heartbeat --- lib/exq/manager/heartbeat_server.ex | 46 ++++++++++------------------- lib/exq/manager/server.ex | 6 ++-- 2 files changed, 19 insertions(+), 33 deletions(-) diff --git a/lib/exq/manager/heartbeat_server.ex b/lib/exq/manager/heartbeat_server.ex index dd15a361..688b1be6 100644 --- a/lib/exq/manager/heartbeat_server.ex +++ b/lib/exq/manager/heartbeat_server.ex @@ -4,49 +4,35 @@ defmodule Exq.Heartbeat.Server do alias Exq.Support.Config alias Exq.Redis.JobStat - defmodule State do - defstruct name: nil, node_id: nil, namespace: nil, started_at: nil, pid: nil, queues: nil, poll_timeout: nil, work_table: nil, redis: nil - end - def start_link(opts) do GenServer.start_link(__MODULE__, opts) end def init(opts) do - state = %State{ - name: Exq.Manager.Server.server_name(opts[:name]), - redis: opts[:redis], - node_id: Config.node_identifier.node_id(), - namespace: opts[:namespace], - queues: opts[:queues], - poll_timeout: opts[:poll_timeout] - } - schedule_work(state, true) - {:ok, state} + schedule_work(Exq.Manager.Server.server_name(opts[:name]), true) + {:ok, nil} end - def handle_cast({:heartbeat, master_state, status}, state) do - schedule_work(state) - master_data = Map.from_struct(master_state) - current_state = %{struct(State, master_data) | name: state.name} - init_data = if status, do: [["DEL", "#{current_state.name}:workers"]], else: [] + def handle_cast({:heartbeat, master_state, name, status}, _state) do + schedule_work(master_state.pid) + init_data = if status, do: [["DEL", "#{name}:workers"]], else: [] data = init_data ++ JobStat.status_process_commands( - current_state.namespace, - current_state.node_id, - current_state.started_at, - current_state.pid, - current_state.queues, - current_state.work_table, - current_state.poll_timeout + master_state.namespace, + master_state.node_id, + master_state.started_at, + master_state.pid, + master_state.queues, + master_state.work_table, + master_state.poll_timeout ) Connection.qp!( - current_state.redis, + master_state.redis, data ) - {:noreply, current_state} + {:noreply, nil} end - defp schedule_work(state, status \\ false) do - Process.send_after(state.name, {:get_state, self(), status}, 1000) + defp schedule_work(name, status \\ false) do + Process.send_after(name, {:get_state, self(), name, status}, 1000) end end diff --git a/lib/exq/manager/server.ex b/lib/exq/manager/server.ex index b89eabed..c54ccd62 100644 --- a/lib/exq/manager/server.ex +++ b/lib/exq/manager/server.ex @@ -222,9 +222,9 @@ defmodule Exq.Manager.Server do {:noreply, updated_state, timeout} end - def handle_info({:get_state, pid, status}, state) do - GenServer.cast(pid, {:heartbeat, state, status}) - {:noreply, state} + def handle_info({:get_state, pid, name, status}, state) do + GenServer.cast(pid, {:heartbeat, state, name, status}) + {:noreply, state, 0} end def handle_info(_info, state) do From 3a610b063b94449592cd0919875fe64874d4ef81 Mon Sep 17 00:00:00 2001 From: Antonin Date: Tue, 3 Apr 2018 17:39:55 +0200 Subject: [PATCH 15/15] Add HeartBeat tests --- lib/exq/manager/server.ex | 4 ---- test/exq_test.exs | 18 ++++++++++++++++++ 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/lib/exq/manager/server.ex b/lib/exq/manager/server.ex index bc53d385..0e1ad7f3 100644 --- a/lib/exq/manager/server.ex +++ b/lib/exq/manager/server.ex @@ -206,10 +206,6 @@ defmodule Exq.Manager.Server do {:reply, :ok, updated_state, 0} end - def handle_call(:get_state, _from, state) do - {:reply, state, state, state.poll_timeout} - end - def handle_cast({:re_enqueue_backup, queue}, state) do rescue_timeout(fn -> JobQueue.re_enqueue_backup(state.redis, state.namespace, state.node_id, queue) diff --git a/test/exq_test.exs b/test/exq_test.exs index ce65855b..5d0b2d82 100644 --- a/test/exq_test.exs +++ b/test/exq_test.exs @@ -1,6 +1,7 @@ defmodule ExqTest do use ExUnit.Case alias Exq.Redis.JobQueue + alias Exq.Redis.Connection import ExqTestUtil defmodule PerformWorker do @@ -387,4 +388,21 @@ defmodule ExqTest do stop_process(sup) end + test "Heartbeat set namespace:key data to redis" do + {:ok, sup} = Exq.start_link([]) + :timer.sleep(1100) + [head | tail] = Connection.smembers!(:testredis, "test:processes") + assert head =~ "test" + stop_process(sup) + end + + test "Simple stop" do + {:ok, sup} = Exq.start_link([]) + Exq.stop(sup) + end + + test "Stop by name" do + {:ok, _sup} = Exq.start_link([name: CustomManager]) + Exq.stop(CustomManager) + end end