From 272b75ac83415980e127d441f67d55978add5189 Mon Sep 17 00:00:00 2001 From: Rafal Studnicki Date: Fri, 20 Jun 2025 07:52:42 +0200 Subject: [PATCH 1/2] Allow separate Registry pool size configuration --- lib/phoenix/pubsub/supervisor.ex | 2 +- test/shared/pubsub_test.exs | 18 ++++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/lib/phoenix/pubsub/supervisor.ex b/lib/phoenix/pubsub/supervisor.ex index ecdbdee9..2f1a5fba 100644 --- a/lib/phoenix/pubsub/supervisor.ex +++ b/lib/phoenix/pubsub/supervisor.ex @@ -22,7 +22,7 @@ defmodule Phoenix.PubSub.Supervisor do adapter_name = Module.concat(name, "Adapter") partitions = - opts[:pool_size] || + opts[:registry_pool_size] || opts[:pool_size] || System.schedulers_online() |> Kernel./(4) |> Float.ceil() |> trunc() registry = [ diff --git a/test/shared/pubsub_test.exs b/test/shared/pubsub_test.exs index 265cf2de..501d9528 100644 --- a/test/shared/pubsub_test.exs +++ b/test/shared/pubsub_test.exs @@ -41,15 +41,17 @@ defmodule Phoenix.PubSubTest do setup config do size = config[:pool_size] || 1 + registry_size = config[:registry_pool_size] || config[:pool_size] || 1 {adapter, adapter_opts} = Application.get_env(:phoenix_pubsub, :test_adapter) - adapter_opts = [adapter: adapter, name: config.test, pool_size: size] ++ adapter_opts + adapter_opts = [adapter: adapter, name: config.test, pool_size: size, registry_pool_size: registry_size] ++ adapter_opts start_supervised!({Phoenix.PubSub, adapter_opts}) opts = %{ pubsub: config.test, topic: to_string(config.test), pool_size: size, - node: Phoenix.PubSub.node_name(config.test) + node: Phoenix.PubSub.node_name(config.test), + adapter_name: Module.concat(config.test, "Adapter") } {:ok, opts} @@ -174,4 +176,16 @@ defmodule Phoenix.PubSubTest do assert_receive {:custom, :special, :none, :direct} end end + + @tag pool_size: 4 + @tag registry_pool_size: 2 + test "PubSub pool size can be configured separately from the Registry partitions", + config do + # This is looking into Registry and PubSub internal implementation, but + # i don't know how to test this better. + assert {:duplicate, 2, _} = :ets.lookup_element(config.pubsub, -2, 2) + + assert :persistent_term.get(config.adapter_name) == + {config.adapter_name, :"#{config.adapter_name}_2", :"#{config.adapter_name}_3", :"#{config.adapter_name}_4"} + end end From e64c5d97f92a48197b55b7870a35e32f2a1e7f25 Mon Sep 17 00:00:00 2001 From: Rafal Studnicki Date: Sat, 21 Jun 2025 09:46:13 +0200 Subject: [PATCH 2/2] Rename to rgistry_size; add documentation --- lib/phoenix/pubsub.ex | 4 ++++ lib/phoenix/pubsub/supervisor.ex | 2 +- test/shared/pubsub_test.exs | 23 +++++++++++++++-------- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/lib/phoenix/pubsub.ex b/lib/phoenix/pubsub.ex index 8cbed738..9f4430a0 100644 --- a/lib/phoenix/pubsub.ex +++ b/lib/phoenix/pubsub.ex @@ -164,6 +164,10 @@ defmodule Phoenix.PubSub do * `:adapter` - the adapter to use (defaults to `Phoenix.PubSub.PG2`) * `:pool_size` - number of pubsub partitions to launch (defaults to one partition for every 4 cores) + * `:registry_size` - number of `Registry` partitions to launch + (defaults to `:pool_size`). This controls the number of Registry partitions + used for storing subscriptions and can be tuned independently from `:pool_size` + for better performance characteristics. * `:broadcast_pool_size` - number of pubsub partitions used for broadcasting messages (defaults to `:pool_size`). This option is used during pool size migrations to ensure no messages are lost. See the "Safe Pool Size Migration" section in the module documentation. diff --git a/lib/phoenix/pubsub/supervisor.ex b/lib/phoenix/pubsub/supervisor.ex index 2f1a5fba..a1b93eaa 100644 --- a/lib/phoenix/pubsub/supervisor.ex +++ b/lib/phoenix/pubsub/supervisor.ex @@ -22,7 +22,7 @@ defmodule Phoenix.PubSub.Supervisor do adapter_name = Module.concat(name, "Adapter") partitions = - opts[:registry_pool_size] || opts[:pool_size] || + opts[:registry_size] || opts[:pool_size] || System.schedulers_online() |> Kernel./(4) |> Float.ceil() |> trunc() registry = [ diff --git a/test/shared/pubsub_test.exs b/test/shared/pubsub_test.exs index 501d9528..fe1c0091 100644 --- a/test/shared/pubsub_test.exs +++ b/test/shared/pubsub_test.exs @@ -41,9 +41,9 @@ defmodule Phoenix.PubSubTest do setup config do size = config[:pool_size] || 1 - registry_size = config[:registry_pool_size] || config[:pool_size] || 1 + registry_size = config[:registry_size] || config[:registry_pool_size] || config[:pool_size] || 1 {adapter, adapter_opts} = Application.get_env(:phoenix_pubsub, :test_adapter) - adapter_opts = [adapter: adapter, name: config.test, pool_size: size, registry_pool_size: registry_size] ++ adapter_opts + adapter_opts = [adapter: adapter, name: config.test, pool_size: size, registry_size: registry_size] ++ adapter_opts start_supervised!({Phoenix.PubSub, adapter_opts}) opts = %{ @@ -178,14 +178,21 @@ defmodule Phoenix.PubSubTest do end @tag pool_size: 4 - @tag registry_pool_size: 2 + @tag registry_size: 2 test "PubSub pool size can be configured separately from the Registry partitions", config do - # This is looking into Registry and PubSub internal implementation, but - # i don't know how to test this better. - assert {:duplicate, 2, _} = :ets.lookup_element(config.pubsub, -2, 2) + assert {:duplicate, 2, _} = :ets.lookup_element(config.pubsub, -2, 2) - assert :persistent_term.get(config.adapter_name) == - {config.adapter_name, :"#{config.adapter_name}_2", :"#{config.adapter_name}_3", :"#{config.adapter_name}_4"} + assert :persistent_term.get(config.adapter_name) == + {config.adapter_name, :"#{config.adapter_name}_2", :"#{config.adapter_name}_3", :"#{config.adapter_name}_4"} + end + + @tag pool_size: 3 + test "Registry partitions are configured with the same pool size as PubSub if not specified", + config do + assert {:duplicate, 3, _} = :ets.lookup_element(config.pubsub, -2, 2) + + assert :persistent_term.get(config.adapter_name) == + {config.adapter_name, :"#{config.adapter_name}_2", :"#{config.adapter_name}_3"} end end