Skip to content
This repository has been archived by the owner on Jul 14, 2021. It is now read-only.

Dialyzer fixes for job_monitor #77

Merged
merged 3 commits into from
Dec 26, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/pushy/include/pushy_sql.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
running |
complete |
quorum_failed |
crashed |
aborted |
timed_out.

Expand Down
43 changes: 27 additions & 16 deletions apps/pushy/src/pushy_job_state_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
start/1,
get_process/1,
get_job_processes/0,
mark_incomplete_job_nodes_as_crashed/0,
register_process/1]).

%% Supervisor callbacks
Expand Down Expand Up @@ -71,39 +70,51 @@ register_process(JobId) ->
false
end.

%% ------------------------------------------------------------------
%% Internal functions
%% ------------------------------------------------------------------

-spec mark_incomplete_job_nodes_as_crashed() -> ok | {error, term()}.
%% Find running job nodes associated with crashed jobs. Mark them as crashed in the db.
mark_incomplete_job_nodes_as_crashed() ->
case pushy_sql:fetch_incomplete_job_nodes() of
{ok, Nodes} ->
[pushy_sql:update_job_node(Node#pushy_job_node{status=crashed})
|| Node <- Nodes];
{error, Error} -> {error, Error}
update_job_node(Nodes);
{error, Error} ->
{error, Error}
end.

%% ------------------------------------------------------------------
%% Internal functions
%% ------------------------------------------------------------------
update_job_node([]) ->
ok;
update_job_node([Node | Nodes]) ->
{ok, 1} = pushy_sql:update_job_node(Node#pushy_job_node{status=crashed}),
update_job_node(Nodes).


-spec mark_incomplete_jobs_as_crashed() -> ok | {error, term()}.
mark_incomplete_jobs_as_crashed() ->
case pushy_sql:fetch_incomplete_jobs() of
{ok, Jobs} ->
Update = fun(J) ->
pushy_object:update_object(update_job,
J#pushy_job{status=crashed},
J#pushy_job.id)
end,
[ Update(Job) || Job <- Jobs];
{error, Error} -> {error, Error}
update_job(Jobs);
{error, Error} ->
{error, Error}
end.

update_job([]) ->
ok;
update_job([Node | Nodes]) ->
pushy_object:update_object(update_job,
Node#pushy_job{status=crashed},
Node#pushy_job.id),
update_job(Nodes).

%% ------------------------------------------------------------------
%% supervisor Function Definitions
%% ------------------------------------------------------------------

init([]) ->
mark_incomplete_jobs_as_crashed(),
mark_incomplete_job_nodes_as_crashed(),
ok = mark_incomplete_jobs_as_crashed(),
ok = mark_incomplete_job_nodes_as_crashed(),
{ok, {{simple_one_for_one, 0, 1},
[{pushy_job_state, {pushy_job_state, start_link, []},
temporary, brutal_kill, worker, [pushy_job_state]}]}}.
13 changes: 13 additions & 0 deletions apps/pushy/src/pushy_sql.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
sql_date/1
]).

-type property() :: atom() | tuple(). %% As defined in proplists
-type proplist() :: [ property() ].
%% job ops

-spec fetch_job(JobId :: object_id()) ->
{ok, not_found | #pushy_job{}} | {error, term()}.
fetch_job(JobId) ->
case sqerl:select(find_job_by_id, [JobId]) of
{ok, none} ->
Expand All @@ -36,6 +40,7 @@ fetch_job(JobId) ->
{error, Error}
end.

-spec fetch_incomplete_jobs() -> {ok, [ #pushy_job{} ] } | {error, term()}.
fetch_incomplete_jobs() ->
case sqerl:select(find_incomplete_jobs, []) of
{ok, none} ->
Expand All @@ -46,6 +51,7 @@ fetch_incomplete_jobs() ->
{error, Error}
end.

-spec fetch_jobs(OrgId :: object_id() ) -> {ok, [ #pushy_job{} ] } | {error, term()}.
fetch_jobs(OrgId) ->
case sqerl:select(find_jobs_by_org, [OrgId]) of
{ok, none} ->
Expand All @@ -56,6 +62,7 @@ fetch_jobs(OrgId) ->
{error, Error}
end.

-spec fetch_incomplete_job_nodes() -> {ok, [ #pushy_job_node{} ] } | {error, term()}.
fetch_incomplete_job_nodes() ->
case sqerl:select(find_incomplete_job_nodes, []) of
{ok, none} ->
Expand All @@ -66,6 +73,7 @@ fetch_incomplete_job_nodes() ->
{error, Error}
end.

-spec create_job(#pushy_job{}) -> {ok, 1} | {error, term()}.
create_job(#pushy_job{status = Status, job_nodes = JobNodes}=Job) ->
%% convert status into an integer
Job1 = Job#pushy_job{status=Status},
Expand Down Expand Up @@ -136,6 +144,7 @@ job_fields_for_insert(CbFields) ->
end,
lists:filter(Pred,CbFields).

-spec insert_job_nodes([#pushy_job_node{}]) -> ok | {error, term()}.
%% @doc Inserts job_nodes records into the database. All records are timestamped
%% with the same stamp, namely `CreatedAt`, which is a binary string in SQL date time
%% format.
Expand Down Expand Up @@ -166,6 +175,7 @@ insert_job_nodes([#pushy_job_node{job_id=JobId,
trunc_date_time_to_second({{YY,MM,DD},{H,M,S}}) ->
{{YY,MM,DD},{H,M, erlang:trunc(S)}}.

-spec job_join_rows_to_record(Rows :: [proplist()]) -> #pushy_job{}.
%% @doc Transforms a collection of proplists representing a job / job_nodes join query
%% result and collapses them all into a single job record. There is a row for each
%% job_node. A job_node tuple is extracted from each row; job information is extracted
Expand All @@ -174,6 +184,8 @@ trunc_date_time_to_second({{YY,MM,DD},{H,M,S}}) ->
%% See the 'find_job_by_id' prepared query for the row "shape".
job_join_rows_to_record(Rows) ->
job_join_rows_to_record(Rows, []).

-spec job_join_rows_to_record(Rows :: [proplist()], [#pushy_job_node{}]) -> #pushy_job{}.
job_join_rows_to_record([LastRow|[]], JobNodes) ->
C = proplist_to_job_node(LastRow),
CreatedAt = safe_get(<<"created_at">>, LastRow),
Expand Down Expand Up @@ -231,6 +243,7 @@ prepare_incomplete_job_nodes(Node) ->
org_id = safe_get(<<"org_id">>, Node),
node_name = safe_get(<<"node_name">>, Node)}.

-spec proplist_to_job_node(Proplist:: proplist()) -> #pushy_job_node{}.
%% @doc Convenience function for assembling a job_node tuple from a proplist
proplist_to_job_node(Proplist) ->
case safe_get(<<"node_name">>, Proplist) of
Expand Down