From 7d4ee1ee6b2cefe3745786f199556cc9a25719b3 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Mon, 5 Nov 2012 17:15:04 -0500 Subject: [PATCH 01/15] Don't forget the booting state --- apps/pushy/src/pushy_node_state2.erl | 165 +++++++++++++++++++++++++++ 1 file changed, 165 insertions(+) create mode 100644 apps/pushy/src/pushy_node_state2.erl diff --git a/apps/pushy/src/pushy_node_state2.erl b/apps/pushy/src/pushy_node_state2.erl new file mode 100644 index 00000000..bab2e853 --- /dev/null +++ b/apps/pushy/src/pushy_node_state2.erl @@ -0,0 +1,165 @@ +-module(pushy_node_state2). + +-behaviour(gen_fsm). + +-include("pushy.hrl"). + +-type logging_level() :: 'verbose' | 'normal'. +-type eavg() :: any(). + + +%% API +-export([start_link/1, + status/1]). + +%% States +-export([booting/2, + idle/2, + running/2, + rehab/2]). + +-define(DEFAULT_DECAY_INTERVAL, 4). +-define(DEFAULT_UP_THRESHOLD, 0.5). +-define(DEFAULT_DOWN_THRESHOLD, 0.4). + +-record(state, {node_ref :: node_ref(), + heartbeat_interval :: integer(), + decay_window :: integer(), + logging = verbose :: logging_level(), + heartbeats_rcvd = 0 :: integer(), + up_threshold :: float(), + down_threshold :: float(), + job :: any(), + rehab_timer, + heartbeat_rate :: eavg() + }). + +%% gen_fsm callbacks +-export([init/1, state_name/2, state_name/3, handle_event/3, + handle_sync_event/4, handle_info/3, terminate/3, code_change/4]). + +start_link(NodeRef) -> + gen_fsm:start_link(?MODULE, [NodeRef], []). + +status(NodeRef) -> + case pushy_node_state_sup:get_process(NodeRef) of + undefined -> + {offline, unavailable}; + Pid -> + eval_state_name(gen_fsm:sync_send_all_state_event(Pid, current_state, infinity)) + end. + +init([NodeRef]) -> + GprocName = pushy_node_state_sup:mk_gproc_name(NodeRef), + try + %% The most important thing to have happen is this registration; we need to get this + %% assigned before anyone else tries to start things up gproc:reg can only return + %% true or throw + true = gproc:reg({n, l, GprocName}), + HeartbeatInterval = heartbeat_interval(), + DecayWindow = envy:get(pushy, decay_window, ?DEFAULT_DECAY_INTERVAL, integer), + UpThresh = envy:get(pushy, up_threshold, ?DEFAULT_UP_THRESHOLD, number), %% TODO constrain to float + DownThresh = envy:get(pushy, down_threshold, ?DEFAULT_DOWN_THRESHOLD, number), %% TODO constrain to float + {ok, TimerRef} = timer:send_interval(rehab_interval(), send_abort), + + State = #state{node_ref = NodeRef, + decay_window = DecayWindow, + heartbeat_interval = HeartbeatInterval, + heartbeat_rate = pushy_ema:init(DecayWindow, HeartbeatInterval, 1.0), + up_threshold = UpThresh, + down_threshold = DownThresh, + rehab_timer=TimerRef}, + {ok, booting, State} + catch + error:badarg -> + %% When we start up from a previous run, we have two ways that the FSM might be started; + %% from an incoming packet, or the database record for a prior run + %% There may be some nasty race conditions surrounding this. + %% We may also want to *not* automatically reanimate FSMs for nodes that aren't + %% actively reporting; but rather keep them in a 'limbo' waiting for the first + %% packet, and if one doesn't arrive within a certain time mark them down. + lager:error("Failed to register:~p for ~p (already exists as ~p?)", + [NodeRef,self(), gproc:lookup_pid({n,l,GprocName}) ]), + {stop, shutdown, undefined} + end. + +rehab(aborted, #state{rehab_timer=TRef}=State) -> + timer:cancel(TRef), + {next_state, idle, State}; +rehab(Message, #state{node_ref=NodeRef}=State) -> + error_logger:error_msg("~p in rehab. Ignoring message: ~p~n", [NodeRef, Message]), + Message = {[{type, abort}]}, + ok = pushy_command_switch:send_command(NodeRef, Message), + {next_state, rehab, State}. + +idle({job, Job}, State) -> + {next_state, running, State#state{job=Job}}. + +running(aborted, State) -> + {next_state, idle, State}; +running({complete, Job}, #state{job=Job}=State) -> + {next_state, idle, State}. + +state_name(_Event, State) -> + {next_state, state_name, State}. + +state_name(_Event, _From, State) -> + Reply = ok, + {reply, Reply, state_name, State}. + +handle_event(_Event, StateName, State) -> + {next_state, StateName, State}. + +handle_sync_event(current_state, _From, StateName, State) -> + {reply, StateName, StateName, State}. + +handle_info(heartbeat, booting, #state{node_ref=NodeRef, heartbeats_rcvd=HeartBeats, heartbeat_rate=HRate, up_threshold=UThresh}=State) -> + NHRate = pushy_ema:tick(HRate), + EAvg = pushy_ema:value(NHRate), + State1 = State#state{heartbeat_rate=NHRate, heartbeats_rcvd=HeartBeats + 1}, + case EAvg > UThresh of + true -> + {next_state, idle, State1, heartbeat_interval()}; + false -> + Message = {[{type, abort}]}, + ok = pushy_command_switch:send_command(NodeRef, Message), + {next_state, rehab, State1, heartbeat_interval()} + end; +handle_info(send_abort, rehab, #state{node_ref=NodeRef}=State) -> + Message = {[{type, abort}]}, + ok = pushy_command_switch:send_command(NodeRef, Message), + {next_state, rehab, State}; +handle_info(timeout, CurrentState, #state{heartbeat_rate=HRate, down_threshold=DThresh}=State) -> + NHRate = pushy_ema:tick(HRate), + EAvg = pushy_ema:value(NHRate), + NState = State#state{heartbeat_rate=NHRate}, + case EAvg < DThresh of + true -> + {stop, shutdown, NState}; + false -> + {next_state, CurrentState, NState} + end; +handle_info(_Info, StateName, State) -> + {next_state, StateName, State}. + +terminate(_Reason, _StateName, _State) -> + ok. + +code_change(_OldVsn, StateName, State, _Extra) -> + {ok, StateName, State}. + +%% Internal functions +eval_state_name(booting) -> + {online, unvailable}; +eval_state_name(idle) -> + {online, available}; +eval_state_name(rehab) -> + {online, unavailable}; +eval_state_name(running) -> + {online, unvailable}. + +rehab_interval() -> + envy:get(pushy, rehab_timer, 1000, integer). + +heartbeat_interval() -> + envy:get(pushy, heartbeat_interval, integer). From 065439491c7549904acf88769d766af4f3c4d1a6 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Tue, 6 Nov 2012 10:14:11 -0500 Subject: [PATCH 02/15] More work this morning * Added watchers and notifications on state transitions * Cleaned up states * Fixed rehab handling * Moved to a simpler heartbeat algo until we can integrate pushy_ema again --- apps/pushy/src/pushy_node_state2.erl | 211 +++++++++++++++++---------- 1 file changed, 134 insertions(+), 77 deletions(-) diff --git a/apps/pushy/src/pushy_node_state2.erl b/apps/pushy/src/pushy_node_state2.erl index bab2e853..b5279127 100644 --- a/apps/pushy/src/pushy_node_state2.erl +++ b/apps/pushy/src/pushy_node_state2.erl @@ -5,12 +5,13 @@ -include("pushy.hrl"). -type logging_level() :: 'verbose' | 'normal'. --type eavg() :: any(). %% API -export([start_link/1, - status/1]). + heartbeat/1, + status/1, + watch/1]). %% States -export([booting/2, @@ -23,53 +24,54 @@ -define(DEFAULT_DOWN_THRESHOLD, 0.4). -record(state, {node_ref :: node_ref(), - heartbeat_interval :: integer(), - decay_window :: integer(), logging = verbose :: logging_level(), - heartbeats_rcvd = 0 :: integer(), - up_threshold :: float(), - down_threshold :: float(), + heartbeats = 1 :: pos_integer(), job :: any(), - rehab_timer, - heartbeat_rate :: eavg() + watchers = [], + state_timer }). %% gen_fsm callbacks --export([init/1, state_name/2, state_name/3, handle_event/3, - handle_sync_event/4, handle_info/3, terminate/3, code_change/4]). +-export([init/1, + handle_event/3, + handle_sync_event/4, + handle_info/3, + terminate/3, + code_change/4]). start_link(NodeRef) -> gen_fsm:start_link(?MODULE, [NodeRef], []). +heartbeat(NodeRef) -> + send_info(NodeRef, heartbeat), + ok. + status(NodeRef) -> - case pushy_node_state_sup:get_process(NodeRef) of + case call(NodeRef, current_state) of undefined -> {offline, unavailable}; - Pid -> - eval_state_name(gen_fsm:sync_send_all_state_event(Pid, current_state, infinity)) + CurrentState -> + eval_state(CurrentState) + end. + +watch(NodeRef) -> + case call(NodeRef, {watch, self()}) of + ok -> + ok; + Error -> + Error end. init([NodeRef]) -> GprocName = pushy_node_state_sup:mk_gproc_name(NodeRef), + State = #state{node_ref = NodeRef}, try %% The most important thing to have happen is this registration; we need to get this %% assigned before anyone else tries to start things up gproc:reg can only return %% true or throw true = gproc:reg({n, l, GprocName}), - HeartbeatInterval = heartbeat_interval(), - DecayWindow = envy:get(pushy, decay_window, ?DEFAULT_DECAY_INTERVAL, integer), - UpThresh = envy:get(pushy, up_threshold, ?DEFAULT_UP_THRESHOLD, number), %% TODO constrain to float - DownThresh = envy:get(pushy, down_threshold, ?DEFAULT_DOWN_THRESHOLD, number), %% TODO constrain to float - {ok, TimerRef} = timer:send_interval(rehab_interval(), send_abort), - - State = #state{node_ref = NodeRef, - decay_window = DecayWindow, - heartbeat_interval = HeartbeatInterval, - heartbeat_rate = pushy_ema:init(DecayWindow, HeartbeatInterval, 1.0), - up_threshold = UpThresh, - down_threshold = DownThresh, - rehab_timer=TimerRef}, - {ok, booting, State} + State = #state{node_ref = NodeRef}, + {ok, state_transition(init, booting, State), State} catch error:badarg -> %% When we start up from a previous run, we have two ways that the FSM might be started; @@ -80,64 +82,86 @@ init([NodeRef]) -> %% packet, and if one doesn't arrive within a certain time mark them down. lager:error("Failed to register:~p for ~p (already exists as ~p?)", [NodeRef,self(), gproc:lookup_pid({n,l,GprocName}) ]), - {stop, shutdown, undefined} + {stop, state_transition(init, shutdown, State), State} end. -rehab(aborted, #state{rehab_timer=TRef}=State) -> +booting(Message, #state{node_ref=NodeRef}=State) -> + lager:info("~p is booting. Ignoring message: ~p~n", [NodeRef, Message]), + {next_state, booting, State, 60000}. + +rehab(aborted, #state{state_timer=TRef}=State) -> timer:cancel(TRef), - {next_state, idle, State}; + {next_state, state_transition(rehab, idle, State), State}; rehab(Message, #state{node_ref=NodeRef}=State) -> - error_logger:error_msg("~p in rehab. Ignoring message: ~p~n", [NodeRef, Message]), - Message = {[{type, abort}]}, - ok = pushy_command_switch:send_command(NodeRef, Message), + lager:info("~p in rehab. Ignoring message: ~p~n", [NodeRef, Message]), {next_state, rehab, State}. idle({job, Job}, State) -> - {next_state, running, State#state{job=Job}}. - -running(aborted, State) -> - {next_state, idle, State}; -running({complete, Job}, #state{job=Job}=State) -> - {next_state, idle, State}. - -state_name(_Event, State) -> - {next_state, state_name, State}. + {next_state, state_transition(idle, running, State), State#state{job=Job}}. -state_name(_Event, _From, State) -> - Reply = ok, - {reply, Reply, state_name, State}. +running(aborted, #state{node_ref=NodeRef}=State) -> + lager:info("~p aborted during job.~n", [NodeRef]), + {next_state, state_transition(running, idle, State), State}; +running({complete, Job}, #state{job=Job, node_ref=NodeRef}=State) -> + lager:info("~p completed job.~n", [NodeRef]), + State1 = State#state{job=undefined}, + {next_state, state_transition(running, idle, State1), State1}. handle_event(_Event, StateName, State) -> {next_state, StateName, State}. -handle_sync_event(current_state, _From, StateName, State) -> - {reply, StateName, StateName, State}. +handle_sync_event({watch, WatcherPid}, _From, StateName, #state{watchers=Watchers}=State) -> + MRef = erlang:monitor(process, WatcherPid), + Watcher = {WatcherPid, MRef}, + {reply, ok, StateName, State#state{watchers=[Watcher|Watchers]}}; +handle_sync_event({unwatch, WatcherPid}, _From, StateName, #state{watchers=Watchers}=State) -> + case lists:keytake(WatcherPid, 1, Watchers) of + false -> + {reply, ok, StateName, State}; + {value, {WatcherPid, MRef}, Watchers1} -> + erlang:demonitor(MRef, [flush]), + {reply, ok, StateName, State#state{watchers=Watchers1}} + end; +handle_sync_event(current_state, _From, StateName, #state{job=Job}=State) -> + {reply, {StateName, Job}, StateName, State}. -handle_info(heartbeat, booting, #state{node_ref=NodeRef, heartbeats_rcvd=HeartBeats, heartbeat_rate=HRate, up_threshold=UThresh}=State) -> - NHRate = pushy_ema:tick(HRate), - EAvg = pushy_ema:value(NHRate), - State1 = State#state{heartbeat_rate=NHRate, heartbeats_rcvd=HeartBeats + 1}, - case EAvg > UThresh of +handle_info(heartbeat, booting, #state{heartbeats=HBeats}=State) -> + HBeats1 = HBeats + 1, + State1 = State#state{heartbeats=HBeats1}, + case HBeats1 > 3 of true -> - {next_state, idle, State1, heartbeat_interval()}; + State2 = force_abort(State1), + {next_state, state_transition(booting, rehab, State2), State2}; false -> - Message = {[{type, abort}]}, - ok = pushy_command_switch:send_command(NodeRef, Message), - {next_state, rehab, State1, heartbeat_interval()} + {next_state, booting, State1} end; -handle_info(send_abort, rehab, #state{node_ref=NodeRef}=State) -> - Message = {[{type, abort}]}, - ok = pushy_command_switch:send_command(NodeRef, Message), - {next_state, rehab, State}; -handle_info(timeout, CurrentState, #state{heartbeat_rate=HRate, down_threshold=DThresh}=State) -> - NHRate = pushy_ema:tick(HRate), - EAvg = pushy_ema:value(NHRate), - NState = State#state{heartbeat_rate=NHRate}, - case EAvg < DThresh of - true -> - {stop, shutdown, NState}; +handle_info(heartbeat, CurrentState, #state{heartbeats=HBeats}=State) -> + HBeats1 = HBeats + 1, + State1 = if + HBeats1 < 5 -> + State#state{heartbeats=HBeats1}; + true -> + State#state{heartbeats=5} + end, + {next_state, CurrentState, State1, heartbeat_interval()}; +handle_info(timeout, CurrentState, #state{heartbeats=HBeats}=State) -> + HBeats1 = HBeats - 1, + State1 = State#state{heartbeats=HBeats1}, + case HBeats1 of + 0 -> + {stop, state_transition(CurrentState, shutdown, State1), State1}; + _ -> + {next_state, CurrentState, State, heartbeat_interval()} + end; +handle_info(rehab_again, rehab, State) -> + State1 = force_abort(State), + {next_state, rehab, State1}; +handle_info({'DOWN', _MRef, _Type, Pid, _Reason}, StateName, #state{watchers=Watchers}=State) -> + case lists:keytake(Pid, Watchers) of false -> - {next_state, CurrentState, NState} + {next_state, StateName, State}; + {value, _, Watchers1} -> + {next_state, StateName, State#state{watchers=Watchers1}} end; handle_info(_Info, StateName, State) -> {next_state, StateName, State}. @@ -149,17 +173,50 @@ code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}. %% Internal functions -eval_state_name(booting) -> - {online, unvailable}; -eval_state_name(idle) -> - {online, available}; -eval_state_name(rehab) -> - {online, unavailable}; -eval_state_name(running) -> - {online, unvailable}. +eval_state({booting, undefined}) -> + {online, {unvailable, none}}; +eval_state({idle, undefined}) -> + {online, {available, none}}; +eval_state({rehab, undefined}) -> + {online, {unavailable, none}}; +eval_state({running, Job}) -> + {online, {unvailable, Job}}. rehab_interval() -> envy:get(pushy, rehab_timer, 1000, integer). heartbeat_interval() -> envy:get(pushy, heartbeat_interval, integer). + +call(NodeRef, Message) -> + case pushy_node_state_sup:get_process(NodeRef) of + Pid when is_pid(Pid) -> + gen_fsm:sync_send_all_state_event(Pid, Message, infinity); + Error -> + Error + end. + +send_info(NodeRef, Message) -> + case pushy_node_state_sup:get_process(NodeRef) of + Pid when is_pid(Pid) -> + Pid ! Message; + Error -> + Error + end. + +force_abort(#state{node_ref=NodeRef}=State) -> + Message = {[{type, abort}]}, + ok = pushy_command_switch:send_command(NodeRef, Message), + TRef = timer:send_after(rehab_interval(), rehab_again), + State#state{state_timer=TRef}. + +state_transition(Current, New, #state{node_ref=NodeRef, watchers=Watchers}) -> + lager:debug("~p transitioning from ~p to ~p~n", [NodeRef, Current, New]), + notify_watchers(Watchers, NodeRef, Current, New), + New. + +notify_watchers([], _NodeRef, _Current, _New) -> + ok; +notify_watchers(Watchers, NodeRef, Current, New) -> + F = fun(Watcher) -> Watcher ! {state_change, NodeRef, Current, New} end, + [F(Watcher) || {Watcher, _Monitor} <- Watchers]. From 899bcc151277df61468fe70906aaa0114ebe7ba0 Mon Sep 17 00:00:00 2001 From: Matthew Peck Date: Tue, 6 Nov 2012 15:33:33 -0500 Subject: [PATCH 03/15] started implementation of cleaned up node fsm --- apps/pushy/src/pushy_command_switch.erl | 4 +- apps/pushy/src/pushy_node_state.erl | 431 ++++++++++-------------- apps/pushy/src/pushy_node_state2.erl | 222 ------------ apps/pushy/src/pushy_node_state_sup.erl | 3 +- 4 files changed, 186 insertions(+), 474 deletions(-) delete mode 100644 apps/pushy/src/pushy_node_state2.erl diff --git a/apps/pushy/src/pushy_command_switch.erl b/apps/pushy/src/pushy_command_switch.erl index 7726997c..53a53fbb 100644 --- a/apps/pushy/src/pushy_command_switch.erl +++ b/apps/pushy/src/pushy_command_switch.erl @@ -244,9 +244,9 @@ send_node_event(JobId, NodeRef, <<"nack_run">>) -> send_node_event(JobId, NodeRef, <<"complete">>)-> pushy_job_state:node_complete(JobId, NodeRef); send_node_event(null, NodeRef, <<"aborted">>) -> - pushy_node_state:node_aborted(NodeRef); + pushy_node_state:aborted(NodeRef); send_node_event(JobId, NodeRef, <<"aborted">>) -> - pushy_node_state:node_aborted(NodeRef), + pushy_node_state:aborted(NodeRef), pushy_job_state:node_aborted(JobId, NodeRef); send_node_event(JobId, NodeRef, undefined) -> lager:error("Status message for job ~p and node ~p was missing type field!~n", [JobId, NodeRef]); diff --git a/apps/pushy/src/pushy_node_state.erl b/apps/pushy/src/pushy_node_state.erl index cd5d6980..e28690ba 100644 --- a/apps/pushy/src/pushy_node_state.erl +++ b/apps/pushy/src/pushy_node_state.erl @@ -1,154 +1,87 @@ -%% -*- erlang-indent-level: 4;indent-tabs-mode: nil; fill-column: 92 -*- -%% ex: ts=4 sw=4 et -%% -%% @author Mark Anderson -%% @author John Keiser -%% -%% @copyright 2012 Opscode Inc. -%% @end - -%% -%% @doc simple FSM for tracking node heartbeats and thus up/down status -%% -module(pushy_node_state). -behaviour(gen_fsm). -%% API --export([current_state/1, - in_rehab/1, - heartbeat/1, - rehab/1, - node_aborted/1, - set_logging/2, - start_link/1, - start_link/2]). - -%% Observers --export([start_watching/1, - stop_watching/1]). - --define(SAVE_MODE, gen_server). % direct or gen_server --define(NO_NODE, {error, no_node}). - -%% gen_fsm callbacks --export([code_change/4, - handle_event/3, - handle_info/3, - handle_sync_event/4, - init/1, - init/2, - terminate/3]). - -include("pushy.hrl"). --include("pushy_sql.hrl"). - --include_lib("eunit/include/eunit.hrl"). -type logging_level() :: 'verbose' | 'normal'. --type eavg() :: any(). + +%% API +-export([start_link/1, + heartbeat/1, + status/1, + watch/1, + aborted/1]). + +%% States +-export([booting/2, + idle/2, + running/2, + rehab/2]). -define(DEFAULT_DECAY_INTERVAL, 4). -define(DEFAULT_UP_THRESHOLD, 0.5). -define(DEFAULT_DOWN_THRESHOLD, 0.4). -record(state, {node_ref :: node_ref(), - heartbeat_interval :: integer(), - decay_window :: integer(), logging = verbose :: logging_level(), - current_status = down :: node_status(), - heartbeats_rcvd = 0 :: integer(), - up_threshold :: float(), - down_threshold :: float(), - rehab_timer, - heartbeat_rate :: eavg() + heartbeats = 1 :: pos_integer(), + job :: any(), + watchers = [], + state_timer }). -%%% -%%% External API -%%% --spec start_link(node_ref() ) -> 'ignore' | {'error',_} | {'ok',pid()}. -start_link(NodeRef) -> - start_link(NodeRef, down). +%% gen_fsm callbacks +-export([init/1, + handle_event/3, + handle_sync_event/4, + handle_info/3, + terminate/3, + code_change/4]). --spec start_link(node_ref(), 'up' | 'down' ) -> 'ignore' | {'error',_} | {'ok',pid()}. -start_link(NodeRef, StartState) -> - gen_fsm:start_link(?MODULE, {NodeRef, StartState}, []). +start_link(NodeRef) -> + gen_fsm:start_link(?MODULE, [NodeRef], []). --spec heartbeat(node_ref()) -> 'ok'. heartbeat(NodeRef) -> - Pid = pushy_node_state_sup:get_process(NodeRef), - gen_fsm:send_all_state_event(Pid, heartbeat). - --spec current_state(node_ref()) -> node_status(). -current_state(NodeRef) -> - Pid = pushy_node_state_sup:get_process(NodeRef), - gen_fsm:sync_send_all_state_event(Pid, current_state, infinity). + send_info(NodeRef, heartbeat), + ok. --spec set_logging(node_ref(), logging_level()) -> ok. -set_logging(NodeRef, Level) when Level =:= verbose orelse Level =:= normal -> - Pid = pushy_node_state_sup:get_process(NodeRef), - gen_fsm:send_all_state_event(Pid, {logging, Level}). +status(NodeRef) -> + case call(NodeRef, current_state) of + undefined -> + {offline, unavailable}; + CurrentState -> + eval_state(CurrentState) + end. --spec start_watching(node_ref()) -> true. -start_watching(NodeRef) -> - gproc:reg(subscribers_key(NodeRef)). +watch(NodeRef) -> + case call(NodeRef, {watch, self()}) of + ok -> + ok; + Error -> + Error + end. --spec stop_watching(node_ref()) -> true. -stop_watching(NodeRef) -> - try - gproc:unreg(subscribers_key(NodeRef)) - catch error:badarg -> - ok +aborted(NodeRef) -> + case pushy_node_state_sup:get_process(NodeRef) of + Pid when is_pid(Pid) -> + gen_fsm:send_event(Pid, aborted); + Error -> + Error end. -in_rehab(NodeRef) -> - Pid = pushy_node_state_sup:get_process(NodeRef), - gen_fsm:sync_send_all_state_event(Pid, current_rehab_status, infinity). - -rehab(NodeRef) -> - Pid = pushy_node_state_sup:get_process(NodeRef), - gen_fsm:send_all_state_event(Pid, rehab). - -node_aborted(NodeRef) -> - Pid = pushy_node_state_sup:get_process(NodeRef), - gen_fsm:send_all_state_event(Pid, aborted). - -init({NodeRef,StartState}) -> - init(NodeRef,StartState); -init(NodeRef) -> - init(NodeRef, down). -% -% This is split into two phases: an 'upper half' to get the minimimal work done required to wire things up -% and a 'lower half' that takes care of things that can wait -% -init(NodeRef, StartState) -> + +init([NodeRef]) -> GprocName = pushy_node_state_sup:mk_gproc_name(NodeRef), + State = #state{node_ref = NodeRef}, try %% The most important thing to have happen is this registration; we need to get this %% assigned before anyone else tries to start things up gproc:reg can only return %% true or throw true = gproc:reg({n, l, GprocName}), - HeartbeatInterval = envy:get(pushy, heartbeat_interval, integer), - DecayWindow = envy:get(pushy, decay_window, ?DEFAULT_DECAY_INTERVAL, integer), - UpThresh = envy:get(pushy, up_threshold, ?DEFAULT_UP_THRESHOLD, number), %% TODO constrain to float - DownThresh = envy:get(pushy, down_threshold, ?DEFAULT_DOWN_THRESHOLD, number), %% TODO constrain to float - - InitAvg = case StartState of - up -> 1.0; - down -> 0.0 - end, - - State = #state{node_ref = NodeRef, - decay_window = DecayWindow, - heartbeat_interval = HeartbeatInterval, - heartbeat_rate = pushy_ema:init(DecayWindow, HeartbeatInterval, InitAvg), - up_threshold = UpThresh, - down_threshold = DownThresh, - current_status = StartState - }, - {ok, StartState, create_status_record(StartState, State)} + State = #state{node_ref = NodeRef}, + {ok, state_transition(init, booting, State), State} catch error:badarg -> %% When we start up from a previous run, we have two ways that the FSM might be started; @@ -159,144 +92,146 @@ init(NodeRef, StartState) -> %% packet, and if one doesn't arrive within a certain time mark them down. lager:error("Failed to register:~p for ~p (already exists as ~p?)", [NodeRef,self(), gproc:lookup_pid({n,l,GprocName}) ]), - {stop, shutdown, undefined} + {stop, state_transition(init, shutdown, State), State} end. -%% -%% These events are handled the same for every state -%% --spec handle_event(any(), node_status(), #state{}) -> {any(), node_status(), #state{}}. -handle_event({logging, Level}, StateName, State) -> - State1 = State#state{logging=Level}, - {next_state, StateName, State1}; -handle_event(rehab, up, State) -> - State1 = send_to_rehab(up, State), - {next_state, up, State1}; -handle_event(rehab, down, State) -> - {next_state, down, State}; -handle_event(aborted, up, State) -> - State1 = kick_from_rehab(State), - {next_state, up, State1}; -handle_event(heartbeat, - StateName, - #state{node_ref=NodeRef, heartbeats_rcvd=HeartBeats, logging=Level, current_status=CurStatus, heartbeat_rate=HRate}=State) -> - nlog(Level, "Heartbeat received from ~p. Currently ~p / ~p", [NodeRef, CurStatus, HRate]), - %% Note that we got a heartbeat - State1 = State#state{ - heartbeat_rate=pushy_ema:inc(HRate,1), - current_status=CurStatus, - heartbeats_rcvd=HeartBeats+1 - }, - {next_state, StateName, State1}; -handle_event(Event, StateName, #state{node_ref=NodeRef}=State) -> - lager:error("FSM for ~p received unexpected handle_event(~p)", [NodeRef, Event]), +booting(Message, #state{node_ref=NodeRef}=State) -> + lager:info("~p is booting. Ignoring message: ~p~n", [NodeRef, Message]), + {next_state, booting, State, 60000}. + +rehab(aborted, #state{state_timer=TRef}=State) -> + timer:cancel(TRef), + {next_state, state_transition(rehab, idle, State), State}; +rehab(Message, #state{node_ref=NodeRef}=State) -> + lager:info("~p in rehab. Ignoring message: ~p~n", [NodeRef, Message]), + {next_state, rehab, State}. + +idle({job, Job}, State) -> + {next_state, state_transition(idle, running, State), State#state{job=Job}}; +idle(Event, State) -> + lager:info("IDLE EVENT: ~p~n", [Event]), + {next_state, idle, State}. + +running(aborted, #state{node_ref=NodeRef}=State) -> + lager:info("~p aborted during job.~n", [NodeRef]), + {next_state, state_transition(running, idle, State), State}; +running({complete, Job}, #state{job=Job, node_ref=NodeRef}=State) -> + lager:info("~p completed job.~n", [NodeRef]), + State1 = State#state{job=undefined}, + {next_state, state_transition(running, idle, State1), State1}. + +handle_event(_Event, StateName, State) -> {next_state, StateName, State}. -handle_sync_event(current_state, _From, StateName, State) -> - {reply, StateName, StateName, State}; -handle_sync_event(current_rehab_status, _From, StateName, #state{rehab_timer = RehabTimer} = State) -> - InRehab = case RehabTimer of - undefined -> false; - _ -> true - end, - {reply, InRehab, StateName, State}; -handle_sync_event(Event, _From, StateName, #state{node_ref=NodeRef}=State) -> - lager:error("FSM for ~p received unexpected handle_sync_event(~p)", [NodeRef, Event]), - {reply, ignored, StateName, State}. - -%% -%% Handle info -%% -handle_info(down, down, State) -> - {next_state, down, State}; -handle_info(send_abort, StateName, #state{node_ref = NodeRef} = State) -> - Message = {[{type, abort}]}, - ok = pushy_command_switch:send_command(NodeRef, Message), - {next_state, StateName, State}; -handle_info({timeout, _Ref, update_avg}, CurStatus, #state{heartbeat_rate=HRate, up_threshold=UThresh, down_threshold=DThresh}=State) -> - NHRate = pushy_ema:tick(HRate), - EAvg = pushy_ema:value(NHRate), - {NStatus, NState} = - case CurStatus of - up when EAvg < DThresh -> - NS = update_status(down, State), - {down, NS}; - down when EAvg > UThresh -> - NS = update_status(up, State), - {up, NS}; - S -> {S, State} - end, - {next_state, NStatus, NState#state{heartbeat_rate=NHRate} }; -handle_info(Info, CurState, #state{node_ref=NodeRef}=State) -> - lager:error("FSM for ~p received unexpected handle_info(~p)", [NodeRef, Info]), - {next_state, CurState, State}. - - -terminate(_Reason, _CurState, _State) -> +handle_sync_event({watch, WatcherPid}, _From, StateName, #state{watchers=Watchers}=State) -> + MRef = erlang:monitor(process, WatcherPid), + Watcher = {WatcherPid, MRef}, + {reply, ok, StateName, State#state{watchers=[Watcher|Watchers]}}; +handle_sync_event({unwatch, WatcherPid}, _From, StateName, #state{watchers=Watchers}=State) -> + case lists:keytake(WatcherPid, 1, Watchers) of + false -> + {reply, ok, StateName, State}; + {value, {WatcherPid, MRef}, Watchers1} -> + erlang:demonitor(MRef, [flush]), + {reply, ok, StateName, State#state{watchers=Watchers1}} + end; +handle_sync_event(current_state, _From, StateName, #state{job=Job}=State) -> + {reply, {StateName, Job}, StateName, State}. + +handle_info(heartbeat, booting, #state{heartbeats=HBeats}=State) -> + lager:info("BOOTING SELF: ~p~n", [self()]), + HBeats1 = HBeats + 1, + State1 = State#state{heartbeats=HBeats1}, + case HBeats1 > 3 of + true -> + State2 = force_abort(State1), + {next_state, state_transition(booting, rehab, State2), State2}; + false -> + {next_state, booting, State1} + end; +handle_info(heartbeat, CurrentState, #state{heartbeats=HBeats}=State) -> + lager:info("~p SELF: ~p~n", [CurrentState, self()]), + HBeats1 = HBeats + 1, + State1 = if + HBeats1 < 5 -> + State#state{heartbeats=HBeats1}; + true -> + State#state{heartbeats=5} + end, + {next_state, CurrentState, State1, heartbeat_interval()}; +handle_info(timeout, CurrentState, #state{heartbeats=HBeats}=State) -> + HBeats1 = HBeats - 1, + State1 = State#state{heartbeats=HBeats1}, + case HBeats1 of + 0 -> + {stop, state_transition(CurrentState, shutdown, State1), State1}; + _ -> + {next_state, CurrentState, State, heartbeat_interval()} + end; +handle_info(rehab_again, rehab, State) -> + State1 = force_abort(State), + {next_state, rehab, State1}; +handle_info({'DOWN', _MRef, _Type, Pid, _Reason}, StateName, #state{watchers=Watchers}=State) -> + case lists:keytake(Pid, Watchers) of + false -> + {next_state, StateName, State}; + {value, _, Watchers1} -> + {next_state, StateName, State#state{watchers=Watchers1}} + end; +handle_info(_Info, StateName, State) -> + {next_state, StateName, State}. + +terminate(_Reason, _StateName, _State) -> ok. -code_change(_OldVsn, CurState, State, _Extra) -> - {ok, CurState, State}. +code_change(_OldVsn, StateName, State, _Extra) -> + {ok, StateName, State}. %% Internal functions - -create_status_record(Status, #state{node_ref=NodeRef}=State) -> - State1 = notify_status_change(Status, State), - pushy_node_status_updater:create(NodeRef, ?POC_ACTOR_ID, Status), - State1. - -update_status(Status, #state{node_ref=NodeRef}=State) -> - State1 = notify_status_change(Status, State), - pushy_node_status_updater:update(NodeRef, ?POC_ACTOR_ID, Status), - State1. - -notify_status_change(Status, #state{node_ref=NodeRef} = State) -> - lager:info("Status change for ~p : ~p", [NodeRef, Status]), - - case Status of - down -> - gproc:send(subscribers_key(NodeRef), {down, NodeRef}), - kick_from_rehab(State); - up -> send_to_rehab(Status, State) +eval_state({booting, undefined}) -> + {online, {unvailable, none}}; +eval_state({idle, undefined}) -> + {online, {available, none}}; +eval_state({rehab, undefined}) -> + {online, {unavailable, none}}; +eval_state({running, Job}) -> + {online, {unvailable, Job}}. + +rehab_interval() -> + envy:get(pushy, rehab_timer, 1000, integer). + +heartbeat_interval() -> + envy:get(pushy, heartbeat_interval, integer). + +call(NodeRef, Message) -> + case pushy_node_state_sup:get_process(NodeRef) of + Pid when is_pid(Pid) -> + gen_fsm:sync_send_all_state_event(Pid, Message, infinity); + Error -> + Error end. -nlog(normal, Format, Args) -> - lager:debug(Format, Args); -nlog(verbose, Format, Args) -> - lager:info(Format, Args). - --spec subscribers_key(node_ref()) -> {p,l,{node_state_monitor,node_ref()}}. -subscribers_key(NodeRef) -> - {p,l,{node_state_monitor,NodeRef}}. - -%%----------------------------------------------------------------------------- -%% Private Functions -%%----------------------------------------------------------------------------- - -send_to_rehab(up, #state{node_ref = NodeRef, rehab_timer = undefined} = State) -> - self() ! send_abort, - {ok, TimerRef} = timer:send_interval(rehab_timer(), send_abort), - lager:info("Added ~p to Rehab", [NodeRef]), - State#state{rehab_timer = TimerRef}; -send_to_rehab(down, #state{node_ref = NodeRef} = State) -> - lager:info("Node ~p is down can't be sent to Rehab", [NodeRef]), - State; -send_to_rehab(_Status, #state{node_ref = NodeRef} = State) -> - lager:info("~p already in rehab", [NodeRef]), - State. - -kick_from_rehab(#state{rehab_timer = undefined} = State) -> - State; -kick_from_rehab(#state{rehab_timer = TimerRef, node_ref = NodeRef} = State) -> - case timer:cancel(TimerRef) of - {ok, cancel} -> - lager:info("Removed ~p from Rehab", [NodeRef]), - State#state{rehab_timer = undefined}; +send_info(NodeRef, Message) -> + case pushy_node_state_sup:get_process(NodeRef) of + Pid when is_pid(Pid) -> + Pid ! Message; Error -> - lager:info("Error Canceling Timer: ~p~n", [Error]), - State + Error end. - -rehab_timer() -> - pushy_util:get_env(pushy, rehab_timer, 1000, fun is_integer/1). +force_abort(#state{node_ref=NodeRef}=State) -> + Message = {[{type, abort}]}, + ok = pushy_command_switch:send_command(NodeRef, Message), + TRef = timer:send_after(rehab_interval(), rehab_again), + State#state{state_timer=TRef}. + +state_transition(Current, New, #state{node_ref=NodeRef, watchers=Watchers}) -> + lager:info("~p transitioning from ~p to ~p~n", [NodeRef, Current, New]), + notify_watchers(Watchers, NodeRef, Current, New), + New. + +notify_watchers([], _NodeRef, _Current, _New) -> + ok; +notify_watchers(Watchers, NodeRef, Current, New) -> + F = fun(Watcher) -> Watcher ! {state_change, NodeRef, Current, New} end, + [F(Watcher) || {Watcher, _Monitor} <- Watchers]. diff --git a/apps/pushy/src/pushy_node_state2.erl b/apps/pushy/src/pushy_node_state2.erl deleted file mode 100644 index b5279127..00000000 --- a/apps/pushy/src/pushy_node_state2.erl +++ /dev/null @@ -1,222 +0,0 @@ --module(pushy_node_state2). - --behaviour(gen_fsm). - --include("pushy.hrl"). - --type logging_level() :: 'verbose' | 'normal'. - - -%% API --export([start_link/1, - heartbeat/1, - status/1, - watch/1]). - -%% States --export([booting/2, - idle/2, - running/2, - rehab/2]). - --define(DEFAULT_DECAY_INTERVAL, 4). --define(DEFAULT_UP_THRESHOLD, 0.5). --define(DEFAULT_DOWN_THRESHOLD, 0.4). - --record(state, {node_ref :: node_ref(), - logging = verbose :: logging_level(), - heartbeats = 1 :: pos_integer(), - job :: any(), - watchers = [], - state_timer - }). - -%% gen_fsm callbacks --export([init/1, - handle_event/3, - handle_sync_event/4, - handle_info/3, - terminate/3, - code_change/4]). - -start_link(NodeRef) -> - gen_fsm:start_link(?MODULE, [NodeRef], []). - -heartbeat(NodeRef) -> - send_info(NodeRef, heartbeat), - ok. - -status(NodeRef) -> - case call(NodeRef, current_state) of - undefined -> - {offline, unavailable}; - CurrentState -> - eval_state(CurrentState) - end. - -watch(NodeRef) -> - case call(NodeRef, {watch, self()}) of - ok -> - ok; - Error -> - Error - end. - -init([NodeRef]) -> - GprocName = pushy_node_state_sup:mk_gproc_name(NodeRef), - State = #state{node_ref = NodeRef}, - try - %% The most important thing to have happen is this registration; we need to get this - %% assigned before anyone else tries to start things up gproc:reg can only return - %% true or throw - true = gproc:reg({n, l, GprocName}), - State = #state{node_ref = NodeRef}, - {ok, state_transition(init, booting, State), State} - catch - error:badarg -> - %% When we start up from a previous run, we have two ways that the FSM might be started; - %% from an incoming packet, or the database record for a prior run - %% There may be some nasty race conditions surrounding this. - %% We may also want to *not* automatically reanimate FSMs for nodes that aren't - %% actively reporting; but rather keep them in a 'limbo' waiting for the first - %% packet, and if one doesn't arrive within a certain time mark them down. - lager:error("Failed to register:~p for ~p (already exists as ~p?)", - [NodeRef,self(), gproc:lookup_pid({n,l,GprocName}) ]), - {stop, state_transition(init, shutdown, State), State} - end. - -booting(Message, #state{node_ref=NodeRef}=State) -> - lager:info("~p is booting. Ignoring message: ~p~n", [NodeRef, Message]), - {next_state, booting, State, 60000}. - -rehab(aborted, #state{state_timer=TRef}=State) -> - timer:cancel(TRef), - {next_state, state_transition(rehab, idle, State), State}; -rehab(Message, #state{node_ref=NodeRef}=State) -> - lager:info("~p in rehab. Ignoring message: ~p~n", [NodeRef, Message]), - {next_state, rehab, State}. - -idle({job, Job}, State) -> - {next_state, state_transition(idle, running, State), State#state{job=Job}}. - -running(aborted, #state{node_ref=NodeRef}=State) -> - lager:info("~p aborted during job.~n", [NodeRef]), - {next_state, state_transition(running, idle, State), State}; -running({complete, Job}, #state{job=Job, node_ref=NodeRef}=State) -> - lager:info("~p completed job.~n", [NodeRef]), - State1 = State#state{job=undefined}, - {next_state, state_transition(running, idle, State1), State1}. - -handle_event(_Event, StateName, State) -> - {next_state, StateName, State}. - -handle_sync_event({watch, WatcherPid}, _From, StateName, #state{watchers=Watchers}=State) -> - MRef = erlang:monitor(process, WatcherPid), - Watcher = {WatcherPid, MRef}, - {reply, ok, StateName, State#state{watchers=[Watcher|Watchers]}}; -handle_sync_event({unwatch, WatcherPid}, _From, StateName, #state{watchers=Watchers}=State) -> - case lists:keytake(WatcherPid, 1, Watchers) of - false -> - {reply, ok, StateName, State}; - {value, {WatcherPid, MRef}, Watchers1} -> - erlang:demonitor(MRef, [flush]), - {reply, ok, StateName, State#state{watchers=Watchers1}} - end; -handle_sync_event(current_state, _From, StateName, #state{job=Job}=State) -> - {reply, {StateName, Job}, StateName, State}. - -handle_info(heartbeat, booting, #state{heartbeats=HBeats}=State) -> - HBeats1 = HBeats + 1, - State1 = State#state{heartbeats=HBeats1}, - case HBeats1 > 3 of - true -> - State2 = force_abort(State1), - {next_state, state_transition(booting, rehab, State2), State2}; - false -> - {next_state, booting, State1} - end; -handle_info(heartbeat, CurrentState, #state{heartbeats=HBeats}=State) -> - HBeats1 = HBeats + 1, - State1 = if - HBeats1 < 5 -> - State#state{heartbeats=HBeats1}; - true -> - State#state{heartbeats=5} - end, - {next_state, CurrentState, State1, heartbeat_interval()}; -handle_info(timeout, CurrentState, #state{heartbeats=HBeats}=State) -> - HBeats1 = HBeats - 1, - State1 = State#state{heartbeats=HBeats1}, - case HBeats1 of - 0 -> - {stop, state_transition(CurrentState, shutdown, State1), State1}; - _ -> - {next_state, CurrentState, State, heartbeat_interval()} - end; -handle_info(rehab_again, rehab, State) -> - State1 = force_abort(State), - {next_state, rehab, State1}; -handle_info({'DOWN', _MRef, _Type, Pid, _Reason}, StateName, #state{watchers=Watchers}=State) -> - case lists:keytake(Pid, Watchers) of - false -> - {next_state, StateName, State}; - {value, _, Watchers1} -> - {next_state, StateName, State#state{watchers=Watchers1}} - end; -handle_info(_Info, StateName, State) -> - {next_state, StateName, State}. - -terminate(_Reason, _StateName, _State) -> - ok. - -code_change(_OldVsn, StateName, State, _Extra) -> - {ok, StateName, State}. - -%% Internal functions -eval_state({booting, undefined}) -> - {online, {unvailable, none}}; -eval_state({idle, undefined}) -> - {online, {available, none}}; -eval_state({rehab, undefined}) -> - {online, {unavailable, none}}; -eval_state({running, Job}) -> - {online, {unvailable, Job}}. - -rehab_interval() -> - envy:get(pushy, rehab_timer, 1000, integer). - -heartbeat_interval() -> - envy:get(pushy, heartbeat_interval, integer). - -call(NodeRef, Message) -> - case pushy_node_state_sup:get_process(NodeRef) of - Pid when is_pid(Pid) -> - gen_fsm:sync_send_all_state_event(Pid, Message, infinity); - Error -> - Error - end. - -send_info(NodeRef, Message) -> - case pushy_node_state_sup:get_process(NodeRef) of - Pid when is_pid(Pid) -> - Pid ! Message; - Error -> - Error - end. - -force_abort(#state{node_ref=NodeRef}=State) -> - Message = {[{type, abort}]}, - ok = pushy_command_switch:send_command(NodeRef, Message), - TRef = timer:send_after(rehab_interval(), rehab_again), - State#state{state_timer=TRef}. - -state_transition(Current, New, #state{node_ref=NodeRef, watchers=Watchers}) -> - lager:debug("~p transitioning from ~p to ~p~n", [NodeRef, Current, New]), - notify_watchers(Watchers, NodeRef, Current, New), - New. - -notify_watchers([], _NodeRef, _Current, _New) -> - ok; -notify_watchers(Watchers, NodeRef, Current, New) -> - F = fun(Watcher) -> Watcher ! {state_change, NodeRef, Current, New} end, - [F(Watcher) || {Watcher, _Monitor} <- Watchers]. diff --git a/apps/pushy/src/pushy_node_state_sup.erl b/apps/pushy/src/pushy_node_state_sup.erl index aa889143..67e44632 100644 --- a/apps/pushy/src/pushy_node_state_sup.erl +++ b/apps/pushy/src/pushy_node_state_sup.erl @@ -34,13 +34,12 @@ start_link() -> -spec get_process(node_ref()) -> pid(). get_process(NodeRef) -> - StartState = down, GprocName = mk_gproc_name(NodeRef), case catch gproc:lookup_pid({n,l,GprocName}) of {'EXIT', _} -> % Run start_child asynchronously; we only need to wait until the % process registers itself before we can send it messages. - spawn(supervisor, start_child, [?SERVER, [NodeRef, StartState]]), + spawn(supervisor, start_child, [?SERVER, [NodeRef]]), {Pid, _Value} = gproc:await({n,l,GprocName},1000), Pid; Pid -> Pid From 4c275dd89758d735d4fafae52453e49ae97bdfe5 Mon Sep 17 00:00:00 2001 From: Matthew Peck Date: Tue, 6 Nov 2012 15:54:12 -0500 Subject: [PATCH 04/15] idle is no longer timing out --- apps/pushy/src/pushy_node_state.erl | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/apps/pushy/src/pushy_node_state.erl b/apps/pushy/src/pushy_node_state.erl index e28690ba..83be10bc 100644 --- a/apps/pushy/src/pushy_node_state.erl +++ b/apps/pushy/src/pushy_node_state.erl @@ -4,9 +4,6 @@ -include("pushy.hrl"). --type logging_level() :: 'verbose' | 'normal'. - - %% API -export([start_link/1, heartbeat/1, @@ -25,7 +22,6 @@ -define(DEFAULT_DOWN_THRESHOLD, 0.4). -record(state, {node_ref :: node_ref(), - logging = verbose :: logging_level(), heartbeats = 1 :: pos_integer(), job :: any(), watchers = [], @@ -107,10 +103,7 @@ rehab(Message, #state{node_ref=NodeRef}=State) -> {next_state, rehab, State}. idle({job, Job}, State) -> - {next_state, state_transition(idle, running, State), State#state{job=Job}}; -idle(Event, State) -> - lager:info("IDLE EVENT: ~p~n", [Event]), - {next_state, idle, State}. + {next_state, state_transition(idle, running, State), State#state{job=Job}}. running(aborted, #state{node_ref=NodeRef}=State) -> lager:info("~p aborted during job.~n", [NodeRef]), @@ -139,7 +132,6 @@ handle_sync_event(current_state, _From, StateName, #state{job=Job}=State) -> {reply, {StateName, Job}, StateName, State}. handle_info(heartbeat, booting, #state{heartbeats=HBeats}=State) -> - lager:info("BOOTING SELF: ~p~n", [self()]), HBeats1 = HBeats + 1, State1 = State#state{heartbeats=HBeats1}, case HBeats1 > 3 of @@ -150,7 +142,6 @@ handle_info(heartbeat, booting, #state{heartbeats=HBeats}=State) -> {next_state, booting, State1} end; handle_info(heartbeat, CurrentState, #state{heartbeats=HBeats}=State) -> - lager:info("~p SELF: ~p~n", [CurrentState, self()]), HBeats1 = HBeats + 1, State1 = if HBeats1 < 5 -> @@ -158,7 +149,7 @@ handle_info(heartbeat, CurrentState, #state{heartbeats=HBeats}=State) -> true -> State#state{heartbeats=5} end, - {next_state, CurrentState, State1, heartbeat_interval()}; + {next_state, CurrentState, State1}; handle_info(timeout, CurrentState, #state{heartbeats=HBeats}=State) -> HBeats1 = HBeats - 1, State1 = State#state{heartbeats=HBeats1}, @@ -226,7 +217,7 @@ force_abort(#state{node_ref=NodeRef}=State) -> State#state{state_timer=TRef}. state_transition(Current, New, #state{node_ref=NodeRef, watchers=Watchers}) -> - lager:info("~p transitioning from ~p to ~p~n", [NodeRef, Current, New]), + lager:debug("~p transitioning from ~p to ~p~n", [NodeRef, Current, New]), notify_watchers(Watchers, NodeRef, Current, New), New. From 4ea69d5c55f7a5b22d2ae4193e627598bd1a5ac0 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Wed, 7 Nov 2012 11:19:48 -0500 Subject: [PATCH 05/15] New node stats module * New stats module incrementally computes node status based on messaging history to the node. Requires a single timer to scan the ETS table periodically to pick up any zombie nodes. --- apps/pushy/src/pushy_node_stats.erl | 111 ++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 apps/pushy/src/pushy_node_stats.erl diff --git a/apps/pushy/src/pushy_node_stats.erl b/apps/pushy/src/pushy_node_stats.erl new file mode 100644 index 00000000..b5b84b4f --- /dev/null +++ b/apps/pushy/src/pushy_node_stats.erl @@ -0,0 +1,111 @@ +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil; fill-column: 92 -*- +%% ex: ts=4 sw=4 et +%%% @copyright Copyright 2012-2012 Opscode Inc. +%%% @doc +%%% Node health statistics +%%% @end + +-module(pushy_node_stats). + +-record(metric, {node_pid :: pid(), + avg=down_threshold() * 2 :: float(), + last=os:timestamp() :: {pos_integer(), pos_integer(), pos_integer()}, + heartbeats=1 :: pos_integer()}). + +%% These two weights must total to 1.0 +-define(HISTORY_WEIGHT, 0.85). +-define(NOW_WEIGHT, 0.15). + +-export([init/0, + heartbeat/1, + scan/0]). + +-spec init() -> atom() | ets:tid(). +init() -> + ets:new(?MODULE, [ordered_set, public, named_table, {keypos, 2}, + {write_concurrency, true}, {read_concurrency, true}]). + +-spec heartbeat(pid()) -> ok | should_die. +heartbeat(NodePid) -> + case ets:lookup(?MODULE, NodePid) of + [] -> + ets:insert_new(?MODULE, #metric{node_pid=NodePid}), + ok; + [Node] -> + Node1 = hb(Node), + case evaluate_node_health(Node1) of + {reset, Node2} -> + ets:insert(?MODULE, reset(Node2)), + ok; + {ok, Node2} -> + ets:insert(?MODULE, Node2), + ok; + {should_die, _Node2} -> + ets:delete_object(?MODULE, Node1), + should_die + end + end. + +-spec scan() -> ok. +scan() -> + ets:safe_fixtable(?MODULE, true), + try + scan(ets:first(?MODULE)) + after + %% Make sure we unfix the table even + %% if an error was thrown + ets:safe_fixtable(?MODULE, false) + end. + +scan('$end_of_table') -> + ok; +scan(NodePid) -> + [Node] = ets:lookup(?MODULE, NodePid), + case evaluate_node_health(Node) of + {ok, Node1} -> + ets:insert(?MODULE, Node1), + ok; + {reset, Node1} -> + ets:insert(?MODULE, Node1), + ok; + {should_die, _Node1} -> + ets:delete_object(?MODULE, Node), + NodePid ! should_die + end, + scan(ets:next(?MODULE, NodePid)). + +reset(Node) -> + Node#metric{heartbeats=0}. + +hb(#metric{heartbeats=Heartbeats}=Node) -> + Node#metric{heartbeats=Heartbeats + 1, last=os:timestamp()}. + +evaluate_node_health(#metric{avg=Avg, heartbeats=Heartbeats}=Node) -> + case elapsed_intervals(Node) of + 0 -> + {ok, Node}; + X -> + Window = (decay_window() - 1) * X, + WindowAvg = Heartbeats / Window, + NAvg = (Avg * ?HISTORY_WEIGHT) + (WindowAvg * ?NOW_WEIGHT), + Node1 = Node#metric{avg=NAvg, heartbeats=0}, + case NAvg < down_threshold() of + true -> + {should_die, Node1}; + false -> + {reset, Node1} + end + end. + +elapsed_intervals(#metric{last=TS}) -> + ET = timer:now_diff(os:timestamp(), TS) div 1000, + ET div heartbeat_interval(). + +heartbeat_interval() -> + envy:get(pushy, heartbeat_interval, number). + +down_threshold() -> + envy:get(pushy, down_threshold, number). + +decay_window() -> + envy:get(pushy, decay_window, integer). From c33adac8ab0d1ec9dbd7c630c0f5b4e112b503e2 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Wed, 7 Nov 2012 11:39:14 -0500 Subject: [PATCH 06/15] Periodic scans to find and kill zombie nodes --- apps/pushy/src/pushy_node_stats_scanner.erl | 54 +++++++++++++++++++++ apps/pushy/src/pushy_sup.erl | 21 ++++---- 2 files changed, 63 insertions(+), 12 deletions(-) create mode 100644 apps/pushy/src/pushy_node_stats_scanner.erl diff --git a/apps/pushy/src/pushy_node_stats_scanner.erl b/apps/pushy/src/pushy_node_stats_scanner.erl new file mode 100644 index 00000000..0b77db10 --- /dev/null +++ b/apps/pushy/src/pushy_node_stats_scanner.erl @@ -0,0 +1,54 @@ +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil; fill-column: 92 -*- +%% ex: ts=4 sw=4 et +%%% @copyright Copyright 2012-2012 Opscode Inc. +%%% @doc +%%% Conducts background scans on node stats +%%% looking for zombie nodes +%%% @end + +-module(pushy_node_stats_scanner). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-define(SERVER, ?MODULE). + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +init([]) -> + random:seed(erlang:now()), + {ok, none, wait_interval()}. + +handle_call(_Request, _From, State) -> + {reply, ignored, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(timeout, State) -> + pushy_node_stats:scan(), + {noreply, State, wait_interval()}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%% Internal functions +wait_interval() -> + HeartbeatInterval = envy:get(pushy, heartbeat_interval, number), + (HeartbeatInterval * 4) + random:uniform(HeartbeatInterval). diff --git a/apps/pushy/src/pushy_sup.erl b/apps/pushy/src/pushy_sup.erl index 26acbb82..3c63c596 100644 --- a/apps/pushy/src/pushy_sup.erl +++ b/apps/pushy/src/pushy_sup.erl @@ -12,10 +12,6 @@ %% Supervisor callbacks -export([init/1]). -%-ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). -%-endif. - -include("pushy.hrl"). %% Helper macro for declaring children of supervisor @@ -55,15 +51,16 @@ init([#pushy_state{ctx=_Ctx} = PushyState]) -> {log_dir, LogDir}, {dispatch, Dispatch}, {enable_perf_logger, true}], - ?debugVal(WebMachineConfig), - Workers = [?SUP(pushy_node_state_sup, []), - ?SUP(pushy_job_state_sup, []), - ?WORKER(chef_keyring, []), - ?WORKER(pushy_node_status_updater, []), - ?WORKER(pushy_heartbeat_generator, [PushyState]), - ?WORKER(pushy_command_switch, [PushyState]), - ?WORKERNL(webmachine_mochiweb, [WebMachineConfig]) %% FIXME start or start_link here? + Workers = [?WORKER(pushy_node_stats_scanner, []), + ?SUP(pushy_node_state_sup, []), + ?SUP(pushy_job_state_sup, []), + ?WORKER(chef_keyring, []), + ?WORKER(pushy_node_status_updater, []), + ?WORKER(pushy_heartbeat_generator, [PushyState]), + ?WORKER(pushy_command_switch, [PushyState]), + ?WORKERNL(webmachine_mochiweb, [WebMachineConfig]) %% FIXME start or start_link here? ], + pushy_node_stats:init(), {ok, {{one_for_one, 60, 120}, maybe_run_graphite(EnableGraphite, Workers)}}. From 3bc0ff67c57c9b596a150ca3613bf27247272089 Mon Sep 17 00:00:00 2001 From: Matthew Peck Date: Wed, 7 Nov 2012 14:50:53 -0500 Subject: [PATCH 07/15] minor modifications --- apps/pushy/src/pushy_node_state.erl | 4 ---- apps/pushy/src/pushy_node_stats.erl | 4 ++-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/apps/pushy/src/pushy_node_state.erl b/apps/pushy/src/pushy_node_state.erl index 83be10bc..3cec887f 100644 --- a/apps/pushy/src/pushy_node_state.erl +++ b/apps/pushy/src/pushy_node_state.erl @@ -17,10 +17,6 @@ running/2, rehab/2]). --define(DEFAULT_DECAY_INTERVAL, 4). --define(DEFAULT_UP_THRESHOLD, 0.5). --define(DEFAULT_DOWN_THRESHOLD, 0.4). - -record(state, {node_ref :: node_ref(), heartbeats = 1 :: pos_integer(), job :: any(), diff --git a/apps/pushy/src/pushy_node_stats.erl b/apps/pushy/src/pushy_node_stats.erl index b5b84b4f..07d69f5b 100644 --- a/apps/pushy/src/pushy_node_stats.erl +++ b/apps/pushy/src/pushy_node_stats.erl @@ -22,7 +22,7 @@ -spec init() -> atom() | ets:tid(). init() -> - ets:new(?MODULE, [ordered_set, public, named_table, {keypos, 2}, + ets:new(?MODULE, [set, public, named_table, {keypos, 2}, {write_concurrency, true}, {read_concurrency, true}]). -spec heartbeat(pid()) -> ok | should_die. @@ -88,7 +88,7 @@ evaluate_node_health(#metric{avg=Avg, heartbeats=Heartbeats}=Node) -> Window = (decay_window() - 1) * X, WindowAvg = Heartbeats / Window, NAvg = (Avg * ?HISTORY_WEIGHT) + (WindowAvg * ?NOW_WEIGHT), - Node1 = Node#metric{avg=NAvg, heartbeats=0}, + Node1 = Node#metric{avg=NAvg}, case NAvg < down_threshold() of true -> {should_die, Node1}; From 9e22703708aba9ac6363ac92a1ecd9941eb86e6d Mon Sep 17 00:00:00 2001 From: Matthew Peck Date: Wed, 7 Nov 2012 15:59:03 -0500 Subject: [PATCH 08/15] integrated pushy_node_stats --- apps/pushy/src/pushy_node_state.erl | 52 ++++++----------------------- apps/pushy/src/pushy_node_stats.erl | 41 ++++++++++++++++------- 2 files changed, 40 insertions(+), 53 deletions(-) diff --git a/apps/pushy/src/pushy_node_state.erl b/apps/pushy/src/pushy_node_state.erl index 3cec887f..1c9ae8eb 100644 --- a/apps/pushy/src/pushy_node_state.erl +++ b/apps/pushy/src/pushy_node_state.erl @@ -12,8 +12,7 @@ aborted/1]). %% States --export([booting/2, - idle/2, +-export([idle/2, running/2, rehab/2]). @@ -72,8 +71,8 @@ init([NodeRef]) -> %% assigned before anyone else tries to start things up gproc:reg can only return %% true or throw true = gproc:reg({n, l, GprocName}), - State = #state{node_ref = NodeRef}, - {ok, state_transition(init, booting, State), State} + State1 = force_abort(State), + {ok, state_transition(init, rehab, State1), State1} catch error:badarg -> %% When we start up from a previous run, we have two ways that the FSM might be started; @@ -87,10 +86,6 @@ init([NodeRef]) -> {stop, state_transition(init, shutdown, State), State} end. -booting(Message, #state{node_ref=NodeRef}=State) -> - lager:info("~p is booting. Ignoring message: ~p~n", [NodeRef, Message]), - {next_state, booting, State, 60000}. - rehab(aborted, #state{state_timer=TRef}=State) -> timer:cancel(TRef), {next_state, state_transition(rehab, idle, State), State}; @@ -103,7 +98,8 @@ idle({job, Job}, State) -> running(aborted, #state{node_ref=NodeRef}=State) -> lager:info("~p aborted during job.~n", [NodeRef]), - {next_state, state_transition(running, idle, State), State}; + State1 = State#state{job=undefined}, + {next_state, state_transition(running, idle, State1), State1}; running({complete, Job}, #state{job=Job, node_ref=NodeRef}=State) -> lager:info("~p completed job.~n", [NodeRef]), State1 = State#state{job=undefined}, @@ -127,34 +123,13 @@ handle_sync_event({unwatch, WatcherPid}, _From, StateName, #state{watchers=Watch handle_sync_event(current_state, _From, StateName, #state{job=Job}=State) -> {reply, {StateName, Job}, StateName, State}. -handle_info(heartbeat, booting, #state{heartbeats=HBeats}=State) -> - HBeats1 = HBeats + 1, - State1 = State#state{heartbeats=HBeats1}, - case HBeats1 > 3 of - true -> - State2 = force_abort(State1), - {next_state, state_transition(booting, rehab, State2), State2}; - false -> - {next_state, booting, State1} - end; -handle_info(heartbeat, CurrentState, #state{heartbeats=HBeats}=State) -> - HBeats1 = HBeats + 1, - State1 = if - HBeats1 < 5 -> - State#state{heartbeats=HBeats1}; - true -> - State#state{heartbeats=5} - end, - {next_state, CurrentState, State1}; -handle_info(timeout, CurrentState, #state{heartbeats=HBeats}=State) -> - HBeats1 = HBeats - 1, - State1 = State#state{heartbeats=HBeats1}, - case HBeats1 of - 0 -> - {stop, state_transition(CurrentState, shutdown, State1), State1}; - _ -> - {next_state, CurrentState, State, heartbeat_interval()} +handle_info(heartbeat, CurrentState, State) -> + case pushy_node_stats:heartbeat(self()) of + ok -> {next_state, CurrentState, State}; + should_die -> {stop, state_transition(CurrentState, shutdown, State), State} end; +handle_info(should_die, CurrentState, State) -> + {stop, state_transition(CurrentState, shutdown, State), State}; handle_info(rehab_again, rehab, State) -> State1 = force_abort(State), {next_state, rehab, State1}; @@ -175,8 +150,6 @@ code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}. %% Internal functions -eval_state({booting, undefined}) -> - {online, {unvailable, none}}; eval_state({idle, undefined}) -> {online, {available, none}}; eval_state({rehab, undefined}) -> @@ -187,9 +160,6 @@ eval_state({running, Job}) -> rehab_interval() -> envy:get(pushy, rehab_timer, 1000, integer). -heartbeat_interval() -> - envy:get(pushy, heartbeat_interval, integer). - call(NodeRef, Message) -> case pushy_node_state_sup:get_process(NodeRef) of Pid when is_pid(Pid) -> diff --git a/apps/pushy/src/pushy_node_stats.erl b/apps/pushy/src/pushy_node_stats.erl index 07d69f5b..079129d0 100644 --- a/apps/pushy/src/pushy_node_stats.erl +++ b/apps/pushy/src/pushy_node_stats.erl @@ -80,21 +80,32 @@ reset(Node) -> hb(#metric{heartbeats=Heartbeats}=Node) -> Node#metric{heartbeats=Heartbeats + 1, last=os:timestamp()}. -evaluate_node_health(#metric{avg=Avg, heartbeats=Heartbeats}=Node) -> +evaluate_node_health(#metric{heartbeats=Heartbeats, node_pid=Pid}=Node) -> case elapsed_intervals(Node) of 0 -> - {ok, Node}; - X -> - Window = (decay_window() - 1) * X, - WindowAvg = Heartbeats / Window, - NAvg = (Avg * ?HISTORY_WEIGHT) + (WindowAvg * ?NOW_WEIGHT), - Node1 = Node#metric{avg=NAvg}, - case NAvg < down_threshold() of + if + Heartbeats > 4 -> + evaluate_node_health(1, Node); true -> - {should_die, Node1}; - false -> - {reset, Node1} - end + lager:debug("Skipping Node: ~p~n", [Pid]), + {ok, Node} + end; + X -> + evaluate_node_health(X, Node) + end. +evaluate_node_health(IntervalCount, #metric{avg=Avg, heartbeats=Heartbeats, node_pid=Pid}=Node) -> + Window = (decay_window() - 1) * IntervalCount, + WindowAvg = Heartbeats / Window, + NAvg = floor((Avg * ?HISTORY_WEIGHT) + (WindowAvg * ?NOW_WEIGHT), 1.0), + lager:debug("~p avg:~p old_avg:~p~n", [Pid, NAvg, Avg]), + Node1 = Node#metric{avg=NAvg}, + case NAvg < down_threshold() of + true -> + lager:debug("Killing Node: ~p~n", [Pid]), + {should_die, Node1}; + false -> + lager:debug("Resetting Node: ~p~n", [Pid]), + {reset, Node1} end. elapsed_intervals(#metric{last=TS}) -> @@ -109,3 +120,9 @@ down_threshold() -> decay_window() -> envy:get(pushy, decay_window, integer). + +floor(X, Y) when X =< Y -> + X; +floor(_X, Y) -> + Y. + From 074024b5f4a744169144152396cd5ce4f7e0e619 Mon Sep 17 00:00:00 2001 From: Matthew Peck Date: Wed, 7 Nov 2012 16:40:50 -0500 Subject: [PATCH 09/15] fixed ordering problem with child startup in pushy_sup --- apps/pushy/src/pushy_command_switch.erl | 14 ++++++++++---- apps/pushy/src/pushy_sup.erl | 6 +++--- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/apps/pushy/src/pushy_command_switch.erl b/apps/pushy/src/pushy_command_switch.erl index 53a53fbb..7578ab2d 100644 --- a/apps/pushy/src/pushy_command_switch.erl +++ b/apps/pushy/src/pushy_command_switch.erl @@ -161,9 +161,12 @@ do_send(#state{addr_node_map = AddrNodeMap, command_sock = CommandSocket}=State, Method, NodeRef, Message) -> {ok, Key} = get_key_for_method(Method, State, NodeRef), - Address = addr_node_map_lookup_by_node(AddrNodeMap, NodeRef), - Packets = ?TIME_IT(pushy_messaging, make_message, (proto_v2, Method, Key, Message)), - ok = pushy_messaging:send_message(CommandSocket, [Address | Packets]), + case addr_node_map_lookup_by_node(AddrNodeMap, NodeRef) of + error -> ok; + Address -> + Packets = ?TIME_IT(pushy_messaging, make_message, (proto_v2, Method, Key, Message)), + ok = pushy_messaging:send_message(CommandSocket, [Address | Packets]) + end, State. %%% @@ -301,4 +304,7 @@ addr_node_map_lookup_by_addr({AddrToNode, _}, Addr) -> addr_node_map_lookup_by_node(#state{addr_node_map = AddrNodeMap}, Node) -> addr_node_map_lookup_by_addr(AddrNodeMap, Node); addr_node_map_lookup_by_node({_, NodeToAddr}, Node) -> - dict:fetch(Node, NodeToAddr). + case dict:find(Node, NodeToAddr) of + error -> error; + {ok, Value} -> Value + end. diff --git a/apps/pushy/src/pushy_sup.erl b/apps/pushy/src/pushy_sup.erl index 3c63c596..a73599b4 100644 --- a/apps/pushy/src/pushy_sup.erl +++ b/apps/pushy/src/pushy_sup.erl @@ -52,12 +52,12 @@ init([#pushy_state{ctx=_Ctx} = PushyState]) -> {dispatch, Dispatch}, {enable_perf_logger, true}], Workers = [?WORKER(pushy_node_stats_scanner, []), - ?SUP(pushy_node_state_sup, []), - ?SUP(pushy_job_state_sup, []), ?WORKER(chef_keyring, []), - ?WORKER(pushy_node_status_updater, []), ?WORKER(pushy_heartbeat_generator, [PushyState]), ?WORKER(pushy_command_switch, [PushyState]), + ?SUP(pushy_node_state_sup, []), + ?SUP(pushy_job_state_sup, []), + ?WORKER(pushy_node_status_updater, []), ?WORKERNL(webmachine_mochiweb, [WebMachineConfig]) %% FIXME start or start_link here? ], pushy_node_stats:init(), From a68e00a70f1bbe0bc9c29f185dce2b3b64277334 Mon Sep 17 00:00:00 2001 From: Matthew Peck Date: Wed, 7 Nov 2012 17:20:17 -0500 Subject: [PATCH 10/15] node statuses now persist --- apps/pushy/src/pushy_node_state.erl | 3 +++ apps/pushy/src/pushy_node_state_sup.erl | 29 ++++++++++++------------- apps/pushy/src/pushy_sql.erl | 12 ++++++---- 3 files changed, 25 insertions(+), 19 deletions(-) diff --git a/apps/pushy/src/pushy_node_state.erl b/apps/pushy/src/pushy_node_state.erl index 1c9ae8eb..ea3167e5 100644 --- a/apps/pushy/src/pushy_node_state.erl +++ b/apps/pushy/src/pushy_node_state.erl @@ -3,6 +3,7 @@ -behaviour(gen_fsm). -include("pushy.hrl"). +-include("pushy_sql.hrl"). %% API -export([start_link/1, @@ -72,6 +73,7 @@ init([NodeRef]) -> %% true or throw true = gproc:reg({n, l, GprocName}), State1 = force_abort(State), + pushy_node_status_updater:create(NodeRef, ?POC_ACTOR_ID, shutdown), {ok, state_transition(init, rehab, State1), State1} catch error:badarg -> @@ -184,6 +186,7 @@ force_abort(#state{node_ref=NodeRef}=State) -> state_transition(Current, New, #state{node_ref=NodeRef, watchers=Watchers}) -> lager:debug("~p transitioning from ~p to ~p~n", [NodeRef, Current, New]), + pushy_node_status_updater:update(NodeRef, ?POC_ACTOR_ID, New), notify_watchers(Watchers, NodeRef, Current, New), New. diff --git a/apps/pushy/src/pushy_node_state_sup.erl b/apps/pushy/src/pushy_node_state_sup.erl index 67e44632..89315a4a 100644 --- a/apps/pushy/src/pushy_node_state_sup.erl +++ b/apps/pushy/src/pushy_node_state_sup.erl @@ -26,7 +26,6 @@ start_link() -> case supervisor:start_link({local, ?SERVER}, ?MODULE, []) of {ok, Pid} -> - load_from_db(), {ok, Pid}; Error -> Error @@ -62,18 +61,18 @@ init([]) -> %% Internal Function Definitions %% ------------------------------------------------------------------ -load_from_db() -> - case pushy_sql:fetch_node_statuses(?POC_ORG_ID) of - {ok, none} -> - lager:info("No existing node status records found in database, FSM proceses will not be pre-created."); - {ok, NodeStatuses} -> - create_processes(NodeStatuses); - {error, Reason} -> - lager:error("Error loading existing node status records from the database: ~p", [Reason]) - end. +%load_from_db() -> + %case pushy_sql:fetch_node_statuses(?POC_ORG_ID) of + %{ok, none} -> + %lager:info("No existing node status records found in database, FSM proceses will not be pre-created."); + %{ok, NodeStatuses} -> + %create_processes(NodeStatuses); + %{error, Reason} -> + %lager:error("Error loading existing node status records from the database: ~p", [Reason]) + %end. -create_processes([]) -> - {ok, done}; -create_processes([#pushy_node_status{org_id=OrgId,node_name=NodeName} | Rest]) -> - get_process({OrgId, NodeName}), - create_processes(Rest). +%create_processes([]) -> + %{ok, done}; +%create_processes([#pushy_node_status{org_id=OrgId,node_name=NodeName} | Rest]) -> + %get_process({OrgId, NodeName}), + %create_processes(Rest). diff --git a/apps/pushy/src/pushy_sql.erl b/apps/pushy/src/pushy_sql.erl index e9e080a2..e693852f 100644 --- a/apps/pushy/src/pushy_sql.erl +++ b/apps/pushy/src/pushy_sql.erl @@ -252,11 +252,15 @@ proplist_to_job_node(Proplist) -> %% Heartbeat Status translators hb_status_as_int(X) when is_integer(X) -> X; -hb_status_as_int(down) -> 0; -hb_status_as_int(up) -> 1. +hb_status_as_int(shutdown) -> 0; +hb_status_as_int(rehab) -> 1; +hb_status_as_int(idle) -> 2; +hb_status_as_int(running) -> 3. hb_status_as_atom(X) when is_atom(X) -> X; -hb_status_as_atom(0) -> down; -hb_status_as_atom(1) -> up. +hb_status_as_atom(0) -> shutdown; +hb_status_as_atom(1) -> rehab; +hb_status_as_atom(2) -> idle; +hb_status_as_atom(3) -> running. %% Job Status translators job_status(voting) -> 0; From 7e1d974018716e9e33b56b6277d25fadd7466ced Mon Sep 17 00:00:00 2001 From: Matthew Peck Date: Wed, 7 Nov 2012 17:34:51 -0500 Subject: [PATCH 11/15] job can now send nodes to rehab --- apps/pushy/src/pushy_job_state.erl | 2 +- apps/pushy/src/pushy_node_state.erl | 28 ++++++++++++++++++++++++---- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/apps/pushy/src/pushy_job_state.erl b/apps/pushy/src/pushy_job_state.erl index 585d79f8..221ec08d 100644 --- a/apps/pushy/src/pushy_job_state.erl +++ b/apps/pushy/src/pushy_job_state.erl @@ -317,7 +317,7 @@ count_nodes_in_state(NodeStates, #state{job_nodes = JobNodes}) -> listen_for_down_nodes([]) -> ok; listen_for_down_nodes([NodeRef|JobNodes]) -> - pushy_node_state:start_watching(NodeRef), + pushy_node_state:watch(NodeRef), case pushy_node_state:current_state(NodeRef) of down -> gen_fsm:send_event(self(), {down, NodeRef}); _ -> ok diff --git a/apps/pushy/src/pushy_node_state.erl b/apps/pushy/src/pushy_node_state.erl index ea3167e5..fc3e2045 100644 --- a/apps/pushy/src/pushy_node_state.erl +++ b/apps/pushy/src/pushy_node_state.erl @@ -10,7 +10,8 @@ heartbeat/1, status/1, watch/1, - aborted/1]). + aborted/1, + rehab/1]). %% States -export([idle/2, @@ -56,9 +57,17 @@ watch(NodeRef) -> end. aborted(NodeRef) -> - case pushy_node_state_sup:get_process(NodeRef) of - Pid when is_pid(Pid) -> - gen_fsm:send_event(Pid, aborted); + case cast(NodeRef, aborted) of + ok -> + ok; + Error -> + Error + end. + +rehab(NodeRef) -> + case cast(NodeRef, rehab) of + ok -> + ok; Error -> Error end. @@ -95,6 +104,9 @@ rehab(Message, #state{node_ref=NodeRef}=State) -> lager:info("~p in rehab. Ignoring message: ~p~n", [NodeRef, Message]), {next_state, rehab, State}. +idle(rehab, State) -> + force_abort(State), + {next_state, state_transition(idle, rehab, State), State}; idle({job, Job}, State) -> {next_state, state_transition(idle, running, State), State#state{job=Job}}. @@ -170,6 +182,14 @@ call(NodeRef, Message) -> Error end. +cast(NodeRef, Message) -> + case pushy_node_state_sup:get_process(NodeRef) of + Pid when is_pid(Pid) -> + gen_fsm:send_event(Pid, Message); + Error -> + Error + end. + send_info(NodeRef, Message) -> case pushy_node_state_sup:get_process(NodeRef) of Pid when is_pid(Pid) -> From 827b7918d758038e68c920249365a3a35bb54df4 Mon Sep 17 00:00:00 2001 From: Matthew Peck Date: Thu, 8 Nov 2012 18:15:36 -0500 Subject: [PATCH 12/15] more integration changes to node_state_fsm --- apps/pushy/src/pushy_job_state.erl | 20 ++++++++++--- .../src/pushy_named_node_state_resource.erl | 5 ++-- apps/pushy/src/pushy_node_state.erl | 6 ++-- apps/pushy/src/pushy_node_state_sup.erl | 28 ++++++++----------- 4 files changed, 32 insertions(+), 27 deletions(-) diff --git a/apps/pushy/src/pushy_job_state.erl b/apps/pushy/src/pushy_job_state.erl index 221ec08d..97c3ce51 100644 --- a/apps/pushy/src/pushy_job_state.erl +++ b/apps/pushy/src/pushy_job_state.erl @@ -125,6 +125,7 @@ voting({ack_commit, NodeRef}, State) -> end, maybe_finished_voting(State2); voting({nack_commit, NodeRef}, State) -> + lager:info("~p Nacking", [NodeRef]), % Node from new -> nacked. State2 = case get_node_state(NodeRef, State) of new -> set_node_state(NodeRef, nacked, State); @@ -185,7 +186,7 @@ handle_sync_event(Event, From, StateName, State) -> -spec handle_info(any(), job_status(), #state{}) -> {'next_state', job_status(), #state{}}. -handle_info({down, NodeRef}, StateName, State) -> +handle_info({state_change, NodeRef, _Current, shutdown}, StateName, State) -> pushy_job_state:StateName({down,NodeRef}, State); handle_info(voting_timeout, voting, #state{job = Job, voting_timeout = VotingTimeout} = State) -> @@ -299,11 +300,21 @@ start_running(#state{job = Job} = State) -> {ok, _} = timer:send_after(Job2#pushy_job.run_timeout*1000, running_timeout), maybe_finished_running(State2). -finish_job(Reason, #state{job = Job} = State) -> +%% TODO this needs refactoring. We don't want to send nodes to rehab if we get +%% a quorum_failed message because that stops jobs that are already running on +%% nodes, even if they are valid. +finish_job(quorum_failed, #state{job = Job} = State) -> + lager:info("Job ~p -> ~p", [Job#pushy_job.id, quorum_failed]), + Job2 = Job#pushy_job{status = quorum_failed}, + State2 = State#state{job = Job2}, + pushy_object:update_object(update_job, Job2, Job2#pushy_job.id), + {stop, {shutdown, quorum_failed}, State2}; +finish_job(Reason, #state{job = Job, job_nodes = JobNodes} = State) -> lager:info("Job ~p -> ~p", [Job#pushy_job.id, Reason]), Job2 = Job#pushy_job{status = Reason}, State2 = State#state{job = Job2}, pushy_object:update_object(update_job, Job2, Job2#pushy_job.id), + [ pushy_node_state:rehab(NodeRef) || NodeRef <- dict:fetch_keys(JobNodes) ], {stop, {shutdown, Reason}, State2}. count_nodes_in_state(NodeStates, #state{job_nodes = JobNodes}) -> @@ -318,8 +329,9 @@ count_nodes_in_state(NodeStates, #state{job_nodes = JobNodes}) -> listen_for_down_nodes([]) -> ok; listen_for_down_nodes([NodeRef|JobNodes]) -> pushy_node_state:watch(NodeRef), - case pushy_node_state:current_state(NodeRef) of - down -> gen_fsm:send_event(self(), {down, NodeRef}); + case pushy_node_state:status(NodeRef) of + {_, {unavailable, _}} -> + gen_fsm:send_event(self(), {down, NodeRef}); _ -> ok end, listen_for_down_nodes(JobNodes). diff --git a/apps/pushy/src/pushy_named_node_state_resource.erl b/apps/pushy/src/pushy_named_node_state_resource.erl index 4da92310..06b73b0b 100644 --- a/apps/pushy/src/pushy_named_node_state_resource.erl +++ b/apps/pushy/src/pushy_named_node_state_resource.erl @@ -44,11 +44,10 @@ content_types_provided(Req, State) -> to_json(Req, #config_state{organization_guid=OrgId}=State) -> NodeName = list_to_binary(wrq:path_info(node_name, Req)), % TODO handle missing node - NodeState = pushy_node_state:current_state({OrgId, NodeName}), - InRehab = pushy_node_state:in_rehab({OrgId, NodeName}), + {NodeState, {Availability, _Job}} = pushy_node_state:status({OrgId, NodeName}), Result = jiffy:encode({[ {<<"node_name">>, NodeName}, {<<"status">>, NodeState}, - {<<"is_in_rehab">>, InRehab} + {<<"availability">>, Availability} ]}), {Result, Req, State}. diff --git a/apps/pushy/src/pushy_node_state.erl b/apps/pushy/src/pushy_node_state.erl index fc3e2045..2592ded1 100644 --- a/apps/pushy/src/pushy_node_state.erl +++ b/apps/pushy/src/pushy_node_state.erl @@ -43,7 +43,7 @@ heartbeat(NodeRef) -> status(NodeRef) -> case call(NodeRef, current_state) of undefined -> - {offline, unavailable}; + {offline, {unavailable, none}}; CurrentState -> eval_state(CurrentState) end. @@ -148,7 +148,7 @@ handle_info(rehab_again, rehab, State) -> State1 = force_abort(State), {next_state, rehab, State1}; handle_info({'DOWN', _MRef, _Type, Pid, _Reason}, StateName, #state{watchers=Watchers}=State) -> - case lists:keytake(Pid, Watchers) of + case lists:keytake(Pid, 1, Watchers) of false -> {next_state, StateName, State}; {value, _, Watchers1} -> @@ -191,7 +191,7 @@ cast(NodeRef, Message) -> end. send_info(NodeRef, Message) -> - case pushy_node_state_sup:get_process(NodeRef) of + case pushy_node_state_sup:get_or_create_process(NodeRef) of Pid when is_pid(Pid) -> Pid ! Message; Error -> diff --git a/apps/pushy/src/pushy_node_state_sup.erl b/apps/pushy/src/pushy_node_state_sup.erl index 89315a4a..4461fac5 100644 --- a/apps/pushy/src/pushy_node_state_sup.erl +++ b/apps/pushy/src/pushy_node_state_sup.erl @@ -11,6 +11,7 @@ %% API -export([start_link/0, + get_or_create_process/1, get_process/1, mk_gproc_name/1]). @@ -31,8 +32,8 @@ start_link() -> Error end. --spec get_process(node_ref()) -> pid(). -get_process(NodeRef) -> +-spec get_or_create_process(node_ref()) -> pid(). +get_or_create_process(NodeRef) -> GprocName = mk_gproc_name(NodeRef), case catch gproc:lookup_pid({n,l,GprocName}) of {'EXIT', _} -> @@ -44,6 +45,14 @@ get_process(NodeRef) -> Pid -> Pid end. +get_process(NodeRef) -> + GprocName = mk_gproc_name(NodeRef), + case catch gproc:lookup_pid({n,l,GprocName}) of + {'EXIT', _} -> + undefined; + Pid -> Pid + end. + -spec mk_gproc_name(node_ref()) -> {'heartbeat', org_id(), node_name()}. mk_gproc_name({OrgId, NodeName}) when is_binary(OrgId) andalso is_binary(NodeName) -> {heartbeat, OrgId, NodeName}. @@ -61,18 +70,3 @@ init([]) -> %% Internal Function Definitions %% ------------------------------------------------------------------ -%load_from_db() -> - %case pushy_sql:fetch_node_statuses(?POC_ORG_ID) of - %{ok, none} -> - %lager:info("No existing node status records found in database, FSM proceses will not be pre-created."); - %{ok, NodeStatuses} -> - %create_processes(NodeStatuses); - %{error, Reason} -> - %lager:error("Error loading existing node status records from the database: ~p", [Reason]) - %end. - -%create_processes([]) -> - %{ok, done}; -%create_processes([#pushy_node_status{org_id=OrgId,node_name=NodeName} | Rest]) -> - %get_process({OrgId, NodeName}), - %create_processes(Rest). From fbba70b0cb1530d59b60c012f3ec4d16df5fb611 Mon Sep 17 00:00:00 2001 From: Matthew Peck Date: Wed, 14 Nov 2012 11:10:02 -0500 Subject: [PATCH 13/15] Removed some debug logging --- apps/pushy/src/pushy_job_state.erl | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/pushy/src/pushy_job_state.erl b/apps/pushy/src/pushy_job_state.erl index 97c3ce51..b8d80812 100644 --- a/apps/pushy/src/pushy_job_state.erl +++ b/apps/pushy/src/pushy_job_state.erl @@ -125,7 +125,6 @@ voting({ack_commit, NodeRef}, State) -> end, maybe_finished_voting(State2); voting({nack_commit, NodeRef}, State) -> - lager:info("~p Nacking", [NodeRef]), % Node from new -> nacked. State2 = case get_node_state(NodeRef, State) of new -> set_node_state(NodeRef, nacked, State); From 90c3c0d6c59429d460eb907c252f5539b5dad03e Mon Sep 17 00:00:00 2001 From: Matthew Peck Date: Wed, 14 Nov 2012 11:24:23 -0500 Subject: [PATCH 14/15] removed pushy_ema --- apps/pushy/src/pushy_ema.erl | 50 ------------------------------------ 1 file changed, 50 deletions(-) delete mode 100644 apps/pushy/src/pushy_ema.erl diff --git a/apps/pushy/src/pushy_ema.erl b/apps/pushy/src/pushy_ema.erl deleted file mode 100644 index 9e2b2a3d..00000000 --- a/apps/pushy/src/pushy_ema.erl +++ /dev/null @@ -1,50 +0,0 @@ -%% -*- erlang-indent-level: 4;indent-tabs-mode: nil; fill-column: 92 -*- -%% ex: ts=4 sw=4 et -%% @copyright 2011-2012 Opscode Inc. - --module(pushy_ema). - --include_lib("eunit/include/eunit.hrl"). - -%% API --export([init/2, - init/3, - reset_timer/1, - tick/1, - inc/2, - value/1]). - - --record(eavg, {acc = 0 :: integer(), - avg = 0 :: float(), - window = 0 :: integer(), - tick_interval :: integer(), - tick_count = 0 :: integer() - }). --spec init(integer(), integer()) -> #eavg{}. -init(Window, Interval) when Window>0 -> - init(Window, Interval, 0.0). - --spec init(integer(), integer(), float()) -> #eavg{}. -init(Window, Interval, Avg) when Window>0 -> - EAvg = #eavg{acc=0, avg=Avg, window=Window, tick_interval=Interval}, - reset_timer(EAvg), - EAvg. - -reset_timer(#eavg{tick_interval=I}) -> - erlang:start_timer(I, self(), update_avg). - --spec tick(#eavg{}) -> #eavg{}. -tick(#eavg{acc=Acc, avg=Avg, window=Window, tick_count=C}=EAvg) -> - NAvg = (Avg * (Window-1) + Acc)/Window, - reset_timer(EAvg), - EAvg#eavg{acc=0, avg=NAvg, tick_count=C+1}. - --spec inc(#eavg{}, number()) -> #eavg{}. -inc(#eavg{acc=Acc}=EAvg, Count) -> - EAvg#eavg{acc=Acc+Count}. - --spec value(#eavg{}) -> float(). -value(#eavg{avg=Avg}) -> - Avg. - From 9a390476b6eea060a194a9bc69aa4a54e4cc3056 Mon Sep 17 00:00:00 2001 From: Matthew Peck Date: Thu, 15 Nov 2012 14:33:53 -0500 Subject: [PATCH 15/15] make the wait interval configurable --- apps/pushy/src/pushy_node_stats_scanner.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/apps/pushy/src/pushy_node_stats_scanner.erl b/apps/pushy/src/pushy_node_stats_scanner.erl index 0b77db10..b0f5242c 100644 --- a/apps/pushy/src/pushy_node_stats_scanner.erl +++ b/apps/pushy/src/pushy_node_stats_scanner.erl @@ -50,5 +50,4 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions wait_interval() -> - HeartbeatInterval = envy:get(pushy, heartbeat_interval, number), - (HeartbeatInterval * 4) + random:uniform(HeartbeatInterval). + envy:get(pushy, detect_offline_nodes_interval, number).