Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Controlling process #41

Merged
merged 2 commits into from
May 18, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 58 additions & 102 deletions lib/elixometer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ defmodule Elixometer do
end
end

@elixometer_table :elixometer
use GenServer

defmodule Config do
defstruct table_name: nil, counters: Map.new
end
Expand All @@ -71,6 +68,12 @@ defmodule Elixometer do
defstruct method_name: nil, key: nil, units: :micros, args: nil, guards: nil, body: nil
end

@elixometer_table :elixometer
alias Elixometer.Updater
import Elixometer.Utils
use GenServer


defmacro __using__(_mod) do

quote do
Expand Down Expand Up @@ -156,6 +159,7 @@ defmodule Elixometer do
def init(:ok) do
table_name = :ets.new(@elixometer_table, [:set, :named_table, read_concurrency: true])
:timer.send_interval(250, :tick)

{:ok, %Config{table_name: table_name}}
end

Expand All @@ -180,38 +184,12 @@ defmodule Elixometer do
end
end

@doc """
Register a metric.
If the metric name does not exist, then the block of code passed in is executed
and the metric is defined and subscribed to.
"""
defmacro register_metric_once(name, do: block) do
quote do
try do
ensure_metric_defined(unquote(name),
fn() ->
unquote(block)
end)
subscribe(unquote(name))

rescue
e in ErlangError -> e
end
end
end

@doc """
Updates a histogram with a new value. If the metric doesn't exist, a new metric
is created and subscribed to.
"""
def update_histogram(name, delta, aggregate_seconds\\60) when is_bitstring(name) do
monitor = name_to_exometer(:histograms, name)

register_metric_once(monitor) do
:exometer.new(monitor, :histogram, [time_span: :timer.seconds(aggregate_seconds)])
end

:exometer.update(monitor, delta)
Updater.histogram(name, delta, aggregate_seconds)
end

@doc """
Expand All @@ -220,12 +198,7 @@ defmodule Elixometer do
maintaining QPS stats.
"""
def update_spiral(name, delta, opts \\ [time_span: :timer.seconds(60), slot_period: 1000]) do
monitor = name_to_exometer(:spirals, name)
register_metric_once(monitor) do
:exometer.new(monitor, :spiral, opts)
end

:exometer.update(monitor, delta)
Updater.spiral(name, delta, opts)
end

@doc """
Expand All @@ -236,19 +209,7 @@ defmodule Elixometer do
automatically at the specified interval.
"""
def update_counter(name, delta, [reset_seconds: secs] \\ [reset_seconds: nil]) when is_bitstring(name) and (is_nil(secs) or secs >= 1) do
monitor = name_to_exometer(:counters, name)

register_metric_once(monitor) do
:exometer.new(monitor, :counter, [])

if is_nil secs do
add_counter(monitor, secs)
else
add_counter(monitor, secs * 1000)
end
end

:exometer.update(monitor, delta)
Updater.counter(name, delta, secs)
end

@doc """
Expand All @@ -267,13 +228,7 @@ defmodule Elixometer do
and the metric is subscribed to the default reporter.
"""
def update_gauge(name, value) when is_bitstring(name) do
monitor = name_to_exometer(:gauges, name)

register_metric_once(monitor) do
:exometer.new(monitor, :gauge, [])
end

:exometer.update(monitor, value)
Updater.gauge(name, value)
end

@doc """
Expand All @@ -284,28 +239,23 @@ defmodule Elixometer do
:millis and the value will be converted.
"""
defmacro timed(name, units \\ :micros, do: block) do
quote do
monitor_name = name_to_exometer(:timers, unquote(name))

register_metric_once(monitor_name) do
:exometer.new(monitor_name, :histogram, [])
end
converted_name = Elixometer.Utils.name_to_exometer(:timers, name)

quote do
{elapsed_us, rv} = :timer.tc(fn -> unquote(block) end)
elapsed_time = case unquote(units) do
:micros -> elapsed_us
:millis -> elapsed_us / 1000
end

:exometer.update(monitor_name, elapsed_time)
Updater.timer(unquote(converted_name), unquote(units), elapsed_us)
rv
end
end

defp add_counter(metric_name, ttl_millis) do
def add_counter(metric_name, ttl_millis) do
GenServer.cast(__MODULE__, {:add_counter, metric_name, ttl_millis})
end

def add_counter(metric_name) do
GenServer.cast(__MODULE__, {:add_counter, metric_name, nil})
end

def metric_defined?(name) when is_bitstring(name) do
name |> to_atom_list |> metric_defined?
end
Expand All @@ -332,28 +282,40 @@ defmodule Elixometer do
:ok
end

def handle_call({:subscribe, name}, _caller, state) do
if not metric_subscribed?(name) do
cfg = Application.get_all_env(:elixometer)
reporter = cfg[:reporter]
interval = cfg[:update_frequency]
@doc """
Ensures a metric is correctly registered in Elixometer.
This means that Elixometer knows about it and its metrics are
subscribed to an exometer reporter
"""
def ensure_registered(metric_name, register_fn) do
try do
ensure_metric_defined(metric_name, register_fn)
subscribe(metric_name)
rescue
e in ErlangError -> e
end
end

if reporter do
:exometer.info(name)
|> Keyword.get(:datapoints)
|> Enum.map(&(:exometer_report.subscribe(reporter, name, &1, interval)))
end
:ets.insert(@elixometer_table, {{:subscriptions, name}, true})
@doc """
Ensures that a metric is subscribed to an exometer reporter.
"""
def subscribe(metric_name) do
if not metric_subscribed?(metric_name) do
GenServer.call(__MODULE__, {:subscribe, metric_name})
end
end

def handle_call({:subscribe, metric_name}, _caller, state) do
create_subscription(metric_name)
{:reply, :ok, state}
end

def handle_call({:define_metric, name, defn_fn}, _caller, state) do
def handle_call({:define_metric, metric_name, defn_fn}, _caller, state) do
# we re-check whether the metric is defined here to prevent
# a race condition in ensure_metric_defined
if not metric_defined?(name) do
if not metric_defined?(metric_name) do
defn_fn.()
:ets.insert(@elixometer_table, {{:definitions, name}, true})
:ets.insert(@elixometer_table, {{:definitions, metric_name}, true})
end

{:reply, :ok, state}
Expand All @@ -376,27 +338,21 @@ defmodule Elixometer do
{:noreply, config}
end

def name_to_exometer(metric_type, name) when is_bitstring(name) do
config = Application.get_all_env(:elixometer)
prefix = config[:metric_prefix] || "elixometer"
base_name = case config[:env] do
nil -> "#{prefix}.#{metric_type}.#{name}"
:prod -> "#{prefix}.#{metric_type}.#{name}"
env -> "#{prefix}.#{env}.#{metric_type}.#{name}"
end

to_atom_list(base_name)
end
defp create_subscription(metric_name) do
# If a metric isn't subscribed to our reporters, create a subscription in our
# ets table and subscribe our metric to exometer's reporters.
if not metric_subscribed?(metric_name) do
cfg = Application.get_all_env(:elixometer)
reporter = cfg[:reporter]
interval = cfg[:update_frequency]

def subscribe(monitor) do
if not metric_subscribed?(monitor) do
GenServer.call(__MODULE__, {:subscribe, monitor})
if reporter do
:exometer.info(metric_name)
|> Keyword.get(:datapoints)
|> Enum.map(&(:exometer_report.subscribe(reporter, metric_name, &1, interval)))
end
:ets.insert(@elixometer_table, {{:subscriptions, metric_name}, true})
end
end

defp to_atom_list(s) when is_bitstring(s) do
s
|> String.split(".")
|> Enum.map(&String.to_atom/1)
end
end
4 changes: 3 additions & 1 deletion lib/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ defmodule Elixometer.Supervisor do
end

def init([]) do
children = [worker(Elixometer, [])]
children = [worker(Elixometer, []),
worker(Elixometer.Updater, [])
]
supervise(children, strategy: :one_for_one)
end
end
118 changes: 118 additions & 0 deletions lib/updater.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
defmodule Elixometer.Updater do
@moduledoc """
A capped worker that updates metrics.
"""

@max_messages 1000

import Elixometer, only: [ensure_registered: 2, add_counter: 2, add_counter: 1]
import Elixometer.Utils
use GenServer

def init([]) do
{:ok, pobox} = :pobox.start_link(self, @max_messages, :queue)
Process.register(pobox, :elixometer_pobox)
activate_pobox
{:ok, nil}
end

def start_link do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end

def timer(name, units, elapsed) when is_bitstring(name) do
elixometer_name = name_to_exometer(:timers, name)
timer(elixometer_name, units, elapsed)
end

def timer(name, units, elapsed) when is_list(name) do
:pobox.post(:elixometer_pobox, {:timer, name, units, elapsed})
end

def gauge(name, value) do
:pobox.post(:elixometer_pobox, {:gauge, name, value})
end

def counter(name, delta, reset_seconds) do
:pobox.post(:elixometer_pobox, {:counter, name, delta, reset_seconds})
end

def spiral(name, delta, opts) do
:pobox.post(:elixometer_pobox, {:spiral, name, delta, opts})
end

def histogram(name, delta, reset_seconds) do
:pobox.post(:elixometer_pobox, {:histogram, name, delta, reset_seconds})
end

def handle_info({:mail, _pid, messages, _, _}, state) do
messages
|> Enum.each(&do_update/1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like Enum.each(messages, &do_update/1) for these simple calls, but it's totally fine either way.


activate_pobox

{:noreply, state}
end

def do_update({:histogram, name, delta, aggregate_seconds}) do
monitor = name_to_exometer(:histograms, name)

ensure_registered(monitor, fn ->
:exometer.new(monitor, :histogram, [time_span: :timer.seconds(aggregate_seconds)])
end)

:exometer.update(monitor, delta)
end

def do_update({:spiral, name, delta, opts}) do
monitor = name_to_exometer(:spirals, name)
ensure_registered(monitor, fn ->
:exometer.new(monitor, :spiral, opts)
end)

:exometer.update(monitor, delta)
end

def do_update({:counter, name, delta, reset_seconds}) do
monitor = name_to_exometer(:counters, name)

ensure_registered(monitor, fn ->
:exometer.new(monitor, :counter, [])

if is_nil reset_seconds do
add_counter(monitor)
else
add_counter(monitor, reset_seconds * 1000)
end
end)

:exometer.update(monitor, delta)
end

def do_update({:gauge, name, value}) do
monitor = name_to_exometer(:gauges, name)

ensure_registered(monitor, fn ->
:exometer.new(monitor, :gauge, [])
end)

:exometer.update(monitor, value)
end

def do_update({:timer, name, units, elapsed_us}) do
ensure_registered(name, fn ->
:exometer.new(name, :histogram, [])
end)

elapsed_time = case units do
:micros -> elapsed_us
:millis -> elapsed_us / 1000
end

:exometer.update(name, elapsed_time)
end

defp activate_pobox do
:pobox.active(:elixometer_pobox, fn(msg, _) -> {{:ok, msg}, :nostate} end, :nostate)
end
end
Loading