Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uptime service, retry on connection errors when sending block (for master) #10789

Merged
merged 2 commits into from
May 2, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 119 additions & 100 deletions src/lib/uptime_service/uptime_service.ml
Original file line number Diff line number Diff line change
Expand Up @@ -79,106 +79,122 @@ let send_uptime_data ~logger ~interruptor ~(submitter_keypair : Keypair.t) ~url
| `Empty | `Pipe _ ->
[]
in
match%map
make_interruptible
(Monitor.try_with ~here:[%here] ~extract_exn:true (fun () ->
let max_attempts = 8 in
let attempt_pause_sec = 10.0 in
let run_attempt attempt =
let interruptible =
match%map
make_interruptible
(Cohttp_async.Client.post ~headers
~body:
( Yojson.Safe.to_string json
|> Cohttp_async.Body.of_string )
url)
with
| { status; _ }, body ->
let succeeded = Cohttp.Code.code_of_status status = 200 in
( if succeeded then
[%log info]
"Sent block with state hash $state_hash to uptime \
service at URL $url"
~metadata:
[ ("state_hash", State_hash.to_yojson state_hash)
; ( "includes_snark_work"
, `Bool (Option.is_some block_data.snark_work) )
; ("is_produced_block", `Bool produced)
; ("url", `String (Uri.to_string url))
]
else if attempt >= max_attempts then
let base_metadata =
[ ("state_hash", State_hash.to_yojson state_hash)
; ("url", `String (Uri.to_string url))
; ("http_code", `Int (Cohttp.Code.code_of_status status))
; ( "http_error"
, `String (Cohttp.Code.string_of_status status) )
; ("payload", json)
]
in
let extra_metadata = metadata_of_body body in
let metadata = base_metadata @ extra_metadata in
[%log error]
"After %d attempts, failed to send block with state \
hash $state_hash to uptime service at URL $url, no \
more retries"
max_attempts ~metadata
else
let base_metadata =
[ ("state_hash", State_hash.to_yojson state_hash)
; ("url", `String (Uri.to_string url))
; ("http_code", `Int (Cohttp.Code.code_of_status status))
; ( "http_error"
, `String (Cohttp.Code.string_of_status status) )
]
in
let extra_metadata = metadata_of_body body in
let metadata = base_metadata @ extra_metadata in
[%log info]
"Failure when sending block with state hash $state_hash \
to uptime service at URL $url, attempt %d of %d, \
retrying"
attempt max_attempts ~metadata ) ;
succeeded
in
match%map.Deferred Interruptible.force interruptible with
| Ok succeeded ->
succeeded
| Error _ ->
[%log error]
"In uptime service, POST of block with state hash \
$state_hash was interrupted"
~metadata:
[ ("state_hash", State_hash.to_yojson state_hash)
; ("payload", json)
] ;
(* interrupted, don't want to retry, claim success *)
true
in
let rec go attempt =
let open Deferred.Let_syntax in
let%bind succeeded = run_attempt attempt in
if succeeded then Deferred.return ()
else if attempt < max_attempts then
let%bind () = Async.after (Time.Span.of_sec attempt_pause_sec) in
go (attempt + 1)
else Deferred.unit
in
go 1))
with
| Ok () ->
()
| Error exn ->
[%log error]
"Error when sending block with state hash $state_hash to uptime \
service at URL $url"
~metadata:
[ ("state_hash", State_hash.to_yojson state_hash)
; ("url", `String (Uri.to_string url))
; ("payload", json)
; ("error", `String (Exn.to_string exn))
]
let max_attempts = 8 in
let attempt_pause_sec = 4.0 in
(* see https://github.com/MinaProtocol/mina/blob/compatible/src/app/delegation_backend/README.md
for significance of these status codes
*)
let unrecoverable_status_codes = [ 400; 401; 411; 413 ] in
let run_attempt attempt =
let interruptible =
match%map
make_interruptible
(Monitor.try_with ~here:[%here] ~extract_exn:true (fun () ->
Cohttp_async.Client.post ~headers
~body:
(Yojson.Safe.to_string json |> Cohttp_async.Body.of_string)
url))
with
| Ok ({ status; _ }, body) ->
let status_code = Cohttp.Code.code_of_status status in
let status_string = Cohttp.Code.string_of_status status in
let unretriable =
List.mem unrecoverable_status_codes status_code ~equal:Int.equal
in
let succeeded = status_code = 200 in
( if succeeded then
[%log info]
"Sent block with state hash $state_hash to uptime service at URL \
$url"
~metadata:
[ ("state_hash", State_hash.to_yojson state_hash)
; ("url", `String (Uri.to_string url))
; ( "includes_snark_work"
, `Bool (Option.is_some block_data.snark_work) )
; ("is_produced_block", `Bool produced)
]
else if unretriable then
[%log error]
"Got unrecoverable response from update service backend at URL \
$url, not retrying to send block"
~metadata:
[ ("state_hash", State_hash.to_yojson state_hash)
; ("url", `String (Uri.to_string url))
; ("http_code", `Int status_code)
; ("http_error", `String status_string)
; ("payload", json)
]
else if attempt >= max_attempts then
let base_metadata =
[ ("state_hash", State_hash.to_yojson state_hash)
; ("url", `String (Uri.to_string url))
; ("http_code", `Int status_code)
; ("http_error", `String status_string)
; ("payload", json)
]
in
let extra_metadata = metadata_of_body body in
let metadata = base_metadata @ extra_metadata in
[%log error]
"After %d attempts, failed to send block with state hash \
$state_hash to uptime service at URL $url, no more retries"
max_attempts ~metadata
else
let base_metadata =
[ ("state_hash", State_hash.to_yojson state_hash)
; ("url", `String (Uri.to_string url))
; ("http_code", `Int status_code)
; ("http_error", `String status_string)
]
in
let extra_metadata = metadata_of_body body in
let metadata = base_metadata @ extra_metadata in
[%log info]
"Failure when sending block with state hash $state_hash to \
uptime service at URL $url, attempt %d of %d, retrying"
attempt max_attempts ~metadata ) ;
succeeded || unretriable
| Error exn ->
[%log warn]
"Error when sending block with state hash $state_hash to uptime \
service at URL $url"
~metadata:
[ ("state_hash", State_hash.to_yojson state_hash)
; ("url", `String (Uri.to_string url))
; ("payload", json)
; ("error", `String (Exn.to_string exn))
] ;
(* retry *)
false
in
match%map.Deferred Interruptible.force interruptible with
| Ok succeeded ->
succeeded
| Error _ ->
[%log error]
"In uptime service, POST of block with state hash $state_hash was \
interrupted"
~metadata:
[ ("state_hash", State_hash.to_yojson state_hash)
; ("payload", json)
] ;
(* interrupted, don't want to retry, claim success *)
true
in
let rec go attempt =
let open Deferred.Let_syntax in
let%bind succeeded = run_attempt attempt in
if succeeded then Deferred.return ()
else if attempt < max_attempts then (
let%bind () = Async.after (Time.Span.of_sec attempt_pause_sec) in
[%log info]
"In uptime service, retrying to send block, attempt %d of %d attempts \
allowed"
attempt max_attempts ;
go (attempt + 1) )
else Deferred.unit
in
make_interruptible (go 1)

let block_base64_of_breadcrumb breadcrumb =
let external_transition = external_transition_of_breadcrumb breadcrumb in
Expand Down Expand Up @@ -455,6 +471,7 @@ let start ~logger ~uptime_url ~snark_worker_opt ~transition_frontier
let%bind () = wait_until_iteration_start next_block_tm in
let interruptor = register_iteration () in
(* work in Interruptible monad in "background" *)
let open Interruptible.Let_syntax in
Interruptible.don't_wait_for
( [%log trace] "Determining which action to take in uptime service" ;
match get_peer () with
Expand Down Expand Up @@ -497,6 +514,8 @@ let start ~logger ~uptime_url ~snark_worker_opt ~transition_frontier
|> Block_time.to_time
in
if Time.( <= ) next_producer_time four_slots_from_start then
(* send a block w/ SNARK work, then the produced block *)
let%bind () = send_block_and_snark_work () in
send_just_block next_producer_time
else send_block_and_snark_work () ) ) ;
Deferred.return (Block_time.add next_block_tm five_slots_span)
Expand Down