Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ the ring to change.

The whitelist and blacklist only have an effect when `monitor_nodes: true`.

It is possible to have the ring wait until an application starts before it is included in the ring.
This can be accomplished by setting `wait_for_readiness: true` and listing the app dependencies in
`readiness_deps: [:app1, :app2]`.

## Configuration

Below is an example configuration:
Expand All @@ -113,7 +117,9 @@ config :libring,
# but does not allow nodes named "a" or "remsh*" to be added to the ring
ring_a: [monitor_nodes: true,
node_type: :visible,
node_blacklist: ["a", ~r/^remsh.*$/]],
node_blacklist: ["a", ~r/^remsh.*$/],
wait_for_readiness: true,
readiness_deps: [:myapp]],
# A ring which is composed of three nodes, of which "c" has a non-default weight of 200
# The default weight is 128
ring_b: [nodes: ["a", "b", {"c", 200}]]
Expand Down
25 changes: 22 additions & 3 deletions lib/managed_ring.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ defmodule HashRing.Managed do
@type weight :: pos_integer
@type node_list :: [term() | {term(), weight}]
@type pattern_list :: [String.t() | Regex.t()]
@type app_list :: [atom()]
@type ring_options :: [
nodes: node_list,
monitor_nodes: boolean,
node_blacklist: pattern_list,
node_whitelist: pattern_list,
node_type: :all | :hidden | :visible
node_type: :all | :hidden | :visible,
wait_for_readiness: boolean,
readiness_deps: app_list
]

@type child_spec_options :: [
Expand All @@ -41,10 +44,22 @@ defmodule HashRing.Managed do
:nodes => node_list,
:monitor_nodes => boolean,
:node_blacklist => pattern_list,
:node_whitelist => pattern_list
:node_whitelist => pattern_list,
:node_type => :all | :hidden | :visible,
:wait_for_readiness => boolean,
:readiness_deps: app_list,
]

@valid_ring_opts [:name, :nodes, :monitor_nodes, :node_blacklist, :node_whitelist, :node_type]
@valid_ring_opts [
:name,
:nodes,
:monitor_nodes,
:node_blacklist,
:node_whitelist,
:node_type,
:wait_for_readiness,
:readiness_deps
]

@spec child_spec(child_spec_options) :: Supervisor.child_spec
def child_spec(opts) do
Expand Down Expand Up @@ -76,6 +91,8 @@ defmodule HashRing.Managed do
is provided, the blacklist has no effect.
* `node_whitelist: [String.t | Regex.t]` - The same as `node_blacklist`, except the opposite; only nodes
which match a pattern in the whitelist will result in the ring being updated.
* `wait_for_readiness: boolean` - Wait for apps listed in `readiness_deps` to start before adding to the ring.
* `readiness_deps: [atom]` - List of dependency apps that need to start before the node is considered ready.
* `node_type: :all | :hidden | :visible`: refers what kind of nodes will be monitored
when `monitor_nodes` is `true`. For more information, see `:net_kernel.monitor_nodes/2`.

Expand Down Expand Up @@ -111,6 +128,8 @@ defmodule HashRing.Managed do
:node_blacklist when is_list(value) -> false
:node_whitelist when is_list(value) -> false
:node_type when value in [:all, :hidden, :visible] -> false
:wait_for_readiness when is_boolean(value) -> false
:readiness_deps when is_list(value) -> false
_ -> true
end
end)
Expand Down
205 changes: 184 additions & 21 deletions lib/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@ defmodule HashRing.Worker do
@moduledoc false
use GenServer

@erpc_timeout 500
@node_readiness_check_interval :timer.seconds(1)

defstruct [
:table,
:node_blacklist,
:node_whitelist,
:wait_for_readiness,
:readiness_deps_set
]

alias __MODULE__, as: State

def nodes(pid_or_name)

def nodes(pid) when is_pid(pid) do
Expand All @@ -14,7 +27,7 @@ defmodule HashRing.Worker do
|> get_ring()
|> HashRing.nodes()
rescue
ArgumentError ->
ArgumentError ->
{:error, :no_such_ring}
end

Expand Down Expand Up @@ -69,55 +82,135 @@ defmodule HashRing.Worker do
nodes = [Node.self() | Node.list(:connected)]
node_blacklist = Keyword.get(options, :node_blacklist, [~r/^remsh.*$/, ~r/^rem-.*$/])
node_whitelist = Keyword.get(options, :node_whitelist, [])
wait_for_readiness = Keyword.get(options, :wait_for_readiness, false)
readiness_deps_set = Keyword.get(options, :readiness_deps, []) |> MapSet.new()

ring =
Enum.reduce(nodes, ring, fn node, acc ->
cond do
HashRing.Utils.ignore_node?(node, node_blacklist, node_whitelist) ->
acc

:else ->
if HashRing.Utils.ignore_node?(node, node_blacklist, node_whitelist) do
acc
else
if wait_for_readiness do
if node_ready?(node, readiness_deps_set) do
HashRing.add_node(acc, node)
else
schedule_check_for_node_readiness(node)
acc
end
else
HashRing.add_node(acc, node)
end
end
end)

node_type = Keyword.get(options, :node_type, :all)
:ok = :net_kernel.monitor_nodes(true, node_type: node_type)
true = :ets.insert_new(table, {:ring, ring})
{:ok, {table, node_blacklist, node_whitelist}}

{:ok,
%State{
table: table,
node_blacklist: node_blacklist,
node_whitelist: node_whitelist,
wait_for_readiness: wait_for_readiness,
readiness_deps_set: readiness_deps_set
}}

:else ->
nodes = Keyword.get(options, :nodes, [])
ring = HashRing.add_nodes(ring, nodes)
true = :ets.insert_new(table, {:ring, ring})
{:ok, {table, [], []}}

{:ok,
%State{
table: table,
node_blacklist: [],
node_whitelist: [],
wait_for_readiness: false,
readiness_deps_set: MapSet.new()
}}
end
end

def handle_call(:list_nodes, _from, {table, _b, _w} = state) do
def handle_call(:list_nodes, _from, %State{table: table} = state) do
{:reply, HashRing.nodes(get_ring(table)), state}
end

def handle_call({:key_to_node, key}, _from, {table, _b, _w} = state) do
def handle_call({:key_to_node, key}, _from, %State{table: table} = state) do
{:reply, HashRing.key_to_node(get_ring(table), key), state}
end

def handle_call({:add_node, node}, _from, {table, _b, _w} = state) do
get_ring(table) |> HashRing.add_node(node) |> update_ring(table)
def handle_call(
{:add_node, node},
_from,
%State{
table: table,
wait_for_readiness: wait_for_readiness,
readiness_deps_set: readiness_deps_set
} = state
) do
if wait_for_readiness and not node_ready?(node, readiness_deps_set) do
schedule_check_for_node_readiness(node)
else
get_ring(table) |> HashRing.add_node(node) |> update_ring(table)
end

{:reply, :ok, state}
end

def handle_call({:add_node, node, weight}, _from, {table, _b, _w} = state) do
get_ring(table) |> HashRing.add_node(node, weight) |> update_ring(table)
def handle_call(
{:add_node, node, weight},
_from,
%State{
table: table,
wait_for_readiness: wait_for_readiness,
readiness_deps_set: readiness_deps_set
} = state
) do
if wait_for_readiness and not node_ready?(node, readiness_deps_set) do
schedule_check_for_node_readiness({node, weight})
else
get_ring(table) |> HashRing.add_node(node, weight) |> update_ring(table)
end

{:reply, :ok, state}
end

def handle_call({:add_nodes, nodes}, _from, {table, _b, _w} = state) do
get_ring(table) |> HashRing.add_nodes(nodes) |> update_ring(table)
def handle_call(
{:add_nodes, nodes},
_from,
%State{
table: table,
wait_for_readiness: wait_for_readiness,
readiness_deps_set: readiness_deps_set
} = state
) do
if wait_for_readiness do
%{true: ready_nodes, false: starting_nodes} =
Enum.group_by(
nodes,
fn
{node, _weight} ->
node_ready?(node, readiness_deps_set)

node ->
node_ready?(node, readiness_deps_set)
end
)

get_ring(table) |> HashRing.add_nodes(ready_nodes) |> update_ring(table)

for starting_node <- starting_nodes do
schedule_check_for_node_readiness(starting_node)
end
else
get_ring(table) |> HashRing.add_nodes(nodes) |> update_ring(table)
end

{:reply, :ok, state}
end

def handle_call({:remove_node, node}, _from, {table, _b, _w} = state) do
def handle_call({:remove_node, node}, _from, %State{table: table} = state) do
get_ring(table) |> HashRing.remove_node(node) |> update_ring(table)
{:reply, :ok, state}
end
Expand All @@ -127,19 +220,62 @@ defmodule HashRing.Worker do
{:stop, :shutdown, state}
end

def handle_info({:nodeup, node, _info}, {table, b, w} = state) do
def handle_info(
{:nodeup, node, _info},
%State{
table: table,
node_blacklist: b,
node_whitelist: w,
wait_for_readiness: wait_for_readiness,
readiness_deps_set: readiness_deps_set
} = state
) do
unless HashRing.Utils.ignore_node?(node, b, w) do
get_ring(table) |> HashRing.add_node(node) |> update_ring(table)
if wait_for_readiness and not node_ready?(node, readiness_deps_set) do
schedule_check_for_node_readiness(node)
else
get_ring(table) |> HashRing.add_node(node) |> update_ring(table)
end
end

{:noreply, state}
end

def handle_info({:nodedown, node, _info}, state = {table, _b, _w}) do
def handle_info({:nodedown, node, _info}, %State{table: table} = state) do
get_ring(table) |> HashRing.remove_node(node) |> update_ring(table)
{:noreply, state}
end

def handle_info(
{:check_node_readiness, node, weight},
%State{table: table, readiness_deps_set: readiness_deps_set} = state
) do
if node_ready?(node, readiness_deps_set) do
get_ring(table) |> HashRing.add_node(node, weight) |> update_ring(table)
else
schedule_check_for_node_readiness({node, weight})
end

{:noreply, state}
end

def handle_info(
{:check_node_readiness, node},
%State{table: table, readiness_deps_set: readiness_deps_set} = state
) do
if node_ready?(node, readiness_deps_set) do
get_ring(table) |> HashRing.add_node(node) |> update_ring(table)
else
schedule_check_for_node_readiness(node)
end

{:noreply, state}
end

def handle_info(_msg, state) do
{:noreply, state}
end

defp get_ets_name(name), do: :"libring_#{name}"

defp do_call(pid_or_name, msg)
Expand All @@ -160,6 +296,33 @@ defmodule HashRing.Worker do

defp get_ring(table), do: :ets.lookup_element(table, :ring, 2)

defp update_ring(ring, table),
defp update_ring(ring, table),
do: :ets.update_element(table, :ring, {2, ring})

defp get_started_apps_set(node) do
try do
:erpc.call(node, Application, :started_applications, [], @erpc_timeout)
|> Enum.map(&elem(&1, 0))
|> MapSet.new()
rescue
_e -> MapSet.new()
end
end

defp node_ready?(node, readiness_deps_set) do
MapSet.difference(readiness_deps_set, get_started_apps_set(node))
|> MapSet.equal?(MapSet.new())
end

defp schedule_check_for_node_readiness({node, weight}) do
if node in Node.list() do
:timer.send_after(@node_readiness_check_interval, {:check_node_readiness, node, weight})
end
end

defp schedule_check_for_node_readiness(node) do
if node in Node.list() do
:timer.send_after(@node_readiness_check_interval, {:check_node_readiness, node})
end
end
end
5 changes: 3 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ defmodule HashRing.Mixfile do

{:ex_doc, ">= 0.0.0", only: [:docs]},
{:benchee, "~> 1.0", only: [:dev]},
{:dialyxir, "~> 1.0", only: [:test], runtime: false},
{:stream_data, "~> 0.5", only: [:test]}
{:dialyxir, "~> 1.0", only: [:dev], runtime: false},
{:stream_data, "~> 0.5", only: [:test]},
{:local_cluster, "~> 1.2", only: [:test]}
]
end

Expand Down
2 changes: 2 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
"earmark_parser": {:hex, :earmark_parser, "1.4.18", "e1b2be73eb08a49fb032a0208bf647380682374a725dfb5b9e510def8397f6f2", [:mix], [], "hexpm", "114a0e85ec3cf9e04b811009e73c206394ffecfcc313e0b346de0d557774ee97"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.26.0", "1922164bac0b18b02f84d6f69cab1b93bc3e870e2ad18d5dacb50a9e06b542a3", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2775d66e494a9a48355db7867478ffd997864c61c65a47d31c4949459281c78d"},
"global_flags": {:hex, :global_flags, "1.0.0", "ee6b864979a1fb38d1fbc67838565644baf632212bce864adca21042df036433", [:rebar3], [], "hexpm", "85d944cecd0f8f96b20ce70b5b16ebccedfcd25e744376b131e89ce61ba93176"},
"local_cluster": {:hex, :local_cluster, "1.2.1", "8eab3b8a387680f0872eacfb1a8bd5a91cb1d4d61256eec6a655b07ac7030c73", [:mix], [{:global_flags, "~> 1.0", [hex: :global_flags, repo: "hexpm", optional: false]}], "hexpm", "aae80c9bc92c911cb0be085fdeea2a9f5b88f81b6bec2ff1fec244bb0acc232c"},
"makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"},
"makeup_elixir": {:hex, :makeup_elixir, "0.15.2", "dc72dfe17eb240552857465cc00cce390960d9a0c055c4ccd38b70629227e97c", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "fd23ae48d09b32eff49d4ced2b43c9f086d402ee4fd4fcb2d7fad97fa8823e75"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
Expand Down
Loading