Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/elixir.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:

jobs:
test:
runs-on: ubuntu-20.04
runs-on: ubuntu-24.04
env:
MIX_ENV: test
strategy:
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,10 @@ config :libring,
rings: [
# A ring which automatically changes based on Erlang cluster membership,
# but does not allow nodes named "a" or "remsh*" to be added to the ring
# All nodes without explicit weights will have a weight of 256
ring_a: [monitor_nodes: true,
node_type: :visible,
node_weight: 256,
node_blacklist: ["a", ~r/^remsh.*$/]],
# A ring which is composed of three nodes, of which "c" has a non-default weight of 200
# The default weight is 128
Expand Down
20 changes: 18 additions & 2 deletions lib/managed_ring.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ defmodule HashRing.Managed do
monitor_nodes: boolean,
node_blacklist: pattern_list,
node_whitelist: pattern_list,
node_type: :all | :hidden | :visible
node_type: :all | :hidden | :visible,
node_weight: pos_integer
]

@type child_spec_option ::
Expand All @@ -42,10 +43,19 @@ defmodule HashRing.Managed do
| {:monitor_nodes, boolean}
| {:node_blacklist, pattern_list}
| {:node_whitelist, pattern_list}
| {:node_weight, pos_integer}

@type child_spec_options :: [child_spec_option()]

@valid_ring_opts [:name, :nodes, :monitor_nodes, :node_blacklist, :node_whitelist, :node_type]
@valid_ring_opts [
:name,
:nodes,
:monitor_nodes,
:node_blacklist,
:node_whitelist,
:node_type,
:node_weight
]

@spec child_spec(child_spec_options) :: Supervisor.child_spec()
def child_spec(opts) do
Expand Down Expand Up @@ -82,6 +92,11 @@ defmodule HashRing.Managed do
which match a pattern in the whitelist will result in the ring being updated.
* `node_type: :all | :hidden | :visible`: refers what kind of nodes will be monitored
when `monitor_nodes` is `true`. For more information, see `:net_kernel.monitor_nodes/2`.
* `node_weight: pos_integer` - The default weight to assign to nodes when no explicit weight
is provided. This applies to: initial nodes from the `nodes` configuration option,
nodes added automatically when `monitor_nodes` is `true`, and nodes added manually via
`add_node/2` and `add_nodes/2` APIs (but not those with explicit weights like `add_node/3`
or `{node, weight}` tuples). Defaults to `128`.

An error is returned if the ring already exists or if bad ring options are provided.

Expand Down Expand Up @@ -115,6 +130,7 @@ defmodule HashRing.Managed do
:node_blacklist when is_list(value) -> false
:node_whitelist when is_list(value) -> false
:node_type when value in [:all, :hidden, :visible] -> false
:node_weight when is_integer(value) and value > 0 -> false
_ -> true
end
end)
Expand Down
41 changes: 27 additions & 14 deletions lib/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ defmodule HashRing.Worker do

ring = HashRing.new()

node_weight = Keyword.get(options, :node_weight, 128)
monitor_nodes? = Keyword.get(options, :monitor_nodes, false)

cond do
Expand All @@ -93,51 +94,53 @@ defmodule HashRing.Worker do
acc

:else ->
HashRing.add_node(acc, node)
HashRing.add_node(acc, node, node_weight)
end
end)

node_type = Keyword.get(options, :node_type, :all)
:ok = :net_kernel.monitor_nodes(true, node_type: node_type)
true = :ets.insert_new(table, {:ring, ring})
{:ok, {table, node_blacklist, node_whitelist}}
{:ok, {table, node_blacklist, node_whitelist, node_weight}}

:else ->
nodes = Keyword.get(options, :nodes, [])
nodes = apply_default_weights(nodes, node_weight)
ring = HashRing.add_nodes(ring, nodes)
true = :ets.insert_new(table, {:ring, ring})
{:ok, {table, [], []}}
{:ok, {table, [], [], node_weight}}
end
end

def handle_call(:list_nodes, _from, {table, _b, _w} = state) do
def handle_call(:list_nodes, _from, {table, _b, _w, _nw} = state) do
{:reply, HashRing.nodes(get_ring(table)), state}
end

def handle_call({:key_to_node, key}, _from, {table, _b, _w} = state) do
def handle_call({:key_to_node, key}, _from, {table, _b, _w, _nw} = state) do
{:reply, HashRing.key_to_node(get_ring(table), key), state}
end

def handle_call({:key_to_nodes, key, count}, _from, {table, _b, _w} = state) do
def handle_call({:key_to_nodes, key, count}, _from, {table, _b, _w, _nw} = state) do
{:reply, HashRing.key_to_nodes(get_ring(table), key, count), state}
end

def handle_call({:add_node, node}, _from, {table, _b, _w} = state) do
get_ring(table) |> HashRing.add_node(node) |> update_ring(table)
def handle_call({:add_node, node}, _from, {table, _b, _w, nw} = state) do
get_ring(table) |> HashRing.add_node(node, nw) |> update_ring(table)
{:reply, :ok, state}
end

def handle_call({:add_node, node, weight}, _from, {table, _b, _w} = state) do
def handle_call({:add_node, node, weight}, _from, {table, _b, _w, _nw} = state) do
get_ring(table) |> HashRing.add_node(node, weight) |> update_ring(table)
{:reply, :ok, state}
end

def handle_call({:add_nodes, nodes}, _from, {table, _b, _w} = state) do
def handle_call({:add_nodes, nodes}, _from, {table, _b, _w, nw} = state) do
nodes = apply_default_weights(nodes, nw)
get_ring(table) |> HashRing.add_nodes(nodes) |> update_ring(table)
{:reply, :ok, state}
end

def handle_call({:remove_node, node}, _from, {table, _b, _w} = state) do
def handle_call({:remove_node, node}, _from, {table, _b, _w, _nw} = state) do
get_ring(table) |> HashRing.remove_node(node) |> update_ring(table)
{:reply, :ok, state}
end
Expand All @@ -147,15 +150,15 @@ defmodule HashRing.Worker do
{:stop, :shutdown, state}
end

def handle_info({:nodeup, node, _info}, {table, b, w} = state) do
def handle_info({:nodeup, node, _info}, {table, b, w, nw} = state) do
unless HashRing.Utils.ignore_node?(node, b, w) do
get_ring(table) |> HashRing.add_node(node) |> update_ring(table)
get_ring(table) |> HashRing.add_node(node, nw) |> update_ring(table)
end

{:noreply, state}
end

def handle_info({:nodedown, node, _info}, state = {table, _b, _w}) do
def handle_info({:nodedown, node, _info}, state = {table, _b, _w, _nw}) do
get_ring(table) |> HashRing.remove_node(node) |> update_ring(table)
{:noreply, state}
end
Expand All @@ -182,4 +185,14 @@ defmodule HashRing.Worker do

defp update_ring(ring, table),
do: :ets.update_element(table, :ring, {2, ring})

defp apply_default_weights(nodes, default_weight) do
Enum.map(nodes, fn
{node, weight} ->
{node, weight}

node ->
{node, default_weight}
end)
end
end
185 changes: 184 additions & 1 deletion test/hashring_worker_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule HashRing.WorkerTest do
use ExUnit.Case
use ExUnit.Case, async: false

describe "when the given node_type is :visible" do
setup do
Expand Down Expand Up @@ -33,4 +33,187 @@ defmodule HashRing.WorkerTest do
assert nodes == HashRing.Worker.nodes(pid)
end
end

describe "node weight distribution" do
test "initial nodes respect node_weight configuration" do
TestCluster.prepare()

{:ok, pid} =
HashRing.Worker.start_link(
name: :initial_nodes_weight_test,
monitor_nodes: false,
node_weight: 100,
nodes: [:node_default, {:node_explicit, 300}]
)

on_exit(fn ->
HashRing.Worker.delete(pid)
TestCluster.teardown()
end)

distribution = distribute_keys(pid, 50_000)

assert_in_delta(distribution[:node_default] / 50_000, 0.25, 0.03)
assert_in_delta(distribution[:node_explicit] / 50_000, 0.75, 0.03)
end

test "add_node API respects node_weight vs explicit weight" do
TestCluster.prepare()

{:ok, pid} =
HashRing.Worker.start_link(
name: :add_node_weight_test,
monitor_nodes: false,
node_weight: 100,
nodes: []
)

on_exit(fn ->
HashRing.Worker.delete(pid)
TestCluster.teardown()
end)

:ok = HashRing.Worker.add_node(pid, :default_weight)
:ok = HashRing.Worker.add_node(pid, :explicit_weight, 200)

distribution = distribute_keys(pid, 50_000)

assert_in_delta(distribution[:default_weight] / 50_000, 0.3333, 0.03)
assert_in_delta(distribution[:explicit_weight] / 50_000, 0.6667, 0.03)
end

test "add_nodes API respects node_weight for unweighted nodes" do
TestCluster.prepare()

{:ok, pid} =
HashRing.Worker.start_link(
name: :add_nodes_weight_test,
monitor_nodes: false,
node_weight: 100,
nodes: []
)

on_exit(fn ->
HashRing.Worker.delete(pid)
TestCluster.teardown()
end)

:ok =
HashRing.Worker.add_nodes(pid, [
:light_node,
{:heavy_node, 300}
])

distribution = distribute_keys(pid, 50_000)

assert_in_delta(distribution[:light_node] / 50_000, 0.25, 0.03)
assert_in_delta(distribution[:heavy_node] / 50_000, 0.75, 0.03)
end

test "nodes with equal weights get equal distribution" do
TestCluster.prepare()

{:ok, pid} =
HashRing.Worker.start_link(
name: :equal_weight_test,
monitor_nodes: false,
node_weight: 150,
nodes: [:node_a, :node_b]
)

on_exit(fn ->
HashRing.Worker.delete(pid)
TestCluster.teardown()
end)

:ok = HashRing.Worker.add_node(pid, :node_c)

distribution = distribute_keys(pid, 60_000)

assert_in_delta(distribution[:node_a] / 60_000, 0.3333, 0.03)
assert_in_delta(distribution[:node_b] / 60_000, 0.3333, 0.03)
assert_in_delta(distribution[:node_c] / 60_000, 0.3333, 0.03)
end

test "monitored nodes respect configured node_weight through distribution variance" do
TestCluster.prepare()

# Ring with LOW weight - expect higher variance
{:ok, pid_low} =
HashRing.Worker.start_link(
name: :monitor_low_weight,
monitor_nodes: true,
node_weight: 10,
node_type: :visible
)

# Ring with HIGH weight - expect lower variance
{:ok, pid_high} =
HashRing.Worker.start_link(
name: :monitor_high_weight,
monitor_nodes: true,
node_weight: 500,
node_type: :visible
)

on_exit(fn ->
HashRing.Worker.delete(pid_low)
HashRing.Worker.delete(pid_high)
TestCluster.teardown()
end)

assert wait_for_nodes(pid_low, 1)
assert wait_for_nodes(pid_high, 1)

{:ok, _peer, new_node} = TestCluster.start_node(~c"test_node")

assert wait_for_nodes(pid_low, 2)
assert wait_for_nodes(pid_high, 2)
assert new_node in HashRing.Worker.nodes(pid_low)
assert new_node in HashRing.Worker.nodes(pid_high)

sample_size = 10_000
dist_low = distribute_keys(pid_low, sample_size)
dist_high = distribute_keys(pid_high, sample_size)

# Calculate how far from perfect 50% each node is
deviation_low = abs(dist_low[Node.self()] / sample_size - 0.5)
deviation_high = abs(dist_high[Node.self()] / sample_size - 0.5)

# Low weight ring should have higher deviation from ideal 50%
assert deviation_low > deviation_high

# More specifically, low weight should be noticeably uneven
assert deviation_low > 0.1
assert deviation_high < 0.015
end

defp wait_for_nodes(pid, expected_count, timeout \\ 5000) do
deadline = System.monotonic_time(:millisecond) + timeout
wait_for_nodes_loop(pid, expected_count, deadline)
end

defp wait_for_nodes_loop(pid, expected_count, deadline) do
case HashRing.Worker.nodes(pid) do
nodes when length(nodes) == expected_count ->
true

_ ->
if System.monotonic_time(:millisecond) < deadline do
:timer.sleep(50)
wait_for_nodes_loop(pid, expected_count, deadline)
else
false
end
end
end

defp distribute_keys(pid, count) do
1..count
|> Enum.map(fn i ->
HashRing.Worker.key_to_node(pid, "test_key_#{i}")
end)
|> Enum.frequencies()
end
end
end