diff --git a/lib/nebulex/adapters/local.ex b/lib/nebulex/adapters/local.ex index 2a36fff5..0d8023cc 100644 --- a/lib/nebulex/adapters/local.ex +++ b/lib/nebulex/adapters/local.ex @@ -309,6 +309,7 @@ defmodule Nebulex.Adapters.Local do # Supported Backends @backends ~w(ets shards)a + # Inline common instructions @compile {:inline, list_gen: 1, newer_gen: 1} ## Nebulex.Adapter @@ -319,22 +320,41 @@ defmodule Nebulex.Adapters.Local do @doc """ A convenience function for creating new generations. """ - defdelegate new_generation(name_or_pid \\ __MODULE__, opts \\ []), to: Generation, as: :new + defdelegate new_generation(name_or_pid \\ __MODULE__, opts \\ []), + to: Generation, + as: :new + + @doc """ + A convenience function for reset the GC timer. + """ + defdelegate reset_generation_timer(name_or_pid \\ __MODULE__), + to: Generation, + as: :reset_timer @doc """ A convenience function for retrieving the current generations. """ - defdelegate generations(name_or_pid \\ __MODULE__), to: Generation, as: :list + defdelegate generations(name_or_pid \\ __MODULE__), + to: Generation, + as: :list @doc """ A convenience function for retrieving the newer generation. """ - defdelegate newer_generation(name_or_pid \\ __MODULE__), to: Generation, as: :newer + defdelegate newer_generation(name_or_pid \\ __MODULE__), + to: Generation, + as: :newer end end @impl true def init(opts) do + # init internal metadata table + meta_tab = opts[:meta_tab] || Metadata.init() + + # init stats_counter + stats_counter = opts[:stats_counter] || Stats.init(opts) + # resolve the backend to be used backend = opts @@ -348,22 +368,19 @@ defmodule Nebulex.Adapters.Local do "#{inspect(@backends)}, got: #{inspect(val)}" end - # init internal metadata table - meta_tab = opts[:meta_tab] || Metadata.init() - - child = + child_spec = Backend.child_spec( backend, - [backend: backend, meta_tab: meta_tab] ++ opts + [backend: backend, stats_counter: stats_counter, meta_tab: meta_tab] ++ opts ) meta = %{ - backend: backend, meta_tab: meta_tab, - stats_counter: opts[:stats_counter] || Stats.init(opts) + stats_counter: stats_counter, + backend: backend } - {:ok, child, meta} + {:ok, child_spec, meta} end ## Nebulex.Adapter.Entry diff --git a/lib/nebulex/adapters/local/generation.ex b/lib/nebulex/adapters/local/generation.ex index 80ef6322..8a855730 100644 --- a/lib/nebulex/adapters/local/generation.ex +++ b/lib/nebulex/adapters/local/generation.ex @@ -50,6 +50,7 @@ defmodule Nebulex.Adapters.Local.Generation do :meta_tab, :backend, :backend_opts, + :stats_counter, :gc_interval, :gc_heartbeat_ref, :max_size, @@ -64,14 +65,13 @@ defmodule Nebulex.Adapters.Local.Generation do import Nebulex.Helpers alias Nebulex.Adapter + alias Nebulex.Adapter.Stats alias Nebulex.Adapters.Local alias Nebulex.Adapters.Local.{Backend, Metadata} @type t :: :ets.tid() @type server_ref :: pid | atom | :ets.tid() - @compile {:inline, server: 1, list: 1, newer: 1} - ## API @doc """ @@ -94,7 +94,9 @@ defmodule Nebulex.Adapters.Local.Generation do ## Example - Nebulex.Adapters.Local.Generation.new(MyCache, reset_timer: :false) + Nebulex.Adapters.Local.Generation.new(MyCache) + + Nebulex.Adapters.Local.Generation.new(MyCache, reset_timer: false) """ @spec new(server_ref, Nebulex.Cache.opts()) :: [atom] def new(server_ref, opts \\ []) do @@ -139,6 +141,19 @@ defmodule Nebulex.Adapters.Local.Generation do do_call(server_ref, :memory_info) end + @doc """ + Resets the timer for pushing new cache generations. + + ## Example + + Nebulex.Adapters.Local.Generation.reset_timer(MyCache) + """ + def reset_timer(server_ref) do + server_ref + |> server() + |> GenServer.cast(:reset_timer) + end + @doc """ Returns the list of the generations in the form `[newer, older]`. @@ -190,6 +205,12 @@ defmodule Nebulex.Adapters.Local.Generation do defp get_meta_tab(server_ref), do: server_ref + defp do_call(tab, message) do + tab + |> server() + |> GenServer.call(message) + end + ## GenServer Callbacks @impl true @@ -216,6 +237,7 @@ defmodule Nebulex.Adapters.Local.Generation do meta_tab: meta_tab, backend: backend, backend_opts: backend_opts, + stats_counter: opts[:stats_counter], gc_interval: gc_interval, max_size: max_size, allocated_memory: allocated_memory, @@ -234,7 +256,7 @@ defmodule Nebulex.Adapters.Local.Generation do end @impl true - def handle_call({:new_generation, opts}, _from, %__MODULE__{} = state) do + def handle_call({:new_generation, opts}, _from, state) do :ok = new_gen(state) ref = @@ -257,7 +279,7 @@ defmodule Nebulex.Adapters.Local.Generation do {:reply, size, state} end - def handle_call({:realloc, mem_size}, _from, %__MODULE__{} = state) do + def handle_call({:realloc, mem_size}, _from, state) do {:reply, :ok, %{state | allocated_memory: mem_size}} end @@ -269,6 +291,11 @@ defmodule Nebulex.Adapters.Local.Generation do {:reply, {memory_info(backend, meta_tab), allocated}, state} end + @impl true + def handle_cast(:reset_timer, state) do + {:noreply, %{state | gc_heartbeat_ref: maybe_reset_timer(true, state)}} + end + @impl true def handle_info(:heartbeat, %__MODULE__{gc_interval: time, gc_heartbeat_ref: ref} = state) do :ok = new_gen(state) @@ -358,26 +385,33 @@ defmodule Nebulex.Adapters.Local.Generation do ## Private Functions - defp do_call(tab, message) do - tab - |> server() - |> GenServer.call(message) - end - - defp new_gen(%__MODULE__{meta_tab: meta_tab, backend: backend, backend_opts: backend_opts}) do + defp new_gen(%__MODULE__{ + meta_tab: meta_tab, + backend: backend, + backend_opts: backend_opts, + stats_counter: stats_counter + }) do # create new generation gen_tab = Backend.new(backend, meta_tab, backend_opts) # update generation list case list(meta_tab) do [newer, older] -> + # since the older generation is deleted, update evictions count + :ok = Stats.incr(stats_counter, :evictions, backend.info(older, :size)) + + # delete older generation _ = Backend.delete(backend, meta_tab, older) + + # update generations Metadata.put(meta_tab, :generations, [gen_tab, newer]) [newer] -> + # update generations Metadata.put(meta_tab, :generations, [gen_tab, newer]) [] -> + # update generations Metadata.put(meta_tab, :generations, [gen_tab]) end end diff --git a/lib/nebulex/adapters/replicated.ex b/lib/nebulex/adapters/replicated.ex index b72b1099..2f4a5318 100644 --- a/lib/nebulex/adapters/replicated.ex +++ b/lib/nebulex/adapters/replicated.ex @@ -476,6 +476,7 @@ defmodule Nebulex.Adapters.Replicated.Bootstrap do {name, self()}, fn -> nodes + |> maybe_push_generation(cache) |> Enum.reduce_while([], &stream_entries(meta, &1, &2)) |> Enum.each(&cache.__primary__.put(&1.key, &1.value, ttl: Entry.ttl(&1))) end, @@ -484,6 +485,20 @@ defmodule Nebulex.Adapters.Replicated.Bootstrap do end end + defp maybe_push_generation(nodes, cache) do + # This is to ensure the GC timer is in-sync (making the gap as short as + # possible) among the cache nodes when using the built-in local adapter + # as primary store. This will make the GC on all cluster nodes run almost + # at the same time. + :ok = + if cache.__primary__.__adapter__() == Nebulex.Adapters.Local do + {_, []} = :rpc.multicall(nodes, cache.__primary__, :reset_generation_timer, []) + :ok + end + + nodes + end + defp stream_entries(meta, node, acc) do # FIXME: this is because coveralls does not check this as covered # coveralls-ignore-start diff --git a/lib/nebulex/cache/cluster.ex b/lib/nebulex/cache/cluster.ex index 76f4eacd..f3f90382 100644 --- a/lib/nebulex/cache/cluster.ex +++ b/lib/nebulex/cache/cluster.ex @@ -3,8 +3,6 @@ defmodule Nebulex.Cache.Cluster do # distributed caching functionality. @moduledoc false - @compile {:inline, get_nodes: 1, get_node: 3} - @doc """ Joins the node where the cache `name`'s supervisor process is running to the `name`'s node group. @@ -65,6 +63,7 @@ defmodule Nebulex.Cache.Cluster do :pg.get_members(__MODULE__, name) end else + # Inline common instructions @compile {:inline, pg2_namespace: 1} defp pg_join(name, pid) do diff --git a/lib/nebulex/rpc.ex b/lib/nebulex/rpc.ex index 11b3d48a..a73b4282 100644 --- a/lib/nebulex/rpc.ex +++ b/lib/nebulex/rpc.ex @@ -26,9 +26,6 @@ defmodule Nebulex.RPC do @typedoc "Reducer spec" @type reducer :: {acc :: term, reducer_fun} - # Inline common instructions - @compile {:inline, rpc_call: 6, rpc_multi_call: 3, rpc_multi_call: 6} - ## API @doc """ diff --git a/test/nebulex/adapters/local/generation_test.exs b/test/nebulex/adapters/local/generation_test.exs index 945191e7..965a610c 100644 --- a/test/nebulex/adapters/local/generation_test.exs +++ b/test/nebulex/adapters/local/generation_test.exs @@ -59,6 +59,19 @@ defmodule Nebulex.Adapters.Local.GenerationTest do :ok = Process.sleep(500) assert generations_len(name) == 2 end + + test "reset timer", %{cache: cache, name: name} do + assert generations_len(name) == 1 + + :ok = Process.sleep(800) + :ok = cache.reset_generation_timer(name) + + :ok = Process.sleep(220) + assert generations_len(name) == 1 + + :ok = Process.sleep(1000) + assert generations_len(name) == 2 + end end describe "allocated memory" do diff --git a/test/nebulex/cache/stats_test.exs b/test/nebulex/cache/stats_test.exs index 4489659c..ffe3462d 100644 --- a/test/nebulex/cache/stats_test.exs +++ b/test/nebulex/cache/stats_test.exs @@ -134,6 +134,29 @@ defmodule Nebulex.Cache.StatsTest do end end + describe "new generation" do + alias Cache.L1 + + setup_with_cache(Cache, [stats: true] ++ @config) + + test "updates evictions" do + :ok = Cache.put_all(a: 1, b: 2, c: 3) + assert Cache.size() == 9 + + assert stats = Cache.stats_info() + assert stats.writes == 9 + assert stats.evictions == 0 + + _ = L1.new_generation() + assert Cache.size() == 9 + assert Cache.stats_info(:evictions) == 0 + + _ = L1.new_generation() + assert Cache.size() == 6 + assert Cache.stats_info(:evictions) == 3 + end + end + describe "disabled stats" do setup_with_cache(Cache, @config)