From 1bd3e7af5cba5fb6cedbf753f2170046e10ac90b Mon Sep 17 00:00:00 2001 From: jamesc Date: Thu, 27 Dec 2012 09:09:33 -0800 Subject: [PATCH 1/6] move setting of updated_at out of pushy_sql layer --- apps/pushy/src/pushy_sql.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/pushy/src/pushy_sql.erl b/apps/pushy/src/pushy_sql.erl index 32d30afb..b7404271 100644 --- a/apps/pushy/src/pushy_sql.erl +++ b/apps/pushy/src/pushy_sql.erl @@ -116,8 +116,8 @@ update_job(#pushy_job{id = JobId, update_job_node(#pushy_job_node{job_id = JobId, node_name = NodeName, org_id = OrgId, + updated_at = UpdatedAt, status = Status}) -> - UpdatedAt = sql_date(now), UpdateFields = [job_node_status(Status), UpdatedAt, OrgId, NodeName, JobId], do_update(update_job_node_by_orgid_nodename_jobid, UpdateFields). From ddc0ceb96927ec493249007818f1c204cc75befb Mon Sep 17 00:00:00 2001 From: jamesc Date: Thu, 27 Dec 2012 09:09:52 -0800 Subject: [PATCH 2/6] remove unused POC macro --- apps/pushy/include/pushy_sql.hrl | 3 --- 1 file changed, 3 deletions(-) diff --git a/apps/pushy/include/pushy_sql.hrl b/apps/pushy/include/pushy_sql.hrl index 00232378..91e439bd 100644 --- a/apps/pushy/include/pushy_sql.hrl +++ b/apps/pushy/include/pushy_sql.hrl @@ -25,9 +25,6 @@ was_ready | timed_out. -%% random PoC hard-codings --define(POC_HB_THRESHOLD, 3). - -record(pushy_job_node, {'job_id'::object_id(), % guid for object (unique) 'org_id'::object_id(), % organization guid 'node_name'::binary(), % node name From c69d05b1306d0b3b451ba7219058ed16307c3471 Mon Sep 17 00:00:00 2001 From: jamesc Date: Thu, 27 Dec 2012 14:59:39 -0800 Subject: [PATCH 3/6] Always call DB update ops through the pushy_object layer which sets update timestamps/actor info --- apps/pushy/include/pushy_sql.hrl | 4 ++++ apps/pushy/src/pushy_job_state.erl | 4 ++-- apps/pushy/src/pushy_job_state_sup.erl | 5 +++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/apps/pushy/include/pushy_sql.hrl b/apps/pushy/include/pushy_sql.hrl index 91e439bd..8e8f0171 100644 --- a/apps/pushy/include/pushy_sql.hrl +++ b/apps/pushy/include/pushy_sql.hrl @@ -3,6 +3,10 @@ %% length, might be able to constrain further for range of elements. -type object_id() :: <<_:256>>. +%% We use this to track when the pushy server itself updates DB records, +%% for example on job crash +-define(PUSHY_ACTOR_ID, <<"00000000000000000000000000000000">>). + %% job status -type job_status() :: new | voting | diff --git a/apps/pushy/src/pushy_job_state.erl b/apps/pushy/src/pushy_job_state.erl index 50aef64e..b690f4d5 100644 --- a/apps/pushy/src/pushy_job_state.erl +++ b/apps/pushy/src/pushy_job_state.erl @@ -215,7 +215,7 @@ set_node_state(NodeRef, NewNodeState, #state{job_nodes = JobNodes} = State) -> terminal -> error("Attempt to change node ~p from terminal state ~p to state ~p"); _ -> NewPushyJobNode = OldPushyJobNode#pushy_job_node{status = NewNodeState}, - {ok, 1} = pushy_sql:update_job_node(NewPushyJobNode), + {ok, 1} = pushy_object:update_object(update_job_node, NewPushyJobNode), NewPushyJobNode end end, JobNodes), @@ -238,7 +238,7 @@ send_matching_to_rehab(OldNodeState, NewNodeState, #state{job_nodes = JobNodes} case OldPushyJobNode#pushy_job_node.status of OldNodeState -> NewPushyJobNode = OldPushyJobNode#pushy_job_node{status = NewNodeState}, - {ok, 1} = pushy_sql:update_job_node(NewPushyJobNode), + {ok, 1} = pushy_object:update_object(update_job_node, NewPushyJobNode), pushy_node_state:rehab({NewPushyJobNode#pushy_job_node.org_id, NewPushyJobNode#pushy_job_node.node_name}), NewPushyJobNode; diff --git a/apps/pushy/src/pushy_job_state_sup.erl b/apps/pushy/src/pushy_job_state_sup.erl index 09832080..9179e415 100644 --- a/apps/pushy/src/pushy_job_state_sup.erl +++ b/apps/pushy/src/pushy_job_state_sup.erl @@ -87,7 +87,8 @@ mark_incomplete_job_nodes_as_crashed() -> update_job_node([]) -> ok; update_job_node([Node | Nodes]) -> - {ok, 1} = pushy_sql:update_job_node(Node#pushy_job_node{status=crashed}), + {ok, 1} = pushy_object:update_object(update_job_node, + Node#pushy_job_node{status=crashed}), update_job_node(Nodes). @@ -105,7 +106,7 @@ update_job([]) -> update_job([Node | Nodes]) -> pushy_object:update_object(update_job, Node#pushy_job{status=crashed}, - Node#pushy_job.id), + ?PUSHY_ACTOR_ID), update_job(Nodes). %% ------------------------------------------------------------------ From 228058a3d465029c2faeff4a76e078c3f5e990de Mon Sep 17 00:00:00 2001 From: jamesc Date: Fri, 28 Dec 2012 14:42:22 -0800 Subject: [PATCH 4/6] Add created_at/updated_at to return JSON --- apps/pushy/src/pushy_named_job_resource.erl | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/apps/pushy/src/pushy_named_job_resource.erl b/apps/pushy/src/pushy_named_job_resource.erl index af3a0fa1..fca159bd 100644 --- a/apps/pushy/src/pushy_named_job_resource.erl +++ b/apps/pushy/src/pushy_named_job_resource.erl @@ -71,20 +71,18 @@ job_to_json(#pushy_job{ command = Command, status = Status, run_timeout = RunTimeout, -% created_at = CreatedAt, -% updated_at = UpdatedAt, + created_at = CreatedAt, + updated_at = UpdatedAt, job_nodes = Nodes }) -> -% CreatedAtDate = iolist_to_binary(httpd_util:rfc1123_date(CreatedAt)), -% UpdatedAtDate = iolist_to_binary(httpd_util:rfc1123_date(UpdatedAt)), NodesJson = job_nodes_json_by_status(Nodes), - {[ {<<"id">>, iolist_to_binary(Id)}, - {<<"command">>, iolist_to_binary(Command)}, + {[ {<<"id">>, Id}, + {<<"command">>, Command}, {<<"status">>, Status}, {<<"run_timeout">>, RunTimeout}, - {<<"nodes">>, NodesJson} -% {<<"created_at">>, CreatedAtDate}, -% {<<"updated_at">>, UpdatedAtDate} + {<<"nodes">>, NodesJson}, + {<<"created_at">>, CreatedAt}, + {<<"updated_at">>, UpdatedAt} ]}. job_nodes_json_by_status(Nodes) -> From fd16037516f992c6ac8a9e3ae4c263bafd733e1d Mon Sep 17 00:00:00 2001 From: jamesc Date: Fri, 28 Dec 2012 14:43:01 -0800 Subject: [PATCH 5/6] Now we return full records from DB - add in all fields to DB queries for fetch_job/fetch_jobs - remove duplication in construction of job records and ejson - Always return #pushy_job{}, not EJSON --- apps/pushy/priv/pgsql_statements.config | 14 ++++---- apps/pushy/src/pushy_sql.erl | 43 +++++++++---------------- 2 files changed, 24 insertions(+), 33 deletions(-) diff --git a/apps/pushy/priv/pgsql_statements.config b/apps/pushy/priv/pgsql_statements.config index b992a5af..53acc0e0 100644 --- a/apps/pushy/priv/pgsql_statements.config +++ b/apps/pushy/priv/pgsql_statements.config @@ -12,16 +12,18 @@ " INNER JOIN job_status js ON j.status = js.id" " WHERE j.id = $1">>}. -{find_incomplete_jobs, - <<"SELECT j.id, j.created_at, j.last_updated_by, js.description AS status", - " FROM jobs j, job_status js", - " WHERE j.status = js.id AND js.description in ('voting', 'running', 'new')">>}. - {find_jobs_by_org, - <<"SELECT j.id, j.created_at, js.description AS status", + <<"SELECT j.id, j.org_id, j.command, j.quorum, js.description AS status," + " j.run_timeout, j.last_updated_by, j.created_at, j.updated_at" " FROM jobs j, job_status js", " WHERE (org_id = $1) AND j.status = js.id">>}. +{find_incomplete_jobs, + <<"SELECT j.id, j.created_at, j.updated_at, j.last_updated_by," + " js.description AS status" + " FROM jobs j, job_status js", + " WHERE j.status = js.id AND js.description in ('voting', 'running', 'new')">>}. + {find_incomplete_job_nodes, <<"SELECT jn.node_name, jn.job_id, j.org_id" " FROM jobs j" diff --git a/apps/pushy/src/pushy_sql.erl b/apps/pushy/src/pushy_sql.erl index b7404271..50a4532e 100644 --- a/apps/pushy/src/pushy_sql.erl +++ b/apps/pushy/src/pushy_sql.erl @@ -46,7 +46,7 @@ fetch_incomplete_jobs() -> {ok, none} -> {ok, []}; {ok, Rows} -> - {ok, [prepare_pushy_job_record(Row) || Row <- Rows]}; + {ok, [prepare_incomplete_job(Row) || Row <- Rows]}; {error, Error} -> {error, Error} end. @@ -57,7 +57,7 @@ fetch_jobs(OrgId) -> {ok, none} -> {ok, []}; {ok, Rows} -> - {ok, [prepare_job(Row) || Row <- Rows]}; + {ok, [prepare_pushy_job_record(Row) || Row <- Rows]}; {error, Error} -> {error, Error} end. @@ -186,17 +186,8 @@ job_join_rows_to_record(Rows) -> -spec job_join_rows_to_record(Rows :: [proplist()], [#pushy_job_node{}]) -> #pushy_job{}. job_join_rows_to_record([LastRow|[]], JobNodes) -> C = proplist_to_job_node(LastRow), - CreatedAt = safe_get(<<"created_at">>, LastRow), - UpdatedAt = safe_get(<<"updated_at">>, LastRow), - #pushy_job{id = safe_get(<<"id">>, LastRow), - org_id = safe_get(<<"org_id">>, LastRow), - command = safe_get(<<"command">>, LastRow), - status = safe_get(<<"status">>, LastRow), - run_timeout = safe_get(<<"run_timeout">>, LastRow), - last_updated_by = safe_get(<<"last_updated_by">>, LastRow), - created_at = date_time_to_sql_date(CreatedAt), - updated_at = date_time_to_sql_date(UpdatedAt), - job_nodes = lists:flatten(lists:reverse([C|JobNodes]))}; + Job = prepare_pushy_job_record(LastRow), + Job#pushy_job{job_nodes = lists:flatten(lists:reverse([C|JobNodes]))}; job_join_rows_to_record([Row|Rest], JobNodes ) -> C = proplist_to_job_node(Row), job_join_rows_to_record(Rest, [C|JobNodes]). @@ -215,26 +206,24 @@ sql_date({_,_,_} = TS) -> iolist_to_binary(io_lib:format("~4w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Year, Month, Day, Hour, Minute, Second])). - - -prepare_job(Job) -> - CreatedAt = safe_get(<<"created_at">>, Job), - CreatedAtFormatted = date_time_to_sql_date(CreatedAt), - Status = safe_get(<<"status">>, Job), - - {[{<<"id">>, safe_get(<<"id">>, Job)}, - {<<"created_at">>, CreatedAtFormatted}, - {<<"status">>, Status}]}. - prepare_pushy_job_record(Job) -> CreatedAt = safe_get(<<"created_at">>, Job), CreatedAtFormatted = date_time_to_sql_date(CreatedAt), - Status = safe_get(<<"status">>, Job), + UpdatedAt = safe_get(<<"updated_at">>, Job), + UpdatedAtFormatted = date_time_to_sql_date(UpdatedAt), #pushy_job{id = safe_get(<<"id">>, Job), + org_id = safe_get(<<"org_id">>, Job), + command = safe_get(<<"command">>, Job), + status = safe_get(<<"status">>, Job), + run_timeout = safe_get(<<"run_timeout">>, Job), + updated_at = UpdatedAtFormatted, created_at = CreatedAtFormatted, - last_updated_by = safe_get(<<"last_updated_by">>, Job), - status = Status}. + last_updated_by = safe_get(<<"last_updated_by">>, Job)}. + +prepare_incomplete_job(Job) -> + #pushy_job{id = safe_get(<<"id">>, Job), + status = safe_get(<<"status">>, Job)}. prepare_incomplete_job_nodes(Node) -> #pushy_job_node{job_id = safe_get(<<"job_id">>, Node), From e2bfe30c82b1a9d9d0aca7de1c79b797b274e885 Mon Sep 17 00:00:00 2001 From: jamesc Date: Fri, 28 Dec 2012 15:27:01 -0800 Subject: [PATCH 6/6] Extract out Json construction for pushy jobs into pushy_object and use this shared code in resource modules --- apps/pushy/src/pushy_jobs_resource.erl | 6 ++- apps/pushy/src/pushy_named_job_resource.erl | 46 +-------------------- apps/pushy/src/pushy_object.erl | 39 ++++++++++++++++- 3 files changed, 45 insertions(+), 46 deletions(-) diff --git a/apps/pushy/src/pushy_jobs_resource.erl b/apps/pushy/src/pushy_jobs_resource.erl index 4ff2923c..9f1b0662 100644 --- a/apps/pushy/src/pushy_jobs_resource.erl +++ b/apps/pushy/src/pushy_jobs_resource.erl @@ -83,10 +83,14 @@ from_json(Req, #config_state{pushy_job = Job} = State) -> %% GET /pushy/jobs to_json(Req, #config_state{organization_guid = OrgId} = State) -> {ok, Jobs} = pushy_sql:fetch_jobs(OrgId), - {jiffy:encode(Jobs), Req, State}. + EJson = assemble_jobs_ejson(Jobs), + {jiffy:encode(EJson), Req, State}. % Private stuff +assemble_jobs_ejson(Jobs) -> + [ pushy_object:assemble_job_ejson(Job) || Job <- Jobs ]. + parse_post_body(Req) -> Body = wrq:req_body(Req), JobJson = jiffy:decode(Body), diff --git a/apps/pushy/src/pushy_named_job_resource.erl b/apps/pushy/src/pushy_named_job_resource.erl index fca159bd..c38f5f9c 100644 --- a/apps/pushy/src/pushy_named_job_resource.erl +++ b/apps/pushy/src/pushy_named_job_resource.erl @@ -52,48 +52,6 @@ resource_exists(Req, State) -> end. to_json(Req, #config_state{pushy_job = Job} = State) -> - {jiffy:encode(job_to_json(Job)), Req, State}. + EJson = pushy_object:assemble_job_ejson_with_nodes(Job), + {jiffy:encode(EJson), Req, State}. -%{ -% id: 2001, -% command: "chef-client", -% status: "complete", -% run_timeout: 1304914, # Seconds. Could be null -% nodes: { -% "complete": [ "DERPY", "RAINBOWDASH" ] -% } -% created_at = "", -% updated_at = "" -%} - -job_to_json(#pushy_job{ - id = Id, - command = Command, - status = Status, - run_timeout = RunTimeout, - created_at = CreatedAt, - updated_at = UpdatedAt, - job_nodes = Nodes - }) -> - NodesJson = job_nodes_json_by_status(Nodes), - {[ {<<"id">>, Id}, - {<<"command">>, Command}, - {<<"status">>, Status}, - {<<"run_timeout">>, RunTimeout}, - {<<"nodes">>, NodesJson}, - {<<"created_at">>, CreatedAt}, - {<<"updated_at">>, UpdatedAt} - ]}. - -job_nodes_json_by_status(Nodes) -> - NodesByStatus = job_nodes_by_status(Nodes, dict:new()), - {[ - { erlang:atom_to_binary(Status, utf8), dict:fetch(Status, NodesByStatus) } - || Status <- dict:fetch_keys(NodesByStatus) - ]}. - -job_nodes_by_status([], Dict) -> - Dict; -job_nodes_by_status([#pushy_job_node{node_name = Name, status = Status} | Nodes], Dict) -> - Dict2 = dict:append(Status, Name, Dict), - job_nodes_by_status(Nodes, Dict2). diff --git a/apps/pushy/src/pushy_object.erl b/apps/pushy/src/pushy_object.erl index 3c592710..ff8cb61e 100644 --- a/apps/pushy/src/pushy_object.erl +++ b/apps/pushy/src/pushy_object.erl @@ -17,7 +17,11 @@ new_record/6, make_org_prefix_id/1, - make_org_prefix_id/2 + make_org_prefix_id/2, + + assemble_job_ejson/1, + assemble_job_ejson_with_nodes/1 + ]). fetch_org_id(OrgName) -> @@ -150,3 +154,36 @@ make_org_prefix_id(OrgId, Name) -> Bin = iolist_to_binary([OrgId, Name, crypto:rand_bytes(6)]), <> = crypto:md5(Bin), iolist_to_binary(io_lib:format("~s~20.16.0b", [OrgSuffix, ObjectPart])). + +assemble_job_ejson_with_nodes(#pushy_job{job_nodes = Nodes} = Job) -> + {NodePropList} = assemble_job_ejson(Job), + NodesJson = job_nodes_json_by_status(Nodes), + {[ {<<"nodes">>, NodesJson} | NodePropList]}. + +assemble_job_ejson(#pushy_job{id = Id, + command = Command, + status = Status, + run_timeout = RunTimeout, + created_at = CreatedAt, + updated_at = UpdatedAt}) -> + {[ {<<"id">>, Id}, + {<<"command">>, Command}, + {<<"status">>, Status}, + {<<"run_timeout">>, RunTimeout}, + {<<"created_at">>, CreatedAt}, + {<<"updated_at">>, UpdatedAt} + ]}. + +job_nodes_json_by_status(Nodes) -> + NodesByStatus = job_nodes_by_status(Nodes, dict:new()), + {[ + { erlang:atom_to_binary(Status, utf8), dict:fetch(Status, NodesByStatus) } + || Status <- dict:fetch_keys(NodesByStatus) + ]}. + +job_nodes_by_status([], Dict) -> + Dict; +job_nodes_by_status([#pushy_job_node{node_name = Name, status = Status} | Nodes], Dict) -> + Dict2 = dict:append(Status, Name, Dict), + job_nodes_by_status(Nodes, Dict2). +