diff --git a/README.md b/README.md index 75199ce6..926c6cbf 100644 --- a/README.md +++ b/README.md @@ -64,7 +64,7 @@ Add exq to your mix.exs deps (replace version with the latest hex.pm package ver defp deps do [ # ... other deps - {:exq, "~> 0.9.0"} + {:exq, "~> 0.9.1"} ] end ``` @@ -182,6 +182,25 @@ config :exq, ] ``` +### Sentinel: + +Exq by default uses [Redix](https://github.com/whatyouhide/redix) +which doesn't support Redis Sentinel. To use Sentinel, add +[RedixSentinel](https://github.com/ananthakumaran/redix_sentinel) to +the list of dependencies. Configure `:redis_worker ({module, start_link_args})` appropriately. + + +```elixir +config :exq + redis_worker: {RedixSentinel, [ + [role: "master", group: "exq", sentinels: [[host: "127.0.0.1", port: 6666]]], + [database: 0, password: nil], + [backoff: 100, timeout: 5000, name: Exq.Redis.Client, socket_opts: []] + ]} + ... +``` + + ## Using iex: If you'd like to try Exq out on the iex console, you can do this by typing ``` @@ -424,11 +443,18 @@ By default, Exq will register itself under the ```Elixir.Exq``` atom. You can c {:ok, exq} = Exq.start_link(name: Exq.Custom) ``` +## Donation + +To donate, send to: + +Bitcoin (BTC): `17j52Veb8qRmVKVvTDijVtmRXvTUpsAWHv` +Ethereum (ETH): `0xA0add27EBdB4394E15b7d1F84D4173aDE1b5fBB3` + + ## Questions? Issues? For issues, please submit a Github issue with steps on how to reproduce the problem. -For questions, stop in at the [#elixir-lang Slack group](https://elixir-slackin.herokuapp.com/) ## Contributions diff --git a/lib/exq/manager/heartbeat_server.ex b/lib/exq/manager/heartbeat_server.ex new file mode 100644 index 00000000..6c45760d --- /dev/null +++ b/lib/exq/manager/heartbeat_server.ex @@ -0,0 +1,37 @@ +defmodule Exq.Heartbeat.Server do + use GenServer + alias Exq.Redis.Connection + alias Exq.Redis.JobStat + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts) + end + + def init(opts) do + schedule_work(Exq.Manager.Server.server_name(opts[:name]), true) + {:ok, nil} + end + + def handle_cast({:heartbeat, master_state, name, status}, _state) do + schedule_work(master_state.pid) + init_data = if status, do: [["DEL", "#{name}:workers"]], else: [] + data = init_data ++ JobStat.status_process_commands( + master_state.namespace, + master_state.node_id, + master_state.started_at, + master_state.pid, + master_state.queues, + master_state.work_table, + master_state.poll_timeout + ) + Connection.qp!( + master_state.redis, + data + ) + {:noreply, nil} + end + + defp schedule_work(name, status \\ false) do + Process.send_after(name, {:get_state, self(), name, status}, 1000) + end +end diff --git a/lib/exq/manager/server.ex b/lib/exq/manager/server.ex index ab904641..0e1ad7f3 100644 --- a/lib/exq/manager/server.ex +++ b/lib/exq/manager/server.ex @@ -113,6 +113,7 @@ defmodule Exq.Manager.Server do use GenServer alias Exq.Enqueuer alias Exq.Support.Config + alias Exq.Support.Time alias Exq.Redis.JobQueue @backoff_mult 10 @@ -120,7 +121,7 @@ defmodule Exq.Manager.Server do defmodule State do defstruct redis: nil, stats: nil, enqueuer: nil, pid: nil, node_id: nil, namespace: nil, work_table: nil, queues: nil, poll_timeout: nil, scheduler_poll_timeout: nil, workers_sup: nil, - middleware: nil, metadata: nil + middleware: nil, metadata: nil, started_at: nil end def start_link(opts\\[]) do @@ -135,20 +136,16 @@ defmodule Exq.Manager.Server do def server_name(nil), do: Config.get(:name) def server_name(name), do: name - ##=========================================================== ## gen server callbacks ##=========================================================== def init(opts) do - - # Cleanup stale stats - GenServer.cast(self(), :cleanup_host_stats) - # Setup queues work_table = setup_queues(opts) - state = %State{work_table: work_table, + state = %State{ + work_table: work_table, redis: opts[:redis], stats: opts[:stats], workers_sup: opts[:workers_sup], @@ -160,10 +157,17 @@ defmodule Exq.Manager.Server do queues: opts[:queues], pid: self(), poll_timeout: opts[:poll_timeout], - scheduler_poll_timeout: opts[:scheduler_poll_timeout] + scheduler_poll_timeout: opts[:scheduler_poll_timeout], + started_at: Time.unix_seconds } check_redis_connection(opts) + + # Cleanup stale stats + rescue_timeout(fn -> + Exq.Stats.Server.cleanup_host_stats(state.stats, state.namespace, state.node_id, state.pid) + end) + {:ok, state, 0} end @@ -209,16 +213,6 @@ defmodule Exq.Manager.Server do {:noreply, state, 0} end - @doc """ - Cleanup host stats on boot - """ - def handle_cast(:cleanup_host_stats, state) do - rescue_timeout(fn -> - Exq.Stats.Server.cleanup_host_stats(state.stats, state.namespace, state.node_id) - end) - {:noreply, state, 0} - end - def handle_cast({:job_terminated, _namespace, queue, _job_serialized}, state) do update_worker_count(state.work_table, queue, -1) {:noreply, state, 0} @@ -229,6 +223,11 @@ defmodule Exq.Manager.Server do {:noreply, updated_state, timeout} end + def handle_info({:get_state, pid, name, status}, state) do + GenServer.cast(pid, {:heartbeat, state, name, status}) + {:noreply, state, 0} + end + def handle_info(_info, state) do {:noreply, state, state.poll_timeout} end @@ -248,7 +247,6 @@ defmodule Exq.Manager.Server do def dequeue_and_dispatch(state, queues) do rescue_timeout({state, state.poll_timeout}, fn -> jobs = Exq.Redis.JobQueue.dequeue(state.redis, state.namespace, state.node_id, queues) - job_results = jobs |> Enum.map(fn(potential_job) -> dispatch_job(state, potential_job) end) cond do @@ -297,7 +295,6 @@ defmodule Exq.Manager.Server do update_worker_count(state.work_table, queue, 1) end - # Setup queues from options / configs. # The following is done: @@ -365,7 +362,7 @@ defmodule Exq.Manager.Server do {:ok, _} = Exq.Redis.Connection.q(opts[:redis], ~w(PING)) catch err, reason -> - opts = Exq.Support.Opts.redis_opts(opts) + opts = Exq.Support.Opts.redis_opts(opts) |> Enum.into(Map.new) |> Map.delete(:password) raise """ \n\n\n#{String.duplicate("=", 100)} ERROR! Could not connect to Redis! diff --git a/lib/exq/redis/connection.ex b/lib/exq/redis/connection.ex index 42667132..8afe7a20 100644 --- a/lib/exq/redis/connection.ex +++ b/lib/exq/redis/connection.ex @@ -6,6 +6,7 @@ defmodule Exq.Redis.Connection do require Logger alias Exq.Support.Config + import Exq.Support.Opts, only: [redis_worker_module: 0] def flushdb!(redis) do {:ok, res} = q(redis, ["flushdb"]) @@ -27,6 +28,21 @@ defmodule Exq.Redis.Connection do val end + def hget!(redis, key, field) do + {:ok, val} = q(redis, ["HGET", key, field]) + val + end + + def hvals!(redis, key) do + {:ok, val} = q(redis, ["HVALS", key]) + val + end + + def hlen!(redis, key) do + {:ok, val} = q(redis, ["HLEN", key]) + val + end + def set!(redis, key, val \\ 0) do q(redis, ["SET", key, val]) end @@ -155,15 +171,14 @@ defmodule Exq.Redis.Connection do end def q(redis, command) do - Redix.command(redis, command, [timeout: Config.get(:redis_timeout)]) + redis_worker_module().command(redis, command, [timeout: Config.get(:redis_timeout)]) end def qp(redis, command) do - Redix.pipeline(redis, command, [timeout: Config.get(:redis_timeout)]) + redis_worker_module().pipeline(redis, command, [timeout: Config.get(:redis_timeout)]) end def qp!(redis, command) do - Redix.pipeline!(redis, command, [timeout: Config.get(:redis_timeout)]) + redis_worker_module().pipeline!(redis, command, [timeout: Config.get(:redis_timeout)]) end - end diff --git a/lib/exq/redis/job_stat.ex b/lib/exq/redis/job_stat.ex index 042dbf34..89b8518d 100644 --- a/lib/exq/redis/job_stat.ex +++ b/lib/exq/redis/job_stat.ex @@ -38,9 +38,20 @@ defmodule Exq.Redis.JobStat do {:ok, count} end - def add_process_commands(namespace, process_info, serialized_process \\ nil) do - serialized = serialized_process || Exq.Support.Process.encode(process_info) - [["SADD", JobQueue.full_key(namespace, "processes"), serialized]] + def add_process_commands(namespace, process_info, _) do + name = supervisor_worker_name(namespace, process_info) + string_pid = :erlang.pid_to_list(process_info.pid) + [ + ["SADD", JobQueue.full_key(namespace, "processes"), name], # ensure supervisor worker is added to list + ["HINCRBY", name, "busy", "1"], + ["HSET", "#{name}:workers", string_pid, Poison.encode!(%{ + run_at: process_info.started_at, + pid: string_pid, + payload: serialize_processing_payload(process_info.job), + hostname: process_info.hostname, + queue: process_info.job && process_info.job.queue + })] + ] end def add_process(redis, namespace, process_info, serialized_process \\ nil) do instr = add_process_commands(namespace, process_info, serialized_process) @@ -48,9 +59,26 @@ defmodule Exq.Redis.JobStat do :ok end - def remove_process_commands(namespace, process_info, serialized_process \\ nil) do - serialized = serialized_process || Exq.Support.Process.encode(process_info) - [["SREM", JobQueue.full_key(namespace, "processes"), serialized]] + defp serialize_processing_payload(nil) do + %{} + end + defp serialize_processing_payload(job) do + %{ + queue: job.queue, + class: job.class, + args: job.args, + jid: job.jid, + created_at: job.enqueued_at, + enqueued_at: job.enqueued_at + } + end + + def remove_process_commands(namespace, process_info, _) do + name = supervisor_worker_name(namespace, process_info) + [ + ["HINCRBY", name, "busy", "-1"], + ["HDEL", "#{name}:workers", :erlang.pid_to_list(process_info.pid)], + ] end def remove_process(redis, namespace, process_info, serialized_process \\ nil) do instr = remove_process_commands(namespace, process_info, serialized_process) @@ -58,21 +86,30 @@ defmodule Exq.Redis.JobStat do :ok end - def cleanup_processes(redis, namespace, host) do - Connection.smembers!(redis, JobQueue.full_key(namespace, "processes")) - |> Enum.map(fn(serialized) -> {Process.decode(serialized), serialized} end) - |> Enum.filter(fn({process, _}) -> process.host == host end) - |> Enum.each(fn({process, serialized}) -> remove_process(redis, namespace, process, serialized) end) + def cleanup_processes(redis, namespace, hostname, master_pid) do + processes = JobQueue.full_key(namespace, "processes") + master_pid_string = "#{:erlang.pid_to_list(master_pid)}" + instr = Connection.smembers!(redis, processes) + |> Enum.filter(fn(key) -> key =~ "#{hostname}:" end) + |> Enum.filter(fn(key) -> ((Connection.hget!(redis, key, "info") || '{}') |> Poison.decode!)["pid"] != master_pid_string end) + |> Enum.flat_map(fn(key) -> [["SREM", processes, key], ["DEL", "#{processes}:workers"]] end) + + if Enum.count(instr) > 0 do + Connection.qp!(redis, instr) + end :ok end def busy(redis, namespace) do - Connection.scard!(redis, JobQueue.full_key(namespace, "processes")) + (Connection.smembers!(redis, JobQueue.full_key(namespace, "processes")) || []) + |> Enum.map(fn(key) -> Connection.hlen!(redis, "#{key}:workers") end) + |> Enum.sum end def processes(redis, namespace) do - list = Connection.smembers!(redis, JobQueue.full_key(namespace, "processes")) || [] - Enum.map(list, &Process.decode/1) + (Connection.smembers!(redis, JobQueue.full_key(namespace, "processes")) || []) + |> Enum.flat_map(fn(key) -> Connection.hvals!(redis, "#{key}:workers") end) + |> Enum.map(&Process.decode(&1)) end def find_failed(redis, namespace, jid) do @@ -120,6 +157,10 @@ defmodule Exq.Redis.JobStat do {:ok, failures, successes} end + defp supervisor_worker_name(namespace, process_info) do + JobQueue.full_key(namespace, "#{process_info.hostname}:elixir") + end + defp realtime_stats_formatter(redis, namespace) do fn(keys, ns) -> if Enum.empty?(keys) do @@ -145,4 +186,30 @@ defmodule Exq.Redis.JobStat do val end end + + def status_process_commands(namespace, node_id, started_at, master_pid, queues, work_table, poll_timeout) do + name = redis_worker_name(namespace, node_id) + [ + ["SADD", JobQueue.full_key(namespace, "processes"), name], + ["HSET", name, "quiet", "false"], + ["HSET", name, "info", Poison.encode!(%{ hostname: node_id, started_at: started_at, pid: "#{:erlang.pid_to_list(master_pid)}", concurrency: concurrency_count(queues, work_table), queues: queues})], + ["HSET", name, "beat", Time.unix_seconds], + ["EXPIRE", name, (round(poll_timeout / 1000) + 5)] + ] + end + + defp redis_worker_name(namespace, node_id) do + JobQueue.full_key(namespace, "#{node_id}:elixir") + end + + defp concurrency_count(queues, work_table) do + Enum.map(queues, fn(q) -> + [{_, concurrency, _}] = :ets.lookup(work_table, q) + cond do + concurrency == :infinite -> 1000000 + true -> concurrency + end + end) + |> Enum.sum + end end diff --git a/lib/exq/serializers/json_serializer.ex b/lib/exq/serializers/json_serializer.ex index b3db3794..2ef91b8f 100644 --- a/lib/exq/serializers/json_serializer.ex +++ b/lib/exq/serializers/json_serializer.ex @@ -56,7 +56,9 @@ defmodule Exq.Serializers.JsonSerializer do deserialized = decode!(serialized) %Exq.Support.Process{ pid: Map.get(deserialized, "pid"), - host: Map.get(deserialized, "host"), + hostname: Map.get(deserialized, "hostname"), + queues: Map.get(deserialized, "queues"), + concurrency: Map.get(deserialized, "concurrency"), job: Map.get(deserialized, "job"), started_at: Map.get(deserialized, "started_at") } @@ -66,7 +68,9 @@ defmodule Exq.Serializers.JsonSerializer do formatted_pid = to_string(:io_lib.format("~p", [process.pid])) deserialized = Enum.into([ pid: formatted_pid, - host: process.host, + hostname: process.hostname, + queues: process.queues, + concurrency: process.concurrency, job: process.job, started_at: process.started_at], Map.new) diff --git a/lib/exq/stats/server.ex b/lib/exq/stats/server.ex index b2eabdb2..809b0ac4 100644 --- a/lib/exq/stats/server.ex +++ b/lib/exq/stats/server.ex @@ -24,9 +24,9 @@ defmodule Exq.Stats.Server do @doc """ Add in progress worker process """ - def add_process(stats, namespace, worker, host, job_serialized) do + def add_process(stats, namespace, worker, hostname, job_serialized) do process_info = %Process{pid: worker, - host: host, + hostname: hostname, job: Exq.Support.Config.serializer.decode_job(job_serialized), started_at: Time.unix_seconds} serialized = Exq.Support.Process.encode(process_info) @@ -62,8 +62,8 @@ defmodule Exq.Stats.Server do @doc """ Cleanup stats on boot. This includes cleaning up busy workers. """ - def cleanup_host_stats(stats, namespace, host) do - GenServer.call(stats, {:cleanup_host_stats, namespace, host}) + def cleanup_host_stats(stats, namespace, host, master_pid) do + GenServer.call(stats, {:cleanup_host_stats, namespace, host, master_pid}) :ok end @@ -76,7 +76,6 @@ defmodule Exq.Stats.Server do GenServer.call(stats, :force_flush) end - ##=========================================================== ## gen server callbacks ##=========================================================== @@ -102,9 +101,9 @@ defmodule Exq.Stats.Server do {:reply, :ok, state} end - def handle_call({:cleanup_host_stats, namespace, host}, _from, state) do + def handle_call({:cleanup_host_stats, namespace, host, master_pid}, _from, state) do try do - JobStat.cleanup_processes(state.redis, namespace, host) + JobStat.cleanup_processes(state.redis, namespace, host, master_pid) rescue e -> Logger.error("Error cleaning up processes - #{Kernel.inspect e}") end @@ -124,7 +123,6 @@ defmodule Exq.Stats.Server do :ok end - ##=========================================================== ## Methods ##=========================================================== diff --git a/lib/exq/support/mode.ex b/lib/exq/support/mode.ex index 4c0342f5..7a5d85a6 100644 --- a/lib/exq/support/mode.ex +++ b/lib/exq/support/mode.ex @@ -11,14 +11,13 @@ defmodule Exq.Support.Mode do Returns child list for the main Exq supervisor """ - import Exq.Support.Opts, only: [conform_opts: 1] + import Exq.Support.Opts, only: [redis_worker_opts: 1] import Supervisor.Spec def children(opts) do - {redis_opts, connection_opts, opts} = conform_opts(opts) - + {module, args, opts} = redis_worker_opts(opts) # make sure redis always first(start in order) - children = [worker(Redix, [redis_opts, connection_opts])] + children = [worker(module, args)] children = children ++ children(opts[:mode], opts) children end @@ -31,7 +30,8 @@ defmodule Exq.Support.Mode do worker(Exq.Manager.Server, [opts]), worker(Exq.WorkerDrainer.Server, [opts]), worker(Exq.Enqueuer.Server, [opts]), - worker(Exq.Api.Server, [opts]) + worker(Exq.Api.Server, [opts]), + worker(Exq.Heartbeat.Server, [opts]) ] if opts[:scheduler_enable] do diff --git a/lib/exq/support/opts.ex b/lib/exq/support/opts.ex index 4ea04229..7c030fd2 100644 --- a/lib/exq/support/opts.ex +++ b/lib/exq/support/opts.ex @@ -11,10 +11,7 @@ defmodule Exq.Support.Opts do "#{name}.Sup" |> String.to_atom end - @doc """ - Return {redis_options, redis_connection_opts, gen_server_opts} - """ - def conform_opts(opts \\ []) do + defp conform_opts(opts) do mode = opts[:mode] || Config.get(:mode) redis = redis_client_name(opts[:name]) opts = [{:redis, redis}|opts] @@ -42,6 +39,24 @@ defmodule Exq.Support.Opts do end end + @doc """ + Return {redis_module, redis_args, gen_server_opts} + """ + def redis_worker_opts(opts) do + {redis_opts, connection_opts, opts} = conform_opts(opts) + case Config.get(:redis_worker) do + {module, args} -> {module, args, opts} + _ -> {Redix, [redis_opts, connection_opts], opts} + end + end + + def redis_worker_module() do + case Config.get(:redis_worker) do + {module, _args} -> module + _ -> Redix + end + end + def connection_opts(opts \\ []) do reconnect_on_sleep = opts[:reconnect_on_sleep] || Config.get(:reconnect_on_sleep) timeout = opts[:redis_timeout] || Config.get(:redis_timeout) diff --git a/lib/exq/support/process.ex b/lib/exq/support/process.ex index c40bb16d..e90ddd99 100644 --- a/lib/exq/support/process.ex +++ b/lib/exq/support/process.ex @@ -2,7 +2,7 @@ defmodule Exq.Support.Process do @moduledoc """ Struct for in progress worker """ - defstruct pid: nil, host: nil, job: nil, started_at: nil + defstruct pid: nil, hostname: nil, job: nil, started_at: nil, concurrency: nil, job: nil, queues: nil alias Exq.Support.Config diff --git a/lib/exq/worker/server.ex b/lib/exq/worker/server.ex index 85765976..261d2e77 100644 --- a/lib/exq/worker/server.ex +++ b/lib/exq/worker/server.ex @@ -73,7 +73,6 @@ defmodule Exq.Worker.Server do {:noreply, state} end - @doc """ Dispatch work to the target module (call :perform method of target) """ diff --git a/mix.exs b/mix.exs index 85d15f56..8d6997f1 100644 --- a/mix.exs +++ b/mix.exs @@ -3,7 +3,7 @@ defmodule Exq.Mixfile do def project do [ app: :exq, - version: "0.9.0", + version: "0.9.1", elixir: "~> 1.3", elixirc_paths: ["lib"], package: [ @@ -38,6 +38,7 @@ defmodule Exq.Mixfile do { :redix, ">= 0.5.0"}, { :poison, ">= 1.2.0 or ~> 2.0"}, { :excoveralls, "~> 0.6", only: :test }, + { :redix_sentinel, "~> 0.5.0", only: :test }, { :flaky_connection, git: "https://github.com/hamiltop/flaky_connection.git", only: :test}, # docs diff --git a/mix.lock b/mix.lock index 8d7efa58..bd492b4b 100644 --- a/mix.lock +++ b/mix.lock @@ -1,4 +1,5 @@ -%{"certifi": {:hex, :certifi, "0.7.0", "861a57f3808f7eb0c2d1802afeaae0fa5de813b0df0979153cbafcd853ababaf", [:rebar3], []}, +%{ + "certifi": {:hex, :certifi, "0.7.0", "861a57f3808f7eb0c2d1802afeaae0fa5de813b0df0979153cbafcd853ababaf", [:rebar3], []}, "connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], []}, "earmark": {:hex, :earmark, "1.2.2", "f718159d6b65068e8daeef709ccddae5f7fdc770707d82e7d126f584cd925b74", [:mix], []}, "ex_doc": {:hex, :ex_doc, "0.15.1", "d5f9d588fd802152516fccfdb96d6073753f77314fcfee892b15b6724ca0d596", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, optional: false]}]}, @@ -13,7 +14,9 @@ "mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], []}, "poison": {:hex, :poison, "2.1.0", "f583218ced822675e484648fa26c933d621373f01c6c76bd00005d7bd4b82e27", [:mix], []}, "ranch": {:hex, :ranch, "1.1.0", "f7ed6d97db8c2a27cca85cacbd543558001fc5a355e93a7bff1e9a9065a8545b", [:make], []}, - "redix": {:hex, :redix, "0.5.0", "46b81bdad24d4a330cd9f9eb26d6da6c40c8e5d1ce0c1607a82b4739f8f678c4", [:mix], [{:connection, "~> 1.0", [hex: :connection, optional: false]}]}, + "redix": {:hex, :redix, "0.6.1", "20986b0e02f02b13e6f53c79a1ae70aa83147488c408f40275ec261f5bb0a6d0", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm"}, + "redix_sentinel": {:hex, :redix_sentinel, "0.5.0", "cd15dc6ff0b676c974bb2694aeb57bc4c4dfc3c5335900fd7b9cee2b1208ab64", [:mix], [{:connection, "~> 1.0.3", [hex: :connection, repo: "hexpm", optional: false]}, {:redix, "~> 0.6", [hex: :redix, repo: "hexpm", optional: false]}], "hexpm"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], []}, "ssl_verify_hostname": {:hex, :ssl_verify_hostname, "1.0.5", "2e73e068cd6393526f9fa6d399353d7c9477d6886ba005f323b592d389fb47be", [:make], []}, - "uuid": {:hex, :uuid, "1.0.1", "ebb032f2b429761540ca3e67436d1eb140206f139ddb7e1f2cc24b77cb4af45b", [:mix], []}} + "uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm"}, +} diff --git a/test/api_test.exs b/test/api_test.exs index 0b33e4a8..f5d3585e 100644 --- a/test/api_test.exs +++ b/test/api_test.exs @@ -67,8 +67,8 @@ defmodule ApiTest do test "processes with data" do JobStat.add_process(:testredis, "test", %Process{pid: self()}) assert {:ok, [processes]} = Exq.Api.processes(Exq.Api) - my_pid_str = to_string(:erlang.pid_to_list(self())) - assert %Process{pid: ^my_pid_str} = processes + pid = to_string(:erlang.pid_to_list(self())) + assert pid = processes.pid end test "jobs when empty" do diff --git a/test/config_test.exs b/test/config_test.exs index a67940c7..3fa7c8aa 100644 --- a/test/config_test.exs +++ b/test/config_test.exs @@ -1,6 +1,7 @@ defmodule Exq.ConfigTest do use ExUnit.Case require Mix.Config + import ExqTestUtil setup_all do ExqTestUtil.reset_config @@ -107,7 +108,7 @@ defmodule Exq.ConfigTest do assert client_name == nil end - test "default conform_opts" do + test "default redis_worker_opts" do Mix.Config.persist([ exq: [ queues: ["default"], @@ -120,7 +121,7 @@ defmodule Exq.ConfigTest do shutdown_timeout: 7000, ] ]) - {_redis_opts, _connection_opts, server_opts} = Exq.Support.Opts.conform_opts([mode: :default]) + {Redix, [_redis_opts, _connection_opts], server_opts} = Exq.Support.Opts.redis_worker_opts([mode: :default]) [scheduler_enable: scheduler_enable, namespace: namespace, scheduler_poll_timeout: scheduler_poll_timeout, workers_sup: workers_sup, poll_timeout: poll_timeout, enqueuer: enqueuer, metadata: metadata, stats: stats, name: name, scheduler: scheduler, queues: queues, redis: redis, concurrency: concurrency, middleware: middleware, @@ -145,18 +146,26 @@ defmodule Exq.ConfigTest do assert mode == :default Mix.Config.persist([exq: [queues: [{"default", 1000}, {"test1", 2000}]]]) - {_redis_opts, _connection_opts, server_opts} = Exq.Support.Opts.conform_opts([mode: :default]) + {Redix, [_redis_opts, _connection_opts], server_opts} = Exq.Support.Opts.redis_worker_opts([mode: :default]) assert server_opts[:queues] == ["default", "test1"] assert server_opts[:concurrency] == [{"default", 1000, 0}, {"test1", 2000, 0}] end - test "api conform_opts" do + test "api redis_worker_opts" do Mix.Config.persist([exq: []]) - {_redis_opts, _connection_opts, server_opts} = Exq.Support.Opts.conform_opts([mode: :api]) + {Redix, [_redis_opts, _connection_opts], server_opts} = Exq.Support.Opts.redis_worker_opts([mode: :api]) [name: name, namespace: namespace, redis: redis, mode: mode] = server_opts assert namespace == "test" assert name == nil assert redis == Exq.Redis.Client assert mode == :api end + + test "custom redis module" do + with_application_env(:exq, :redis_worker, {RedisWorker, [1, 2]}, fn -> + {module, args, server_opts} = Exq.Support.Opts.redis_worker_opts([mode: :default]) + assert module == RedisWorker + assert args == [1, 2] + end) + end end diff --git a/test/exq_test.exs b/test/exq_test.exs index 3799554a..5d0b2d82 100644 --- a/test/exq_test.exs +++ b/test/exq_test.exs @@ -1,6 +1,7 @@ defmodule ExqTest do use ExUnit.Case alias Exq.Redis.JobQueue + alias Exq.Redis.Connection import ExqTestUtil defmodule PerformWorker do @@ -78,6 +79,21 @@ defmodule ExqTest do stop_process(sup) end + test "enqueue and run job via redis sentinel" do + sentinel_args = [ + [role: "master", group: "exq", sentinels: [[host: "127.0.0.1", port: 6666]]], + [database: 0, password: nil], + [backoff: 100, timeout: 5000, name: Exq.Redis.Client, socket_opts: []] + ] + with_application_env(:exq, :redis_worker, {RedixSentinel, sentinel_args}, fn -> + Process.register(self(), :exqtest) + {:ok, sup} = Exq.start_link + {:ok, _} = Exq.enqueue(Exq, "default", ExqTest.PerformWorker, []) + assert_receive {:worked} + stop_process(sup) + end) + end + test "run jobs from backup queue on boot" do host = elem(:inet.gethostname(), 1) Process.register(self(), :exqtest) @@ -266,7 +282,7 @@ defmodule ExqTest do # Clear processes for this node host = Exq.NodeIdentifier.HostnameIdentifier.node_id() - Exq.Stats.Server.cleanup_host_stats(ExqP.Stats, "test", host) + Exq.Stats.Server.cleanup_host_stats(ExqP.Stats, "test", host, self()) # Check that process has been cleared processes = Exq.Redis.JobStat.processes(state.redis, "test") @@ -305,7 +321,6 @@ defmodule ExqTest do {:ok, count} = TestStats.failed_count(state.redis, "test") assert count == "2" - {:ok, jid} = Exq.enqueue(Exq, "default", "ExqTest.FailWorker/failure_perform", []) # if we kill Exq too fast we dont record the failure because exq is gone @@ -373,4 +388,21 @@ defmodule ExqTest do stop_process(sup) end + test "Heartbeat set namespace:key data to redis" do + {:ok, sup} = Exq.start_link([]) + :timer.sleep(1100) + [head | tail] = Connection.smembers!(:testredis, "test:processes") + assert head =~ "test" + stop_process(sup) + end + + test "Simple stop" do + {:ok, sup} = Exq.start_link([]) + Exq.stop(sup) + end + + test "Stop by name" do + {:ok, _sup} = Exq.start_link([name: CustomManager]) + Exq.stop(CustomManager) + end end diff --git a/test/job_stat_test.exs b/test/job_stat_test.exs index 77d141ef..4c91d11a 100644 --- a/test/job_stat_test.exs +++ b/test/job_stat_test.exs @@ -28,9 +28,9 @@ defmodule JobStatTest do {:ok, jid} end - def create_process_info(host) do + def create_process_info(hostname) do process_info = %Process{pid: self(), - host: host, + hostname: hostname, job: %Job{}, started_at: Time.unix_seconds} serialized = Exq.Support.Process.encode(process_info) @@ -122,11 +122,10 @@ defmodule JobStatTest do assert Enum.count(Exq.Redis.JobStat.processes(:testredis, namespace)) == 2 # Should cleanup only the host that is passed in - JobStat.cleanup_processes(:testredis, namespace, "host123") + JobStat.cleanup_processes(:testredis, namespace, "host123", self()) processes = Exq.Redis.JobStat.processes(:testredis, namespace) assert Enum.count(processes) == 1 - assert Enum.find(processes, fn(process) -> process.host == "host456" end) != nil + assert Enum.find(processes, fn(process) -> process.hostname == "host456" end) != nil end - end diff --git a/test/test-sentinel.conf b/test/test-sentinel.conf new file mode 100644 index 00000000..250c50c5 --- /dev/null +++ b/test/test-sentinel.conf @@ -0,0 +1,6 @@ +port 6666 +daemonize yes +logfile stdout +pidfile /tmp/resquex-redis-sentinel.pid + +sentinel monitor exq 127.0.0.1 6555 1 diff --git a/test/test_helper.exs b/test/test_helper.exs index a6b1ac85..39650a9f 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -85,6 +85,16 @@ defmodule ExqTestUtil do config = Mix.Config.read!(Path.join([Path.dirname(__DIR__), "config", "config.exs"])) Mix.Config.persist(config) end + + def with_application_env(app, key, new, context) do + old = Application.get_env(app, key) + Application.put_env(app, key, new) + try do + context.() + after + Application.put_env(app, key, old) + end + end end defmodule TestRedis do @@ -96,13 +106,15 @@ defmodule TestRedis do def start do unless Config.get(:test_with_local_redis) == false do [] = :os.cmd('redis-server test/test-redis.conf') - :timer.sleep(100) + [] = :os.cmd('redis-server test/test-sentinel.conf --sentinel') + :timer.sleep(500) end end def stop do unless Config.get(:test_with_local_redis) == false do [] = :os.cmd('redis-cli -p 6555 shutdown') + [] = :os.cmd('redis-cli -p 6666 shutdown') end end