Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Add exq to your mix.exs deps (replace version with the latest hex.pm package ver
defp deps do
[
# ... other deps
{:exq, "~> 0.9.0"}
{:exq, "~> 0.9.1"}
]
end
```
Expand Down Expand Up @@ -182,6 +182,25 @@ config :exq,
]
```

### Sentinel:

Exq by default uses [Redix](https://github.com/whatyouhide/redix)
which doesn't support Redis Sentinel. To use Sentinel, add
[RedixSentinel](https://github.com/ananthakumaran/redix_sentinel) to
the list of dependencies. Configure `:redis_worker ({module, start_link_args})` appropriately.


```elixir
config :exq
redis_worker: {RedixSentinel, [
[role: "master", group: "exq", sentinels: [[host: "127.0.0.1", port: 6666]]],
[database: 0, password: nil],
[backoff: 100, timeout: 5000, name: Exq.Redis.Client, socket_opts: []]
]}
...
```


## Using iex:
If you'd like to try Exq out on the iex console, you can do this by typing
```
Expand Down Expand Up @@ -424,11 +443,18 @@ By default, Exq will register itself under the ```Elixir.Exq``` atom. You can c
{:ok, exq} = Exq.start_link(name: Exq.Custom)
```

## Donation

To donate, send to:

Bitcoin (BTC): `17j52Veb8qRmVKVvTDijVtmRXvTUpsAWHv`
Ethereum (ETH): `0xA0add27EBdB4394E15b7d1F84D4173aDE1b5fBB3`


## Questions? Issues?

For issues, please submit a Github issue with steps on how to reproduce the problem.

For questions, stop in at the [#elixir-lang Slack group](https://elixir-slackin.herokuapp.com/)

## Contributions

Expand Down
37 changes: 37 additions & 0 deletions lib/exq/manager/heartbeat_server.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
defmodule Exq.Heartbeat.Server do
use GenServer
alias Exq.Redis.Connection
alias Exq.Redis.JobStat

def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end

def init(opts) do
schedule_work(Exq.Manager.Server.server_name(opts[:name]), true)
{:ok, nil}
end

def handle_cast({:heartbeat, master_state, name, status}, _state) do
schedule_work(master_state.pid)
init_data = if status, do: [["DEL", "#{name}:workers"]], else: []
data = init_data ++ JobStat.status_process_commands(
master_state.namespace,
master_state.node_id,
master_state.started_at,
master_state.pid,
master_state.queues,
master_state.work_table,
master_state.poll_timeout
)
Connection.qp!(
master_state.redis,
data
)
{:noreply, nil}
end

defp schedule_work(name, status \\ false) do
Process.send_after(name, {:get_state, self(), name, status}, 1000)
end
end
39 changes: 18 additions & 21 deletions lib/exq/manager/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,15 @@ defmodule Exq.Manager.Server do
use GenServer
alias Exq.Enqueuer
alias Exq.Support.Config
alias Exq.Support.Time
alias Exq.Redis.JobQueue

@backoff_mult 10

defmodule State do
defstruct redis: nil, stats: nil, enqueuer: nil, pid: nil, node_id: nil, namespace: nil, work_table: nil,
queues: nil, poll_timeout: nil, scheduler_poll_timeout: nil, workers_sup: nil,
middleware: nil, metadata: nil
middleware: nil, metadata: nil, started_at: nil
end

def start_link(opts\\[]) do
Expand All @@ -135,20 +136,16 @@ defmodule Exq.Manager.Server do
def server_name(nil), do: Config.get(:name)
def server_name(name), do: name


##===========================================================
## gen server callbacks
##===========================================================

def init(opts) do

# Cleanup stale stats
GenServer.cast(self(), :cleanup_host_stats)

# Setup queues
work_table = setup_queues(opts)

state = %State{work_table: work_table,
state = %State{
work_table: work_table,
redis: opts[:redis],
stats: opts[:stats],
workers_sup: opts[:workers_sup],
Expand All @@ -160,10 +157,17 @@ defmodule Exq.Manager.Server do
queues: opts[:queues],
pid: self(),
poll_timeout: opts[:poll_timeout],
scheduler_poll_timeout: opts[:scheduler_poll_timeout]
scheduler_poll_timeout: opts[:scheduler_poll_timeout],
started_at: Time.unix_seconds
}

check_redis_connection(opts)

# Cleanup stale stats
rescue_timeout(fn ->
Exq.Stats.Server.cleanup_host_stats(state.stats, state.namespace, state.node_id, state.pid)
end)

{:ok, state, 0}
end

Expand Down Expand Up @@ -209,16 +213,6 @@ defmodule Exq.Manager.Server do
{:noreply, state, 0}
end

@doc """
Cleanup host stats on boot
"""
def handle_cast(:cleanup_host_stats, state) do
rescue_timeout(fn ->
Exq.Stats.Server.cleanup_host_stats(state.stats, state.namespace, state.node_id)
end)
{:noreply, state, 0}
end

def handle_cast({:job_terminated, _namespace, queue, _job_serialized}, state) do
update_worker_count(state.work_table, queue, -1)
{:noreply, state, 0}
Expand All @@ -229,6 +223,11 @@ defmodule Exq.Manager.Server do
{:noreply, updated_state, timeout}
end

def handle_info({:get_state, pid, name, status}, state) do
GenServer.cast(pid, {:heartbeat, state, name, status})
{:noreply, state, 0}
end

def handle_info(_info, state) do
{:noreply, state, state.poll_timeout}
end
Expand All @@ -248,7 +247,6 @@ defmodule Exq.Manager.Server do
def dequeue_and_dispatch(state, queues) do
rescue_timeout({state, state.poll_timeout}, fn ->
jobs = Exq.Redis.JobQueue.dequeue(state.redis, state.namespace, state.node_id, queues)

job_results = jobs |> Enum.map(fn(potential_job) -> dispatch_job(state, potential_job) end)

cond do
Expand Down Expand Up @@ -297,7 +295,6 @@ defmodule Exq.Manager.Server do
update_worker_count(state.work_table, queue, 1)
end


# Setup queues from options / configs.

# The following is done:
Expand Down Expand Up @@ -365,7 +362,7 @@ defmodule Exq.Manager.Server do
{:ok, _} = Exq.Redis.Connection.q(opts[:redis], ~w(PING))
catch
err, reason ->
opts = Exq.Support.Opts.redis_opts(opts)
opts = Exq.Support.Opts.redis_opts(opts) |> Enum.into(Map.new) |> Map.delete(:password)
raise """
\n\n\n#{String.duplicate("=", 100)}
ERROR! Could not connect to Redis!
Expand Down
23 changes: 19 additions & 4 deletions lib/exq/redis/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Exq.Redis.Connection do
require Logger

alias Exq.Support.Config
import Exq.Support.Opts, only: [redis_worker_module: 0]

def flushdb!(redis) do
{:ok, res} = q(redis, ["flushdb"])
Expand All @@ -27,6 +28,21 @@ defmodule Exq.Redis.Connection do
val
end

def hget!(redis, key, field) do
{:ok, val} = q(redis, ["HGET", key, field])
val
end

def hvals!(redis, key) do
{:ok, val} = q(redis, ["HVALS", key])
val
end

def hlen!(redis, key) do
{:ok, val} = q(redis, ["HLEN", key])
val
end

def set!(redis, key, val \\ 0) do
q(redis, ["SET", key, val])
end
Expand Down Expand Up @@ -155,15 +171,14 @@ defmodule Exq.Redis.Connection do
end

def q(redis, command) do
Redix.command(redis, command, [timeout: Config.get(:redis_timeout)])
redis_worker_module().command(redis, command, [timeout: Config.get(:redis_timeout)])
end

def qp(redis, command) do
Redix.pipeline(redis, command, [timeout: Config.get(:redis_timeout)])
redis_worker_module().pipeline(redis, command, [timeout: Config.get(:redis_timeout)])
end

def qp!(redis, command) do
Redix.pipeline!(redis, command, [timeout: Config.get(:redis_timeout)])
redis_worker_module().pipeline!(redis, command, [timeout: Config.get(:redis_timeout)])
end

end
95 changes: 81 additions & 14 deletions lib/exq/redis/job_stat.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,41 +38,78 @@ defmodule Exq.Redis.JobStat do
{:ok, count}
end

def add_process_commands(namespace, process_info, serialized_process \\ nil) do
serialized = serialized_process || Exq.Support.Process.encode(process_info)
[["SADD", JobQueue.full_key(namespace, "processes"), serialized]]
def add_process_commands(namespace, process_info, _) do
name = supervisor_worker_name(namespace, process_info)
string_pid = :erlang.pid_to_list(process_info.pid)
[
["SADD", JobQueue.full_key(namespace, "processes"), name], # ensure supervisor worker is added to list
["HINCRBY", name, "busy", "1"],
["HSET", "#{name}:workers", string_pid, Poison.encode!(%{
run_at: process_info.started_at,
pid: string_pid,
payload: serialize_processing_payload(process_info.job),
hostname: process_info.hostname,
queue: process_info.job && process_info.job.queue
})]
]
end
def add_process(redis, namespace, process_info, serialized_process \\ nil) do
instr = add_process_commands(namespace, process_info, serialized_process)
Connection.qp!(redis, instr)
:ok
end

def remove_process_commands(namespace, process_info, serialized_process \\ nil) do
serialized = serialized_process || Exq.Support.Process.encode(process_info)
[["SREM", JobQueue.full_key(namespace, "processes"), serialized]]
defp serialize_processing_payload(nil) do
%{}
end
defp serialize_processing_payload(job) do
%{
queue: job.queue,
class: job.class,
args: job.args,
jid: job.jid,
created_at: job.enqueued_at,
enqueued_at: job.enqueued_at
}
end

def remove_process_commands(namespace, process_info, _) do
name = supervisor_worker_name(namespace, process_info)
[
["HINCRBY", name, "busy", "-1"],
["HDEL", "#{name}:workers", :erlang.pid_to_list(process_info.pid)],
]
end
def remove_process(redis, namespace, process_info, serialized_process \\ nil) do
instr = remove_process_commands(namespace, process_info, serialized_process)
Connection.qp!(redis, instr)
:ok
end

def cleanup_processes(redis, namespace, host) do
Connection.smembers!(redis, JobQueue.full_key(namespace, "processes"))
|> Enum.map(fn(serialized) -> {Process.decode(serialized), serialized} end)
|> Enum.filter(fn({process, _}) -> process.host == host end)
|> Enum.each(fn({process, serialized}) -> remove_process(redis, namespace, process, serialized) end)
def cleanup_processes(redis, namespace, hostname, master_pid) do
processes = JobQueue.full_key(namespace, "processes")
master_pid_string = "#{:erlang.pid_to_list(master_pid)}"
instr = Connection.smembers!(redis, processes)
|> Enum.filter(fn(key) -> key =~ "#{hostname}:" end)
|> Enum.filter(fn(key) -> ((Connection.hget!(redis, key, "info") || '{}') |> Poison.decode!)["pid"] != master_pid_string end)
|> Enum.flat_map(fn(key) -> [["SREM", processes, key], ["DEL", "#{processes}:workers"]] end)

if Enum.count(instr) > 0 do
Connection.qp!(redis, instr)
end
:ok
end

def busy(redis, namespace) do
Connection.scard!(redis, JobQueue.full_key(namespace, "processes"))
(Connection.smembers!(redis, JobQueue.full_key(namespace, "processes")) || [])
|> Enum.map(fn(key) -> Connection.hlen!(redis, "#{key}:workers") end)
|> Enum.sum
end

def processes(redis, namespace) do
list = Connection.smembers!(redis, JobQueue.full_key(namespace, "processes")) || []
Enum.map(list, &Process.decode/1)
(Connection.smembers!(redis, JobQueue.full_key(namespace, "processes")) || [])
|> Enum.flat_map(fn(key) -> Connection.hvals!(redis, "#{key}:workers") end)
|> Enum.map(&Process.decode(&1))
end

def find_failed(redis, namespace, jid) do
Expand Down Expand Up @@ -120,6 +157,10 @@ defmodule Exq.Redis.JobStat do
{:ok, failures, successes}
end

defp supervisor_worker_name(namespace, process_info) do
JobQueue.full_key(namespace, "#{process_info.hostname}:elixir")
end

defp realtime_stats_formatter(redis, namespace) do
fn(keys, ns) ->
if Enum.empty?(keys) do
Expand All @@ -145,4 +186,30 @@ defmodule Exq.Redis.JobStat do
val
end
end

def status_process_commands(namespace, node_id, started_at, master_pid, queues, work_table, poll_timeout) do
name = redis_worker_name(namespace, node_id)
[
["SADD", JobQueue.full_key(namespace, "processes"), name],
["HSET", name, "quiet", "false"],
["HSET", name, "info", Poison.encode!(%{ hostname: node_id, started_at: started_at, pid: "#{:erlang.pid_to_list(master_pid)}", concurrency: concurrency_count(queues, work_table), queues: queues})],
["HSET", name, "beat", Time.unix_seconds],
["EXPIRE", name, (round(poll_timeout / 1000) + 5)]
]
end

defp redis_worker_name(namespace, node_id) do
JobQueue.full_key(namespace, "#{node_id}:elixir")
end

defp concurrency_count(queues, work_table) do
Enum.map(queues, fn(q) ->
[{_, concurrency, _}] = :ets.lookup(work_table, q)
cond do
concurrency == :infinite -> 1000000
true -> concurrency
end
end)
|> Enum.sum
end
end
Loading