Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/phoenix/pubsub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ defmodule Phoenix.PubSub do
* `:adapter` - the adapter to use (defaults to `Phoenix.PubSub.PG2`)
* `:pool_size` - number of pubsub partitions to launch
(defaults to one partition for every 4 cores)
* `:registry_size` - number of `Registry` partitions to launch
(defaults to `:pool_size`). This controls the number of Registry partitions
used for storing subscriptions and can be tuned independently from `:pool_size`
for better performance characteristics.
* `:broadcast_pool_size` - number of pubsub partitions used for broadcasting messages
(defaults to `:pool_size`). This option is used during pool size migrations to ensure
no messages are lost. See the "Safe Pool Size Migration" section in the module documentation.
Expand Down
2 changes: 1 addition & 1 deletion lib/phoenix/pubsub/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ defmodule Phoenix.PubSub.Supervisor do
adapter_name = Module.concat(name, "Adapter")

partitions =
opts[:pool_size] ||
opts[:registry_size] || opts[:pool_size] ||
System.schedulers_online() |> Kernel./(4) |> Float.ceil() |> trunc()

registry = [
Expand Down
25 changes: 23 additions & 2 deletions test/shared/pubsub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,17 @@ defmodule Phoenix.PubSubTest do

setup config do
size = config[:pool_size] || 1
registry_size = config[:registry_size] || config[:registry_pool_size] || config[:pool_size] || 1
{adapter, adapter_opts} = Application.get_env(:phoenix_pubsub, :test_adapter)
adapter_opts = [adapter: adapter, name: config.test, pool_size: size] ++ adapter_opts
adapter_opts = [adapter: adapter, name: config.test, pool_size: size, registry_size: registry_size] ++ adapter_opts
start_supervised!({Phoenix.PubSub, adapter_opts})

opts = %{
pubsub: config.test,
topic: to_string(config.test),
pool_size: size,
node: Phoenix.PubSub.node_name(config.test)
node: Phoenix.PubSub.node_name(config.test),
adapter_name: Module.concat(config.test, "Adapter")
}

{:ok, opts}
Expand Down Expand Up @@ -174,4 +176,23 @@ defmodule Phoenix.PubSubTest do
assert_receive {:custom, :special, :none, :direct}
end
end

@tag pool_size: 4
@tag registry_size: 2
test "PubSub pool size can be configured separately from the Registry partitions",
config do
assert {:duplicate, 2, _} = :ets.lookup_element(config.pubsub, -2, 2)

assert :persistent_term.get(config.adapter_name) ==
{config.adapter_name, :"#{config.adapter_name}_2", :"#{config.adapter_name}_3", :"#{config.adapter_name}_4"}
end

@tag pool_size: 3
test "Registry partitions are configured with the same pool size as PubSub if not specified",
config do
assert {:duplicate, 3, _} = :ets.lookup_element(config.pubsub, -2, 2)

assert :persistent_term.get(config.adapter_name) ==
{config.adapter_name, :"#{config.adapter_name}_2", :"#{config.adapter_name}_3"}
end
end