From 00150f623e365a9b1a168358d4c14a401a868a07 Mon Sep 17 00:00:00 2001 From: Rafal Studnicki Date: Fri, 10 Dec 2021 23:32:25 +0100 Subject: [PATCH 01/17] Use dirty list --- lib/phoenix/tracker.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/phoenix/tracker.ex b/lib/phoenix/tracker.ex index 0819893b..8b8620db 100644 --- a/lib/phoenix/tracker.ex +++ b/lib/phoenix/tracker.ex @@ -202,7 +202,7 @@ defmodule Phoenix.Tracker do def list(tracker_name, topic) do tracker_name |> Shard.name_for_topic(topic, pool_size(tracker_name)) - |> Phoenix.Tracker.Shard.list(topic) + |> Phoenix.Tracker.Shard.dirty_list(topic) end @doc """ From 788cd9265b031ce68c9573b5fb57d3f6bc7713e5 Mon Sep 17 00:00:00 2001 From: Arek Gil Date: Tue, 11 Oct 2022 16:45:21 +0200 Subject: [PATCH 02/17] Fix compatibility between most recent official PubSub version and the one used by live-service (#2) --- lib/phoenix/pubsub/pg2.ex | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/lib/phoenix/pubsub/pg2.ex b/lib/phoenix/pubsub/pg2.ex index 83907490..801fefd6 100644 --- a/lib/phoenix/pubsub/pg2.ex +++ b/lib/phoenix/pubsub/pg2.ex @@ -62,20 +62,16 @@ defmodule Phoenix.PubSub.PG2 do name = Keyword.fetch!(opts, :name) pool_size = Keyword.get(opts, :pool_size, 1) adapter_name = Keyword.fetch!(opts, :adapter_name) - Supervisor.start_link(__MODULE__, {name, adapter_name, pool_size}, name: :"#{adapter_name}_supervisor") + Supervisor.start_link(__MODULE__, {name, adapter_name, pool_size}, name: adapter_name) end @impl true def init({name, adapter_name, pool_size}) do - [_ | groups] = + groups = for number <- 1..pool_size do :"#{adapter_name}_#{number}" end - # Use `adapter_name` for the first in the pool for backwards compatability - # with v2.0 when the pool_size is 1. - groups = [adapter_name | groups] - :persistent_term.put(adapter_name, List.to_tuple(groups)) children = From f0267fc25a1916e6deefcb028d5f922ebe746061 Mon Sep 17 00:00:00 2001 From: Arek Gil Date: Thu, 13 Jul 2023 18:50:29 +0200 Subject: [PATCH 03/17] Add `dirty_get_by_key` (#5) This pull request includes a dirty version of the get_by_key function. It also modifies Tracker.list to use the standard presence list mechanism, and it introduces dirty_list as a standalone function. --- lib/phoenix/tracker.ex | 7 +++++++ lib/phoenix/tracker/shard.ex | 5 +++++ test/phoenix/tracker/shard_replication_test.exs | 6 ++++++ 3 files changed, 18 insertions(+) diff --git a/lib/phoenix/tracker.ex b/lib/phoenix/tracker.ex index dcb579f5..e194b798 100644 --- a/lib/phoenix/tracker.ex +++ b/lib/phoenix/tracker.ex @@ -205,6 +205,13 @@ defmodule Phoenix.Tracker do """ @spec list(atom, topic) :: [presence] def list(tracker_name, topic) do + tracker_name + |> Shard.name_for_topic(topic, pool_size(tracker_name)) + |> Phoenix.Tracker.Shard.list(topic) + end + + @spec dirty_list(atom, topic) :: [presence] + def dirty_list(tracker_name, topic) do tracker_name |> Shard.name_for_topic(topic, pool_size(tracker_name)) |> Phoenix.Tracker.Shard.dirty_list(topic) diff --git a/lib/phoenix/tracker/shard.ex b/lib/phoenix/tracker/shard.ex index 6fd792b6..ec91ad40 100644 --- a/lib/phoenix/tracker/shard.ex +++ b/lib/phoenix/tracker/shard.ex @@ -86,6 +86,11 @@ defmodule Phoenix.Tracker.Shard do |> State.get_by_key(topic, key) end + @spec dirty_get_by_key(atom, topic, term) :: [presence] + def dirty_get_by_key(shard_name, topic, key) do + State.tracked_key(shard_name, topic, key, []) + end + @spec graceful_permdown(pid) :: :ok def graceful_permdown(server_pid) do GenServer.call(server_pid, :graceful_permdown) diff --git a/test/phoenix/tracker/shard_replication_test.exs b/test/phoenix/tracker/shard_replication_test.exs index 0a6fed5e..ae505356 100644 --- a/test/phoenix/tracker/shard_replication_test.exs +++ b/test/phoenix/tracker/shard_replication_test.exs @@ -244,6 +244,8 @@ defmodule Phoenix.Tracker.ShardReplicationTest do assert %{@node1 => %Replica{status: :up}} = replicas(shard) assert [{"local1", _}, {"node1", _}] = list(shard, topic) assert [{"local1", _}, {"node1", _}] = dirty_list(shard, topic) + assert [_] = Shard.dirty_get_by_key(shard, topic, "local1") + assert [_] = Shard.dirty_get_by_key(shard, topic, "node1") # nodedown Process.unlink(node_pid) @@ -252,9 +254,13 @@ defmodule Phoenix.Tracker.ShardReplicationTest do assert %{@node1 => %Replica{status: :down}} = replicas(shard) assert [{"local1", _}] = list(shard, topic) assert [{"local1", _}, {"node1", _}] = dirty_list(shard, topic) + assert [_] = Shard.dirty_get_by_key(shard, topic, "local1") + assert [_] = Shard.dirty_get_by_key(shard, topic, "node1") :timer.sleep(@permdown + 2*@heartbeat) assert [{"local1", _}] = dirty_list(shard, topic) + assert [_] = Shard.dirty_get_by_key(shard, topic, "local1") + assert [] = Shard.dirty_get_by_key(shard, topic, "node1") end From 111352b90a54f399a7027a776bbb0382e0ab38b3 Mon Sep 17 00:00:00 2001 From: Arek Gil Date: Thu, 13 Jul 2023 19:37:39 +0200 Subject: [PATCH 04/17] Add `Tracker.dirty_get_by_key` (#6) This was supposed to be added in https://github.com/Whatnot-Inc/phoenix_pubsub/pull/5. --- lib/phoenix/tracker.ex | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/lib/phoenix/tracker.ex b/lib/phoenix/tracker.ex index e194b798..5fa2d2e7 100644 --- a/lib/phoenix/tracker.ex +++ b/lib/phoenix/tracker.ex @@ -238,6 +238,13 @@ defmodule Phoenix.Tracker do |> Phoenix.Tracker.Shard.get_by_key(topic, key) end + @spec dirty_get_by_key(atom, topic, term) :: [presence] + def dirty_get_by_key(tracker_name, topic, key) do + tracker_name + |> Shard.name_for_topic(topic, pool_size(tracker_name)) + |> Phoenix.Tracker.Shard.get_by_key(topic, key) + end + @doc """ Gracefully shuts down by broadcasting permdown to all replicas. From 8eac4173473b9b456b78309e275b671d6260d4ef Mon Sep 17 00:00:00 2001 From: Arek Gil Date: Mon, 18 Sep 2023 15:40:07 +0200 Subject: [PATCH 05/17] Fix `dirty_get_by_key` to actually perform a dirty read (#7) --- lib/phoenix/tracker.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/phoenix/tracker.ex b/lib/phoenix/tracker.ex index 5fa2d2e7..becd60b3 100644 --- a/lib/phoenix/tracker.ex +++ b/lib/phoenix/tracker.ex @@ -242,7 +242,7 @@ defmodule Phoenix.Tracker do def dirty_get_by_key(tracker_name, topic, key) do tracker_name |> Shard.name_for_topic(topic, pool_size(tracker_name)) - |> Phoenix.Tracker.Shard.get_by_key(topic, key) + |> Phoenix.Tracker.Shard.dirty_get_by_key(topic, key) end @doc """ From 9a437176ed0f6a1a7292b04c01f94de735db0a21 Mon Sep 17 00:00:00 2001 From: Arek Gil Date: Wed, 8 Nov 2023 20:21:47 +0100 Subject: [PATCH 06/17] Allow to dirty get by key only (#8) --- lib/phoenix/tracker.ex | 10 ++++++++++ lib/phoenix/tracker/shard.ex | 5 +++++ lib/phoenix/tracker/state.ex | 15 ++++++++++++++- test/phoenix/tracker/pool_test.exs | 29 +++++++++++++++++++++++++++-- 4 files changed, 56 insertions(+), 3 deletions(-) diff --git a/lib/phoenix/tracker.ex b/lib/phoenix/tracker.ex index becd60b3..e3e70169 100644 --- a/lib/phoenix/tracker.ex +++ b/lib/phoenix/tracker.ex @@ -245,6 +245,16 @@ defmodule Phoenix.Tracker do |> Phoenix.Tracker.Shard.dirty_get_by_key(topic, key) end + @spec dirty_get_by_key(atom, term) :: [{topic, pid, meta :: map()}] + def dirty_get_by_key(tracker_name, key) do + 0..(pool_size(tracker_name) - 1) + |> Enum.flat_map(fn n -> + shard_name = Shard.name_for_number(tracker_name, n) + + Phoenix.Tracker.Shard.dirty_get_by_key(shard_name, key) + end) + end + @doc """ Gracefully shuts down by broadcasting permdown to all replicas. diff --git a/lib/phoenix/tracker/shard.ex b/lib/phoenix/tracker/shard.ex index ec91ad40..c5385324 100644 --- a/lib/phoenix/tracker/shard.ex +++ b/lib/phoenix/tracker/shard.ex @@ -91,6 +91,11 @@ defmodule Phoenix.Tracker.Shard do State.tracked_key(shard_name, topic, key, []) end + @spec dirty_get_by_key(atom, term) :: [presence] + def dirty_get_by_key(shard_name, key) do + State.tracked_key(shard_name, key, []) + end + @spec graceful_permdown(pid) :: :ok def graceful_permdown(server_pid) do GenServer.call(server_pid, :graceful_permdown) diff --git a/lib/phoenix/tracker/state.ex b/lib/phoenix/tracker/state.ex index 1d5949cc..a25ef67c 100644 --- a/lib/phoenix/tracker/state.ex +++ b/lib/phoenix/tracker/state.ex @@ -57,7 +57,7 @@ defmodule Phoenix.Tracker.State do replica: replica, context: %{replica => 0}, mode: :normal, - values: :ets.new(shard_name, [:named_table, :protected, :ordered_set]), + values: :ets.new(shard_name, [:named_table, :protected, :ordered_set, read_concurrency: true]), pids: :ets.new(:pids, [:duplicate_bag]), replicas: %{replica => :up}}) end @@ -161,6 +161,19 @@ defmodule Phoenix.Tracker.State do [{{:"$1", :"$2"}}]}]) end + def tracked_key(table, key, down_replicas) do + :ets.select( + table, + [ + { + {{:"$1", :"$2", key}, :"$3", {:"$4", :_}}, + not_in(:"$4", down_replicas), + [{{:"$1", :"$2", :"$3"}}] + } + ] + ) + end + defp not_in(_pos, []), do: [] defp not_in(pos, replicas), do: [not: ors(pos, replicas)] defp ors(pos, [rep]), do: {:"=:=", pos, {rep}} diff --git a/test/phoenix/tracker/pool_test.exs b/test/phoenix/tracker/pool_test.exs index e7bd1071..9ce09d3c 100644 --- a/test/phoenix/tracker/pool_test.exs +++ b/test/phoenix/tracker/pool_test.exs @@ -35,9 +35,34 @@ defmodule Phoenix.Tracker.PoolTest do end @tag pool_size: pool_size - test "pool #{pool_size}: Untrack/4 results in all ids being untracked", - %{server: server} do + test "pool #{pool_size}: dirty_get_by_key/2 returns presences from all shards", %{server: server} do + topics = for i <- 1..100, do: "topic_#{i}" + + refs = + for topic <- topics do + {:ok, ref} = Tracker.track(server, self(), topic, "me", %{name: "me"}) + ref + end + for topic <- topics do + {:ok, _} = Tracker.track(server, self(), topic, "other", %{name: "me"}) + end + + by_key = Tracker.dirty_get_by_key(server, "me") + assert length(by_key) == 100 + + topics_and_refs = + for {topic, pid, %{name: "me", phx_ref: ref}} <- by_key do + assert pid == self() + {topic, ref} + end + + assert Enum.sort(topics_and_refs) == Enum.sort(List.zip([topics, refs])) + end + + @tag pool_size: pool_size + test "pool #{pool_size}: Untrack/4 results in all ids being untracked", + %{server: server} do topics = for i <- 1..100, do: "topic_#{i}" for t <- topics do {:ok, _ref} = Tracker.track(server, self(), t, "me", %{a: "b"}) From 245044a87c3c856bb75ffe6f6653d89f767ba15a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Studnicki?= Date: Tue, 28 Nov 2023 17:19:42 +0100 Subject: [PATCH 07/17] Add Tracker.size/1 api (#9) --- lib/phoenix/tracker.ex | 11 ++++++++ lib/phoenix/tracker/shard.ex | 5 ++++ lib/phoenix/tracker/state.ex | 5 ++++ test/phoenix/tracker/pool_test.exs | 11 ++++++++ test/phoenix/tracker/shard_test.exs | 41 +++++++++++++++++++++++++++++ test/phoenix/tracker/state_test.exs | 32 ++++++++++++++++++++++ 6 files changed, 105 insertions(+) diff --git a/lib/phoenix/tracker.ex b/lib/phoenix/tracker.ex index e3e70169..61261eed 100644 --- a/lib/phoenix/tracker.ex +++ b/lib/phoenix/tracker.ex @@ -269,6 +269,17 @@ defmodule Phoenix.Tracker do Supervisor.stop(tracker_name) end + @doc false + @spec size(atom) :: non_neg_integer + def size(tracker_name) do + 0..(pool_size(tracker_name) - 1) + |> Enum.reduce(0, fn n, acc -> + shard_name = Shard.name_for_number(tracker_name, n) + + Phoenix.Tracker.Shard.size(shard_name) + acc + end) + end + @doc """ Starts a tracker pool. diff --git a/lib/phoenix/tracker/shard.ex b/lib/phoenix/tracker/shard.ex index c5385324..993ed735 100644 --- a/lib/phoenix/tracker/shard.ex +++ b/lib/phoenix/tracker/shard.ex @@ -101,6 +101,11 @@ defmodule Phoenix.Tracker.Shard do GenServer.call(server_pid, :graceful_permdown) end + @spec size(atom) :: non_neg_integer + def size(shard_name) do + State.size(shard_name) + end + ## Server def start_link(tracker, tracker_opts, pool_opts) do diff --git a/lib/phoenix/tracker/state.ex b/lib/phoenix/tracker/state.ex index a25ef67c..7449a39c 100644 --- a/lib/phoenix/tracker/state.ex +++ b/lib/phoenix/tracker/state.ex @@ -62,6 +62,11 @@ defmodule Phoenix.Tracker.State do replicas: %{replica => :up}}) end + @spec size(atom) :: non_neg_integer + def size(table) do + :ets.info(table, :size) + end + @doc """ Returns the causal context for the set. """ diff --git a/test/phoenix/tracker/pool_test.exs b/test/phoenix/tracker/pool_test.exs index 9ce09d3c..fb569f4e 100644 --- a/test/phoenix/tracker/pool_test.exs +++ b/test/phoenix/tracker/pool_test.exs @@ -168,5 +168,16 @@ defmodule Phoenix.Tracker.PoolTest do for t <- topics, do: assert Tracker.list(server, t) == [] end + + @tag pool_size: pool_size + test "pool #{pool_size}: count/1 returns number of entries across all shards", + %{server: server} do + topics = for i <- 1..100, do: "topic_#{i}" + for t <- topics do + {:ok, _ref} = Tracker.track(server, self(), t, "me", %{a: "b"}) + end + + assert Tracker.size(server) == 100 + end end end diff --git a/test/phoenix/tracker/shard_test.exs b/test/phoenix/tracker/shard_test.exs index f6338af2..169126a3 100644 --- a/test/phoenix/tracker/shard_test.exs +++ b/test/phoenix/tracker/shard_test.exs @@ -13,4 +13,45 @@ defmodule Phoenix.Tracker.ShardTest do assert Phoenix.Tracker.Shard.init([nil, nil, opts]) == {:error, "permdown_period must be at least larger than the down_period"} end + + defmodule TestTracker do + use Phoenix.Tracker + def init(state), do: {:ok, state} + def handle_diff(_diff, state), do: {:ok, state} + end + + describe "size/1" do + test "returns 0 when there are no entries in the shard" do + tracker = TestTracker + name = :"#{inspect(make_ref())}" + shard_name = Phoenix.Tracker.Shard.name_for_number(name, 1) + given_pubsub(name) + opts = [pubsub_server: name, name: name, shard_number: 1] + {:ok, _pid} = Phoenix.Tracker.Shard.start_link(tracker, %{}, opts) + + assert Phoenix.Tracker.Shard.size(shard_name) == 0 + end + + test "returns number of tracked entries in the shard" do + tracker = TestTracker + name = :"#{inspect(make_ref())}" + shard_name = Phoenix.Tracker.Shard.name_for_number(name, 1) + given_pubsub(name) + opts = [pubsub_server: name, name: name, shard_number: 1] + {:ok, pid} = Phoenix.Tracker.Shard.start_link(tracker, %{}, opts) + + for i <- 1..100 do + Phoenix.Tracker.Shard.track(pid, self(), "topic", "user#{i}", %{}) + end + + assert Phoenix.Tracker.Shard.size(shard_name) == 100 + end + + defp given_pubsub(name) do + size = 1 + {adapter, adapter_opts} = Application.get_env(:phoenix_pubsub, :test_adapter) + adapter_opts = [adapter: adapter, name: name, pool_size: size] ++ adapter_opts + start_supervised!({Phoenix.PubSub, adapter_opts}) + end + end end diff --git a/test/phoenix/tracker/state_test.exs b/test/phoenix/tracker/state_test.exs index 081bbff4..9f7f4eb2 100644 --- a/test/phoenix/tracker/state_test.exs +++ b/test/phoenix/tracker/state_test.exs @@ -409,4 +409,36 @@ defmodule Phoenix.Tracker.StateTest do end) end) end + + describe "size/1" do + test "returns 0 for empty state" do + shard_name = :shard + State.new(:s1, shard_name) + assert State.size(shard_name) == 0 + end + + test "returns size of the values tables" do + shard_name = :shard + state = State.new(:s1, shard_name) + + Enum.reduce(1..100, state, fn i, acc -> + State.join(acc, self(), "lobby", "user#{i}", %{}) + end) + + assert State.size(shard_name) == 100 + end + + test "leaves are accounted for in the returned size" do + shard_name = :shard + state = State.new(:s1, shard_name) + + state = Enum.reduce(1..100, state, fn i, acc -> + State.join(acc, self(), "topic#{i}", "user#{i}", %{}) + end) + + State.leave(state, self(), "topic1", "user1") + + assert State.size(shard_name) == 99 + end + end end From a3ab4612dcb0ab1fced7419586e9a29b95808fc9 Mon Sep 17 00:00:00 2001 From: Pranav Vadrevu Date: Mon, 11 Dec 2023 14:10:55 +0000 Subject: [PATCH 08/17] [Get By Key] Allow limits --- lib/phoenix/tracker.ex | 10 ++++++++++ lib/phoenix/tracker/shard.ex | 5 +++++ lib/phoenix/tracker/state.ex | 18 ++++++++++++++++++ 3 files changed, 33 insertions(+) diff --git a/lib/phoenix/tracker.ex b/lib/phoenix/tracker.ex index 61261eed..e6008145 100644 --- a/lib/phoenix/tracker.ex +++ b/lib/phoenix/tracker.ex @@ -255,6 +255,16 @@ defmodule Phoenix.Tracker do end) end + @spec dirty_get_by_key(atom, term, integer()) :: [{topic, pid, meta :: map()}] + def dirty_get_by_key(tracker_name, key, limit) do + 0..(pool_size(tracker_name) - 1) + |> Enum.flat_map(fn n -> + shard_name = Shard.name_for_number(tracker_name, n) + + Phoenix.Tracker.Shard.dirty_get_by_key(shard_name, key) + end) + end + @doc """ Gracefully shuts down by broadcasting permdown to all replicas. diff --git a/lib/phoenix/tracker/shard.ex b/lib/phoenix/tracker/shard.ex index 993ed735..cf9ddbd2 100644 --- a/lib/phoenix/tracker/shard.ex +++ b/lib/phoenix/tracker/shard.ex @@ -96,6 +96,11 @@ defmodule Phoenix.Tracker.Shard do State.tracked_key(shard_name, key, []) end + @spec dirty_get_by_key(atom, term, integer()) :: [presence] + def dirty_get_by_key(shard_name, key, limit) do + State.tracked_key(shard_name, key, [], limit) + end + @spec graceful_permdown(pid) :: :ok def graceful_permdown(server_pid) do GenServer.call(server_pid, :graceful_permdown) diff --git a/lib/phoenix/tracker/state.ex b/lib/phoenix/tracker/state.ex index 7449a39c..5b5c86bb 100644 --- a/lib/phoenix/tracker/state.ex +++ b/lib/phoenix/tracker/state.ex @@ -179,6 +179,24 @@ defmodule Phoenix.Tracker.State do ) end + def tracked_key(table, key, down_replicas, limit) do + :ets.select( + table, + [ + { + {{:"$1", :"$2", key}, :"$3", {:"$4", :_}}, + not_in(:"$4", down_replicas), + [{{:"$1", :"$2", :"$3"}}] + } + ], + limit + ) + |> case do + {lst, _} -> lst + _ -> [] + end + end + defp not_in(_pos, []), do: [] defp not_in(pos, replicas), do: [not: ors(pos, replicas)] defp ors(pos, [rep]), do: {:"=:=", pos, {rep}} From 1c565aba28fcce3aaec579394527892ce9e8ccdb Mon Sep 17 00:00:00 2001 From: Pranav Vadrevu Date: Mon, 11 Dec 2023 14:51:39 +0000 Subject: [PATCH 09/17] Phoenix dirty_get_by_key_with_limit --- lib/phoenix/tracker.ex | 6 +++--- lib/phoenix/tracker/shard.ex | 6 +++--- lib/phoenix/tracker/state.ex | 2 +- test/phoenix/tracker/pool_test.exs | 17 +++++++++++++++++ 4 files changed, 24 insertions(+), 7 deletions(-) diff --git a/lib/phoenix/tracker.ex b/lib/phoenix/tracker.ex index e6008145..3daca779 100644 --- a/lib/phoenix/tracker.ex +++ b/lib/phoenix/tracker.ex @@ -255,13 +255,13 @@ defmodule Phoenix.Tracker do end) end - @spec dirty_get_by_key(atom, term, integer()) :: [{topic, pid, meta :: map()}] - def dirty_get_by_key(tracker_name, key, limit) do + @spec dirty_get_by_key_with_limit(atom, term, integer()) :: [{topic, pid, meta :: map()}] + def dirty_get_by_key_with_limit(tracker_name, key, limit) do 0..(pool_size(tracker_name) - 1) |> Enum.flat_map(fn n -> shard_name = Shard.name_for_number(tracker_name, n) - Phoenix.Tracker.Shard.dirty_get_by_key(shard_name, key) + Phoenix.Tracker.Shard.dirty_get_by_key_with_limit(shard_name, key, limit) end) end diff --git a/lib/phoenix/tracker/shard.ex b/lib/phoenix/tracker/shard.ex index cf9ddbd2..4c77b9d4 100644 --- a/lib/phoenix/tracker/shard.ex +++ b/lib/phoenix/tracker/shard.ex @@ -96,9 +96,9 @@ defmodule Phoenix.Tracker.Shard do State.tracked_key(shard_name, key, []) end - @spec dirty_get_by_key(atom, term, integer()) :: [presence] - def dirty_get_by_key(shard_name, key, limit) do - State.tracked_key(shard_name, key, [], limit) + @spec dirty_get_by_key_with_limit(atom, term, integer()) :: [presence] + def dirty_get_by_key_with_limit(shard_name, key, limit) do + State.tracked_key_with_limit(shard_name, key, [], limit) end @spec graceful_permdown(pid) :: :ok diff --git a/lib/phoenix/tracker/state.ex b/lib/phoenix/tracker/state.ex index 5b5c86bb..5419bf84 100644 --- a/lib/phoenix/tracker/state.ex +++ b/lib/phoenix/tracker/state.ex @@ -179,7 +179,7 @@ defmodule Phoenix.Tracker.State do ) end - def tracked_key(table, key, down_replicas, limit) do + def tracked_key_with_limit(table, key, down_replicas, limit) do :ets.select( table, [ diff --git a/test/phoenix/tracker/pool_test.exs b/test/phoenix/tracker/pool_test.exs index fb569f4e..eaf58a44 100644 --- a/test/phoenix/tracker/pool_test.exs +++ b/test/phoenix/tracker/pool_test.exs @@ -60,6 +60,23 @@ defmodule Phoenix.Tracker.PoolTest do assert Enum.sort(topics_and_refs) == Enum.sort(List.zip([topics, refs])) end + @tag pool_size: pool_size + test "pool #{pool_size}: dirty_get_by_key_with_limit/2 returns results with (limit*pool_size)", %{server: server, pool_size: pool_size} do + limit = Enum.random([1,2,3]) + topics = for i <- 1..2000, do: "topic_#{i}" + + for topic <- topics do + {:ok, _} = Tracker.track(server, self(), topic, "me", %{name: "me"}) + end + + for topic <- topics do + {:ok, _} = Tracker.track(server, self(), topic, "other", %{name: "me"}) + end + + res = Tracker.dirty_get_by_key_with_limit(server, "me", limit) + assert length(res) <= (limit*pool_size) + end + @tag pool_size: pool_size test "pool #{pool_size}: Untrack/4 results in all ids being untracked", %{server: server} do From 05e2d7a0f916a3387f1d12ec1cbd024f47deda8e Mon Sep 17 00:00:00 2001 From: Pranav Vadrevu Date: Tue, 12 Dec 2023 00:56:53 +0000 Subject: [PATCH 10/17] Use async_stream --- lib/phoenix/tracker.ex | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/lib/phoenix/tracker.ex b/lib/phoenix/tracker.ex index 3daca779..bb333a1c 100644 --- a/lib/phoenix/tracker.ex +++ b/lib/phoenix/tracker.ex @@ -258,10 +258,27 @@ defmodule Phoenix.Tracker do @spec dirty_get_by_key_with_limit(atom, term, integer()) :: [{topic, pid, meta :: map()}] def dirty_get_by_key_with_limit(tracker_name, key, limit) do 0..(pool_size(tracker_name) - 1) - |> Enum.flat_map(fn n -> - shard_name = Shard.name_for_number(tracker_name, n) - - Phoenix.Tracker.Shard.dirty_get_by_key_with_limit(shard_name, key, limit) + |> Task.async_stream( + fn n -> + shard_name = Shard.name_for_number(tracker_name, n) + + Phoenix.Tracker.Shard.dirty_get_by_key_with_limit(shard_name, key, limit) + end, + on_timeout: :kill_task, + zip_input_on_exit: true + ) + |> Enum.flat_map(fn + {:ok, presences} -> + presences + + {:exit, {shard_n, reason}} -> + Logger.warning("Failed to fetch presences by key", + key: key, + shard: shard_n, + error: inspect(reason) + ) + + [] end) end From a1923a5586d4221c2bdd42ddcdddd0d766e3b27f Mon Sep 17 00:00:00 2001 From: Pranav Vadrevu Date: Tue, 12 Dec 2023 01:19:36 +0000 Subject: [PATCH 11/17] Set write concurrency to true --- lib/phoenix/tracker/state.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/phoenix/tracker/state.ex b/lib/phoenix/tracker/state.ex index 5419bf84..a47cc640 100644 --- a/lib/phoenix/tracker/state.ex +++ b/lib/phoenix/tracker/state.ex @@ -57,7 +57,7 @@ defmodule Phoenix.Tracker.State do replica: replica, context: %{replica => 0}, mode: :normal, - values: :ets.new(shard_name, [:named_table, :protected, :ordered_set, read_concurrency: true]), + values: :ets.new(shard_name, [:named_table, :protected, :ordered_set, read_concurrency: true, write_concurrency: true]), pids: :ets.new(:pids, [:duplicate_bag]), replicas: %{replica => :up}}) end From 6bb9609c6bb370394d1e227583e1a6a31b37ab00 Mon Sep 17 00:00:00 2001 From: Pranav Vadrevu Date: Tue, 12 Dec 2023 02:07:20 +0000 Subject: [PATCH 12/17] Use write_concurrency auto --- lib/phoenix/tracker/state.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/phoenix/tracker/state.ex b/lib/phoenix/tracker/state.ex index a47cc640..10d27a06 100644 --- a/lib/phoenix/tracker/state.ex +++ b/lib/phoenix/tracker/state.ex @@ -57,7 +57,7 @@ defmodule Phoenix.Tracker.State do replica: replica, context: %{replica => 0}, mode: :normal, - values: :ets.new(shard_name, [:named_table, :protected, :ordered_set, read_concurrency: true, write_concurrency: true]), + values: :ets.new(shard_name, [:named_table, :protected, :ordered_set, read_concurrency: true, write_concurrency: :auto]), pids: :ets.new(:pids, [:duplicate_bag]), replicas: %{replica => :up}}) end From 1e6ac4d08ac4242272148f9d41fdadfba6ae6aaa Mon Sep 17 00:00:00 2001 From: Pranav Vadrevu Date: Tue, 12 Dec 2023 02:26:51 +0000 Subject: [PATCH 13/17] Go back to sync fetching of data --- lib/phoenix/tracker.ex | 25 ++++--------------------- 1 file changed, 4 insertions(+), 21 deletions(-) diff --git a/lib/phoenix/tracker.ex b/lib/phoenix/tracker.ex index bb333a1c..3daca779 100644 --- a/lib/phoenix/tracker.ex +++ b/lib/phoenix/tracker.ex @@ -258,27 +258,10 @@ defmodule Phoenix.Tracker do @spec dirty_get_by_key_with_limit(atom, term, integer()) :: [{topic, pid, meta :: map()}] def dirty_get_by_key_with_limit(tracker_name, key, limit) do 0..(pool_size(tracker_name) - 1) - |> Task.async_stream( - fn n -> - shard_name = Shard.name_for_number(tracker_name, n) - - Phoenix.Tracker.Shard.dirty_get_by_key_with_limit(shard_name, key, limit) - end, - on_timeout: :kill_task, - zip_input_on_exit: true - ) - |> Enum.flat_map(fn - {:ok, presences} -> - presences - - {:exit, {shard_n, reason}} -> - Logger.warning("Failed to fetch presences by key", - key: key, - shard: shard_n, - error: inspect(reason) - ) - - [] + |> Enum.flat_map(fn n -> + shard_name = Shard.name_for_number(tracker_name, n) + + Phoenix.Tracker.Shard.dirty_get_by_key_with_limit(shard_name, key, limit) end) end From c00eae2a5a6124c26a10fe3722d99528826c8b25 Mon Sep 17 00:00:00 2001 From: Pranav Vadrevu Date: Wed, 13 Dec 2023 12:31:21 +0000 Subject: [PATCH 14/17] write_concurrency: true --- lib/phoenix/tracker/state.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/phoenix/tracker/state.ex b/lib/phoenix/tracker/state.ex index 10d27a06..94a6fd49 100644 --- a/lib/phoenix/tracker/state.ex +++ b/lib/phoenix/tracker/state.ex @@ -57,7 +57,7 @@ defmodule Phoenix.Tracker.State do replica: replica, context: %{replica => 0}, mode: :normal, - values: :ets.new(shard_name, [:named_table, :protected, :ordered_set, read_concurrency: true, write_concurrency: :auto]), + values: :ets.new(shard_name, [:named_table, :protected, :ordered_set, read_concurrency: true, write_concurrency: :true]), pids: :ets.new(:pids, [:duplicate_bag]), replicas: %{replica => :up}}) end From 287141a3a99cf3e93ffde435c9249a7d3cbf12d7 Mon Sep 17 00:00:00 2001 From: Pranav Vadrevu Date: Wed, 13 Dec 2023 13:34:06 +0000 Subject: [PATCH 15/17] Remove both read and write concurrency --- lib/phoenix/tracker/state.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/phoenix/tracker/state.ex b/lib/phoenix/tracker/state.ex index 94a6fd49..32474305 100644 --- a/lib/phoenix/tracker/state.ex +++ b/lib/phoenix/tracker/state.ex @@ -57,7 +57,7 @@ defmodule Phoenix.Tracker.State do replica: replica, context: %{replica => 0}, mode: :normal, - values: :ets.new(shard_name, [:named_table, :protected, :ordered_set, read_concurrency: true, write_concurrency: :true]), + values: :ets.new(shard_name, [:named_table, :protected, :ordered_set]), pids: :ets.new(:pids, [:duplicate_bag]), replicas: %{replica => :up}}) end From cfed0c6873b79f62bc2b85ca4e10e3c6057445cf Mon Sep 17 00:00:00 2001 From: Pranav Vadrevu Date: Wed, 13 Dec 2023 16:18:55 +0000 Subject: [PATCH 16/17] Use just write_concurrency --- lib/phoenix/tracker/state.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/phoenix/tracker/state.ex b/lib/phoenix/tracker/state.ex index 32474305..75526f86 100644 --- a/lib/phoenix/tracker/state.ex +++ b/lib/phoenix/tracker/state.ex @@ -57,7 +57,7 @@ defmodule Phoenix.Tracker.State do replica: replica, context: %{replica => 0}, mode: :normal, - values: :ets.new(shard_name, [:named_table, :protected, :ordered_set]), + values: :ets.new(shard_name, [:named_table, :protected, :ordered_set, write_concurrency: :true]), pids: :ets.new(:pids, [:duplicate_bag]), replicas: %{replica => :up}}) end From a48eb0d5f3c01340ca6f599cdcd48dbb805d8b0f Mon Sep 17 00:00:00 2001 From: Pranav Vadrevu Date: Wed, 3 Jan 2024 16:35:07 +0100 Subject: [PATCH 17/17] Global limit (not limit per shard) + write_concurrency: true --- lib/phoenix/tracker.ex | 4 +++- test/phoenix/tracker/pool_test.exs | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/phoenix/tracker.ex b/lib/phoenix/tracker.ex index 3daca779..d276115b 100644 --- a/lib/phoenix/tracker.ex +++ b/lib/phoenix/tracker.ex @@ -257,12 +257,14 @@ defmodule Phoenix.Tracker do @spec dirty_get_by_key_with_limit(atom, term, integer()) :: [{topic, pid, meta :: map()}] def dirty_get_by_key_with_limit(tracker_name, key, limit) do + upper_bound_limit_per_shard = ceil(limit / pool_size(tracker_name)) 0..(pool_size(tracker_name) - 1) |> Enum.flat_map(fn n -> shard_name = Shard.name_for_number(tracker_name, n) - Phoenix.Tracker.Shard.dirty_get_by_key_with_limit(shard_name, key, limit) + Phoenix.Tracker.Shard.dirty_get_by_key_with_limit(shard_name, key, upper_bound_limit_per_shard) end) + |> Enum.take(limit) end @doc """ diff --git a/test/phoenix/tracker/pool_test.exs b/test/phoenix/tracker/pool_test.exs index eaf58a44..74715ae2 100644 --- a/test/phoenix/tracker/pool_test.exs +++ b/test/phoenix/tracker/pool_test.exs @@ -61,8 +61,8 @@ defmodule Phoenix.Tracker.PoolTest do end @tag pool_size: pool_size - test "pool #{pool_size}: dirty_get_by_key_with_limit/2 returns results with (limit*pool_size)", %{server: server, pool_size: pool_size} do - limit = Enum.random([1,2,3]) + test "pool #{pool_size}: dirty_get_by_key_with_limit/2 returns at most limit num results", %{server: server} do + limit = Enum.random([1, 2, 5, 1000]) topics = for i <- 1..2000, do: "topic_#{i}" for topic <- topics do @@ -74,7 +74,7 @@ defmodule Phoenix.Tracker.PoolTest do end res = Tracker.dirty_get_by_key_with_limit(server, "me", limit) - assert length(res) <= (limit*pool_size) + assert length(res) <= limit end @tag pool_size: pool_size