Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Get By Key] get by key with limits #180

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions lib/phoenix/pubsub/pg2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,16 @@ defmodule Phoenix.PubSub.PG2 do
name = Keyword.fetch!(opts, :name)
pool_size = Keyword.get(opts, :pool_size, 1)
adapter_name = Keyword.fetch!(opts, :adapter_name)
Supervisor.start_link(__MODULE__, {name, adapter_name, pool_size}, name: :"#{adapter_name}_supervisor")
Supervisor.start_link(__MODULE__, {name, adapter_name, pool_size}, name: adapter_name)
end

@impl true
def init({name, adapter_name, pool_size}) do
[_ | groups] =
groups =
for number <- 1..pool_size do
:"#{adapter_name}_#{number}"
end

# Use `adapter_name` for the first in the pool for backwards compatability
# with v2.0 when the pool_size is 1.
groups = [adapter_name | groups]

:persistent_term.put(adapter_name, List.to_tuple(groups))

children =
Expand Down
47 changes: 47 additions & 0 deletions lib/phoenix/tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,13 @@ defmodule Phoenix.Tracker do
|> Phoenix.Tracker.Shard.list(topic)
end

@spec dirty_list(atom, topic) :: [presence]
def dirty_list(tracker_name, topic) do
tracker_name
|> Shard.name_for_topic(topic, pool_size(tracker_name))
|> Phoenix.Tracker.Shard.dirty_list(topic)
end

@doc """
Gets presences tracked under a given topic and key pair.

Expand All @@ -231,6 +238,35 @@ defmodule Phoenix.Tracker do
|> Phoenix.Tracker.Shard.get_by_key(topic, key)
end

@spec dirty_get_by_key(atom, topic, term) :: [presence]
def dirty_get_by_key(tracker_name, topic, key) do
tracker_name
|> Shard.name_for_topic(topic, pool_size(tracker_name))
|> Phoenix.Tracker.Shard.dirty_get_by_key(topic, key)
end

@spec dirty_get_by_key(atom, term) :: [{topic, pid, meta :: map()}]
def dirty_get_by_key(tracker_name, key) do
0..(pool_size(tracker_name) - 1)
|> Enum.flat_map(fn n ->
shard_name = Shard.name_for_number(tracker_name, n)

Phoenix.Tracker.Shard.dirty_get_by_key(shard_name, key)
end)
end

@spec dirty_get_by_key_with_limit(atom, term, integer()) :: [{topic, pid, meta :: map()}]
def dirty_get_by_key_with_limit(tracker_name, key, limit) do
upper_bound_limit_per_shard = ceil(limit / pool_size(tracker_name))
0..(pool_size(tracker_name) - 1)
|> Enum.flat_map(fn n ->
shard_name = Shard.name_for_number(tracker_name, n)

Phoenix.Tracker.Shard.dirty_get_by_key_with_limit(shard_name, key, upper_bound_limit_per_shard)
end)
|> Enum.take(limit)
end

@doc """
Gracefully shuts down by broadcasting permdown to all replicas.

Expand All @@ -245,6 +281,17 @@ defmodule Phoenix.Tracker do
Supervisor.stop(tracker_name)
end

@doc false
@spec size(atom) :: non_neg_integer
def size(tracker_name) do
0..(pool_size(tracker_name) - 1)
|> Enum.reduce(0, fn n, acc ->
shard_name = Shard.name_for_number(tracker_name, n)

Phoenix.Tracker.Shard.size(shard_name) + acc
end)
end

@doc """
Starts a tracker pool.

Expand Down
20 changes: 20 additions & 0 deletions lib/phoenix/tracker/shard.ex
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,31 @@ defmodule Phoenix.Tracker.Shard do
|> State.get_by_key(topic, key)
end

@spec dirty_get_by_key(atom, topic, term) :: [presence]
def dirty_get_by_key(shard_name, topic, key) do
State.tracked_key(shard_name, topic, key, [])
end

@spec dirty_get_by_key(atom, term) :: [presence]
def dirty_get_by_key(shard_name, key) do
State.tracked_key(shard_name, key, [])
end

@spec dirty_get_by_key_with_limit(atom, term, integer()) :: [presence]
def dirty_get_by_key_with_limit(shard_name, key, limit) do
State.tracked_key_with_limit(shard_name, key, [], limit)
end

@spec graceful_permdown(pid) :: :ok
def graceful_permdown(server_pid) do
GenServer.call(server_pid, :graceful_permdown)
end

@spec size(atom) :: non_neg_integer
def size(shard_name) do
State.size(shard_name)
end

## Server

def start_link(tracker, tracker_opts, pool_opts) do
Expand Down
38 changes: 37 additions & 1 deletion lib/phoenix/tracker/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,16 @@ defmodule Phoenix.Tracker.State do
replica: replica,
context: %{replica => 0},
mode: :normal,
values: :ets.new(shard_name, [:named_table, :protected, :ordered_set]),
values: :ets.new(shard_name, [:named_table, :protected, :ordered_set, write_concurrency: :true]),
pids: :ets.new(:pids, [:duplicate_bag]),
replicas: %{replica => :up}})
end

@spec size(atom) :: non_neg_integer
def size(table) do
:ets.info(table, :size)
end

@doc """
Returns the causal context for the set.
"""
Expand Down Expand Up @@ -161,6 +166,37 @@ defmodule Phoenix.Tracker.State do
[{{:"$1", :"$2"}}]}])
end

def tracked_key(table, key, down_replicas) do
:ets.select(
table,
[
{
{{:"$1", :"$2", key}, :"$3", {:"$4", :_}},
not_in(:"$4", down_replicas),
[{{:"$1", :"$2", :"$3"}}]
}
]
)
end

def tracked_key_with_limit(table, key, down_replicas, limit) do
:ets.select(
table,
[
{
{{:"$1", :"$2", key}, :"$3", {:"$4", :_}},
not_in(:"$4", down_replicas),
[{{:"$1", :"$2", :"$3"}}]
}
],
limit
)
|> case do
{lst, _} -> lst
_ -> []
end
end

defp not_in(_pos, []), do: []
defp not_in(pos, replicas), do: [not: ors(pos, replicas)]
defp ors(pos, [rep]), do: {:"=:=", pos, {rep}}
Expand Down
57 changes: 55 additions & 2 deletions test/phoenix/tracker/pool_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,51 @@ defmodule Phoenix.Tracker.PoolTest do
end

@tag pool_size: pool_size
test "pool #{pool_size}: Untrack/4 results in all ids being untracked",
%{server: server} do
test "pool #{pool_size}: dirty_get_by_key/2 returns presences from all shards", %{server: server} do
topics = for i <- 1..100, do: "topic_#{i}"

refs =
for topic <- topics do
{:ok, ref} = Tracker.track(server, self(), topic, "me", %{name: "me"})
ref
end

for topic <- topics do
{:ok, _} = Tracker.track(server, self(), topic, "other", %{name: "me"})
end

by_key = Tracker.dirty_get_by_key(server, "me")
assert length(by_key) == 100

topics_and_refs =
for {topic, pid, %{name: "me", phx_ref: ref}} <- by_key do
assert pid == self()
{topic, ref}
end

assert Enum.sort(topics_and_refs) == Enum.sort(List.zip([topics, refs]))
end

@tag pool_size: pool_size
test "pool #{pool_size}: dirty_get_by_key_with_limit/2 returns at most limit num results", %{server: server} do
limit = Enum.random([1, 2, 5, 1000])
topics = for i <- 1..2000, do: "topic_#{i}"

for topic <- topics do
{:ok, _} = Tracker.track(server, self(), topic, "me", %{name: "me"})
end

for topic <- topics do
{:ok, _} = Tracker.track(server, self(), topic, "other", %{name: "me"})
end

res = Tracker.dirty_get_by_key_with_limit(server, "me", limit)
assert length(res) <= limit
end

@tag pool_size: pool_size
test "pool #{pool_size}: Untrack/4 results in all ids being untracked",
%{server: server} do
topics = for i <- 1..100, do: "topic_#{i}"
for t <- topics do
{:ok, _ref} = Tracker.track(server, self(), t, "me", %{a: "b"})
Expand Down Expand Up @@ -143,5 +185,16 @@ defmodule Phoenix.Tracker.PoolTest do

for t <- topics, do: assert Tracker.list(server, t) == []
end

@tag pool_size: pool_size
test "pool #{pool_size}: count/1 returns number of entries across all shards",
%{server: server} do
topics = for i <- 1..100, do: "topic_#{i}"
for t <- topics do
{:ok, _ref} = Tracker.track(server, self(), t, "me", %{a: "b"})
end

assert Tracker.size(server) == 100
end
end
end
6 changes: 6 additions & 0 deletions test/phoenix/tracker/shard_replication_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ defmodule Phoenix.Tracker.ShardReplicationTest do
assert %{@node1 => %Replica{status: :up}} = replicas(shard)
assert [{"local1", _}, {"node1", _}] = list(shard, topic)
assert [{"local1", _}, {"node1", _}] = dirty_list(shard, topic)
assert [_] = Shard.dirty_get_by_key(shard, topic, "local1")
assert [_] = Shard.dirty_get_by_key(shard, topic, "node1")

# nodedown
Process.unlink(node_pid)
Expand All @@ -252,9 +254,13 @@ defmodule Phoenix.Tracker.ShardReplicationTest do
assert %{@node1 => %Replica{status: :down}} = replicas(shard)
assert [{"local1", _}] = list(shard, topic)
assert [{"local1", _}, {"node1", _}] = dirty_list(shard, topic)
assert [_] = Shard.dirty_get_by_key(shard, topic, "local1")
assert [_] = Shard.dirty_get_by_key(shard, topic, "node1")

:timer.sleep(@permdown + 2*@heartbeat)
assert [{"local1", _}] = dirty_list(shard, topic)
assert [_] = Shard.dirty_get_by_key(shard, topic, "local1")
assert [] = Shard.dirty_get_by_key(shard, topic, "node1")
end


Expand Down
41 changes: 41 additions & 0 deletions test/phoenix/tracker/shard_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,45 @@ defmodule Phoenix.Tracker.ShardTest do
assert Phoenix.Tracker.Shard.init([nil, nil, opts]) ==
{:error, "permdown_period must be at least larger than the down_period"}
end

defmodule TestTracker do
use Phoenix.Tracker
def init(state), do: {:ok, state}
def handle_diff(_diff, state), do: {:ok, state}
end

describe "size/1" do
test "returns 0 when there are no entries in the shard" do
tracker = TestTracker
name = :"#{inspect(make_ref())}"
shard_name = Phoenix.Tracker.Shard.name_for_number(name, 1)
given_pubsub(name)
opts = [pubsub_server: name, name: name, shard_number: 1]
{:ok, _pid} = Phoenix.Tracker.Shard.start_link(tracker, %{}, opts)

assert Phoenix.Tracker.Shard.size(shard_name) == 0
end

test "returns number of tracked entries in the shard" do
tracker = TestTracker
name = :"#{inspect(make_ref())}"
shard_name = Phoenix.Tracker.Shard.name_for_number(name, 1)
given_pubsub(name)
opts = [pubsub_server: name, name: name, shard_number: 1]
{:ok, pid} = Phoenix.Tracker.Shard.start_link(tracker, %{}, opts)

for i <- 1..100 do
Phoenix.Tracker.Shard.track(pid, self(), "topic", "user#{i}", %{})
end

assert Phoenix.Tracker.Shard.size(shard_name) == 100
end

defp given_pubsub(name) do
size = 1
{adapter, adapter_opts} = Application.get_env(:phoenix_pubsub, :test_adapter)
adapter_opts = [adapter: adapter, name: name, pool_size: size] ++ adapter_opts
start_supervised!({Phoenix.PubSub, adapter_opts})
end
end
end
32 changes: 32 additions & 0 deletions test/phoenix/tracker/state_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -409,4 +409,36 @@ defmodule Phoenix.Tracker.StateTest do
end)
end)
end

describe "size/1" do
test "returns 0 for empty state" do
shard_name = :shard
State.new(:s1, shard_name)
assert State.size(shard_name) == 0
end

test "returns size of the values tables" do
shard_name = :shard
state = State.new(:s1, shard_name)

Enum.reduce(1..100, state, fn i, acc ->
State.join(acc, self(), "lobby", "user#{i}", %{})
end)

assert State.size(shard_name) == 100
end

test "leaves are accounted for in the returned size" do
shard_name = :shard
state = State.new(:s1, shard_name)

state = Enum.reduce(1..100, state, fn i, acc ->
State.join(acc, self(), "topic#{i}", "user#{i}", %{})
end)

State.leave(state, self(), "topic1", "user1")

assert State.size(shard_name) == 99
end
end
end