Skip to content

Commit

Permalink
Controlling process (#41)
Browse files Browse the repository at this point in the history
* Addressing bottleneck

In one of our high-throughput systems, timing messages were a bottleneck
because the update calls were synchronous. This PR moves the calls to an
updater process that is capped at 1000 messages in its queue using the
pobox framework. This should alleviate the bottleneck and prevent the
process mailboxes from getting too big.

* Addressed PR comments
  • Loading branch information
scohen committed May 18, 2016
1 parent cd778fd commit 9dae8d8
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 105 deletions.
160 changes: 58 additions & 102 deletions lib/elixometer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ defmodule Elixometer do
end
end

@elixometer_table :elixometer
use GenServer

defmodule Config do
defstruct table_name: nil, counters: Map.new
end
Expand All @@ -71,6 +68,12 @@ defmodule Elixometer do
defstruct method_name: nil, key: nil, units: :micros, args: nil, guards: nil, body: nil
end

@elixometer_table :elixometer
alias Elixometer.Updater
import Elixometer.Utils
use GenServer


defmacro __using__(_mod) do

quote do
Expand Down Expand Up @@ -156,6 +159,7 @@ defmodule Elixometer do
def init(:ok) do
table_name = :ets.new(@elixometer_table, [:set, :named_table, read_concurrency: true])
:timer.send_interval(250, :tick)

{:ok, %Config{table_name: table_name}}
end

Expand All @@ -180,38 +184,12 @@ defmodule Elixometer do
end
end

@doc """
Register a metric.
If the metric name does not exist, then the block of code passed in is executed
and the metric is defined and subscribed to.
"""
defmacro register_metric_once(name, do: block) do
quote do
try do
ensure_metric_defined(unquote(name),
fn() ->
unquote(block)
end)
subscribe(unquote(name))

rescue
e in ErlangError -> e
end
end
end

@doc """
Updates a histogram with a new value. If the metric doesn't exist, a new metric
is created and subscribed to.
"""
def update_histogram(name, delta, aggregate_seconds\\60) when is_bitstring(name) do
monitor = name_to_exometer(:histograms, name)

register_metric_once(monitor) do
:exometer.new(monitor, :histogram, [time_span: :timer.seconds(aggregate_seconds)])
end

:exometer.update(monitor, delta)
Updater.histogram(name, delta, aggregate_seconds)
end

@doc """
Expand All @@ -220,12 +198,7 @@ defmodule Elixometer do
maintaining QPS stats.
"""
def update_spiral(name, delta, opts \\ [time_span: :timer.seconds(60), slot_period: 1000]) do
monitor = name_to_exometer(:spirals, name)
register_metric_once(monitor) do
:exometer.new(monitor, :spiral, opts)
end

:exometer.update(monitor, delta)
Updater.spiral(name, delta, opts)
end

@doc """
Expand All @@ -236,19 +209,7 @@ defmodule Elixometer do
automatically at the specified interval.
"""
def update_counter(name, delta, [reset_seconds: secs] \\ [reset_seconds: nil]) when is_bitstring(name) and (is_nil(secs) or secs >= 1) do
monitor = name_to_exometer(:counters, name)

register_metric_once(monitor) do
:exometer.new(monitor, :counter, [])

if is_nil secs do
add_counter(monitor, secs)
else
add_counter(monitor, secs * 1000)
end
end

:exometer.update(monitor, delta)
Updater.counter(name, delta, secs)
end

@doc """
Expand All @@ -267,13 +228,7 @@ defmodule Elixometer do
and the metric is subscribed to the default reporter.
"""
def update_gauge(name, value) when is_bitstring(name) do
monitor = name_to_exometer(:gauges, name)

register_metric_once(monitor) do
:exometer.new(monitor, :gauge, [])
end

:exometer.update(monitor, value)
Updater.gauge(name, value)
end

@doc """
Expand All @@ -284,28 +239,23 @@ defmodule Elixometer do
:millis and the value will be converted.
"""
defmacro timed(name, units \\ :micros, do: block) do
quote do
monitor_name = name_to_exometer(:timers, unquote(name))

register_metric_once(monitor_name) do
:exometer.new(monitor_name, :histogram, [])
end
converted_name = Elixometer.Utils.name_to_exometer(:timers, name)

quote do
{elapsed_us, rv} = :timer.tc(fn -> unquote(block) end)
elapsed_time = case unquote(units) do
:micros -> elapsed_us
:millis -> elapsed_us / 1000
end

:exometer.update(monitor_name, elapsed_time)
Updater.timer(unquote(converted_name), unquote(units), elapsed_us)
rv
end
end

defp add_counter(metric_name, ttl_millis) do
def add_counter(metric_name, ttl_millis) do
GenServer.cast(__MODULE__, {:add_counter, metric_name, ttl_millis})
end

def add_counter(metric_name) do
GenServer.cast(__MODULE__, {:add_counter, metric_name, nil})
end

def metric_defined?(name) when is_bitstring(name) do
name |> to_atom_list |> metric_defined?
end
Expand All @@ -332,28 +282,40 @@ defmodule Elixometer do
:ok
end

def handle_call({:subscribe, name}, _caller, state) do
if not metric_subscribed?(name) do
cfg = Application.get_all_env(:elixometer)
reporter = cfg[:reporter]
interval = cfg[:update_frequency]
@doc """
Ensures a metric is correctly registered in Elixometer.
This means that Elixometer knows about it and its metrics are
subscribed to an exometer reporter
"""
def ensure_registered(metric_name, register_fn) do
try do
ensure_metric_defined(metric_name, register_fn)
subscribe(metric_name)
rescue
e in ErlangError -> e
end
end

if reporter do
:exometer.info(name)
|> Keyword.get(:datapoints)
|> Enum.map(&(:exometer_report.subscribe(reporter, name, &1, interval)))
end
:ets.insert(@elixometer_table, {{:subscriptions, name}, true})
@doc """
Ensures that a metric is subscribed to an exometer reporter.
"""
def subscribe(metric_name) do
if not metric_subscribed?(metric_name) do
GenServer.call(__MODULE__, {:subscribe, metric_name})
end
end

def handle_call({:subscribe, metric_name}, _caller, state) do
create_subscription(metric_name)
{:reply, :ok, state}
end

def handle_call({:define_metric, name, defn_fn}, _caller, state) do
def handle_call({:define_metric, metric_name, defn_fn}, _caller, state) do
# we re-check whether the metric is defined here to prevent
# a race condition in ensure_metric_defined
if not metric_defined?(name) do
if not metric_defined?(metric_name) do
defn_fn.()
:ets.insert(@elixometer_table, {{:definitions, name}, true})
:ets.insert(@elixometer_table, {{:definitions, metric_name}, true})
end

{:reply, :ok, state}
Expand All @@ -376,27 +338,21 @@ defmodule Elixometer do
{:noreply, config}
end

def name_to_exometer(metric_type, name) when is_bitstring(name) do
config = Application.get_all_env(:elixometer)
prefix = config[:metric_prefix] || "elixometer"
base_name = case config[:env] do
nil -> "#{prefix}.#{metric_type}.#{name}"
:prod -> "#{prefix}.#{metric_type}.#{name}"
env -> "#{prefix}.#{env}.#{metric_type}.#{name}"
end

to_atom_list(base_name)
end
defp create_subscription(metric_name) do
# If a metric isn't subscribed to our reporters, create a subscription in our
# ets table and subscribe our metric to exometer's reporters.
if not metric_subscribed?(metric_name) do
cfg = Application.get_all_env(:elixometer)
reporter = cfg[:reporter]
interval = cfg[:update_frequency]

def subscribe(monitor) do
if not metric_subscribed?(monitor) do
GenServer.call(__MODULE__, {:subscribe, monitor})
if reporter do
:exometer.info(metric_name)
|> Keyword.get(:datapoints)
|> Enum.map(&(:exometer_report.subscribe(reporter, metric_name, &1, interval)))
end
:ets.insert(@elixometer_table, {{:subscriptions, metric_name}, true})
end
end

defp to_atom_list(s) when is_bitstring(s) do
s
|> String.split(".")
|> Enum.map(&String.to_atom/1)
end
end
4 changes: 3 additions & 1 deletion lib/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ defmodule Elixometer.Supervisor do
end

def init([]) do
children = [worker(Elixometer, [])]
children = [worker(Elixometer, []),
worker(Elixometer.Updater, [])
]
supervise(children, strategy: :one_for_one)
end
end
118 changes: 118 additions & 0 deletions lib/updater.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
defmodule Elixometer.Updater do
@moduledoc """
A capped worker that updates metrics.
"""

@max_messages 1000

import Elixometer, only: [ensure_registered: 2, add_counter: 2, add_counter: 1]
import Elixometer.Utils
use GenServer

def init([]) do
{:ok, pobox} = :pobox.start_link(self, @max_messages, :queue)
Process.register(pobox, :elixometer_pobox)
activate_pobox
{:ok, nil}
end

def start_link do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end

def timer(name, units, elapsed) when is_bitstring(name) do
elixometer_name = name_to_exometer(:timers, name)
timer(elixometer_name, units, elapsed)
end

def timer(name, units, elapsed) when is_list(name) do
:pobox.post(:elixometer_pobox, {:timer, name, units, elapsed})
end

def gauge(name, value) do
:pobox.post(:elixometer_pobox, {:gauge, name, value})
end

def counter(name, delta, reset_seconds) do
:pobox.post(:elixometer_pobox, {:counter, name, delta, reset_seconds})
end

def spiral(name, delta, opts) do
:pobox.post(:elixometer_pobox, {:spiral, name, delta, opts})
end

def histogram(name, delta, reset_seconds) do
:pobox.post(:elixometer_pobox, {:histogram, name, delta, reset_seconds})
end

def handle_info({:mail, _pid, messages, _, _}, state) do
messages
|> Enum.each(&do_update/1)

activate_pobox

{:noreply, state}
end

def do_update({:histogram, name, delta, aggregate_seconds}) do
monitor = name_to_exometer(:histograms, name)

ensure_registered(monitor, fn ->
:exometer.new(monitor, :histogram, [time_span: :timer.seconds(aggregate_seconds)])
end)

:exometer.update(monitor, delta)
end

def do_update({:spiral, name, delta, opts}) do
monitor = name_to_exometer(:spirals, name)
ensure_registered(monitor, fn ->
:exometer.new(monitor, :spiral, opts)
end)

:exometer.update(monitor, delta)
end

def do_update({:counter, name, delta, reset_seconds}) do
monitor = name_to_exometer(:counters, name)

ensure_registered(monitor, fn ->
:exometer.new(monitor, :counter, [])

if is_nil reset_seconds do
add_counter(monitor)
else
add_counter(monitor, reset_seconds * 1000)
end
end)

:exometer.update(monitor, delta)
end

def do_update({:gauge, name, value}) do
monitor = name_to_exometer(:gauges, name)

ensure_registered(monitor, fn ->
:exometer.new(monitor, :gauge, [])
end)

:exometer.update(monitor, value)
end

def do_update({:timer, name, units, elapsed_us}) do
ensure_registered(name, fn ->
:exometer.new(name, :histogram, [])
end)

elapsed_time = case units do
:micros -> elapsed_us
:millis -> elapsed_us / 1000
end

:exometer.update(name, elapsed_time)
end

defp activate_pobox do
:pobox.active(:elixometer_pobox, fn(msg, _) -> {{:ok, msg}, :nostate} end, :nostate)
end
end
Loading

0 comments on commit 9dae8d8

Please sign in to comment.