Skip to content
This repository has been archived by the owner on Jul 14, 2021. It is now read-only.

Refactor of node_state fsm to fix rehab race conditions #38

Closed
wants to merge 16 commits into from
18 changes: 12 additions & 6 deletions apps/pushy/src/pushy_command_switch.erl
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,12 @@ do_send(#state{addr_node_map = AddrNodeMap,
command_sock = CommandSocket}=State,
Method, NodeRef, Message) ->
{ok, Key} = get_key_for_method(Method, State, NodeRef),
Address = addr_node_map_lookup_by_node(AddrNodeMap, NodeRef),
Packets = ?TIME_IT(pushy_messaging, make_message, (proto_v2, Method, Key, Message)),
ok = pushy_messaging:send_message(CommandSocket, [Address | Packets]),
case addr_node_map_lookup_by_node(AddrNodeMap, NodeRef) of
error -> ok;
Address ->
Packets = ?TIME_IT(pushy_messaging, make_message, (proto_v2, Method, Key, Message)),
ok = pushy_messaging:send_message(CommandSocket, [Address | Packets])
end,
State.

%%%
Expand Down Expand Up @@ -244,9 +247,9 @@ send_node_event(JobId, NodeRef, <<"nack_run">>) ->
send_node_event(JobId, NodeRef, <<"complete">>)->
pushy_job_state:node_complete(JobId, NodeRef);
send_node_event(null, NodeRef, <<"aborted">>) ->
pushy_node_state:node_aborted(NodeRef);
pushy_node_state:aborted(NodeRef);
send_node_event(JobId, NodeRef, <<"aborted">>) ->
pushy_node_state:node_aborted(NodeRef),
pushy_node_state:aborted(NodeRef),
pushy_job_state:node_aborted(JobId, NodeRef);
send_node_event(JobId, NodeRef, undefined) ->
lager:error("Status message for job ~p and node ~p was missing type field!~n", [JobId, NodeRef]);
Expand Down Expand Up @@ -301,4 +304,7 @@ addr_node_map_lookup_by_addr({AddrToNode, _}, Addr) ->
addr_node_map_lookup_by_node(#state{addr_node_map = AddrNodeMap}, Node) ->
addr_node_map_lookup_by_addr(AddrNodeMap, Node);
addr_node_map_lookup_by_node({_, NodeToAddr}, Node) ->
dict:fetch(Node, NodeToAddr).
case dict:find(Node, NodeToAddr) of
error -> error;
{ok, Value} -> Value
end.
22 changes: 17 additions & 5 deletions apps/pushy/src/pushy_job_state.erl
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ voting({ack_commit, NodeRef}, State) ->
end,
maybe_finished_voting(State2);
voting({nack_commit, NodeRef}, State) ->
lager:info("~p Nacking", [NodeRef]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does anything else print when we receive a nack? It seems like we should either print on all messages, or none. (All seems right.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was just some debug code. Didn't mean to leave it in.

% Node from new -> nacked.
State2 = case get_node_state(NodeRef, State) of
new -> set_node_state(NodeRef, nacked, State);
Expand Down Expand Up @@ -185,7 +186,7 @@ handle_sync_event(Event, From, StateName, State) ->

-spec handle_info(any(), job_status(), #state{}) ->
{'next_state', job_status(), #state{}}.
handle_info({down, NodeRef}, StateName, State) ->
handle_info({state_change, NodeRef, _Current, shutdown}, StateName, State) ->
pushy_job_state:StateName({down,NodeRef}, State);
handle_info(voting_timeout, voting,
#state{job = Job, voting_timeout = VotingTimeout} = State) ->
Expand Down Expand Up @@ -299,11 +300,21 @@ start_running(#state{job = Job} = State) ->
{ok, _} = timer:send_after(Job2#pushy_job.run_timeout*1000, running_timeout),
maybe_finished_running(State2).

finish_job(Reason, #state{job = Job} = State) ->
%% TODO this needs refactoring. We don't want to send nodes to rehab if we get
%% a quorum_failed message because that stops jobs that are already running on
%% nodes, even if they are valid.
finish_job(quorum_failed, #state{job = Job} = State) ->
lager:info("Job ~p -> ~p", [Job#pushy_job.id, quorum_failed]),
Job2 = Job#pushy_job{status = quorum_failed},
State2 = State#state{job = Job2},
pushy_object:update_object(update_job, Job2, Job2#pushy_job.id),
{stop, {shutdown, quorum_failed}, State2};
finish_job(Reason, #state{job = Job, job_nodes = JobNodes} = State) ->
lager:info("Job ~p -> ~p", [Job#pushy_job.id, Reason]),
Job2 = Job#pushy_job{status = Reason},
State2 = State#state{job = Job2},
pushy_object:update_object(update_job, Job2, Job2#pushy_job.id),
[ pushy_node_state:rehab(NodeRef) || NodeRef <- dict:fetch_keys(JobNodes) ],
{stop, {shutdown, Reason}, State2}.

count_nodes_in_state(NodeStates, #state{job_nodes = JobNodes}) ->
Expand All @@ -317,9 +328,10 @@ count_nodes_in_state(NodeStates, #state{job_nodes = JobNodes}) ->

listen_for_down_nodes([]) -> ok;
listen_for_down_nodes([NodeRef|JobNodes]) ->
pushy_node_state:start_watching(NodeRef),
case pushy_node_state:current_state(NodeRef) of
down -> gen_fsm:send_event(self(), {down, NodeRef});
pushy_node_state:watch(NodeRef),
case pushy_node_state:status(NodeRef) of
{_, {unavailable, _}} ->
gen_fsm:send_event(self(), {down, NodeRef});
_ -> ok
end,
listen_for_down_nodes(JobNodes).
Expand Down
5 changes: 2 additions & 3 deletions apps/pushy/src/pushy_named_node_state_resource.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,10 @@ content_types_provided(Req, State) ->
to_json(Req, #config_state{organization_guid=OrgId}=State) ->
NodeName = list_to_binary(wrq:path_info(node_name, Req)),
% TODO handle missing node
NodeState = pushy_node_state:current_state({OrgId, NodeName}),
InRehab = pushy_node_state:in_rehab({OrgId, NodeName}),
{NodeState, {Availability, _Job}} = pushy_node_state:status({OrgId, NodeName}),
Result = jiffy:encode({[
{<<"node_name">>, NodeName},
{<<"status">>, NodeState},
{<<"is_in_rehab">>, InRehab}
{<<"availability">>, Availability}
]}),
{Result, Req, State}.
Loading