Skip to content
This repository has been archived by the owner on Jul 14, 2021. It is now read-only.

Expose updated_at and created_at in API responses #84

Merged
merged 6 commits into from
Jan 8, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions apps/pushy/include/pushy_sql.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
%% length, might be able to constrain further for range of elements.
-type object_id() :: <<_:256>>.

%% We use this to track when the pushy server itself updates DB records,
%% for example on job crash
-define(PUSHY_ACTOR_ID, <<"00000000000000000000000000000000">>).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we weren't going to do this? Should we just not set it instead, and leave it as last_created_by or something? Or leave that for a future PBI?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only use for the job monitor and not for any of the zeromq message triggered state transitions

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And in this case right now we get back a minimal pushy_node object from the DB. We'd have to change that logic which pulls from the DB if we wanted to not set this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All right, at least it doesn't kill the mainline cases :)

%% job status
-type job_status() :: new |
voting |
Expand All @@ -25,9 +29,6 @@
was_ready |
timed_out.

%% random PoC hard-codings
-define(POC_HB_THRESHOLD, 3).

-record(pushy_job_node, {'job_id'::object_id(), % guid for object (unique)
'org_id'::object_id(), % organization guid
'node_name'::binary(), % node name
Expand Down
14 changes: 8 additions & 6 deletions apps/pushy/priv/pgsql_statements.config
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@
" INNER JOIN job_status js ON j.status = js.id"
" WHERE j.id = $1">>}.

{find_incomplete_jobs,
<<"SELECT j.id, j.created_at, j.last_updated_by, js.description AS status",
" FROM jobs j, job_status js",
" WHERE j.status = js.id AND js.description in ('voting', 'running', 'new')">>}.

{find_jobs_by_org,
<<"SELECT j.id, j.created_at, js.description AS status",
<<"SELECT j.id, j.org_id, j.command, j.quorum, js.description AS status,"
" j.run_timeout, j.last_updated_by, j.created_at, j.updated_at"
" FROM jobs j, job_status js",
" WHERE (org_id = $1) AND j.status = js.id">>}.

{find_incomplete_jobs,
<<"SELECT j.id, j.created_at, j.updated_at, j.last_updated_by,"
" js.description AS status"
" FROM jobs j, job_status js",
" WHERE j.status = js.id AND js.description in ('voting', 'running', 'new')">>}.

{find_incomplete_job_nodes,
<<"SELECT jn.node_name, jn.job_id, j.org_id"
" FROM jobs j"
Expand Down
4 changes: 2 additions & 2 deletions apps/pushy/src/pushy_job_state.erl
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ set_node_state(NodeRef, NewNodeState, #state{job_nodes = JobNodes} = State) ->
terminal -> error("Attempt to change node ~p from terminal state ~p to state ~p");
_ ->
NewPushyJobNode = OldPushyJobNode#pushy_job_node{status = NewNodeState},
{ok, 1} = pushy_sql:update_job_node(NewPushyJobNode),
{ok, 1} = pushy_object:update_object(update_job_node, NewPushyJobNode),
NewPushyJobNode
end
end, JobNodes),
Expand All @@ -238,7 +238,7 @@ send_matching_to_rehab(OldNodeState, NewNodeState, #state{job_nodes = JobNodes}
case OldPushyJobNode#pushy_job_node.status of
OldNodeState ->
NewPushyJobNode = OldPushyJobNode#pushy_job_node{status = NewNodeState},
{ok, 1} = pushy_sql:update_job_node(NewPushyJobNode),
{ok, 1} = pushy_object:update_object(update_job_node, NewPushyJobNode),
pushy_node_state:rehab({NewPushyJobNode#pushy_job_node.org_id,
NewPushyJobNode#pushy_job_node.node_name}),
NewPushyJobNode;
Expand Down
5 changes: 3 additions & 2 deletions apps/pushy/src/pushy_job_state_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ mark_incomplete_job_nodes_as_crashed() ->
update_job_node([]) ->
ok;
update_job_node([Node | Nodes]) ->
{ok, 1} = pushy_sql:update_job_node(Node#pushy_job_node{status=crashed}),
{ok, 1} = pushy_object:update_object(update_job_node,
Node#pushy_job_node{status=crashed}),
update_job_node(Nodes).


Expand All @@ -105,7 +106,7 @@ update_job([]) ->
update_job([Node | Nodes]) ->
pushy_object:update_object(update_job,
Node#pushy_job{status=crashed},
Node#pushy_job.id),
?PUSHY_ACTOR_ID),
update_job(Nodes).

%% ------------------------------------------------------------------
Expand Down
6 changes: 5 additions & 1 deletion apps/pushy/src/pushy_jobs_resource.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,14 @@ from_json(Req, #config_state{pushy_job = Job} = State) ->
%% GET /pushy/jobs
to_json(Req, #config_state{organization_guid = OrgId} = State) ->
{ok, Jobs} = pushy_sql:fetch_jobs(OrgId),
{jiffy:encode(Jobs), Req, State}.
EJson = assemble_jobs_ejson(Jobs),
{jiffy:encode(EJson), Req, State}.

% Private stuff

assemble_jobs_ejson(Jobs) ->
[ pushy_object:assemble_job_ejson(Job) || Job <- Jobs ].

parse_post_body(Req) ->
Body = wrq:req_body(Req),
JobJson = jiffy:decode(Body),
Expand Down
48 changes: 2 additions & 46 deletions apps/pushy/src/pushy_named_job_resource.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,50 +52,6 @@ resource_exists(Req, State) ->
end.

to_json(Req, #config_state{pushy_job = Job} = State) ->
{jiffy:encode(job_to_json(Job)), Req, State}.
EJson = pushy_object:assemble_job_ejson_with_nodes(Job),
{jiffy:encode(EJson), Req, State}.

%{
% id: 2001,
% command: "chef-client",
% status: "complete",
% run_timeout: 1304914, # Seconds. Could be null
% nodes: {
% "complete": [ "DERPY", "RAINBOWDASH" ]
% }
% created_at = "<some date format>",
% updated_at = "<some date format>"
%}

job_to_json(#pushy_job{
id = Id,
command = Command,
status = Status,
run_timeout = RunTimeout,
% created_at = CreatedAt,
% updated_at = UpdatedAt,
job_nodes = Nodes
}) ->
% CreatedAtDate = iolist_to_binary(httpd_util:rfc1123_date(CreatedAt)),
% UpdatedAtDate = iolist_to_binary(httpd_util:rfc1123_date(UpdatedAt)),
NodesJson = job_nodes_json_by_status(Nodes),
{[ {<<"id">>, iolist_to_binary(Id)},
{<<"command">>, iolist_to_binary(Command)},
{<<"status">>, Status},
{<<"run_timeout">>, RunTimeout},
{<<"nodes">>, NodesJson}
% {<<"created_at">>, CreatedAtDate},
% {<<"updated_at">>, UpdatedAtDate}
]}.

job_nodes_json_by_status(Nodes) ->
NodesByStatus = job_nodes_by_status(Nodes, dict:new()),
{[
{ erlang:atom_to_binary(Status, utf8), dict:fetch(Status, NodesByStatus) }
|| Status <- dict:fetch_keys(NodesByStatus)
]}.

job_nodes_by_status([], Dict) ->
Dict;
job_nodes_by_status([#pushy_job_node{node_name = Name, status = Status} | Nodes], Dict) ->
Dict2 = dict:append(Status, Name, Dict),
job_nodes_by_status(Nodes, Dict2).
39 changes: 38 additions & 1 deletion apps/pushy/src/pushy_object.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
new_record/6,

make_org_prefix_id/1,
make_org_prefix_id/2
make_org_prefix_id/2,

assemble_job_ejson/1,
assemble_job_ejson_with_nodes/1

]).

fetch_org_id(OrgName) ->
Expand Down Expand Up @@ -150,3 +154,36 @@ make_org_prefix_id(OrgId, Name) ->
Bin = iolist_to_binary([OrgId, Name, crypto:rand_bytes(6)]),
<<ObjectPart:80, _/binary>> = crypto:md5(Bin),
iolist_to_binary(io_lib:format("~s~20.16.0b", [OrgSuffix, ObjectPart])).

assemble_job_ejson_with_nodes(#pushy_job{job_nodes = Nodes} = Job) ->
{NodePropList} = assemble_job_ejson(Job),
NodesJson = job_nodes_json_by_status(Nodes),
{[ {<<"nodes">>, NodesJson} | NodePropList]}.

assemble_job_ejson(#pushy_job{id = Id,
command = Command,
status = Status,
run_timeout = RunTimeout,
created_at = CreatedAt,
updated_at = UpdatedAt}) ->
{[ {<<"id">>, Id},
{<<"command">>, Command},
{<<"status">>, Status},
{<<"run_timeout">>, RunTimeout},
{<<"created_at">>, CreatedAt},
{<<"updated_at">>, UpdatedAt}
]}.

job_nodes_json_by_status(Nodes) ->
NodesByStatus = job_nodes_by_status(Nodes, dict:new()),
{[
{ erlang:atom_to_binary(Status, utf8), dict:fetch(Status, NodesByStatus) }
|| Status <- dict:fetch_keys(NodesByStatus)
]}.

job_nodes_by_status([], Dict) ->
Dict;
job_nodes_by_status([#pushy_job_node{node_name = Name, status = Status} | Nodes], Dict) ->
Dict2 = dict:append(Status, Name, Dict),
job_nodes_by_status(Nodes, Dict2).

45 changes: 17 additions & 28 deletions apps/pushy/src/pushy_sql.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fetch_incomplete_jobs() ->
{ok, none} ->
{ok, []};
{ok, Rows} ->
{ok, [prepare_pushy_job_record(Row) || Row <- Rows]};
{ok, [prepare_incomplete_job(Row) || Row <- Rows]};
{error, Error} ->
{error, Error}
end.
Expand All @@ -57,7 +57,7 @@ fetch_jobs(OrgId) ->
{ok, none} ->
{ok, []};
{ok, Rows} ->
{ok, [prepare_job(Row) || Row <- Rows]};
{ok, [prepare_pushy_job_record(Row) || Row <- Rows]};
{error, Error} ->
{error, Error}
end.
Expand Down Expand Up @@ -116,8 +116,8 @@ update_job(#pushy_job{id = JobId,
update_job_node(#pushy_job_node{job_id = JobId,
node_name = NodeName,
org_id = OrgId,
updated_at = UpdatedAt,
status = Status}) ->
UpdatedAt = sql_date(now),
UpdateFields = [job_node_status(Status), UpdatedAt, OrgId, NodeName, JobId],
do_update(update_job_node_by_orgid_nodename_jobid, UpdateFields).

Expand Down Expand Up @@ -186,17 +186,8 @@ job_join_rows_to_record(Rows) ->
-spec job_join_rows_to_record(Rows :: [proplist()], [#pushy_job_node{}]) -> #pushy_job{}.
job_join_rows_to_record([LastRow|[]], JobNodes) ->
C = proplist_to_job_node(LastRow),
CreatedAt = safe_get(<<"created_at">>, LastRow),
UpdatedAt = safe_get(<<"updated_at">>, LastRow),
#pushy_job{id = safe_get(<<"id">>, LastRow),
org_id = safe_get(<<"org_id">>, LastRow),
command = safe_get(<<"command">>, LastRow),
status = safe_get(<<"status">>, LastRow),
run_timeout = safe_get(<<"run_timeout">>, LastRow),
last_updated_by = safe_get(<<"last_updated_by">>, LastRow),
created_at = date_time_to_sql_date(CreatedAt),
updated_at = date_time_to_sql_date(UpdatedAt),
job_nodes = lists:flatten(lists:reverse([C|JobNodes]))};
Job = prepare_pushy_job_record(LastRow),
Job#pushy_job{job_nodes = lists:flatten(lists:reverse([C|JobNodes]))};
job_join_rows_to_record([Row|Rest], JobNodes ) ->
C = proplist_to_job_node(Row),
job_join_rows_to_record(Rest, [C|JobNodes]).
Expand All @@ -215,26 +206,24 @@ sql_date({_,_,_} = TS) ->
iolist_to_binary(io_lib:format("~4w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w",
[Year, Month, Day, Hour, Minute, Second])).



prepare_job(Job) ->
CreatedAt = safe_get(<<"created_at">>, Job),
CreatedAtFormatted = date_time_to_sql_date(CreatedAt),
Status = safe_get(<<"status">>, Job),

{[{<<"id">>, safe_get(<<"id">>, Job)},
{<<"created_at">>, CreatedAtFormatted},
{<<"status">>, Status}]}.

prepare_pushy_job_record(Job) ->
CreatedAt = safe_get(<<"created_at">>, Job),
CreatedAtFormatted = date_time_to_sql_date(CreatedAt),
Status = safe_get(<<"status">>, Job),
UpdatedAt = safe_get(<<"updated_at">>, Job),
UpdatedAtFormatted = date_time_to_sql_date(UpdatedAt),

#pushy_job{id = safe_get(<<"id">>, Job),
org_id = safe_get(<<"org_id">>, Job),
command = safe_get(<<"command">>, Job),
status = safe_get(<<"status">>, Job),
run_timeout = safe_get(<<"run_timeout">>, Job),
updated_at = UpdatedAtFormatted,
created_at = CreatedAtFormatted,
last_updated_by = safe_get(<<"last_updated_by">>, Job),
status = Status}.
last_updated_by = safe_get(<<"last_updated_by">>, Job)}.

prepare_incomplete_job(Job) ->
#pushy_job{id = safe_get(<<"id">>, Job),
status = safe_get(<<"status">>, Job)}.

prepare_incomplete_job_nodes(Node) ->
#pushy_job_node{job_id = safe_get(<<"job_id">>, Node),
Expand Down