Skip to content

Commit

Permalink
[#98] Update evictions when a new generation is created and the older…
Browse files Browse the repository at this point in the history
… is deleted
  • Loading branch information
cabol committed Jan 4, 2021
1 parent 2b87aae commit d275a65
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 28 deletions.
39 changes: 28 additions & 11 deletions lib/nebulex/adapters/local.ex
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ defmodule Nebulex.Adapters.Local do
# Supported Backends
@backends ~w(ets shards)a

# Inline common instructions
@compile {:inline, list_gen: 1, newer_gen: 1}

## Nebulex.Adapter
Expand All @@ -319,22 +320,41 @@ defmodule Nebulex.Adapters.Local do
@doc """
A convenience function for creating new generations.
"""
defdelegate new_generation(name_or_pid \\ __MODULE__, opts \\ []), to: Generation, as: :new
defdelegate new_generation(name_or_pid \\ __MODULE__, opts \\ []),
to: Generation,
as: :new

@doc """
A convenience function for reset the GC timer.
"""
defdelegate reset_generation_timer(name_or_pid \\ __MODULE__),
to: Generation,
as: :reset_timer

@doc """
A convenience function for retrieving the current generations.
"""
defdelegate generations(name_or_pid \\ __MODULE__), to: Generation, as: :list
defdelegate generations(name_or_pid \\ __MODULE__),
to: Generation,
as: :list

@doc """
A convenience function for retrieving the newer generation.
"""
defdelegate newer_generation(name_or_pid \\ __MODULE__), to: Generation, as: :newer
defdelegate newer_generation(name_or_pid \\ __MODULE__),
to: Generation,
as: :newer
end
end

@impl true
def init(opts) do
# init internal metadata table
meta_tab = opts[:meta_tab] || Metadata.init()

# init stats_counter
stats_counter = opts[:stats_counter] || Stats.init(opts)

# resolve the backend to be used
backend =
opts
Expand All @@ -348,22 +368,19 @@ defmodule Nebulex.Adapters.Local do
"#{inspect(@backends)}, got: #{inspect(val)}"
end

# init internal metadata table
meta_tab = opts[:meta_tab] || Metadata.init()

child =
child_spec =
Backend.child_spec(
backend,
[backend: backend, meta_tab: meta_tab] ++ opts
[backend: backend, stats_counter: stats_counter, meta_tab: meta_tab] ++ opts
)

meta = %{
backend: backend,
meta_tab: meta_tab,
stats_counter: opts[:stats_counter] || Stats.init(opts)
stats_counter: stats_counter,
backend: backend
}

{:ok, child, meta}
{:ok, child_spec, meta}
end

## Nebulex.Adapter.Entry
Expand Down
58 changes: 46 additions & 12 deletions lib/nebulex/adapters/local/generation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ defmodule Nebulex.Adapters.Local.Generation do
:meta_tab,
:backend,
:backend_opts,
:stats_counter,
:gc_interval,
:gc_heartbeat_ref,
:max_size,
Expand All @@ -64,14 +65,13 @@ defmodule Nebulex.Adapters.Local.Generation do
import Nebulex.Helpers

alias Nebulex.Adapter
alias Nebulex.Adapter.Stats
alias Nebulex.Adapters.Local
alias Nebulex.Adapters.Local.{Backend, Metadata}

@type t :: :ets.tid()
@type server_ref :: pid | atom | :ets.tid()

@compile {:inline, server: 1, list: 1, newer: 1}

## API

@doc """
Expand All @@ -94,7 +94,9 @@ defmodule Nebulex.Adapters.Local.Generation do
## Example
Nebulex.Adapters.Local.Generation.new(MyCache, reset_timer: :false)
Nebulex.Adapters.Local.Generation.new(MyCache)
Nebulex.Adapters.Local.Generation.new(MyCache, reset_timer: false)
"""
@spec new(server_ref, Nebulex.Cache.opts()) :: [atom]
def new(server_ref, opts \\ []) do
Expand Down Expand Up @@ -139,6 +141,19 @@ defmodule Nebulex.Adapters.Local.Generation do
do_call(server_ref, :memory_info)
end

@doc """
Resets the timer for pushing new cache generations.
## Example
Nebulex.Adapters.Local.Generation.reset_timer(MyCache)
"""
def reset_timer(server_ref) do
server_ref
|> server()
|> GenServer.cast(:reset_timer)
end

@doc """
Returns the list of the generations in the form `[newer, older]`.
Expand Down Expand Up @@ -190,6 +205,12 @@ defmodule Nebulex.Adapters.Local.Generation do

defp get_meta_tab(server_ref), do: server_ref

defp do_call(tab, message) do
tab
|> server()
|> GenServer.call(message)
end

## GenServer Callbacks

@impl true
Expand All @@ -216,6 +237,7 @@ defmodule Nebulex.Adapters.Local.Generation do
meta_tab: meta_tab,
backend: backend,
backend_opts: backend_opts,
stats_counter: opts[:stats_counter],
gc_interval: gc_interval,
max_size: max_size,
allocated_memory: allocated_memory,
Expand All @@ -234,7 +256,7 @@ defmodule Nebulex.Adapters.Local.Generation do
end

@impl true
def handle_call({:new_generation, opts}, _from, %__MODULE__{} = state) do
def handle_call({:new_generation, opts}, _from, state) do
:ok = new_gen(state)

ref =
Expand All @@ -257,7 +279,7 @@ defmodule Nebulex.Adapters.Local.Generation do
{:reply, size, state}
end

def handle_call({:realloc, mem_size}, _from, %__MODULE__{} = state) do
def handle_call({:realloc, mem_size}, _from, state) do
{:reply, :ok, %{state | allocated_memory: mem_size}}
end

Expand All @@ -269,6 +291,11 @@ defmodule Nebulex.Adapters.Local.Generation do
{:reply, {memory_info(backend, meta_tab), allocated}, state}
end

@impl true
def handle_cast(:reset_timer, state) do
{:noreply, %{state | gc_heartbeat_ref: maybe_reset_timer(true, state)}}
end

@impl true
def handle_info(:heartbeat, %__MODULE__{gc_interval: time, gc_heartbeat_ref: ref} = state) do
:ok = new_gen(state)
Expand Down Expand Up @@ -358,26 +385,33 @@ defmodule Nebulex.Adapters.Local.Generation do

## Private Functions

defp do_call(tab, message) do
tab
|> server()
|> GenServer.call(message)
end

defp new_gen(%__MODULE__{meta_tab: meta_tab, backend: backend, backend_opts: backend_opts}) do
defp new_gen(%__MODULE__{
meta_tab: meta_tab,
backend: backend,
backend_opts: backend_opts,
stats_counter: stats_counter
}) do
# create new generation
gen_tab = Backend.new(backend, meta_tab, backend_opts)

# update generation list
case list(meta_tab) do
[newer, older] ->
# since the older generation is deleted, update evictions count
:ok = Stats.incr(stats_counter, :evictions, backend.info(older, :size))

# delete older generation
_ = Backend.delete(backend, meta_tab, older)

# update generations
Metadata.put(meta_tab, :generations, [gen_tab, newer])

[newer] ->
# update generations
Metadata.put(meta_tab, :generations, [gen_tab, newer])

[] ->
# update generations
Metadata.put(meta_tab, :generations, [gen_tab])
end
end
Expand Down
15 changes: 15 additions & 0 deletions lib/nebulex/adapters/replicated.ex
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ defmodule Nebulex.Adapters.Replicated.Bootstrap do
{name, self()},
fn ->
nodes
|> maybe_push_generation(cache)
|> Enum.reduce_while([], &stream_entries(meta, &1, &2))
|> Enum.each(&cache.__primary__.put(&1.key, &1.value, ttl: Entry.ttl(&1)))
end,
Expand All @@ -484,6 +485,20 @@ defmodule Nebulex.Adapters.Replicated.Bootstrap do
end
end

defp maybe_push_generation(nodes, cache) do
# This is to ensure the GC timer is in-sync (making the gap as short as
# possible) among the cache nodes when using the built-in local adapter
# as primary store. This will make the GC on all cluster nodes run almost
# at the same time.
:ok =
if cache.__primary__.__adapter__() == Nebulex.Adapters.Local do
{_, []} = :rpc.multicall(nodes, cache.__primary__, :reset_generation_timer, [])
:ok
end

nodes
end

defp stream_entries(meta, node, acc) do
# FIXME: this is because coveralls does not check this as covered
# coveralls-ignore-start
Expand Down
3 changes: 1 addition & 2 deletions lib/nebulex/cache/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ defmodule Nebulex.Cache.Cluster do
# distributed caching functionality.
@moduledoc false

@compile {:inline, get_nodes: 1, get_node: 3}

@doc """
Joins the node where the cache `name`'s supervisor process is running to the
`name`'s node group.
Expand Down Expand Up @@ -65,6 +63,7 @@ defmodule Nebulex.Cache.Cluster do
:pg.get_members(__MODULE__, name)
end
else
# Inline common instructions
@compile {:inline, pg2_namespace: 1}

defp pg_join(name, pid) do
Expand Down
3 changes: 0 additions & 3 deletions lib/nebulex/rpc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ defmodule Nebulex.RPC do
@typedoc "Reducer spec"
@type reducer :: {acc :: term, reducer_fun}

# Inline common instructions
@compile {:inline, rpc_call: 6, rpc_multi_call: 3, rpc_multi_call: 6}

## API

@doc """
Expand Down
13 changes: 13 additions & 0 deletions test/nebulex/adapters/local/generation_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ defmodule Nebulex.Adapters.Local.GenerationTest do
:ok = Process.sleep(500)
assert generations_len(name) == 2
end

test "reset timer", %{cache: cache, name: name} do
assert generations_len(name) == 1

:ok = Process.sleep(800)
:ok = cache.reset_generation_timer(name)

:ok = Process.sleep(220)
assert generations_len(name) == 1

:ok = Process.sleep(1000)
assert generations_len(name) == 2
end
end

describe "allocated memory" do
Expand Down
23 changes: 23 additions & 0 deletions test/nebulex/cache/stats_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,29 @@ defmodule Nebulex.Cache.StatsTest do
end
end

describe "new generation" do
alias Cache.L1

setup_with_cache(Cache, [stats: true] ++ @config)

test "updates evictions" do
:ok = Cache.put_all(a: 1, b: 2, c: 3)
assert Cache.size() == 9

assert stats = Cache.stats_info()
assert stats.writes == 9
assert stats.evictions == 0

_ = L1.new_generation()
assert Cache.size() == 9
assert Cache.stats_info(:evictions) == 0

_ = L1.new_generation()
assert Cache.size() == 6
assert Cache.stats_info(:evictions) == 3
end
end

describe "disabled stats" do
setup_with_cache(Cache, @config)

Expand Down

0 comments on commit d275a65

Please sign in to comment.