Skip to content
This repository has been archived by the owner on Jul 14, 2021. It is now read-only.

Merge heartbeat and command channels #6

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion apps/pushy/src/pushy_command_switch.erl
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ process_message(State, Address, _Header, Body) ->
lager:info([{job_id, JobId}], "Node [~p] finished running Job [~p]", [NodeName, JobId]),
pushy_job_runner:node_command_event(JobId, NodeName, finished),
State2;
<<"heartbeat">> ->
NodeState = ej:get({<<"state">>}, Data),
lager:info("Node [~p] sent a heartbeat in state [~p]", [NodeName, NodeState]),
ok = pushy_node_state:heartbeat(NodeName, NodeState),
State2;
_Else ->
lager:info("I don't know anything about ~p", [Type]),
State2
Expand All @@ -185,7 +190,6 @@ process_message(State, Address, _Header, Body) ->
State
end.


%%
%% Utility functions; we should generalize these and move elsewhere.
%%
Expand Down
3 changes: 0 additions & 3 deletions apps/pushy/src/pushy_config_resource.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ to_json(Req, State) ->
Host = pushy_util:get_env(pushy, server_name, fun is_list/1),
HeartbeatAddress = iolist_to_binary(
pushy_util:make_zmq_socket_addr(Host, server_heartbeat_port, tcp)),
StatusAddress = iolist_to_binary(
pushy_util:make_zmq_socket_addr(Host, node_status_port, tcp)),
CommandAddress = iolist_to_binary(
pushy_util:make_zmq_socket_addr(Host, command_port, tcp)),

Expand All @@ -62,7 +60,6 @@ to_json(Req, State) ->
{<<"push_jobs">>,
{[{<<"heartbeat">>,
{[{<<"out_addr">>, HeartbeatAddress},
{<<"in_addr">>, StatusAddress},
{<<"command_addr">>, CommandAddress},
{<<"interval">>, HeartbeatInterval/1000},
{<<"offline_threshold">>, 3},
Expand Down
2 changes: 0 additions & 2 deletions apps/pushy/src/pushy_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ label(pushy_command_switch, do_send) ->
label(send, all);
label(pushy_command_switch, do_send_multi) ->
label(send, all);
label(pushy_node_status_tracker, do_receive) ->
label('receive', all);
label(pushy_command_switch, do_receive) ->
label('receive', all);
label(pushy_messaging, _) ->
Expand Down
15 changes: 9 additions & 6 deletions apps/pushy/src/pushy_node_state.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

%% API
-export([current_state/1,
heartbeat/1,
heartbeat/2,
set_logging/2,
start_link/3]).

Expand Down Expand Up @@ -90,11 +90,14 @@ eavg_value(#eavg{avg=Avg}) ->
start_link(Name, HeartbeatInterval, DeadIntervalCount) ->
gen_fsm:start_link(?MODULE, [Name, HeartbeatInterval, DeadIntervalCount], []).

-spec heartbeat({'heartbeat', node_name(), node_state()}) -> gproc_error().
heartbeat({heartbeat, NodeName, NodeStatus}) ->
-spec heartbeat(node_name(), node_state()) -> 'ok'.
heartbeat(NodeName, NodeStatus) ->
case catch gproc:send({n,l,NodeName},
{heartbeat, NodeName, status_to_atom(NodeStatus)}) of
{'EXIT', _} -> ?NO_NODE;
{'EXIT', _} ->
% TODO this fails to take into account a failed initialize/gproc registration
pushy_node_state_sup:new(NodeName),
heartbeat(NodeName, NodeStatus);
_ -> ok
end.

Expand Down Expand Up @@ -157,7 +160,7 @@ initializing(timeout, #state{name=Name}=State) ->
{next_state, down, reset_timer(save_status(?SAVE_MODE, down, State))}
catch
error:badarg ->
%% When we start up from a previous run, we have two ways that the FSM might be started;
%% When we start up from a previous run, we have two ways that the FSM might be started;
%% from an incoming packet, or the database record for a prior run
%% There may be some nasty race conditions surrounding this.
%% We may also want to *not* automatically reanimate FSMs for nodes that aren't
Expand Down Expand Up @@ -215,7 +218,7 @@ handle_sync_event(Event, _From, StateName, #state{name=Name}=State) ->

%%
%% Handle info
%%
%%
handle_info({'DOWN', _MonitorRef, _Type, Object, _Info}, CurState, State) ->
Observers = State#state.observers,
State1 = State#state{observers=lists:delete(Object,Observers)},
Expand Down
8 changes: 4 additions & 4 deletions apps/pushy/src/pushy_node_state_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

%% API
-export([start_link/0,
new/3]).
new/1]).

%% Supervisor callbacks
-export([init/1]).
Expand All @@ -30,15 +30,15 @@ start_link() ->
Error
end.

new(Name, HeartbeatInterval, DeadIntervalCount) ->
supervisor:start_child(?SERVER, [Name, HeartbeatInterval, DeadIntervalCount]).

new(Name) ->
lager:info("Creating Process For ~s", [Name]),
{ok, HeartbeatInterval} = application:get_env(pushy, heartbeat_interval),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pushy_util:get_env provides typechecking; the code in node_status_tracker uses it. The application:get_env api does not.

There's a general philosophy question on whether we want to cache values like this in the supervisor; I'd prefer it myself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm on board with caching in the supervisor if we can make it work. I'll do it after I check in.

{ok, DeadIntervalCount} = application:get_env(pushy, dead_interval),
new(Name, HeartbeatInterval, DeadIntervalCount).

new(Name, HeartbeatInterval, DeadIntervalCount) ->
supervisor:start_child(?SERVER, [Name, HeartbeatInterval, DeadIntervalCount]).

%% ------------------------------------------------------------------
%% supervisor Function Definitions
%% ------------------------------------------------------------------
Expand Down
131 changes: 0 additions & 131 deletions apps/pushy/src/pushy_node_status_tracker.erl

This file was deleted.

1 change: 0 additions & 1 deletion apps/pushy/src/pushy_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ init([#pushy_state{ctx=Ctx} = PushyState]) ->
?WORKER(chef_keyring, []),
?WORKER(pushy_node_status_updater, []),
?WORKER(pushy_heartbeat_generator, [PushyState]),
?WORKER(pushy_node_status_tracker, [PushyState]),
?WORKER(pushy_command_switch, [PushyState]),
?WORKERNL(webmachine_mochiweb, [WebMachineConfig]) %% FIXME start or start_link here?
]}}.
Expand Down
1 change: 0 additions & 1 deletion rel/files/app.config
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
{heartbeat_interval, {{heartbeat_interval}} },
{dead_interval, {{dead_interval}} },
{zeromq_listen_address, "{{zeromq_listen_address}}"},
{node_status_port, {{node_status_port}} },
{server_heartbeat_port, {{server_heartbeat_port}} },
{command_port, {{command_port}} },
{api_port, {{api_port}} },
Expand Down
1 change: 0 additions & 1 deletion rel/vars.config
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
{dead_interval, 1000 }.
{zeromq_listen_address, "tcp://*"}.
{server_heartbeat_port, 10000}.
{node_status_port, 10001}.
{api_port, 10002}.
{command_port, 10003}.
{log_dir, "priv/log"}.