Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More tracing for release/1.2.0 #9390

Merged
merged 2 commits into from
Aug 24, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
348 changes: 182 additions & 166 deletions src/lib/mina_lib/mina_lib.ml
Original file line number Diff line number Diff line change
@@ -417,6 +417,7 @@ let create_sync_status_observer ~logger ~is_seed ~demo_mode ~net
(fun () ->
[%log info]
"Offline for too long; restarting libp2p_helper" ;
trace_event "libp2p_helper restart" ;
Mina_networking.restart_helper net ;
next_helper_restart := None ;
match !offline_shutdown with
@@ -1171,7 +1172,7 @@ let create ?wallets (config : Config.t) =
let consensus_constants = config.precomputed_values.consensus_constants in
let monitor = Option.value ~default:(Monitor.create ()) config.monitor in
Async.Scheduler.within' ~monitor (fun () ->
trace "coda" (fun () ->
trace "mina_lib" (fun () ->
let%bind prover =
Monitor.try_with ~here:[%here]
~rest:
@@ -1317,143 +1318,151 @@ let create ?wallets (config : Config.t) =
config.initial_block_production_keypairs
in
let get_node_status _env =
let node_ip_addr =
config.gossip_net_params.addrs_and_ports.external_ip
in
let peer_opt = config.gossip_net_params.addrs_and_ports.peer in
let node_peer_id =
Option.value_map peer_opt ~default:"<UNKNOWN>" ~f:(fun peer ->
peer.peer_id )
in
if config.disable_node_status then
Deferred.return
@@ Error
(Error.of_string
(sprintf
!"Node with IP address=%{sexp: Unix.Inet_addr.t}, \
peer ID=%s, node status is disabled"
node_ip_addr node_peer_id))
else
match !net_ref with
| None ->
(* should be unreachable; without a network, we wouldn't receive this RPC call *)
[%log' info config.logger]
"Network not instantiated when node status requested" ;
trace_recurring "get_node_status" (fun () ->
let node_ip_addr =
config.gossip_net_params.addrs_and_ports.external_ip
in
let peer_opt = config.gossip_net_params.addrs_and_ports.peer in
let node_peer_id =
Option.value_map peer_opt ~default:"<UNKNOWN>"
~f:(fun peer -> peer.peer_id)
in
if config.disable_node_status then
Deferred.return
@@ Error
(Error.of_string
(sprintf
!"Node with IP address=%{sexp: \
Unix.Inet_addr.t}, peer ID=%s, network not \
instantiated when node status requested"
Unix.Inet_addr.t}, peer ID=%s, node status is \
disabled"
node_ip_addr node_peer_id))
| Some net ->
let ( protocol_state_hash
, best_tip_opt
, k_block_hashes_and_timestamps ) =
match
Broadcast_pipe.Reader.peek frontier_broadcast_pipe_r
with
| None ->
( config.precomputed_values.protocol_state_with_hash
.hash
, None
, [] )
| Some frontier ->
let tip = Transition_frontier.best_tip frontier in
let protocol_state_hash =
let state =
Transition_frontier.Breadcrumb.protocol_state tip
in
Mina_state.Protocol_state.hash state
in
let k_breadcrumbs =
Transition_frontier.root frontier
:: Transition_frontier.best_tip_path frontier
in
let k_block_hashes_and_timestamps =
List.map k_breadcrumbs ~f:(fun bc ->
( Transition_frontier.Breadcrumb.state_hash bc
, Option.value_map
(Transition_frontier.Breadcrumb
.transition_receipt_time bc)
~default:"no timestamp available"
~f:
(Time.to_string_iso8601_basic
~zone:Time.Zone.utc) ) )
in
( protocol_state_hash
, Some tip
, k_block_hashes_and_timestamps )
in
let%bind peers = Mina_networking.peers net in
let open Deferred.Or_error.Let_syntax in
let%map sync_status =
match !sync_status_ref with
| None ->
Deferred.return (Ok `Offline)
| Some status ->
Deferred.return
(Mina_incremental.Status.Observer.value status)
in
let block_producers =
let public_keys, _ = Agent.get block_production_keypairs in
Public_key.Compressed.Set.map public_keys ~f:snd
|> Set.to_list
in
let ban_statuses =
Trust_system.Peer_trust.peer_statuses config.trust_system
in
let git_commit = Mina_version.commit_id_short in
let uptime_minutes =
let now = Time.now () in
let minutes_float =
Time.diff now config.start_time |> Time.Span.to_min
in
(* if rounding fails, just convert *)
Option.value_map
(Float.iround_nearest minutes_float)
~f:Fn.id
~default:(Float.to_int minutes_float)
in
let block_height_opt =
match best_tip_opt with
| None ->
None
| Some tip ->
let state =
Transition_frontier.Breadcrumb.protocol_state tip
else
match !net_ref with
| None ->
(* should be unreachable; without a network, we wouldn't receive this RPC call *)
[%log' info config.logger]
"Network not instantiated when node status requested" ;
Deferred.return
@@ Error
(Error.of_string
(sprintf
!"Node with IP address=%{sexp: \
Unix.Inet_addr.t}, peer ID=%s, network not \
instantiated when node status requested"
node_ip_addr node_peer_id))
| Some net ->
let ( protocol_state_hash
, best_tip_opt
, k_block_hashes_and_timestamps ) =
match
Broadcast_pipe.Reader.peek frontier_broadcast_pipe_r
with
| None ->
( config.precomputed_values.protocol_state_with_hash
.hash
, None
, [] )
| Some frontier ->
let tip = Transition_frontier.best_tip frontier in
let protocol_state_hash =
let state =
Transition_frontier.Breadcrumb.protocol_state
tip
in
Mina_state.Protocol_state.hash state
in
let k_breadcrumbs =
Transition_frontier.root frontier
:: Transition_frontier.best_tip_path frontier
in
let k_block_hashes_and_timestamps =
List.map k_breadcrumbs ~f:(fun bc ->
( Transition_frontier.Breadcrumb.state_hash bc
, Option.value_map
(Transition_frontier.Breadcrumb
.transition_receipt_time bc)
~default:"no timestamp available"
~f:
(Time.to_string_iso8601_basic
~zone:Time.Zone.utc) ) )
in
( protocol_state_hash
, Some tip
, k_block_hashes_and_timestamps )
in
let%bind peers = Mina_networking.peers net in
let open Deferred.Or_error.Let_syntax in
let%map sync_status =
match !sync_status_ref with
| None ->
Deferred.return (Ok `Offline)
| Some status ->
Deferred.return
(Mina_incremental.Status.Observer.value status)
in
let block_producers =
let public_keys, _ =
Agent.get block_production_keypairs
in
let consensus_state =
state |> Mina_state.Protocol_state.consensus_state
Public_key.Compressed.Set.map public_keys ~f:snd
|> Set.to_list
in
let ban_statuses =
Trust_system.Peer_trust.peer_statuses
config.trust_system
in
let git_commit = Mina_version.commit_id_short in
let uptime_minutes =
let now = Time.now () in
let minutes_float =
Time.diff now config.start_time |> Time.Span.to_min
in
Some
( Mina_numbers.Length.to_int
@@ Consensus.Data.Consensus_state.blockchain_length
consensus_state )
in
Mina_networking.Rpcs.Get_node_status.Node_status.
{ node_ip_addr
; node_peer_id
; sync_status
; peers
; block_producers
; protocol_state_hash
; ban_statuses
; k_block_hashes_and_timestamps
; git_commit
; uptime_minutes
; block_height_opt }
(* if rounding fails, just convert *)
Option.value_map
(Float.iround_nearest minutes_float)
~f:Fn.id
~default:(Float.to_int minutes_float)
in
let block_height_opt =
match best_tip_opt with
| None ->
None
| Some tip ->
let state =
Transition_frontier.Breadcrumb.protocol_state tip
in
let consensus_state =
state
|> Mina_state.Protocol_state.consensus_state
in
Some
( Mina_numbers.Length.to_int
@@ Consensus.Data.Consensus_state
.blockchain_length consensus_state )
in
Mina_networking.Rpcs.Get_node_status.Node_status.
{ node_ip_addr
; node_peer_id
; sync_status
; peers
; block_producers
; protocol_state_hash
; ban_statuses
; k_block_hashes_and_timestamps
; git_commit
; uptime_minutes
; block_height_opt } )
in
let get_some_initial_peers _ =
match !net_ref with
| None ->
(* should be unreachable; without a network, we wouldn't receive this RPC call *)
[%log' error config.logger]
"Network not instantiated when initial peers requested" ;
Deferred.return []
| Some net ->
Mina_networking.peers net
trace_recurring "get_some_initial_peers" (fun () ->
match !net_ref with
| None ->
(* should be unreachable; without a network, we wouldn't receive this RPC call *)
[%log' error config.logger]
"Network not instantiated when initial peers requested" ;
Deferred.return []
| Some net ->
Mina_networking.peers net )
in
let%bind net =
Mina_networking.create config.net_config ~get_some_initial_peers
@@ -1535,14 +1544,15 @@ let create ?wallets (config : Config.t) =
(handle_request "get_transition_chain"
~f:Sync_handler.get_transition_chain)
~get_transition_knowledge:(fun _q ->
return
( match
Broadcast_pipe.Reader.peek frontier_broadcast_pipe_r
with
| None ->
[]
| Some frontier ->
Sync_handler.best_tip_path ~frontier ) )
trace_recurring "get_transition_knowledge" (fun () ->
return
( match
Broadcast_pipe.Reader.peek frontier_broadcast_pipe_r
with
| None ->
[]
| Some frontier ->
Sync_handler.best_tip_path ~frontier ) ) )
in
(* tie the first knot *)
net_ref := Some net ;
@@ -1563,12 +1573,13 @@ let create ?wallets (config : Config.t) =
config.precomputed_values.genesis_constants.txpool_max_size
in
let transaction_pool =
Network_pool.Transaction_pool.create ~config:txn_pool_config
~constraint_constants ~consensus_constants
~time_controller:config.time_controller ~logger:config.logger
~incoming_diffs:(Mina_networking.transaction_pool_diffs net)
~local_diffs:local_txns_reader
~frontier_broadcast_pipe:frontier_broadcast_pipe_r
trace "transaction_pool" (fun () ->
Network_pool.Transaction_pool.create ~config:txn_pool_config
~constraint_constants ~consensus_constants
~time_controller:config.time_controller ~logger:config.logger
~incoming_diffs:(Mina_networking.transaction_pool_diffs net)
~local_diffs:local_txns_reader
~frontier_broadcast_pipe:frontier_broadcast_pipe_r )
in
(*Read from user_command_input_reader that has the user command inputs from client, infer nonce, create user command, and write it to the pipe consumed by the network pool*)
Strict_pipe.Reader.iter user_command_input_reader
@@ -1785,12 +1796,13 @@ let create ?wallets (config : Config.t) =
~disk_location:config.snark_pool_disk_location
in
let%bind snark_pool =
Network_pool.Snark_pool.load ~config:snark_pool_config
~constraint_constants ~consensus_constants
~time_controller:config.time_controller ~logger:config.logger
~incoming_diffs:(Mina_networking.snark_pool_diffs net)
~local_diffs:local_snark_work_reader
~frontier_broadcast_pipe:frontier_broadcast_pipe_r
trace "snark_pool" (fun () ->
Network_pool.Snark_pool.load ~config:snark_pool_config
~constraint_constants ~consensus_constants
~time_controller:config.time_controller ~logger:config.logger
~incoming_diffs:(Mina_networking.snark_pool_diffs net)
~local_diffs:local_snark_work_reader
~frontier_broadcast_pipe:frontier_broadcast_pipe_r )
in
let snark_jobs_state =
Work_selector.State.init
@@ -1841,12 +1853,14 @@ let create ?wallets (config : Config.t) =
, if config.log_precomputed_blocks then Some `Log else None )
in
let subscriptions =
Coda_subscriptions.create ~logger:config.logger
~constraint_constants ~new_blocks ~wallets
~transition_frontier:frontier_broadcast_pipe_r
~is_storing_all:config.is_archive_rocksdb
~upload_blocks_to_gcloud:config.upload_blocks_to_gcloud
~time_controller:config.time_controller ~precomputed_block_writer
trace "coda_subscriptions" (fun () ->
Coda_subscriptions.create ~logger:config.logger
~constraint_constants ~new_blocks ~wallets
~transition_frontier:frontier_broadcast_pipe_r
~is_storing_all:config.is_archive_rocksdb
~upload_blocks_to_gcloud:config.upload_blocks_to_gcloud
~time_controller:config.time_controller
~precomputed_block_writer )
in
let open Mina_incremental.Status in
let transition_frontier_incr =
@@ -1864,18 +1878,20 @@ let create ?wallets (config : Config.t) =
return None
in
let sync_status =
create_sync_status_observer ~logger:config.logger ~net
~is_seed:config.is_seed ~demo_mode:config.demo_mode
~transition_frontier_and_catchup_signal_incr
~online_status_incr:
( Var.watch @@ of_broadcast_pipe
@@ Mina_networking.online_status net )
~first_connection_incr:
( Var.watch @@ of_deferred
@@ Mina_networking.on_first_connect net ~f:Fn.id )
~first_message_incr:
( Var.watch @@ of_deferred
@@ Mina_networking.on_first_received_message net ~f:Fn.id )
trace "sync_status_observer" (fun () ->
create_sync_status_observer ~logger:config.logger ~net
~is_seed:config.is_seed ~demo_mode:config.demo_mode
~transition_frontier_and_catchup_signal_incr
~online_status_incr:
( Var.watch @@ of_broadcast_pipe
@@ Mina_networking.online_status net )
~first_connection_incr:
( Var.watch @@ of_deferred
@@ Mina_networking.on_first_connect net ~f:Fn.id )
~first_message_incr:
( Var.watch @@ of_deferred
@@ Mina_networking.on_first_received_message net ~f:Fn.id
) )
in
(* tie other knot *)
sync_status_ref := Some sync_status ;
171 changes: 92 additions & 79 deletions src/lib/network_pool/network_pool_base.ml
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ open Async_kernel
open Core_kernel
open Pipe_lib
open Network_peer
open O1trace

module Make (Transition_frontier : sig
type t
@@ -95,16 +96,17 @@ end)
(Resource_pool.Diff.summary diff') ;
forward t.write_broadcasts diff' rejected cb )
in
match%bind Resource_pool.Diff.unsafe_apply t.resource_pool diff with
| Ok res ->
rebroadcast res
| Error (`Locally_generated res) ->
rebroadcast res
| Error (`Other e) ->
[%log' debug t.logger]
"Refusing to rebroadcast. Pool diff apply feedback: $error"
~metadata:[("error", Error_json.error_to_yojson e)] ;
Broadcast_callback.error e cb
trace_recurring (Resource_pool.label ^ "_apply_and_broadcast") (fun () ->
match%bind Resource_pool.Diff.unsafe_apply t.resource_pool diff with
| Ok res ->
rebroadcast res
| Error (`Locally_generated res) ->
rebroadcast res
| Error (`Other e) ->
[%log' debug t.logger]
"Refusing to rebroadcast. Pool diff apply feedback: $error"
~metadata:[("error", Error_json.error_to_yojson e)] ;
Broadcast_callback.error e cb )

let log_rate_limiter_occasionally t rl =
let time = Time_ns.Span.of_min 1. in
@@ -140,54 +142,60 @@ end)
if log_rate_limiter then log_rate_limiter_occasionally t rl ;
(*Note: This is done asynchronously to use batch verification*)
Strict_pipe.Reader.iter_without_pushback pipe ~f:(fun d ->
let diff, cb = f d in
if not (Broadcast_callback.is_expired cb) then (
let summary =
`String (Resource_pool.Diff.summary @@ Envelope.Incoming.data diff)
in
[%log' debug t.logger] "Verifying $diff from $sender"
~metadata:
[ ("diff", summary)
; ("sender", Envelope.Sender.to_yojson diff.sender) ] ;
don't_wait_for
( match
Rate_limiter.add rl diff.sender ~now:(Time.now ())
~score:(Resource_pool.Diff.score diff.data)
with
| `Capacity_exceeded ->
[%log' debug t.logger]
~metadata:
[ ("sender", Envelope.Sender.to_yojson diff.sender)
; ("diff", summary) ]
"exceeded capacity from $sender" ;
Broadcast_callback.error
(Error.of_string "exceeded capacity")
cb
| `Within_capacity -> (
match%bind Resource_pool.Diff.verify t.resource_pool diff with
| Error err ->
trace_recurring (Resource_pool.label ^ "_verification") (fun () ->
let diff, cb = f d in
if not (Broadcast_callback.is_expired cb) then (
let summary =
`String
(Resource_pool.Diff.summary @@ Envelope.Incoming.data diff)
in
[%log' debug t.logger] "Verifying $diff from $sender"
~metadata:
[ ("diff", summary)
; ("sender", Envelope.Sender.to_yojson diff.sender) ] ;
don't_wait_for
( match
Rate_limiter.add rl diff.sender ~now:(Time.now ())
~score:(Resource_pool.Diff.score diff.data)
with
| `Capacity_exceeded ->
[%log' debug t.logger]
"Refusing to rebroadcast $diff. Verification error: \
$error"
~metadata:
[ ("diff", summary)
; ("error", Error_json.error_to_yojson err) ] ;
(*reject incoming messages*)
Broadcast_callback.error err cb
| Ok verified_diff -> (
[%log' debug t.logger] "Verified diff: $verified_diff"
~metadata:
[ ( "verified_diff"
, Resource_pool.Diff.verified_to_yojson
@@ Envelope.Incoming.data verified_diff )
; ( "sender"
, Envelope.Sender.to_yojson
@@ Envelope.Incoming.sender verified_diff ) ] ;
match Strict_pipe.Writer.write w (verified_diff, cb) with
| Some r ->
r
| None ->
Deferred.unit ) ) ) ) )
[ ("sender", Envelope.Sender.to_yojson diff.sender)
; ("diff", summary) ]
"exceeded capacity from $sender" ;
Broadcast_callback.error
(Error.of_string "exceeded capacity")
cb
| `Within_capacity -> (
match%bind
Resource_pool.Diff.verify t.resource_pool diff
with
| Error err ->
[%log' debug t.logger]
"Refusing to rebroadcast $diff. Verification error: \
$error"
~metadata:
[ ("diff", summary)
; ("error", Error_json.error_to_yojson err) ] ;
(*reject incoming messages*)
Broadcast_callback.error err cb
| Ok verified_diff -> (
[%log' debug t.logger] "Verified diff: $verified_diff"
~metadata:
[ ( "verified_diff"
, Resource_pool.Diff.verified_to_yojson
@@ Envelope.Incoming.data verified_diff )
; ( "sender"
, Envelope.Sender.to_yojson
@@ Envelope.Incoming.sender verified_diff ) ] ;
match
Strict_pipe.Writer.write w (verified_diff, cb)
with
| Some r ->
r
| None ->
Deferred.unit ) ) ) ) ) )
|> don't_wait_for ;
r

@@ -221,7 +229,11 @@ end)
| `Local (verified_diff, cb) ->
apply_and_broadcast network_pool verified_diff cb
| `Transition_frontier_extension diff ->
Resource_pool.handle_transition_frontier_diff diff resource_pool )
trace_recurring
(Resource_pool.label ^ "_handle_transition_frontier_diff")
(fun () ->
Resource_pool.handle_transition_frontier_diff diff
resource_pool ) )
|> Deferred.don't_wait_for ;
network_pool

@@ -245,28 +257,29 @@ end)
if Time.(add time rebroadcast_window < now ()) then `Timed_out else `Ok
in
let rec go () =
let rebroadcastable =
Resource_pool.get_rebroadcastable t.resource_pool ~has_timed_out
in
if List.is_empty rebroadcastable then
[%log trace] "Nothing to rebroadcast"
else
[%log debug]
"Preparing to rebroadcast locally generated resource pool diffs \
$diffs"
~metadata:
[ ("count", `Int (List.length rebroadcastable))
; ( "diffs"
, `List
(List.map
~f:(fun d -> `String (Resource_pool.Diff.summary d))
rebroadcastable) ) ] ;
let%bind () =
Deferred.List.iter rebroadcastable
~f:(Linear_pipe.write t.write_broadcasts)
in
let%bind () = Async.after rebroadcast_interval in
go ()
trace_recurring (Resource_pool.label ^ "_rebroadcast_loop") (fun () ->
let rebroadcastable =
Resource_pool.get_rebroadcastable t.resource_pool ~has_timed_out
in
if List.is_empty rebroadcastable then
[%log trace] "Nothing to rebroadcast"
else
[%log debug]
"Preparing to rebroadcast locally generated resource pool diffs \
$diffs"
~metadata:
[ ("count", `Int (List.length rebroadcastable))
; ( "diffs"
, `List
(List.map
~f:(fun d -> `String (Resource_pool.Diff.summary d))
rebroadcastable) ) ] ;
let%bind () =
Deferred.List.iter rebroadcastable
~f:(Linear_pipe.write t.write_broadcasts)
in
let%bind () = Async.after rebroadcast_interval in
go () )
in
go ()