Skip to content
This repository has been archived by the owner on Jul 14, 2021. It is now read-only.

folsom metrics rework for pushy + pushysim #57

Merged
merged 4 commits into from
Dec 6, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions apps/pushy/src/pushy_node_stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
-define(NOW_WEIGHT, (1.0/decay_window())).
-define(HISTORY_WEIGHT, (1.0-?NOW_WEIGHT)).

-define(MEGA, 1000000). %% because I can't count zeros reliably

-export([init/0,
heartbeat/1,
stop/0,
Expand Down Expand Up @@ -131,4 +129,10 @@ advance_interval(#metric{avg=Avg, interval_start=StartI, heartbeats=Hb} = M, ICo
%% The first interval may have accumulated heartbeats. Later intervals will not and can be
%% aggregated into one step using pow.
NAvg = ((Avg * ?HISTORY_WEIGHT) + (Hb * ?NOW_WEIGHT)) * math:pow(?HISTORY_WEIGHT, ICount-1),
folsom_metrics:notify(app_metric(<<"health">>), NAvg, histogram),
folsom_metrics:notify(app_metric(<<"interval">>), ICount, histogram),
folsom_metrics:notify(app_metric(<<"heartbeat">>), Hb, histogram),
M#metric{avg=NAvg, interval_start=NextI, heartbeats=0}.

app_metric(Name) ->
pushy_metrics:app_metric(?MODULE, Name).
174 changes: 174 additions & 0 deletions apps/pushy/src/pushy_process_monitor.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
%%%-------------------------------------------------------------------
%%% @author James Casey
%%% @copyright 2012 Opscode, Inc.
%%% @doc
%%%
%%% @end
%%%-------------------------------------------------------------------

-module(pushy_process_monitor).

-behaviour(gen_server).

%% API
-export([start_link/3,
measure/1]).

%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).

-type process() :: atom() | pid().
-record(state, {name :: binary(),
process :: process(),
interval :: integer()
}).

%%%===================================================================
%%% API
%%%===================================================================

%%--------------------------------------------------------------------
%% @doc
%% Starts the server
%%
%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
%% @end
%%--------------------------------------------------------------------
start_link(Name, Process, Interval) ->
gen_server:start_link(?MODULE, [Name, Process, Interval], []).

measure(Pid) ->
gen_server:cast(Pid, measure).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Initializes the server
%%
%% @spec init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% @end
%%--------------------------------------------------------------------
init([Name, Process, Interval]) ->
{ok, _Timer} = timer:send_after(Interval, start_measure),
{ok, #state{name = Name,
process = Process,
interval = Interval}}.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling call messages
%%
%% @spec handle_call(Request, From, State) ->
%% {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling cast messages
%%
%% @spec handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_cast(measure, #state{name = Name,
process = Process} = State) ->
case mbox_stats(Process) of
undefined ->
lager:warning("Process undefined : ~p", [Process]),
ignore;
Len when is_integer(Len) ->
send_metric(Name, Len)
end,
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling all non call/cast messages
%%
%% @spec handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_info(start_measure, #state{process = Process,
interval = Interval} = State) ->
lager:info("Starting Process Monitor for ~p", [Process]),
timer:apply_interval(Interval, ?MODULE, measure, [self()]),
{noreply, State};
handle_info(Info, State) ->
lager:warning("handle_info: [~s] unhandled message ~w:", [State, Info]),
{noreply, State}.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
%%
%% @spec terminate(Reason, State) -> void()
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.

%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert process state when code is changed
%%
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @end
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%%===================================================================
%%% Internal functions
%%%===================================================================


mbox_stats(Name) when is_atom(Name) ->
case erlang:whereis(Name) of
undefined ->
undefined;
Pid ->
mbox_stats(Pid)
end;
mbox_stats(Pid) ->
{message_queue_len, Len} = erlang:process_info(Pid, message_queue_len),
Len.

send_metric(Name, Len) ->
folsom_metrics:notify(metric_for_name(Name), Len, gauge).

metric_for_name(Name) ->
<<"process.", Name/binary, ".message_queue_len">>.
1 change: 1 addition & 0 deletions apps/pushy/src/pushy_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ init([#pushy_state{ctx=_Ctx} = PushyState]) ->
?WORKER(chef_keyring, []),
?WORKER(pushy_heartbeat_generator, [PushyState]),
?WORKER(pushy_command_switch, [PushyState]),
?WORKER(pushy_process_monitor, [<<"pushy_command_switch">>, pushy_command_switch, 1000]),
?SUP(pushy_node_state_sup, []),
?SUP(pushy_job_state_sup, []),
?WORKERNL(webmachine_mochiweb, [WebMachineConfig]) %% FIXME start or start_link here?
Expand Down
3 changes: 2 additions & 1 deletion apps/pushy/test/test_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@
-include_lib("eunit/include/eunit.hrl").

start_apps() ->
application:start(gproc).
application:start(gproc),
application:start(folsom).