From 4bd2f0b12930f2cb57db722215a98fe95eccb417 Mon Sep 17 00:00:00 2001 From: jamesc Date: Fri, 30 Nov 2012 11:46:55 -0800 Subject: [PATCH 1/4] Command switch queue length monitoring --- apps/pushy/src/pushy_process_monitor.erl | 174 +++++++++++++++++++++++ apps/pushy/src/pushy_sup.erl | 1 + 2 files changed, 175 insertions(+) create mode 100644 apps/pushy/src/pushy_process_monitor.erl diff --git a/apps/pushy/src/pushy_process_monitor.erl b/apps/pushy/src/pushy_process_monitor.erl new file mode 100644 index 00000000..843f5b53 --- /dev/null +++ b/apps/pushy/src/pushy_process_monitor.erl @@ -0,0 +1,174 @@ +%%%------------------------------------------------------------------- +%%% @author James Casey +%%% @copyright 2012 Opscode, Inc. +%%% @doc +%%% +%%% @end +%%%------------------------------------------------------------------- + +-module(pushy_process_monitor). + +-behaviour(gen_server). + +%% API +-export([start_link/3, + measure/1]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-type process() :: atom() | pid(). +-record(state, {name :: binary(), + process :: process(), + interval :: integer() + }). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server +%% +%% @spec start_link() -> {ok, Pid} | ignore | {error, Error} +%% @end +%%-------------------------------------------------------------------- +start_link(Name, Process, Interval) -> + gen_server:start_link(?MODULE, [Name, Process, Interval], []). + +measure(Pid) -> + gen_server:cast(Pid, measure). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initializes the server +%% +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @end +%%-------------------------------------------------------------------- +init([Name, Process, Interval]) -> + {ok, _Timer} = timer:send_after(Interval, start_measure), + {ok, #state{name = Name, + process = Process, + interval = Interval}}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling call messages +%% +%% @spec handle_call(Request, From, State) -> +%% {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling cast messages +%% +%% @spec handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_cast(measure, #state{name = Name, + process = Process} = State) -> + case mbox_stats(Process) of + undefined -> + lager:warning("Process undefined : ~p", [Process]), + ignore; + Len when is_integer(Len) -> + send_metric(Name, Len) + end, + {noreply, State}; +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling all non call/cast messages +%% +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_info(start_measure, #state{process = Process, + interval = Interval} = State) -> + lager:info("Starting Process Monitor for ~p", [Process]), + timer:apply_interval(Interval, ?MODULE, measure, [self()]), + {noreply, State}; +handle_info(Info, State) -> + lager:warning("handle_info: [~s] unhandled message ~w:", [State, Info]), + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% +%% @spec terminate(Reason, State) -> void() +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + + +mbox_stats(Name) when is_atom(Name) -> + case erlang:whereis(Name) of + undefined -> + undefined; + Pid -> + mbox_stats(Pid) + end; +mbox_stats(Pid) -> + {message_queue_len, Len} = erlang:process_info(Pid, message_queue_len), + Len. + +send_metric(Name, Len) -> + folsom_metrics:notify(metric_for_name(Name), Len, gauge). + +metric_for_name(Name) -> + <<"process.", Name/binary, ".message_queue_len">>. diff --git a/apps/pushy/src/pushy_sup.erl b/apps/pushy/src/pushy_sup.erl index db202e3f..00609f11 100644 --- a/apps/pushy/src/pushy_sup.erl +++ b/apps/pushy/src/pushy_sup.erl @@ -60,6 +60,7 @@ init([#pushy_state{ctx=_Ctx} = PushyState]) -> ?WORKER(chef_keyring, []), ?WORKER(pushy_heartbeat_generator, [PushyState]), ?WORKER(pushy_command_switch, [PushyState]), + ?WORKER(pushy_process_monitor, [<<"pushy_command_switch">>, pushy_command_switch, 1000]), ?SUP(pushy_node_state_sup, []), ?SUP(pushy_job_state_sup, []), ?WORKERNL(webmachine_mochiweb, [WebMachineConfig]) %% FIXME start or start_link here? From 4e40a393b8b570c12ef677cec9d32e614a5c792f Mon Sep 17 00:00:00 2001 From: jamesc Date: Tue, 4 Dec 2012 13:06:04 -0800 Subject: [PATCH 2/4] startup folsom in tests --- apps/pushy/test/test_util.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/pushy/test/test_util.erl b/apps/pushy/test/test_util.erl index a19ce1fb..c50fbb8c 100644 --- a/apps/pushy/test/test_util.erl +++ b/apps/pushy/test/test_util.erl @@ -19,4 +19,5 @@ -include_lib("eunit/include/eunit.hrl"). start_apps() -> - application:start(gproc). + application:start(gproc), + application:start(folsom). From aef5cd6707c29150c68b921593e661cfa1485a55 Mon Sep 17 00:00:00 2001 From: jamesc Date: Fri, 30 Nov 2012 11:47:26 -0800 Subject: [PATCH 3/4] Log node_stats info to folsom --- apps/pushy/src/pushy_node_stats.erl | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/apps/pushy/src/pushy_node_stats.erl b/apps/pushy/src/pushy_node_stats.erl index e3f73695..068c632d 100644 --- a/apps/pushy/src/pushy_node_stats.erl +++ b/apps/pushy/src/pushy_node_stats.erl @@ -20,7 +20,9 @@ -define(NOW_WEIGHT, (1.0/decay_window())). -define(HISTORY_WEIGHT, (1.0-?NOW_WEIGHT)). --define(MEGA, 1000000). %% because I can't count zeros reliably +-define(HEALTH_METRIC, metric_for_name(<<"health">>)). +-define(HEARTBEATS_METRIC, metric_for_name(<<"heartbeats">>)). +-define(INTERVALS_METRIC, metric_for_name(<<"intervals">>)). -export([init/0, heartbeat/1, @@ -29,6 +31,9 @@ -spec init() -> atom() | ets:tid(). init() -> + folsom_metrics:new_histogram(?HEALTH_METRIC, slide, 20), + folsom_metrics:new_histogram(?INTERVALS_METRIC, slide, 20), + folsom_metrics:new_histogram(?HEARTBEATS_METRIC, slide, 20), ets:new(?MODULE, [set, public, named_table, {keypos, 2}, {write_concurrency, true}, {read_concurrency, true}]). @@ -131,4 +136,10 @@ advance_interval(#metric{avg=Avg, interval_start=StartI, heartbeats=Hb} = M, ICo %% The first interval may have accumulated heartbeats. Later intervals will not and can be %% aggregated into one step using pow. NAvg = ((Avg * ?HISTORY_WEIGHT) + (Hb * ?NOW_WEIGHT)) * math:pow(?HISTORY_WEIGHT, ICount-1), + folsom_metrics:notify(?HEALTH_METRIC, NAvg), + folsom_metrics:notify(?INTERVALS_METRIC, ICount), + folsom_metrics:notify(?HEARTBEATS_METRIC, Hb), M#metric{avg=NAvg, interval_start=NextI, heartbeats=0}. + +metric_for_name(Name) -> + <<"node.stats.", Name/binary>>. From 0045ed1649cab6c4d9475e362b30db03d5a8a735 Mon Sep 17 00:00:00 2001 From: jamesc Date: Tue, 4 Dec 2012 12:03:49 -0800 Subject: [PATCH 4/4] Use new pushy_metrics:app_metric call --- apps/pushy/src/pushy_node_stats.erl | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/apps/pushy/src/pushy_node_stats.erl b/apps/pushy/src/pushy_node_stats.erl index 068c632d..145dc6d3 100644 --- a/apps/pushy/src/pushy_node_stats.erl +++ b/apps/pushy/src/pushy_node_stats.erl @@ -20,10 +20,6 @@ -define(NOW_WEIGHT, (1.0/decay_window())). -define(HISTORY_WEIGHT, (1.0-?NOW_WEIGHT)). --define(HEALTH_METRIC, metric_for_name(<<"health">>)). --define(HEARTBEATS_METRIC, metric_for_name(<<"heartbeats">>)). --define(INTERVALS_METRIC, metric_for_name(<<"intervals">>)). - -export([init/0, heartbeat/1, stop/0, @@ -31,9 +27,6 @@ -spec init() -> atom() | ets:tid(). init() -> - folsom_metrics:new_histogram(?HEALTH_METRIC, slide, 20), - folsom_metrics:new_histogram(?INTERVALS_METRIC, slide, 20), - folsom_metrics:new_histogram(?HEARTBEATS_METRIC, slide, 20), ets:new(?MODULE, [set, public, named_table, {keypos, 2}, {write_concurrency, true}, {read_concurrency, true}]). @@ -136,10 +129,10 @@ advance_interval(#metric{avg=Avg, interval_start=StartI, heartbeats=Hb} = M, ICo %% The first interval may have accumulated heartbeats. Later intervals will not and can be %% aggregated into one step using pow. NAvg = ((Avg * ?HISTORY_WEIGHT) + (Hb * ?NOW_WEIGHT)) * math:pow(?HISTORY_WEIGHT, ICount-1), - folsom_metrics:notify(?HEALTH_METRIC, NAvg), - folsom_metrics:notify(?INTERVALS_METRIC, ICount), - folsom_metrics:notify(?HEARTBEATS_METRIC, Hb), + folsom_metrics:notify(app_metric(<<"health">>), NAvg, histogram), + folsom_metrics:notify(app_metric(<<"interval">>), ICount, histogram), + folsom_metrics:notify(app_metric(<<"heartbeat">>), Hb, histogram), M#metric{avg=NAvg, interval_start=NextI, heartbeats=0}. -metric_for_name(Name) -> - <<"node.stats.", Name/binary>>. +app_metric(Name) -> + pushy_metrics:app_metric(?MODULE, Name).