diff --git a/src/lib/mina_lib/mina_lib.ml b/src/lib/mina_lib/mina_lib.ml index be89ade237b..87f7a476cdc 100644 --- a/src/lib/mina_lib/mina_lib.ml +++ b/src/lib/mina_lib/mina_lib.ml @@ -417,6 +417,7 @@ let create_sync_status_observer ~logger ~is_seed ~demo_mode ~net (fun () -> [%log info] "Offline for too long; restarting libp2p_helper" ; + trace_event "libp2p_helper restart" ; Mina_networking.restart_helper net ; next_helper_restart := None ; match !offline_shutdown with @@ -1171,7 +1172,7 @@ let create ?wallets (config : Config.t) = let consensus_constants = config.precomputed_values.consensus_constants in let monitor = Option.value ~default:(Monitor.create ()) config.monitor in Async.Scheduler.within' ~monitor (fun () -> - trace "coda" (fun () -> + trace "mina_lib" (fun () -> let%bind prover = Monitor.try_with ~here:[%here] ~rest: @@ -1317,143 +1318,151 @@ let create ?wallets (config : Config.t) = config.initial_block_production_keypairs in let get_node_status _env = - let node_ip_addr = - config.gossip_net_params.addrs_and_ports.external_ip - in - let peer_opt = config.gossip_net_params.addrs_and_ports.peer in - let node_peer_id = - Option.value_map peer_opt ~default:"" ~f:(fun peer -> - peer.peer_id ) - in - if config.disable_node_status then - Deferred.return - @@ Error - (Error.of_string - (sprintf - !"Node with IP address=%{sexp: Unix.Inet_addr.t}, \ - peer ID=%s, node status is disabled" - node_ip_addr node_peer_id)) - else - match !net_ref with - | None -> - (* should be unreachable; without a network, we wouldn't receive this RPC call *) - [%log' info config.logger] - "Network not instantiated when node status requested" ; + trace_recurring "get_node_status" (fun () -> + let node_ip_addr = + config.gossip_net_params.addrs_and_ports.external_ip + in + let peer_opt = config.gossip_net_params.addrs_and_ports.peer in + let node_peer_id = + Option.value_map peer_opt ~default:"" + ~f:(fun peer -> peer.peer_id) + in + if config.disable_node_status then Deferred.return @@ Error (Error.of_string (sprintf !"Node with IP address=%{sexp: \ - Unix.Inet_addr.t}, peer ID=%s, network not \ - instantiated when node status requested" + Unix.Inet_addr.t}, peer ID=%s, node status is \ + disabled" node_ip_addr node_peer_id)) - | Some net -> - let ( protocol_state_hash - , best_tip_opt - , k_block_hashes_and_timestamps ) = - match - Broadcast_pipe.Reader.peek frontier_broadcast_pipe_r - with - | None -> - ( config.precomputed_values.protocol_state_with_hash - .hash - , None - , [] ) - | Some frontier -> - let tip = Transition_frontier.best_tip frontier in - let protocol_state_hash = - let state = - Transition_frontier.Breadcrumb.protocol_state tip - in - Mina_state.Protocol_state.hash state - in - let k_breadcrumbs = - Transition_frontier.root frontier - :: Transition_frontier.best_tip_path frontier - in - let k_block_hashes_and_timestamps = - List.map k_breadcrumbs ~f:(fun bc -> - ( Transition_frontier.Breadcrumb.state_hash bc - , Option.value_map - (Transition_frontier.Breadcrumb - .transition_receipt_time bc) - ~default:"no timestamp available" - ~f: - (Time.to_string_iso8601_basic - ~zone:Time.Zone.utc) ) ) - in - ( protocol_state_hash - , Some tip - , k_block_hashes_and_timestamps ) - in - let%bind peers = Mina_networking.peers net in - let open Deferred.Or_error.Let_syntax in - let%map sync_status = - match !sync_status_ref with - | None -> - Deferred.return (Ok `Offline) - | Some status -> - Deferred.return - (Mina_incremental.Status.Observer.value status) - in - let block_producers = - let public_keys, _ = Agent.get block_production_keypairs in - Public_key.Compressed.Set.map public_keys ~f:snd - |> Set.to_list - in - let ban_statuses = - Trust_system.Peer_trust.peer_statuses config.trust_system - in - let git_commit = Mina_version.commit_id_short in - let uptime_minutes = - let now = Time.now () in - let minutes_float = - Time.diff now config.start_time |> Time.Span.to_min - in - (* if rounding fails, just convert *) - Option.value_map - (Float.iround_nearest minutes_float) - ~f:Fn.id - ~default:(Float.to_int minutes_float) - in - let block_height_opt = - match best_tip_opt with - | None -> - None - | Some tip -> - let state = - Transition_frontier.Breadcrumb.protocol_state tip + else + match !net_ref with + | None -> + (* should be unreachable; without a network, we wouldn't receive this RPC call *) + [%log' info config.logger] + "Network not instantiated when node status requested" ; + Deferred.return + @@ Error + (Error.of_string + (sprintf + !"Node with IP address=%{sexp: \ + Unix.Inet_addr.t}, peer ID=%s, network not \ + instantiated when node status requested" + node_ip_addr node_peer_id)) + | Some net -> + let ( protocol_state_hash + , best_tip_opt + , k_block_hashes_and_timestamps ) = + match + Broadcast_pipe.Reader.peek frontier_broadcast_pipe_r + with + | None -> + ( config.precomputed_values.protocol_state_with_hash + .hash + , None + , [] ) + | Some frontier -> + let tip = Transition_frontier.best_tip frontier in + let protocol_state_hash = + let state = + Transition_frontier.Breadcrumb.protocol_state + tip + in + Mina_state.Protocol_state.hash state + in + let k_breadcrumbs = + Transition_frontier.root frontier + :: Transition_frontier.best_tip_path frontier + in + let k_block_hashes_and_timestamps = + List.map k_breadcrumbs ~f:(fun bc -> + ( Transition_frontier.Breadcrumb.state_hash bc + , Option.value_map + (Transition_frontier.Breadcrumb + .transition_receipt_time bc) + ~default:"no timestamp available" + ~f: + (Time.to_string_iso8601_basic + ~zone:Time.Zone.utc) ) ) + in + ( protocol_state_hash + , Some tip + , k_block_hashes_and_timestamps ) + in + let%bind peers = Mina_networking.peers net in + let open Deferred.Or_error.Let_syntax in + let%map sync_status = + match !sync_status_ref with + | None -> + Deferred.return (Ok `Offline) + | Some status -> + Deferred.return + (Mina_incremental.Status.Observer.value status) + in + let block_producers = + let public_keys, _ = + Agent.get block_production_keypairs in - let consensus_state = - state |> Mina_state.Protocol_state.consensus_state + Public_key.Compressed.Set.map public_keys ~f:snd + |> Set.to_list + in + let ban_statuses = + Trust_system.Peer_trust.peer_statuses + config.trust_system + in + let git_commit = Mina_version.commit_id_short in + let uptime_minutes = + let now = Time.now () in + let minutes_float = + Time.diff now config.start_time |> Time.Span.to_min in - Some - ( Mina_numbers.Length.to_int - @@ Consensus.Data.Consensus_state.blockchain_length - consensus_state ) - in - Mina_networking.Rpcs.Get_node_status.Node_status. - { node_ip_addr - ; node_peer_id - ; sync_status - ; peers - ; block_producers - ; protocol_state_hash - ; ban_statuses - ; k_block_hashes_and_timestamps - ; git_commit - ; uptime_minutes - ; block_height_opt } + (* if rounding fails, just convert *) + Option.value_map + (Float.iround_nearest minutes_float) + ~f:Fn.id + ~default:(Float.to_int minutes_float) + in + let block_height_opt = + match best_tip_opt with + | None -> + None + | Some tip -> + let state = + Transition_frontier.Breadcrumb.protocol_state tip + in + let consensus_state = + state + |> Mina_state.Protocol_state.consensus_state + in + Some + ( Mina_numbers.Length.to_int + @@ Consensus.Data.Consensus_state + .blockchain_length consensus_state ) + in + Mina_networking.Rpcs.Get_node_status.Node_status. + { node_ip_addr + ; node_peer_id + ; sync_status + ; peers + ; block_producers + ; protocol_state_hash + ; ban_statuses + ; k_block_hashes_and_timestamps + ; git_commit + ; uptime_minutes + ; block_height_opt } ) in let get_some_initial_peers _ = - match !net_ref with - | None -> - (* should be unreachable; without a network, we wouldn't receive this RPC call *) - [%log' error config.logger] - "Network not instantiated when initial peers requested" ; - Deferred.return [] - | Some net -> - Mina_networking.peers net + trace_recurring "get_some_initial_peers" (fun () -> + match !net_ref with + | None -> + (* should be unreachable; without a network, we wouldn't receive this RPC call *) + [%log' error config.logger] + "Network not instantiated when initial peers requested" ; + Deferred.return [] + | Some net -> + Mina_networking.peers net ) in let%bind net = Mina_networking.create config.net_config ~get_some_initial_peers @@ -1535,14 +1544,15 @@ let create ?wallets (config : Config.t) = (handle_request "get_transition_chain" ~f:Sync_handler.get_transition_chain) ~get_transition_knowledge:(fun _q -> - return - ( match - Broadcast_pipe.Reader.peek frontier_broadcast_pipe_r - with - | None -> - [] - | Some frontier -> - Sync_handler.best_tip_path ~frontier ) ) + trace_recurring "get_transition_knowledge" (fun () -> + return + ( match + Broadcast_pipe.Reader.peek frontier_broadcast_pipe_r + with + | None -> + [] + | Some frontier -> + Sync_handler.best_tip_path ~frontier ) ) ) in (* tie the first knot *) net_ref := Some net ; @@ -1563,12 +1573,13 @@ let create ?wallets (config : Config.t) = config.precomputed_values.genesis_constants.txpool_max_size in let transaction_pool = - Network_pool.Transaction_pool.create ~config:txn_pool_config - ~constraint_constants ~consensus_constants - ~time_controller:config.time_controller ~logger:config.logger - ~incoming_diffs:(Mina_networking.transaction_pool_diffs net) - ~local_diffs:local_txns_reader - ~frontier_broadcast_pipe:frontier_broadcast_pipe_r + trace "transaction_pool" (fun () -> + Network_pool.Transaction_pool.create ~config:txn_pool_config + ~constraint_constants ~consensus_constants + ~time_controller:config.time_controller ~logger:config.logger + ~incoming_diffs:(Mina_networking.transaction_pool_diffs net) + ~local_diffs:local_txns_reader + ~frontier_broadcast_pipe:frontier_broadcast_pipe_r ) in (*Read from user_command_input_reader that has the user command inputs from client, infer nonce, create user command, and write it to the pipe consumed by the network pool*) Strict_pipe.Reader.iter user_command_input_reader @@ -1785,12 +1796,13 @@ let create ?wallets (config : Config.t) = ~disk_location:config.snark_pool_disk_location in let%bind snark_pool = - Network_pool.Snark_pool.load ~config:snark_pool_config - ~constraint_constants ~consensus_constants - ~time_controller:config.time_controller ~logger:config.logger - ~incoming_diffs:(Mina_networking.snark_pool_diffs net) - ~local_diffs:local_snark_work_reader - ~frontier_broadcast_pipe:frontier_broadcast_pipe_r + trace "snark_pool" (fun () -> + Network_pool.Snark_pool.load ~config:snark_pool_config + ~constraint_constants ~consensus_constants + ~time_controller:config.time_controller ~logger:config.logger + ~incoming_diffs:(Mina_networking.snark_pool_diffs net) + ~local_diffs:local_snark_work_reader + ~frontier_broadcast_pipe:frontier_broadcast_pipe_r ) in let snark_jobs_state = Work_selector.State.init @@ -1841,12 +1853,14 @@ let create ?wallets (config : Config.t) = , if config.log_precomputed_blocks then Some `Log else None ) in let subscriptions = - Coda_subscriptions.create ~logger:config.logger - ~constraint_constants ~new_blocks ~wallets - ~transition_frontier:frontier_broadcast_pipe_r - ~is_storing_all:config.is_archive_rocksdb - ~upload_blocks_to_gcloud:config.upload_blocks_to_gcloud - ~time_controller:config.time_controller ~precomputed_block_writer + trace "coda_subscriptions" (fun () -> + Coda_subscriptions.create ~logger:config.logger + ~constraint_constants ~new_blocks ~wallets + ~transition_frontier:frontier_broadcast_pipe_r + ~is_storing_all:config.is_archive_rocksdb + ~upload_blocks_to_gcloud:config.upload_blocks_to_gcloud + ~time_controller:config.time_controller + ~precomputed_block_writer ) in let open Mina_incremental.Status in let transition_frontier_incr = @@ -1864,18 +1878,20 @@ let create ?wallets (config : Config.t) = return None in let sync_status = - create_sync_status_observer ~logger:config.logger ~net - ~is_seed:config.is_seed ~demo_mode:config.demo_mode - ~transition_frontier_and_catchup_signal_incr - ~online_status_incr: - ( Var.watch @@ of_broadcast_pipe - @@ Mina_networking.online_status net ) - ~first_connection_incr: - ( Var.watch @@ of_deferred - @@ Mina_networking.on_first_connect net ~f:Fn.id ) - ~first_message_incr: - ( Var.watch @@ of_deferred - @@ Mina_networking.on_first_received_message net ~f:Fn.id ) + trace "sync_status_observer" (fun () -> + create_sync_status_observer ~logger:config.logger ~net + ~is_seed:config.is_seed ~demo_mode:config.demo_mode + ~transition_frontier_and_catchup_signal_incr + ~online_status_incr: + ( Var.watch @@ of_broadcast_pipe + @@ Mina_networking.online_status net ) + ~first_connection_incr: + ( Var.watch @@ of_deferred + @@ Mina_networking.on_first_connect net ~f:Fn.id ) + ~first_message_incr: + ( Var.watch @@ of_deferred + @@ Mina_networking.on_first_received_message net ~f:Fn.id + ) ) in (* tie other knot *) sync_status_ref := Some sync_status ; diff --git a/src/lib/network_pool/network_pool_base.ml b/src/lib/network_pool/network_pool_base.ml index 497e316d48b..6fb6ef37354 100644 --- a/src/lib/network_pool/network_pool_base.ml +++ b/src/lib/network_pool/network_pool_base.ml @@ -2,6 +2,7 @@ open Async_kernel open Core_kernel open Pipe_lib open Network_peer +open O1trace module Make (Transition_frontier : sig type t @@ -95,16 +96,17 @@ end) (Resource_pool.Diff.summary diff') ; forward t.write_broadcasts diff' rejected cb ) in - match%bind Resource_pool.Diff.unsafe_apply t.resource_pool diff with - | Ok res -> - rebroadcast res - | Error (`Locally_generated res) -> - rebroadcast res - | Error (`Other e) -> - [%log' debug t.logger] - "Refusing to rebroadcast. Pool diff apply feedback: $error" - ~metadata:[("error", Error_json.error_to_yojson e)] ; - Broadcast_callback.error e cb + trace_recurring (Resource_pool.label ^ "_apply_and_broadcast") (fun () -> + match%bind Resource_pool.Diff.unsafe_apply t.resource_pool diff with + | Ok res -> + rebroadcast res + | Error (`Locally_generated res) -> + rebroadcast res + | Error (`Other e) -> + [%log' debug t.logger] + "Refusing to rebroadcast. Pool diff apply feedback: $error" + ~metadata:[("error", Error_json.error_to_yojson e)] ; + Broadcast_callback.error e cb ) let log_rate_limiter_occasionally t rl = let time = Time_ns.Span.of_min 1. in @@ -140,54 +142,60 @@ end) if log_rate_limiter then log_rate_limiter_occasionally t rl ; (*Note: This is done asynchronously to use batch verification*) Strict_pipe.Reader.iter_without_pushback pipe ~f:(fun d -> - let diff, cb = f d in - if not (Broadcast_callback.is_expired cb) then ( - let summary = - `String (Resource_pool.Diff.summary @@ Envelope.Incoming.data diff) - in - [%log' debug t.logger] "Verifying $diff from $sender" - ~metadata: - [ ("diff", summary) - ; ("sender", Envelope.Sender.to_yojson diff.sender) ] ; - don't_wait_for - ( match - Rate_limiter.add rl diff.sender ~now:(Time.now ()) - ~score:(Resource_pool.Diff.score diff.data) - with - | `Capacity_exceeded -> - [%log' debug t.logger] - ~metadata: - [ ("sender", Envelope.Sender.to_yojson diff.sender) - ; ("diff", summary) ] - "exceeded capacity from $sender" ; - Broadcast_callback.error - (Error.of_string "exceeded capacity") - cb - | `Within_capacity -> ( - match%bind Resource_pool.Diff.verify t.resource_pool diff with - | Error err -> + trace_recurring (Resource_pool.label ^ "_verification") (fun () -> + let diff, cb = f d in + if not (Broadcast_callback.is_expired cb) then ( + let summary = + `String + (Resource_pool.Diff.summary @@ Envelope.Incoming.data diff) + in + [%log' debug t.logger] "Verifying $diff from $sender" + ~metadata: + [ ("diff", summary) + ; ("sender", Envelope.Sender.to_yojson diff.sender) ] ; + don't_wait_for + ( match + Rate_limiter.add rl diff.sender ~now:(Time.now ()) + ~score:(Resource_pool.Diff.score diff.data) + with + | `Capacity_exceeded -> [%log' debug t.logger] - "Refusing to rebroadcast $diff. Verification error: \ - $error" - ~metadata: - [ ("diff", summary) - ; ("error", Error_json.error_to_yojson err) ] ; - (*reject incoming messages*) - Broadcast_callback.error err cb - | Ok verified_diff -> ( - [%log' debug t.logger] "Verified diff: $verified_diff" ~metadata: - [ ( "verified_diff" - , Resource_pool.Diff.verified_to_yojson - @@ Envelope.Incoming.data verified_diff ) - ; ( "sender" - , Envelope.Sender.to_yojson - @@ Envelope.Incoming.sender verified_diff ) ] ; - match Strict_pipe.Writer.write w (verified_diff, cb) with - | Some r -> - r - | None -> - Deferred.unit ) ) ) ) ) + [ ("sender", Envelope.Sender.to_yojson diff.sender) + ; ("diff", summary) ] + "exceeded capacity from $sender" ; + Broadcast_callback.error + (Error.of_string "exceeded capacity") + cb + | `Within_capacity -> ( + match%bind + Resource_pool.Diff.verify t.resource_pool diff + with + | Error err -> + [%log' debug t.logger] + "Refusing to rebroadcast $diff. Verification error: \ + $error" + ~metadata: + [ ("diff", summary) + ; ("error", Error_json.error_to_yojson err) ] ; + (*reject incoming messages*) + Broadcast_callback.error err cb + | Ok verified_diff -> ( + [%log' debug t.logger] "Verified diff: $verified_diff" + ~metadata: + [ ( "verified_diff" + , Resource_pool.Diff.verified_to_yojson + @@ Envelope.Incoming.data verified_diff ) + ; ( "sender" + , Envelope.Sender.to_yojson + @@ Envelope.Incoming.sender verified_diff ) ] ; + match + Strict_pipe.Writer.write w (verified_diff, cb) + with + | Some r -> + r + | None -> + Deferred.unit ) ) ) ) ) ) |> don't_wait_for ; r @@ -221,7 +229,11 @@ end) | `Local (verified_diff, cb) -> apply_and_broadcast network_pool verified_diff cb | `Transition_frontier_extension diff -> - Resource_pool.handle_transition_frontier_diff diff resource_pool ) + trace_recurring + (Resource_pool.label ^ "_handle_transition_frontier_diff") + (fun () -> + Resource_pool.handle_transition_frontier_diff diff + resource_pool ) ) |> Deferred.don't_wait_for ; network_pool @@ -245,28 +257,29 @@ end) if Time.(add time rebroadcast_window < now ()) then `Timed_out else `Ok in let rec go () = - let rebroadcastable = - Resource_pool.get_rebroadcastable t.resource_pool ~has_timed_out - in - if List.is_empty rebroadcastable then - [%log trace] "Nothing to rebroadcast" - else - [%log debug] - "Preparing to rebroadcast locally generated resource pool diffs \ - $diffs" - ~metadata: - [ ("count", `Int (List.length rebroadcastable)) - ; ( "diffs" - , `List - (List.map - ~f:(fun d -> `String (Resource_pool.Diff.summary d)) - rebroadcastable) ) ] ; - let%bind () = - Deferred.List.iter rebroadcastable - ~f:(Linear_pipe.write t.write_broadcasts) - in - let%bind () = Async.after rebroadcast_interval in - go () + trace_recurring (Resource_pool.label ^ "_rebroadcast_loop") (fun () -> + let rebroadcastable = + Resource_pool.get_rebroadcastable t.resource_pool ~has_timed_out + in + if List.is_empty rebroadcastable then + [%log trace] "Nothing to rebroadcast" + else + [%log debug] + "Preparing to rebroadcast locally generated resource pool diffs \ + $diffs" + ~metadata: + [ ("count", `Int (List.length rebroadcastable)) + ; ( "diffs" + , `List + (List.map + ~f:(fun d -> `String (Resource_pool.Diff.summary d)) + rebroadcastable) ) ] ; + let%bind () = + Deferred.List.iter rebroadcastable + ~f:(Linear_pipe.write t.write_broadcasts) + in + let%bind () = Async.after rebroadcast_interval in + go () ) in go ()