diff --git a/lib/elixometer.ex b/lib/elixometer.ex index fbbda9e..62c53f9 100644 --- a/lib/elixometer.ex +++ b/lib/elixometer.ex @@ -60,9 +60,6 @@ defmodule Elixometer do end end - @elixometer_table :elixometer - use GenServer - defmodule Config do defstruct table_name: nil, counters: Map.new end @@ -71,6 +68,12 @@ defmodule Elixometer do defstruct method_name: nil, key: nil, units: :micros, args: nil, guards: nil, body: nil end + @elixometer_table :elixometer + alias Elixometer.Updater + import Elixometer.Utils + use GenServer + + defmacro __using__(_mod) do quote do @@ -156,6 +159,7 @@ defmodule Elixometer do def init(:ok) do table_name = :ets.new(@elixometer_table, [:set, :named_table, read_concurrency: true]) :timer.send_interval(250, :tick) + {:ok, %Config{table_name: table_name}} end @@ -180,38 +184,12 @@ defmodule Elixometer do end end - @doc """ - Register a metric. - If the metric name does not exist, then the block of code passed in is executed - and the metric is defined and subscribed to. - """ - defmacro register_metric_once(name, do: block) do - quote do - try do - ensure_metric_defined(unquote(name), - fn() -> - unquote(block) - end) - subscribe(unquote(name)) - - rescue - e in ErlangError -> e - end - end - end - @doc """ Updates a histogram with a new value. If the metric doesn't exist, a new metric is created and subscribed to. """ def update_histogram(name, delta, aggregate_seconds\\60) when is_bitstring(name) do - monitor = name_to_exometer(:histograms, name) - - register_metric_once(monitor) do - :exometer.new(monitor, :histogram, [time_span: :timer.seconds(aggregate_seconds)]) - end - - :exometer.update(monitor, delta) + Updater.histogram(name, delta, aggregate_seconds) end @doc """ @@ -220,12 +198,7 @@ defmodule Elixometer do maintaining QPS stats. """ def update_spiral(name, delta, opts \\ [time_span: :timer.seconds(60), slot_period: 1000]) do - monitor = name_to_exometer(:spirals, name) - register_metric_once(monitor) do - :exometer.new(monitor, :spiral, opts) - end - - :exometer.update(monitor, delta) + Updater.spiral(name, delta, opts) end @doc """ @@ -236,19 +209,7 @@ defmodule Elixometer do automatically at the specified interval. """ def update_counter(name, delta, [reset_seconds: secs] \\ [reset_seconds: nil]) when is_bitstring(name) and (is_nil(secs) or secs >= 1) do - monitor = name_to_exometer(:counters, name) - - register_metric_once(monitor) do - :exometer.new(monitor, :counter, []) - - if is_nil secs do - add_counter(monitor, secs) - else - add_counter(monitor, secs * 1000) - end - end - - :exometer.update(monitor, delta) + Updater.counter(name, delta, secs) end @doc """ @@ -267,13 +228,7 @@ defmodule Elixometer do and the metric is subscribed to the default reporter. """ def update_gauge(name, value) when is_bitstring(name) do - monitor = name_to_exometer(:gauges, name) - - register_metric_once(monitor) do - :exometer.new(monitor, :gauge, []) - end - - :exometer.update(monitor, value) + Updater.gauge(name, value) end @doc """ @@ -284,28 +239,23 @@ defmodule Elixometer do :millis and the value will be converted. """ defmacro timed(name, units \\ :micros, do: block) do - quote do - monitor_name = name_to_exometer(:timers, unquote(name)) - - register_metric_once(monitor_name) do - :exometer.new(monitor_name, :histogram, []) - end + converted_name = Elixometer.Utils.name_to_exometer(:timers, name) + quote do {elapsed_us, rv} = :timer.tc(fn -> unquote(block) end) - elapsed_time = case unquote(units) do - :micros -> elapsed_us - :millis -> elapsed_us / 1000 - end - - :exometer.update(monitor_name, elapsed_time) + Updater.timer(unquote(converted_name), unquote(units), elapsed_us) rv end end - defp add_counter(metric_name, ttl_millis) do + def add_counter(metric_name, ttl_millis) do GenServer.cast(__MODULE__, {:add_counter, metric_name, ttl_millis}) end + def add_counter(metric_name) do + GenServer.cast(__MODULE__, {:add_counter, metric_name, nil}) + end + def metric_defined?(name) when is_bitstring(name) do name |> to_atom_list |> metric_defined? end @@ -332,28 +282,40 @@ defmodule Elixometer do :ok end - def handle_call({:subscribe, name}, _caller, state) do - if not metric_subscribed?(name) do - cfg = Application.get_all_env(:elixometer) - reporter = cfg[:reporter] - interval = cfg[:update_frequency] + @doc """ + Ensures a metric is correctly registered in Elixometer. + This means that Elixometer knows about it and its metrics are + subscribed to an exometer reporter + """ + def ensure_registered(metric_name, register_fn) do + try do + ensure_metric_defined(metric_name, register_fn) + subscribe(metric_name) + rescue + e in ErlangError -> e + end + end - if reporter do - :exometer.info(name) - |> Keyword.get(:datapoints) - |> Enum.map(&(:exometer_report.subscribe(reporter, name, &1, interval))) - end - :ets.insert(@elixometer_table, {{:subscriptions, name}, true}) + @doc """ + Ensures that a metric is subscribed to an exometer reporter. + """ + def subscribe(metric_name) do + if not metric_subscribed?(metric_name) do + GenServer.call(__MODULE__, {:subscribe, metric_name}) end + end + + def handle_call({:subscribe, metric_name}, _caller, state) do + create_subscription(metric_name) {:reply, :ok, state} end - def handle_call({:define_metric, name, defn_fn}, _caller, state) do + def handle_call({:define_metric, metric_name, defn_fn}, _caller, state) do # we re-check whether the metric is defined here to prevent # a race condition in ensure_metric_defined - if not metric_defined?(name) do + if not metric_defined?(metric_name) do defn_fn.() - :ets.insert(@elixometer_table, {{:definitions, name}, true}) + :ets.insert(@elixometer_table, {{:definitions, metric_name}, true}) end {:reply, :ok, state} @@ -376,27 +338,21 @@ defmodule Elixometer do {:noreply, config} end - def name_to_exometer(metric_type, name) when is_bitstring(name) do - config = Application.get_all_env(:elixometer) - prefix = config[:metric_prefix] || "elixometer" - base_name = case config[:env] do - nil -> "#{prefix}.#{metric_type}.#{name}" - :prod -> "#{prefix}.#{metric_type}.#{name}" - env -> "#{prefix}.#{env}.#{metric_type}.#{name}" - end - - to_atom_list(base_name) - end + defp create_subscription(metric_name) do + # If a metric isn't subscribed to our reporters, create a subscription in our + # ets table and subscribe our metric to exometer's reporters. + if not metric_subscribed?(metric_name) do + cfg = Application.get_all_env(:elixometer) + reporter = cfg[:reporter] + interval = cfg[:update_frequency] - def subscribe(monitor) do - if not metric_subscribed?(monitor) do - GenServer.call(__MODULE__, {:subscribe, monitor}) + if reporter do + :exometer.info(metric_name) + |> Keyword.get(:datapoints) + |> Enum.map(&(:exometer_report.subscribe(reporter, metric_name, &1, interval))) + end + :ets.insert(@elixometer_table, {{:subscriptions, metric_name}, true}) end end - defp to_atom_list(s) when is_bitstring(s) do - s - |> String.split(".") - |> Enum.map(&String.to_atom/1) - end end diff --git a/lib/supervisor.ex b/lib/supervisor.ex index 1af7eb3..659734c 100644 --- a/lib/supervisor.ex +++ b/lib/supervisor.ex @@ -6,7 +6,9 @@ defmodule Elixometer.Supervisor do end def init([]) do - children = [worker(Elixometer, [])] + children = [worker(Elixometer, []), + worker(Elixometer.Updater, []) + ] supervise(children, strategy: :one_for_one) end end diff --git a/lib/updater.ex b/lib/updater.ex new file mode 100644 index 0000000..63c1ced --- /dev/null +++ b/lib/updater.ex @@ -0,0 +1,118 @@ +defmodule Elixometer.Updater do + @moduledoc """ + A capped worker that updates metrics. + """ + + @max_messages 1000 + + import Elixometer, only: [ensure_registered: 2, add_counter: 2, add_counter: 1] + import Elixometer.Utils + use GenServer + + def init([]) do + {:ok, pobox} = :pobox.start_link(self, @max_messages, :queue) + Process.register(pobox, :elixometer_pobox) + activate_pobox + {:ok, nil} + end + + def start_link do + GenServer.start_link(__MODULE__, [], name: __MODULE__) + end + + def timer(name, units, elapsed) when is_bitstring(name) do + elixometer_name = name_to_exometer(:timers, name) + timer(elixometer_name, units, elapsed) + end + + def timer(name, units, elapsed) when is_list(name) do + :pobox.post(:elixometer_pobox, {:timer, name, units, elapsed}) + end + + def gauge(name, value) do + :pobox.post(:elixometer_pobox, {:gauge, name, value}) + end + + def counter(name, delta, reset_seconds) do + :pobox.post(:elixometer_pobox, {:counter, name, delta, reset_seconds}) + end + + def spiral(name, delta, opts) do + :pobox.post(:elixometer_pobox, {:spiral, name, delta, opts}) + end + + def histogram(name, delta, reset_seconds) do + :pobox.post(:elixometer_pobox, {:histogram, name, delta, reset_seconds}) + end + + def handle_info({:mail, _pid, messages, _, _}, state) do + messages + |> Enum.each(&do_update/1) + + activate_pobox + + {:noreply, state} + end + + def do_update({:histogram, name, delta, aggregate_seconds}) do + monitor = name_to_exometer(:histograms, name) + + ensure_registered(monitor, fn -> + :exometer.new(monitor, :histogram, [time_span: :timer.seconds(aggregate_seconds)]) + end) + + :exometer.update(monitor, delta) + end + + def do_update({:spiral, name, delta, opts}) do + monitor = name_to_exometer(:spirals, name) + ensure_registered(monitor, fn -> + :exometer.new(monitor, :spiral, opts) + end) + + :exometer.update(monitor, delta) + end + + def do_update({:counter, name, delta, reset_seconds}) do + monitor = name_to_exometer(:counters, name) + + ensure_registered(monitor, fn -> + :exometer.new(monitor, :counter, []) + + if is_nil reset_seconds do + add_counter(monitor) + else + add_counter(monitor, reset_seconds * 1000) + end + end) + + :exometer.update(monitor, delta) + end + + def do_update({:gauge, name, value}) do + monitor = name_to_exometer(:gauges, name) + + ensure_registered(monitor, fn -> + :exometer.new(monitor, :gauge, []) + end) + + :exometer.update(monitor, value) + end + + def do_update({:timer, name, units, elapsed_us}) do + ensure_registered(name, fn -> + :exometer.new(name, :histogram, []) + end) + + elapsed_time = case units do + :micros -> elapsed_us + :millis -> elapsed_us / 1000 + end + + :exometer.update(name, elapsed_time) + end + + defp activate_pobox do + :pobox.active(:elixometer_pobox, fn(msg, _) -> {{:ok, msg}, :nostate} end, :nostate) + end +end diff --git a/lib/utils.ex b/lib/utils.ex new file mode 100644 index 0000000..cc5aee8 --- /dev/null +++ b/lib/utils.ex @@ -0,0 +1,20 @@ +defmodule Elixometer.Utils do + def name_to_exometer(metric_type, name) when is_bitstring(name) do + config = Application.get_all_env(:elixometer) + prefix = config[:metric_prefix] || "elixometer" + base_name = case config[:env] do + nil -> "#{prefix}.#{metric_type}.#{name}" + :prod -> "#{prefix}.#{metric_type}.#{name}" + env -> "#{prefix}.#{env}.#{metric_type}.#{name}" + end + + to_atom_list(base_name) + end + + def to_atom_list(s) when is_bitstring(s) do + s + |> String.split(".") + |> Enum.map(&String.to_atom/1) + end + +end diff --git a/mix.exs b/mix.exs index 8cb0c85..d78e07d 100644 --- a/mix.exs +++ b/mix.exs @@ -20,7 +20,7 @@ defmodule Elixometer.Mixfile do def application do [mod: {Elixometer.App, []}, - applications: [:lager, :exometer_core], + applications: [:lager, :exometer_core, :pobox], erl_opts: [parse_transform: "lager_transform"], env: default_config(Mix.env) ] @@ -40,7 +40,8 @@ defmodule Elixometer.Mixfile do {:edown, github: "uwiger/edown", tag: "0.7", override: true}, {:lager, github: "basho/lager", tag: "2.1.0", override: true}, {:exometer_core, github: "PSPDFKit-labs/exometer_core"}, - {:excoveralls, github: "parroty/excoveralls", tag: "v0.4.5", override: true, only: :test} + {:excoveralls, github: "parroty/excoveralls", tag: "v0.4.5", override: true, only: :test}, + {:pobox, github: "ferd/pobox"}, ] end diff --git a/mix.lock b/mix.lock index 42bc80a..83f9e42 100644 --- a/mix.lock +++ b/mix.lock @@ -11,5 +11,6 @@ "meck": {:hex, :meck, "0.8.4"}, "mimerl": {:hex, :mimerl, "1.0.2"}, "parse_trans": {:git, "git://github.com/uwiger/parse_trans.git", "82cc00264aa1bad8fc5c0739b7541feb4a843432", [tag: "2.9"]}, + "pobox": {:git, "https://github.com/ferd/pobox.git", "8f9f227e1de2c0342ff1dac01f0219996dc08c4c", []}, "setup": {:git, "git://github.com/uwiger/setup.git", "51ee7c9f64d2bbe9dcbb58c278e8fbfd4d0ca5e2", [tag: "1.4"]}, "ssl_verify_hostname": {:hex, :ssl_verify_hostname, "1.0.5"}} diff --git a/test/elixometer_test.exs b/test/elixometer_test.exs index c18540d..2b7ab14 100644 --- a/test/elixometer_test.exs +++ b/test/elixometer_test.exs @@ -56,6 +56,10 @@ defmodule ElixometerTest do :ok end + defp wait_for_messages do + :timer.sleep 10 + end + defp to_elixometer_name(metric_name) when is_bitstring(metric_name) do metric_name |> String.split(".") @@ -67,6 +71,7 @@ defmodule ElixometerTest do end def metric_exists(metric_name) when is_list(metric_name) do + wait_for_messages metric_name in Reporter.metric_names end @@ -75,6 +80,7 @@ defmodule ElixometerTest do end def subscription_exists(metric_name) when is_list(metric_name) do + wait_for_messages metric_name in Reporter.subscriptions end @@ -117,6 +123,8 @@ defmodule ElixometerTest do test "a counter resets itself after its time has elapsed" do update_counter("reset", 1, reset_seconds: 1) + wait_for_messages + expected_name = [:elixometer, :test, :counters, :reset] {:ok, [value: val]} = :exometer.get_value(expected_name, :value) assert val == 1 @@ -134,6 +142,8 @@ defmodule ElixometerTest do test "a counter does not reset itself if reset_seconds is nil" do update_counter("no_reset", 1, reset_seconds: nil) + wait_for_messages + expected_name = [:elixometer, :test, :counters, :no_reset] {:ok, [value: val]} = :exometer.get_value(expected_name, :value) assert val == 1 @@ -234,6 +244,9 @@ defmodule ElixometerTest do test "getting a specific metric" do update_gauge "value_2", 23 + + wait_for_messages + assert {:ok, 23} == get_metric_value("elixometer.test.gauges.value_2", :value) end @@ -243,6 +256,9 @@ defmodule ElixometerTest do test "getting a datapoint that doesn't exist" do update_gauge "no_datapoint", 22 + + wait_for_messages + assert {:ok, :undefined} == get_metric_value("elixometer.test.gauges.no_datapoint", :bad_datapoint) end