From 7eee16c2648a3e6f3cbe144b0bdef5294fefaae9 Mon Sep 17 00:00:00 2001 From: georgeee Date: Wed, 15 Dec 2021 20:18:08 +0300 Subject: [PATCH] Introduce and use Sink abstraction Problem: there are a few layers of pipes that are used between sites of pubsub message receival and pubsub message processing. These layers complicate reasoning about code and impose an unnecessary additional load onto Async scheduler. Solution: introduce a Sink abstraction and use it to handle all gossips. --- src/lib/cli_lib/default.ml | 2 - src/lib/fake_network/fake_network.ml | 5 + src/lib/gossip_net/any.ml | 27 ++- src/lib/gossip_net/fake.ml | 79 ++++--- src/lib/gossip_net/gossip_net.ml | 25 ++- src/lib/gossip_net/intf.ml | 13 +- src/lib/gossip_net/libp2p.ml | 169 ++++++++------ src/lib/gossip_net/message.ml | 78 +++++++ src/lib/mina_lib/mina_lib.ml | 166 +++++--------- src/lib/mina_net2/mina_net2.ml | 1 + src/lib/mina_net2/mina_net2.mli | 1 + src/lib/mina_net2/sink.ml | 15 ++ src/lib/mina_networking/dune | 2 +- src/lib/mina_networking/mina_networking.ml | 212 ++++++++---------- src/lib/mina_networking/mina_networking.mli | 32 +-- src/lib/mina_networking/sinks.ml | 16 ++ src/lib/network_peer/envelope.ml | 2 + src/lib/network_peer/envelope.mli | 2 + src/lib/network_pool/block_sink.ml | 58 +++++ src/lib/network_pool/block_sink.mli | 18 ++ src/lib/network_pool/intf.ml | 53 +++-- src/lib/network_pool/network_pool_base.ml | 167 +++++--------- src/lib/network_pool/pool_sink.ml | 207 +++++++++++++++++ src/lib/network_pool/snark_pool.ml | 116 ++++------ src/lib/network_pool/snark_pool.mli | 12 +- src/lib/network_pool/test.ml | 36 +-- src/lib/network_pool/transaction_pool.ml | 24 +- .../transaction_inclusion_status.ml | 20 +- 28 files changed, 889 insertions(+), 669 deletions(-) create mode 100644 src/lib/mina_net2/sink.ml create mode 100644 src/lib/mina_networking/sinks.ml create mode 100644 src/lib/network_pool/block_sink.ml create mode 100644 src/lib/network_pool/block_sink.mli create mode 100644 src/lib/network_pool/pool_sink.ml diff --git a/src/lib/cli_lib/default.ml b/src/lib/cli_lib/default.ml index 3afa0f7b3f44..38b404ce0e58 100644 --- a/src/lib/cli_lib/default.ml +++ b/src/lib/cli_lib/default.ml @@ -8,5 +8,3 @@ let validation_queue_size = 150 let conf_dir_name = ".mina-config" let stop_time = 168 - -(* 24*7 hours*) diff --git a/src/lib/fake_network/fake_network.ml b/src/lib/fake_network/fake_network.ml index 60bfd842be77..fc41c6511ec9 100644 --- a/src/lib/fake_network/fake_network.ml +++ b/src/lib/fake_network/fake_network.ml @@ -131,6 +131,11 @@ let setup (type n) ~logger ?(trust_system = Trust_system.null ()) (* TODO: merge implementations with mina_lib *) Mina_networking.create (config peer state.consensus_local_state) + ~sinks: + { sink_tx = Network_pool.Transaction_pool.Remote_sink.void + ; sink_snark_work = Network_pool.Snark_pool.Remote_sink.void + ; sink_block = Network_pool.Block_sink.void + } ~get_staged_ledger_aux_and_pending_coinbases_at_hash: state.get_staged_ledger_aux_and_pending_coinbases_at_hash ~get_some_initial_peers:state.get_some_initial_peers diff --git a/src/lib/gossip_net/any.ml b/src/lib/gossip_net/any.ml index 035435e13229..e72fad3909f7 100644 --- a/src/lib/gossip_net/any.ml +++ b/src/lib/gossip_net/any.ml @@ -12,15 +12,19 @@ module type S = sig include Intf.Gossip_net_intf with module Rpc_intf := Rpc_intf and type t := t - type 't creator = Rpc_intf.rpc_handler list -> 't Deferred.t + type sinks + + type 't creator = Rpc_intf.rpc_handler list -> sinks -> 't Deferred.t type creatable = Creatable : 't implementation * 't creator -> creatable val create : creatable -> t creator end -module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : - S with module Rpc_intf := Rpc_intf = struct +module Make + (SinksImpl : Message.Sinks) + (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : + S with module Rpc_intf := Rpc_intf with type sinks := SinksImpl.sinks = struct open Rpc_intf module type Implementation_intf = @@ -30,12 +34,12 @@ module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : type t = Any : 't implementation * 't -> t - type 't creator = rpc_handler list -> 't Deferred.t + type 't creator = rpc_handler list -> SinksImpl.sinks -> 't Deferred.t type creatable = Creatable : 't implementation * 't creator -> creatable - let create (Creatable ((module M), creator)) impls = - let%map gossip_net = creator impls in + let create (Creatable ((module M), creator)) impls sinks = + let%map gossip_net = creator impls sinks in Any ((module M), gossip_net) let peers (Any ((module M), t)) = M.peers t @@ -62,16 +66,19 @@ module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : let query_random_peers (Any ((module M), t)) = M.query_random_peers t - let broadcast (Any ((module M), t)) = M.broadcast t + let broadcast_state (Any ((module M), t)) = M.broadcast_state t + + let broadcast_transaction_pool_diff (Any ((module M), t)) = + M.broadcast_transaction_pool_diff t + + let broadcast_snark_pool_diff (Any ((module M), t)) = + M.broadcast_snark_pool_diff t let on_first_connect (Any ((module M), t)) = M.on_first_connect t let on_first_high_connectivity (Any ((module M), t)) = M.on_first_high_connectivity t - let received_message_reader (Any ((module M), t)) = - M.received_message_reader t - let ban_notification_reader (Any ((module M), t)) = M.ban_notification_reader t diff --git a/src/lib/gossip_net/fake.ml b/src/lib/gossip_net/fake.ml index da59deadcb37..3edc3b19b0c1 100644 --- a/src/lib/gossip_net/fake.ml +++ b/src/lib/gossip_net/fake.ml @@ -9,16 +9,20 @@ open Network_peer module type S = sig include Intf.Gossip_net_intf + type sinks + type network val create_network : Peer.t list -> network val create_instance : - network -> Peer.t -> Rpc_intf.rpc_handler list -> t Deferred.t + network -> Peer.t -> Rpc_intf.rpc_handler list -> sinks -> t Deferred.t end -module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : - S with module Rpc_intf := Rpc_intf = struct +module Make + (SinksImpl : Message.Sinks) + (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : + S with module Rpc_intf := Rpc_intf with type sinks := SinksImpl.sinks = struct open Intf open Rpc_intf @@ -29,14 +33,7 @@ module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : -> 'r Mina_base.Rpc_intf.rpc_response Deferred.t } - type network_interface = - { broadcast_message_writer : - ( Message.msg Envelope.Incoming.t * Mina_net2.Validation_callback.t - , Strict_pipe.crash Strict_pipe.buffered - , unit ) - Strict_pipe.Writer.t - ; rpc_hook : rpc_hook - } + type network_interface = { sinks : SinksImpl.sinks; rpc_hook : rpc_hook } type node = { peer : Peer.t; mutable interface : network_interface option } @@ -75,19 +72,27 @@ module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : Or_error.error_string "cannot call rpc on peer which was never registered" - let broadcast t ~sender msg = - Hashtbl.iter t.nodes ~f:(fun nodes -> - List.iter nodes ~f:(fun node -> - if not (Peer.equal node.peer sender) then - Or_error.iter (get_interface node) ~f:(fun intf -> + let broadcast t ~sender msg send_f = + Hashtbl.fold t.nodes ~init:Deferred.unit + ~f:(fun ~key:_ ~data:nodes prev -> + prev + >>= fun () -> + Deferred.List.iter ~how:`Sequential nodes ~f:(fun node -> + if Peer.equal node.peer sender then Deferred.unit + else + Option.fold node.interface + ~f:(fun a intf -> + a + >>= fun () -> let msg = Envelope.( Incoming.wrap ~data:msg ~sender:(Sender.Remote sender)) in - Strict_pipe.Writer.write intf.broadcast_message_writer + send_f intf.sinks ( msg , Mina_net2.Validation_callback.create_without_expiration - () )))) + () )) + ~init:Deferred.unit)) let call_rpc : type q r. @@ -120,14 +125,6 @@ module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : ; peer_table : (Peer.Id.t, Peer.t) Hashtbl.t ; initial_peers : Peer.t list ; connection_gating : Mina_net2.connection_gating ref - ; received_message_reader : - (Message.msg Envelope.Incoming.t * Mina_net2.Validation_callback.t) - Strict_pipe.Reader.t - ; received_message_writer : - ( Message.msg Envelope.Incoming.t * Mina_net2.Validation_callback.t - , Strict_pipe.crash Strict_pipe.buffered - , unit ) - Strict_pipe.Writer.t ; ban_notification_reader : ban_notification Linear_pipe.Reader.t ; ban_notification_writer : ban_notification Linear_pipe.Writer.t } @@ -164,14 +161,11 @@ module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : in Network.{ hook } - let create network me rpc_handlers = + let create network me rpc_handlers sinks = let initial_peers = Network.get_initial_peers network me.Peer.host in let peer_table = Hashtbl.create (module Peer.Id) in List.iter initial_peers ~f:(fun peer -> Hashtbl.add_exn peer_table ~key:peer.peer_id ~data:peer) ; - let received_message_reader, received_message_writer = - Strict_pipe.(create (Buffered (`Capacity 5, `Overflow Crash))) - in let ban_notification_reader, ban_notification_writer = Linear_pipe.create () in @@ -185,17 +179,13 @@ module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : ref Mina_net2. { banned_peers = []; trusted_peers = []; isolate = false } - ; received_message_reader - ; received_message_writer ; ban_notification_reader ; ban_notification_writer } in Network.( attach_interface network me - { broadcast_message_writer = received_message_writer - ; rpc_hook = rpc_hook t rpc_handlers - }) ; + { sinks; rpc_hook = rpc_hook t rpc_handlers }) ; t let peers { peer_table; _ } = Hashtbl.data peer_table |> Deferred.return @@ -232,9 +222,6 @@ module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : let on_first_high_connectivity _ ~f:_ = Deferred.never () - let received_message_reader { received_message_reader; _ } = - received_message_reader - let ban_notification_reader { ban_notification_reader; _ } = ban_notification_reader @@ -265,7 +252,17 @@ module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : let query_random_peers _ = failwith "TODO stub" - let broadcast t msg = Network.broadcast t.network ~sender:t.me msg + let broadcast_state t state = + Network.broadcast t.network ~sender:t.me state (fun sinks -> + SinksImpl.Block_sink.push sinks.sink_block) + + let broadcast_snark_pool_diff t diff = + Network.broadcast t.network ~sender:t.me diff (fun sinks -> + SinksImpl.Snark_sink.push sinks.sink_snark_work) + + let broadcast_transaction_pool_diff t diff = + Network.broadcast t.network ~sender:t.me diff (fun sinks -> + SinksImpl.Tx_sink.push sinks.sink_tx) let connection_gating t = Deferred.return !(t.connection_gating) @@ -282,6 +279,6 @@ module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : let create_network = Network.create - let create_instance network local_ip impls = - Deferred.return (Instance.create network local_ip impls) + let create_instance network local_ip impls sinks = + Deferred.return (Instance.create network local_ip impls sinks) end diff --git a/src/lib/gossip_net/gossip_net.ml b/src/lib/gossip_net/gossip_net.ml index d3e819964b6b..0eb04d9a6ae1 100644 --- a/src/lib/gossip_net/gossip_net.ml +++ b/src/lib/gossip_net/gossip_net.ml @@ -4,24 +4,33 @@ module Libp2p = Libp2p module Fake = Fake module type S = sig + type sinks + module Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf include module type of Intf module Message : module type of Message - module Any : Any.S with module Rpc_intf := Rpc_intf + module Any : Any.S with module Rpc_intf := Rpc_intf with type sinks := sinks - module Libp2p : Libp2p.S with module Rpc_intf := Rpc_intf + module Libp2p : + Libp2p.S with module Rpc_intf := Rpc_intf with type sinks := sinks - module Fake : Fake.S with module Rpc_intf := Rpc_intf + module Fake : Fake.S with module Rpc_intf := Rpc_intf with type sinks := sinks end -module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : - S with module Rpc_intf := Rpc_intf = struct +module type Sinks = Message.Sinks + +module Wrapped_sinks = Message.Wrapped_sinks + +module Make + (SinksImpl : Message.Sinks) + (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : + S with module Rpc_intf := Rpc_intf with type sinks := SinksImpl.sinks = struct include Intf module Message = Message - module Any = Any.Make (Rpc_intf) - module Fake = Fake.Make (Rpc_intf) - module Libp2p = Libp2p.Make (Rpc_intf) + module Any = Any.Make (SinksImpl) (Rpc_intf) + module Fake = Fake.Make (SinksImpl) (Rpc_intf) + module Libp2p = Libp2p.Make (SinksImpl) (Rpc_intf) end diff --git a/src/lib/gossip_net/intf.ml b/src/lib/gossip_net/intf.ml index 4657ad1191b0..024a00ac5546 100644 --- a/src/lib/gossip_net/intf.ml +++ b/src/lib/gossip_net/intf.ml @@ -67,16 +67,17 @@ module type Gossip_net_intf = sig -> 'q -> 'r rpc_response Deferred.t List.t Deferred.t - val broadcast : t -> Message.msg -> unit + val broadcast_state : t -> Message.state_msg -> unit Deferred.t + + val broadcast_transaction_pool_diff : + t -> Message.transaction_pool_diff_msg -> unit Deferred.t + + val broadcast_snark_pool_diff : + t -> Message.snark_pool_diff_msg -> unit Deferred.t val on_first_connect : t -> f:(unit -> 'a) -> 'a Deferred.t val on_first_high_connectivity : t -> f:(unit -> 'a) -> 'a Deferred.t - val received_message_reader : - t - -> (Message.msg Envelope.Incoming.t * Mina_net2.Validation_callback.t) - Strict_pipe.Reader.t - val ban_notification_reader : t -> ban_notification Linear_pipe.Reader.t end diff --git a/src/lib/gossip_net/libp2p.ml b/src/lib/gossip_net/libp2p.ml index 4ac060fd8635..6259cae22f31 100644 --- a/src/lib/gossip_net/libp2p.ml +++ b/src/lib/gossip_net/libp2p.ml @@ -43,10 +43,13 @@ end module type S = sig include Intf.Gossip_net_intf + type sinks + val create : Config.t -> pids:Child_processes.Termination.t -> Rpc_intf.rpc_handler list + -> sinks -> t Deferred.t end @@ -57,8 +60,46 @@ let download_seed_peer_list uri = let%map contents = Cohttp_async.Body.to_string body in Mina_net2.Multiaddr.of_file_contents contents -module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : - S with module Rpc_intf := Rpc_intf = struct +type publish_functions = { publish_v0 : Message.msg -> unit Deferred.t } + +let empty_publish_functions = + let unit _ = Deferred.unit in + { publish_v0 = unit } + +let validate_gossip_base ~fn my_peer_id envelope validation_callback = + (* Messages from ourselves are valid. Don't try and reingest them. *) + match Envelope.Incoming.sender envelope with + | Local -> + Mina_net2.Validation_callback.fire_if_not_already_fired + validation_callback `Accept ; + Deferred.unit + | Remote sender -> + if not (Peer.Id.equal sender.peer_id my_peer_id) then + (* Match on different cases *) + fn (envelope, validation_callback) + else ( + Mina_net2.Validation_callback.fire_if_not_already_fired + validation_callback `Accept ; + Deferred.unit ) + +let on_gossip_decode_failure (config : Config.t) envelope (err : Error.t) = + let peer = Envelope.Incoming.sender envelope |> Envelope.Sender.remote_exn in + let metadata = + [ ("sender_peer_id", `String peer.peer_id) + ; ("error", Error_json.error_to_yojson err) + ] + in + Trust_system.( + record config.trust_system config.logger peer + Actions. + (Decoding_failed, Some ("failed to decode gossip message", metadata))) + |> don't_wait_for ; + () + +module Make + (SinksImpl : Message.Sinks) + (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : + S with module Rpc_intf := Rpc_intf with type sinks := SinksImpl.sinks = struct open Rpc_intf module T = struct @@ -69,10 +110,7 @@ module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : ; first_peer_ivar : unit Ivar.t ; high_connectivity_ivar : unit Ivar.t ; ban_reader : Intf.ban_notification Linear_pipe.Reader.t - ; message_reader : - (Message.msg Envelope.Incoming.t * Mina_net2.Validation_callback.t) - Strict_pipe.Reader.t - ; subscription : Message.msg Mina_net2.Pubsub.subscription Deferred.t ref + ; publish_functions : publish_functions ref ; restart_helper : unit -> unit } @@ -138,7 +176,8 @@ module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : (* Creates just the helper, making sure to register everything BEFORE we start listening/advertise ourselves for discovery. *) let create_libp2p (config : Config.t) rpc_handlers first_peer_ivar - high_connectivity_ivar ~added_seeds ~pids ~on_unexpected_termination = + high_connectivity_ivar ~added_seeds ~pids ~on_unexpected_termination + ~(sinks : SinksImpl.sinks) = let ctr = ref 0 in let record_peer_connection () = [%log' trace config.logger] "Fired peer_connected callback" ; @@ -331,57 +370,34 @@ module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : | Ok () -> () )) in - let message_reader, message_writer = - Strict_pipe.( - create - ~name:"Gossip_net.Libp2p messages with validation callbacks" - Synchronous) - in - let%bind subscription = + let subscribe ~fn topic bin_prot = Mina_net2.Pubsub.subscribe_encode net2 - "coda/consensus-messages/0.0.1" + topic (* Fix for #4097: validation is tied into a lot of complex control flow. Instead of refactoring it to have validation up-front and decoupled, we pass along a validation callback with the message. This ends up ignoring the actual subscription message pipe, so drain it separately. *) ~handle_and_validate_incoming_message: - (fun envelope validation_callback -> - (* Messages from ourselves are valid. Don't try and reingest them. *) - match Envelope.Incoming.sender envelope with - | Local -> - Mina_net2.Validation_callback.fire_if_not_already_fired - validation_callback `Accept ; - Deferred.unit - | Remote sender -> - if not (Peer.Id.equal sender.peer_id my_peer_id) then - Strict_pipe.Writer.write message_writer - (envelope, validation_callback) - else ( - Mina_net2.Validation_callback.fire_if_not_already_fired - validation_callback `Accept ; - Deferred.unit )) - ~bin_prot:Message.Latest.T.bin_msg - ~on_decode_failure: - (`Call - (fun envelope (err : Error.t) -> - let peer = - Envelope.Incoming.sender envelope - |> Envelope.Sender.remote_exn - in - let metadata = - [ ("sender_peer_id", `String peer.peer_id) - ; ("error", Error_json.error_to_yojson err) - ] - in - Trust_system.( - record config.trust_system config.logger peer - Actions. - ( Decoding_failed - , Some ("failed to decode gossip message", metadata) - )) - |> don't_wait_for ; - ())) + (validate_gossip_base ~fn my_peer_id) + ~bin_prot + ~on_decode_failure:(`Call (on_gossip_decode_failure config)) in + let subscribe_v0_impl = + subscribe + ~fn:(fun (env, vc) -> + match Envelope.Incoming.data env with + | Message.New_state state -> + SinksImpl.Block_sink.push sinks.sink_block + (Envelope.Incoming.map ~f:(fun _ -> state) env, vc) + | Message.Transaction_pool_diff diff -> + SinksImpl.Tx_sink.push sinks.sink_tx + (Envelope.Incoming.map ~f:(fun _ -> diff) env, vc) + | Message.Snark_pool_diff diff -> + SinksImpl.Snark_sink.push sinks.sink_snark_work + (Envelope.Incoming.map ~f:(fun _ -> diff) env, vc)) + "coda/consensus-messages/0.0.1" Message.Latest.T.bin_msg + in + let%bind publish_v0 = subscribe_v0_impl >>| Pubsub.publish net2 in let%map _ = (* XXX: this ALWAYS needs to be AFTER handle_protocol/subscribe or it is possible to miss connections! *) @@ -420,11 +436,11 @@ module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : [%log' warn config.logger] "starting libp2p up failed: $error" ~metadata:[ ("error", Error_json.error_to_yojson e) ])) ; - (subscription, message_reader) + { publish_v0 } in match%map initializing_libp2p_result with - | Ok (subscription, message_reader) -> - (net2, subscription, message_reader, me) + | Ok pfs -> + (net2, pfs, me) | Error e -> fail e ) | Ok (Error e) -> @@ -436,14 +452,12 @@ module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : let bandwidth_info t = !(t.net2) >>= Mina_net2.bandwidth_info - let create (config : Config.t) ~pids rpc_handlers = + let create (config : Config.t) ~pids rpc_handlers (sinks : SinksImpl.sinks) + = let first_peer_ivar = Ivar.create () in let high_connectivity_ivar = Ivar.create () in - let message_reader, message_writer = - Strict_pipe.create ~name:"libp2p_messages" Synchronous - in let net2_ref = ref (Deferred.never ()) in - let subscription_ref = ref (Deferred.never ()) in + let pfs_ref = ref empty_publish_functions in let restarts_r, restarts_w = Strict_pipe.create ~name:"libp2p-restarts" (Strict_pipe.Buffered @@ -453,7 +467,7 @@ module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : let%bind () = let rec on_libp2p_create res = net2_ref := - Deferred.map res ~f:(fun (n, _, _, _) -> + Deferred.map res ~f:(fun (n, _, _) -> ( match Sys.getenv "MINA_LIBP2P_HELPER_RESTART_INTERVAL_BASE" with @@ -478,18 +492,21 @@ module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : | None -> () ) ; n) ; - subscription_ref := Deferred.map res ~f:(fun (_, s, _, _) -> s) ; - upon res (fun (_, _, m, me) -> + let pf_impl f msg = + let%bind _, pf, _ = res in + f pf msg + in + pfs_ref := { publish_v0 = pf_impl (fun pf -> pf.publish_v0) } ; + upon res (fun (_, _, me) -> (* This is a hack so that we keep the same keypair across restarts. *) config.keypair <- Some me ; let logger = config.logger in - [%log trace] ~metadata:[] "Successfully restarted libp2p" ; - don't_wait_for (Strict_pipe.transfer m message_writer ~f:Fn.id)) + [%log trace] ~metadata:[] "Successfully restarted libp2p") and start_libp2p () = let libp2p = create_libp2p config rpc_handlers first_peer_ivar high_connectivity_ivar ~added_seeds ~pids - ~on_unexpected_termination:restart_libp2p + ~on_unexpected_termination:restart_libp2p ~sinks in on_libp2p_create libp2p ; Deferred.ignore_m libp2p and restart_libp2p () = don't_wait_for (start_libp2p ()) in @@ -548,8 +565,7 @@ module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : ; net2 = net2_ref ; first_peer_ivar ; high_connectivity_ivar - ; subscription = subscription_ref - ; message_reader + ; publish_functions = pfs_ref ; ban_reader ; restart_helper = (fun () -> Strict_pipe.Writer.write restarts_w ()) } @@ -786,19 +802,24 @@ module Make (Rpc_intf : Mina_base.Rpc_intf.Rpc_interface_intf) : (Peer.pretty_list peers) ; List.map peers ~f:(fun peer -> query_peer t peer.peer_id rpc query) - let broadcast t msg = - don't_wait_for - (let%bind net2 = !(t.net2) in - let%bind subscription = !(t.subscription) in - Mina_net2.Pubsub.publish net2 subscription msg) + (* broadcast to new topics *) + let broadcast_state t state = + let pfs = !(t.publish_functions) in + pfs.publish_v0 (Message.New_state state) + + let broadcast_transaction_pool_diff t diff = + let pfs = !(t.publish_functions) in + pfs.publish_v0 (Message.Transaction_pool_diff diff) + + let broadcast_snark_pool_diff t diff = + let pfs = !(t.publish_functions) in + pfs.publish_v0 (Message.Snark_pool_diff diff) let on_first_connect t ~f = Deferred.map (Ivar.read t.first_peer_ivar) ~f let on_first_high_connectivity t ~f = Deferred.map (Ivar.read t.high_connectivity_ivar) ~f - let received_message_reader t = t.message_reader - let ban_notification_reader t = t.ban_reader let connection_gating t = diff --git a/src/lib/gossip_net/message.ml b/src/lib/gossip_net/message.ml index 5634c9ff2625..c9403b713fc8 100644 --- a/src/lib/gossip_net/message.ml +++ b/src/lib/gossip_net/message.ml @@ -2,6 +2,7 @@ open Async open Core_kernel open Mina_transition open Network_pool +open Network_peer module Master = struct module T = struct @@ -10,6 +11,12 @@ module Master = struct | Snark_pool_diff of Snark_pool.Resource_pool.Diff.t | Transaction_pool_diff of Transaction_pool.Resource_pool.Diff.t [@@deriving sexp, to_yojson] + + type state_msg = External_transition.t + + type snark_pool_diff_msg = Snark_pool.Resource_pool.Diff.t + + type transaction_pool_diff_msg = Transaction_pool.Resource_pool.Diff.t end let name = "message" @@ -29,6 +36,12 @@ module V1 = struct | Transaction_pool_diff of Transaction_pool.Diff_versioned.Stable.V1.t [@@deriving bin_io, sexp, version { rpc }] + type state_msg = External_transition.Stable.V1.t + + type snark_pool_diff_msg = Snark_pool.Diff_versioned.Stable.V1.t + + type transaction_pool_diff_msg = Transaction_pool.Diff_versioned.Stable.V1.t + let callee_model_of_msg = Fn.id let msg_of_caller_model = Fn.id @@ -48,3 +61,68 @@ end module Latest = V1 [%%define_locally Latest.(summary)] + +type block_sink_msg = + state_msg Envelope.Incoming.t * Mina_net2.Validation_callback.t + +type tx_sink_msg = + transaction_pool_diff_msg Envelope.Incoming.t + * Mina_net2.Validation_callback.t + +type snark_sink_msg = + snark_pool_diff_msg Envelope.Incoming.t * Mina_net2.Validation_callback.t + +type 'msg push_modifier = ('msg -> unit Deferred.t) -> 'msg -> unit Deferred.t + +module type Sinks = sig + module Block_sink : Mina_net2.Sink.S with type msg := block_sink_msg + + module Tx_sink : Mina_net2.Sink.S with type msg := tx_sink_msg + + module Snark_sink : Mina_net2.Sink.S with type msg := snark_sink_msg + + type sinks = + { sink_block : Block_sink.t + ; sink_tx : Tx_sink.t + ; sink_snark_work : Snark_sink.t + } +end + +module Wrapped_sinks (S : Sinks) = struct + module Block_sink = struct + type t = S.Block_sink.t * block_sink_msg push_modifier + + let push (t, f) = f (S.Block_sink.push t) + end + + module Tx_sink = struct + type t = S.Tx_sink.t * tx_sink_msg push_modifier + + let push (t, f) = f (S.Tx_sink.push t) + end + + module Snark_sink = struct + type t = S.Snark_sink.t * snark_sink_msg push_modifier + + let push (t, f) = f (S.Snark_sink.push t) + end + + type sinks = + { sink_block : Block_sink.t + ; sink_tx : Tx_sink.t + ; sink_snark_work : Snark_sink.t + } + + let wrap ~block_push_modifier ~tx_push_modifier ~snark_push_modifier + (sinks : S.sinks) = + { sink_block = (sinks.sink_block, block_push_modifier) + ; sink_tx = (sinks.sink_tx, tx_push_modifier) + ; sink_snark_work = (sinks.sink_snark_work, snark_push_modifier) + } + + let preprocess ~block_fn ~tx_fn ~snark_fn = + let modifier fn f msg = fn msg >>= fun () -> f msg in + wrap ~block_push_modifier:(modifier block_fn) + ~tx_push_modifier:(modifier tx_fn) + ~snark_push_modifier:(modifier snark_fn) +end diff --git a/src/lib/mina_lib/mina_lib.ml b/src/lib/mina_lib/mina_lib.ml index 27d64674ff28..48d01c32c6a4 100644 --- a/src/lib/mina_lib/mina_lib.ml +++ b/src/lib/mina_lib/mina_lib.ml @@ -76,11 +76,6 @@ type pipes = , Strict_pipe.synchronous , unit Deferred.t ) Strict_pipe.Writer.t - ; external_transitions_writer : - ( External_transition.t Envelope.Incoming.t - * Block_time.t - * Mina_net2.Validation_callback.t ) - Pipe.Writer.t ; user_command_input_writer : ( User_command_input.t list * ( ( Network_pool.Transaction_pool.Resource_pool.Diff.t @@ -95,24 +90,8 @@ type pipes = , Strict_pipe.synchronous , unit Deferred.t ) Strict_pipe.Writer.t - ; user_command_writer : - ( User_command.t list - * ( ( Network_pool.Transaction_pool.Resource_pool.Diff.t - * Network_pool.Transaction_pool.Resource_pool.Diff.Rejected.t ) - Or_error.t - -> unit) - , Strict_pipe.synchronous - , unit Deferred.t ) - Strict_pipe.Writer.t - ; local_snark_work_writer : - ( Network_pool.Snark_pool.Resource_pool.Diff.t - * ( ( Network_pool.Snark_pool.Resource_pool.Diff.t - * Network_pool.Snark_pool.Resource_pool.Diff.rejected ) - Or_error.t - -> unit) - , Strict_pipe.synchronous - , unit Deferred.t ) - Strict_pipe.Writer.t + ; tx_local_sink : Network_pool.Transaction_pool.Local_sink.t + ; snark_local_sink : Network_pool.Snark_pool.Local_sink.t } type t = @@ -880,7 +859,7 @@ let add_work t (work : Snark_worker_lib.Work.Result.t) = Work_selection_method.remove t.snark_job_state spec in ignore (Or_error.try_with (fun () -> update_metrics ()) : unit Or_error.t) ; - Strict_pipe.Writer.write t.pipes.local_snark_work_writer + Network_pool.Snark_pool.Local_sink.push t.pipes.snark_local_sink (Network_pool.Snark_pool.Resource_pool.Diff.of_result work, cb) |> Deferred.don't_wait_for @@ -913,7 +892,7 @@ let add_transactions t (uc_inputs : User_command_input.t list) = let add_full_transactions t user_command = let result_ivar = Ivar.create () in - Strict_pipe.Writer.write t.pipes.user_command_writer + Network_pool.Transaction_pool.Local_sink.push t.pipes.tx_local_sink (user_command, Ivar.fill result_ivar) |> Deferred.don't_wait_for ; Ivar.read result_ivar @@ -1382,36 +1361,6 @@ let create ?wallets (config : Config.t) = [ ("rate_limiter", Network_pool.Rate_limiter.summary rl) ] !"%s $rate_limiter" label) in - let external_transitions_reader, external_transitions_writer = - let rl = - Network_pool.Rate_limiter.create - ~capacity: - ( (* Max of 20 transitions per slot per peer. *) - 20 - , `Per - (Block_time.Span.to_time_span - consensus_constants.slot_duration_ms) ) - in - log_rate_limiter_occasionally rl ~label:"new_block" ; - let r, w = Strict_pipe.create Synchronous in - ( Strict_pipe.Reader.filter_map r ~f:(fun ((e, _, cb) as x) -> - let sender = Envelope.Incoming.sender e in - match - Network_pool.Rate_limiter.add rl sender ~now:(Time.now ()) - ~score:1 - with - | `Capacity_exceeded -> - [%log' warn config.logger] - "$sender has sent many blocks. This is very unusual." - ~metadata: - [ ("sender", Envelope.Sender.to_yojson sender) ] ; - Mina_net2.Validation_callback.fire_if_not_already_fired cb - `Reject ; - None - | `Within_capacity -> - Some x) - , w ) - in let producer_transition_reader, producer_transition_writer = Strict_pipe.create Synchronous in @@ -1585,8 +1534,47 @@ let create ?wallets (config : Config.t) = | Some net -> Mina_networking.peers net) in + let txn_pool_config = + Network_pool.Transaction_pool.Resource_pool.make_config ~verifier + ~trust_system:config.trust_system + ~pool_max_size: + config.precomputed_values.genesis_constants.txpool_max_size + in + let transaction_pool, tx_remote_sink, tx_local_sink = + trace "transaction_pool" (fun () -> + (* make transaction pool return writer for local and incoming diffs *) + Network_pool.Transaction_pool.create ~config:txn_pool_config + ~constraint_constants ~consensus_constants + ~time_controller:config.time_controller ~logger:config.logger + ~frontier_broadcast_pipe:frontier_broadcast_pipe_r) + in + let snark_pool_config = + Network_pool.Snark_pool.Resource_pool.make_config ~verifier + ~trust_system:config.trust_system + ~disk_location:config.snark_pool_disk_location + in + let%bind snark_pool, snark_remote_sink, snark_local_sink = + trace "snark_pool" (fun () -> + Network_pool.Snark_pool.load ~config:snark_pool_config + ~constraint_constants ~consensus_constants + ~time_controller:config.time_controller ~logger:config.logger + ~frontier_broadcast_pipe:frontier_broadcast_pipe_r) + in + let block_reader, block_sink = + Network_pool.Block_sink.create ~logger:config.logger + ~slot_duration_ms: + config.precomputed_values.consensus_constants.slot_duration_ms + ~time_controller:config.time_controller + in + let sinks = + { Mina_networking.Sinks.Unwrapped.sink_block = block_sink + ; sink_tx = tx_remote_sink + ; sink_snark_work = snark_remote_sink + } + in let%bind net = - Mina_networking.create config.net_config ~get_some_initial_peers + Mina_networking.create config.net_config ~sinks + ~get_some_initial_peers ~get_staged_ledger_aux_and_pending_coinbases_at_hash: (fun query_env -> trace_recurring @@ -1680,28 +1668,7 @@ let create ?wallets (config : Config.t) = let user_command_input_reader, user_command_input_writer = Strict_pipe.(create ~name:"local transactions" Synchronous) in - let local_txns_reader, local_txns_writer = - Strict_pipe.(create ~name:"local transactions" Synchronous) - in - let local_snark_work_reader, local_snark_work_writer = - Strict_pipe.(create ~name:"local snark work" Synchronous) - in let block_produced_bvar = Bvar.create () in - let txn_pool_config = - Network_pool.Transaction_pool.Resource_pool.make_config ~verifier - ~trust_system:config.trust_system - ~pool_max_size: - config.precomputed_values.genesis_constants.txpool_max_size - in - let transaction_pool = - trace "transaction_pool" (fun () -> - Network_pool.Transaction_pool.create ~config:txn_pool_config - ~constraint_constants ~consensus_constants - ~time_controller:config.time_controller ~logger:config.logger - ~incoming_diffs:(Mina_networking.transaction_pool_diffs net) - ~local_diffs:local_txns_reader - ~frontier_broadcast_pipe:frontier_broadcast_pipe_r) - in (*Read from user_command_input_reader that has the user command inputs from client, infer nonce, create user command, and write it to the pipe consumed by the network pool*) Strict_pipe.Reader.iter user_command_input_reader ~f:(fun (input_list, result_cb, get_current_nonce, get_account) -> @@ -1716,8 +1683,8 @@ let create ?wallets (config : Config.t) = (Error (Error.of_string "No user commands to send")) ; Deferred.unit ) else - (*callback for the result from transaction_pool.apply_diff*) - Strict_pipe.Writer.write local_txns_writer + Network_pool.Transaction_pool.Local_sink.push tx_local_sink + (*callback for the result from transaction_pool.apply_diff*) ( List.map user_commands ~f:(fun c -> User_command.Signed_command c) , result_cb ) @@ -1746,10 +1713,9 @@ let create ?wallets (config : Config.t) = config.persistent_frontier_location ~frontier_broadcast_pipe: (frontier_broadcast_pipe_r, frontier_broadcast_pipe_w) - ~catchup_mode + ~catchup_mode (* TODO Remove pipe map form here *) ~network_transition_reader: - (Strict_pipe.Reader.map external_transitions_reader - ~f:(fun (tn, tm, cb) -> + (Strict_pipe.Reader.map block_reader ~f:(fun (tn, tm, cb) -> let lift_consensus_time = Fn.compose UInt32.to_int Consensus.Data.Consensus_time.to_uint32 @@ -1792,7 +1758,7 @@ let create ?wallets (config : Config.t) = et validation_callback ; don't_wait_for (* this will never throw since the callback was created without expiration *) - (let%map v = + (let%bind v = Mina_net2.Validation_callback.await_exn validation_callback in @@ -1802,7 +1768,8 @@ let create ?wallets (config : Config.t) = then Mina_networking.broadcast_state net (External_transition.Validation - .forget_validation_with_hash et)) ; + .forget_validation_with_hash et) + else Deferred.unit) ; breadcrumb)) ~most_recent_valid_block ~precomputed_values:config.precomputed_values) @@ -1833,8 +1800,7 @@ let create ?wallets (config : Config.t) = Network_pool.Transaction_pool.Resource_pool.Diff .max_per_15_seconds x in - Mina_networking.broadcast_transaction_pool_diff net x ; - Deferred.unit)) ; + Mina_networking.broadcast_transaction_pool_diff net x)) ; trace_task "valid_transitions_for_network broadcast loop" (fun () -> Strict_pipe.Reader.iter_without_pushback valid_transitions_for_network @@ -1902,10 +1868,6 @@ let create ?wallets (config : Config.t) = [%log' warn config.logger] ~metadata "Not rebroadcasting block $state_hash because it \ was received $timing" ))) ; - don't_wait_for - (Strict_pipe.transfer - (Mina_networking.states net) - external_transitions_writer ~f:ident) ; (* FIXME #4093: augment ban_notifications with a Peer.ID so we can implement ban_notify trace_task "ban notification loop" (fun () -> Linear_pipe.iter (Mina_networking.ban_notification_reader net) @@ -1922,20 +1884,6 @@ let create ?wallets (config : Config.t) = (Linear_pipe.iter (Mina_networking.ban_notification_reader net) ~f:(Fn.const Deferred.unit)) ; - let snark_pool_config = - Network_pool.Snark_pool.Resource_pool.make_config ~verifier - ~trust_system:config.trust_system - ~disk_location:config.snark_pool_disk_location - in - let%bind snark_pool = - trace "snark_pool" (fun () -> - Network_pool.Snark_pool.load ~config:snark_pool_config - ~constraint_constants ~consensus_constants - ~time_controller:config.time_controller ~logger:config.logger - ~incoming_diffs:(Mina_networking.snark_pool_diffs net) - ~local_diffs:local_snark_work_reader - ~frontier_broadcast_pipe:frontier_broadcast_pipe_r) - in let snark_jobs_state = Work_selector.State.init ~reassignment_wait:config.work_reassignment_wait @@ -1963,8 +1911,7 @@ let create ?wallets (config : Config.t) = Network_pool.Snark_pool.Resource_pool.Diff .max_per_15_seconds x in - Mina_networking.broadcast_snark_pool_diff net x ; - Deferred.unit)) ; + Mina_networking.broadcast_snark_pool_diff net x)) ; Option.iter config.archive_process_location ~f:(fun archive_process_port -> [%log' info config.logger] @@ -2048,12 +1995,9 @@ let create ?wallets (config : Config.t) = ; pipes = { validated_transitions_reader = valid_transitions_for_api ; producer_transition_writer - ; external_transitions_writer = - Strict_pipe.Writer.to_linear_pipe - external_transitions_writer ; user_command_input_writer - ; user_command_writer = local_txns_writer - ; local_snark_work_writer + ; tx_local_sink + ; snark_local_sink } ; wallets ; block_production_keypairs diff --git a/src/lib/mina_net2/mina_net2.ml b/src/lib/mina_net2/mina_net2.ml index 3349dbcbf23f..d7d16e9fd87b 100644 --- a/src/lib/mina_net2/mina_net2.ml +++ b/src/lib/mina_net2/mina_net2.ml @@ -6,6 +6,7 @@ module Keypair = Keypair module Libp2p_stream = Libp2p_stream module Multiaddr = Multiaddr module Validation_callback = Validation_callback +module Sink = Sink exception Libp2p_helper_died_unexpectedly = Libp2p_helper diff --git a/src/lib/mina_net2/mina_net2.mli b/src/lib/mina_net2/mina_net2.mli index 3145af9728fd..a936684a1ec3 100644 --- a/src/lib/mina_net2/mina_net2.mli +++ b/src/lib/mina_net2/mina_net2.mli @@ -108,6 +108,7 @@ module Keypair : sig end module Validation_callback = Validation_callback +module Sink = Sink (** [create ~logger ~conf_dir] starts a new [net] storing its state in [conf_dir] * diff --git a/src/lib/mina_net2/sink.ml b/src/lib/mina_net2/sink.ml new file mode 100644 index 000000000000..8ca5e85eb019 --- /dev/null +++ b/src/lib/mina_net2/sink.ml @@ -0,0 +1,15 @@ +open Async_kernel + +module type S = sig + type t + + type msg + + val push : t -> msg -> unit Deferred.t +end + +module type S_with_void = sig + include S + + val void : t +end diff --git a/src/lib/mina_networking/dune b/src/lib/mina_networking/dune index d6a324c34230..5d345e751ab9 100644 --- a/src/lib/mina_networking/dune +++ b/src/lib/mina_networking/dune @@ -5,7 +5,7 @@ (libraries core o1trace async mina_intf gossip_net mina_base unix_timestamp perf_histograms proof_carrying_data consensus network_pool mina_transition transition_frontier staged_ledger - sync_status) + sync_status mina_net2) (inline_tests) (preprocess (pps ppx_coda ppx_compare ppx_hash ppx_version ppx_inline_test ppx_compare ppx_deriving.make ppx_deriving_yojson ppx_optcomp ppx_bin_prot ppx_sexp_conv ppx_fields_conv ppx_let ppx_register_event ppx_custom_printf)) diff --git a/src/lib/mina_networking/mina_networking.ml b/src/lib/mina_networking/mina_networking.ml index 4318b6b9a9db..b2c45f3cfffb 100644 --- a/src/lib/mina_networking/mina_networking.ml +++ b/src/lib/mina_networking/mina_networking.ml @@ -1008,7 +1008,8 @@ module Rpcs = struct None end -module Gossip_net = Gossip_net.Make (Rpcs) +module Sinks = Sinks +module Gossip_net = Gossip_net.Make (Sinks) (Rpcs) module Config = struct type log_gossip_heard = @@ -1033,19 +1034,6 @@ type t = { logger : Logger.t ; trust_system : Trust_system.t ; gossip_net : Gossip_net.Any.t - ; states : - ( External_transition.t Envelope.Incoming.t - * Block_time.t - * Mina_net2.Validation_callback.t ) - Strict_pipe.Reader.t - ; transaction_pool_diffs : - ( Transaction_pool.Resource_pool.Diff.t Envelope.Incoming.t - * Mina_net2.Validation_callback.t ) - Strict_pipe.Reader.t - ; snark_pool_diffs : - ( Snark_pool.Resource_pool.Diff.t Envelope.Incoming.t - * Mina_net2.Validation_callback.t ) - Strict_pipe.Reader.t ; online_status : [ `Offline | `Online ] Broadcast_pipe.Reader.t ; first_received_message_signal : unit Ivar.t } @@ -1063,24 +1051,26 @@ let setup_timer ~constraint_constants time_controller sync_state_broadcaster = Broadcast_pipe.Writer.write sync_state_broadcaster `Offline |> don't_wait_for) -let online_broadcaster ~constraint_constants time_controller received_messages = +let online_broadcaster ~constraint_constants time_controller = let online_reader, online_writer = Broadcast_pipe.create `Offline in let init = Block_time.Timeout.create time_controller (Block_time.Span.of_ms Int64.zero) ~f:ignore in - Strict_pipe.Reader.fold received_messages ~init ~f:(fun old_timeout _ -> - let%map () = Broadcast_pipe.Writer.write online_writer `Online in - Block_time.Timeout.cancel time_controller old_timeout () ; - setup_timer ~constraint_constants time_controller online_writer) - |> Deferred.ignore_m |> don't_wait_for ; - online_reader + let v = ref init in + let notify_online () = + let old_timeout = !v in + let%map () = Broadcast_pipe.Writer.write online_writer `Online in + Block_time.Timeout.cancel time_controller old_timeout () ; + v := setup_timer ~constraint_constants time_controller online_writer + in + (online_reader, notify_online) let wrap_rpc_data_in_envelope conn data = Envelope.Incoming.wrap_peer ~data ~sender:conn -let create (config : Config.t) +let create (config : Config.t) ~sinks ~(get_some_initial_peers : Rpcs.Get_some_initial_peers.query Envelope.Incoming.t -> Rpcs.Get_some_initial_peers.response Deferred.t) @@ -1435,8 +1425,81 @@ let create (config : Config.t) ~f:(fun (Rpc_handler { rpc; f; cost; budget }) -> Rpcs.(Rpc_handler { rpc = Consensus_rpc rpc; f; cost; budget }))) in + let first_received_message_signal = Ivar.create () in + let online_status, notify_online = + online_broadcaster ~constraint_constants:config.constraint_constants + config.time_controller + in + let wrapped_sinks = + Sinks.preprocess + ~block_fn:(fun (envelope, valid_cb) -> + Ivar.fill_if_empty first_received_message_signal () ; + Mina_metrics.(Counter.inc_one Network.gossip_messages_received) ; + let state = envelope.data in + let processing_start_time = + Block_time.(now config.time_controller |> to_time) + in + don't_wait_for + ( match%map Mina_net2.Validation_callback.await valid_cb with + | Some `Accept -> + let processing_time_span = + Time.diff + Block_time.(now config.time_controller |> to_time) + processing_start_time + in + Mina_metrics.Block_latency.( + Validation_acceptance_time.update processing_time_span) + | _ -> + () ) ; + Perf_histograms.add_span ~name:"external_transition_latency" + (Core.Time.abs_diff + Block_time.(now config.time_controller |> to_time) + ( External_transition.protocol_state state + |> Protocol_state.blockchain_state |> Blockchain_state.timestamp + |> Block_time.to_time )) ; + Mina_metrics.(Gauge.inc_one Network.new_state_received) ; + if config.log_gossip_heard.new_state then + [%str_log info] + ~metadata: + [ ("external_transition", External_transition.to_yojson state) ] + (Block_received + { state_hash = External_transition.state_hash state + ; sender = Envelope.Incoming.sender envelope + }) ; + Mina_net2.Validation_callback.set_message_type valid_cb `Block ; + Mina_metrics.(Counter.inc_one Network.Block.received) ; + notify_online ()) + ~snark_fn:(fun (envelope, valid_cb) -> + Ivar.fill_if_empty first_received_message_signal () ; + Mina_metrics.(Counter.inc_one Network.gossip_messages_received) ; + Mina_metrics.(Gauge.inc_one Network.snark_pool_diff_received) ; + let diff = envelope.data in + if config.log_gossip_heard.snark_pool_diff then + Option.iter (Snark_pool.Resource_pool.Diff.to_compact diff) + ~f:(fun work -> + [%str_log debug] + (Snark_work_received + { work; sender = Envelope.Incoming.sender envelope })) ; + Mina_metrics.(Counter.inc_one Network.Snark_work.received) ; + Mina_net2.Validation_callback.set_message_type valid_cb `Snark_work ; + notify_online ()) + ~tx_fn:(fun (envelope, valid_cb) -> + Ivar.fill_if_empty first_received_message_signal () ; + Mina_metrics.(Counter.inc_one Network.gossip_messages_received) ; + Mina_metrics.(Gauge.inc_one Network.transaction_pool_diff_received) ; + let diff = envelope.data in + + if config.log_gossip_heard.transaction_pool_diff then + [%str_log debug] + (Transactions_received + { txns = diff; sender = Envelope.Incoming.sender envelope }) ; + Mina_net2.Validation_callback.set_message_type valid_cb `Transaction ; + Mina_metrics.(Counter.inc_one Network.Transaction.received) ; + notify_online ()) + sinks + in let%map gossip_net = - Gossip_net.Any.create config.creatable_gossip_net rpc_handlers + Gossip_net.Any.create config.creatable_gossip_net rpc_handlers wrapped_sinks in (* The node status RPC is implemented directly in go, serving a string which is periodically updated. This is so that one can make this RPC on a node even @@ -1468,86 +1531,9 @@ let create (config : Config.t) For example, some things you really want to not drop (like your outgoing block announcment). *) - let received_gossips, online_notifier = - Strict_pipe.Reader.Fork.two - (Gossip_net.Any.received_message_reader gossip_net) - in - let online_status = - online_broadcaster ~constraint_constants:config.constraint_constants - config.time_controller online_notifier - in - let first_received_message_signal = Ivar.create () in - let states, snark_pool_diffs, transaction_pool_diffs = - Strict_pipe.Reader.partition_map3 received_gossips - ~f:(fun (envelope, valid_cb) -> - Ivar.fill_if_empty first_received_message_signal () ; - Mina_metrics.(Counter.inc_one Network.gossip_messages_received) ; - match Envelope.Incoming.data envelope with - | New_state state -> - let processing_start_time = - Block_time.(now config.time_controller |> to_time) - in - don't_wait_for - ( match%map Mina_net2.Validation_callback.await valid_cb with - | Some `Accept -> - let processing_time_span = - Time.diff - Block_time.(now config.time_controller |> to_time) - processing_start_time - in - Mina_metrics.Block_latency.( - Validation_acceptance_time.update processing_time_span) - | _ -> - () ) ; - Perf_histograms.add_span ~name:"external_transition_latency" - (Core.Time.abs_diff - Block_time.(now config.time_controller |> to_time) - ( External_transition.protocol_state state - |> Protocol_state.blockchain_state - |> Blockchain_state.timestamp |> Block_time.to_time )) ; - Mina_metrics.(Gauge.inc_one Network.new_state_received) ; - if config.log_gossip_heard.new_state then - [%str_log info] - ~metadata: - [ ("external_transition", External_transition.to_yojson state) - ] - (Block_received - { state_hash = External_transition.state_hash state - ; sender = Envelope.Incoming.sender envelope - }) ; - Mina_net2.Validation_callback.set_message_type valid_cb `Block ; - Mina_metrics.(Counter.inc_one Network.Block.received) ; - `Fst - ( Envelope.Incoming.map envelope ~f:(fun _ -> state) - , Block_time.now config.time_controller - , valid_cb ) - | Snark_pool_diff diff -> - Mina_metrics.(Gauge.inc_one Network.snark_pool_diff_received) ; - if config.log_gossip_heard.snark_pool_diff then - Option.iter (Snark_pool.Resource_pool.Diff.to_compact diff) - ~f:(fun work -> - [%str_log debug] - (Snark_work_received - { work; sender = Envelope.Incoming.sender envelope })) ; - Mina_metrics.(Counter.inc_one Network.Snark_work.received) ; - Mina_net2.Validation_callback.set_message_type valid_cb `Snark_work ; - `Snd (Envelope.Incoming.map envelope ~f:(fun _ -> diff), valid_cb) - | Transaction_pool_diff diff -> - Mina_metrics.(Gauge.inc_one Network.transaction_pool_diff_received) ; - if config.log_gossip_heard.transaction_pool_diff then - [%str_log debug] - (Transactions_received - { txns = diff; sender = Envelope.Incoming.sender envelope }) ; - Mina_net2.Validation_callback.set_message_type valid_cb `Transaction ; - Mina_metrics.(Counter.inc_one Network.Transaction.received) ; - `Trd (Envelope.Incoming.map envelope ~f:(fun _ -> diff), valid_cb)) - in { gossip_net ; logger = config.logger ; trust_system = config.trust_system - ; states - ; snark_pool_diffs - ; transaction_pool_diffs ; online_status ; first_received_message_signal } @@ -1609,33 +1595,33 @@ let fill_first_received_message_signal { first_received_message_signal; _ } = Ivar.fill_if_empty first_received_message_signal () (* TODO: Have better pushback behavior *) -let broadcast t ~log_msg msg = - [%str_log' trace t.logger] +let log_gossip logger ~log_msg msg = + [%str_log' trace logger] ~metadata:[ ("message", Gossip_net.Message.msg_to_yojson msg) ] - log_msg ; - Gossip_net.Any.broadcast t.gossip_net msg + log_msg let broadcast_state t state = - let msg = Gossip_net.Message.New_state (With_hash.data state) in - [%str_log' info t.logger] - ~metadata:[ ("message", Gossip_net.Message.msg_to_yojson msg) ] - (Gossip_new_state { state_hash = With_hash.hash state }) ; + let msg = With_hash.data state in + log_gossip t.logger (Gossip_net.Message.New_state msg) + ~log_msg:(Gossip_new_state { state_hash = With_hash.hash state }) ; Mina_metrics.(Gauge.inc_one Network.new_state_broadcasted) ; - Gossip_net.Any.broadcast t.gossip_net msg + Gossip_net.Any.broadcast_state t.gossip_net msg let broadcast_transaction_pool_diff t diff = + log_gossip t.logger (Gossip_net.Message.Transaction_pool_diff diff) + ~log_msg:(Gossip_transaction_pool_diff { txns = diff }) ; Mina_metrics.(Gauge.inc_one Network.transaction_pool_diff_broadcasted) ; - broadcast t (Gossip_net.Message.Transaction_pool_diff diff) - ~log_msg:(Gossip_transaction_pool_diff { txns = diff }) + Gossip_net.Any.broadcast_transaction_pool_diff t.gossip_net diff let broadcast_snark_pool_diff t diff = Mina_metrics.(Gauge.inc_one Network.snark_pool_diff_broadcasted) ; - broadcast t (Gossip_net.Message.Snark_pool_diff diff) + log_gossip t.logger (Gossip_net.Message.Snark_pool_diff diff) ~log_msg: (Gossip_snark_pool_diff { work = Option.value_exn (Snark_pool.Resource_pool.Diff.to_compact diff) - }) + }) ; + Gossip_net.Any.broadcast_snark_pool_diff t.gossip_net diff (* TODO: Don't copy and paste *) let find_map' xs ~f = diff --git a/src/lib/mina_networking/mina_networking.mli b/src/lib/mina_networking/mina_networking.mli index 57fbdf3f92c1..42322f81488c 100644 --- a/src/lib/mina_networking/mina_networking.mli +++ b/src/lib/mina_networking/mina_networking.mli @@ -157,7 +157,10 @@ module Rpcs : sig include Rpc_intf.Rpc_interface_intf with type ('q, 'r) rpc := ('q, 'r) rpc end -module Gossip_net : Gossip_net.S with module Rpc_intf := Rpcs +module Sinks : module type of Sinks + +module Gossip_net : + Gossip_net.S with module Rpc_intf := Rpcs with type sinks := Sinks.sinks module Config : sig type log_gossip_heard = @@ -180,13 +183,6 @@ end type t -val states : - t - -> ( External_transition.t Envelope.Incoming.t - * Block_time.t - * Mina_net2.Validation_callback.t ) - Strict_pipe.Reader.t - val peers : t -> Network_peer.Peer.t list Deferred.t val bandwidth_info : @@ -262,25 +258,14 @@ val get_staged_ledger_aux_and_pending_coinbases_at_hash : val ban_notify : t -> Network_peer.Peer.t -> Time.t -> unit Deferred.Or_error.t -val snark_pool_diffs : - t - -> ( Snark_pool.Resource_pool.Diff.t Envelope.Incoming.t - * Mina_net2.Validation_callback.t ) - Strict_pipe.Reader.t - -val transaction_pool_diffs : - t - -> ( Transaction_pool.Resource_pool.Diff.t Envelope.Incoming.t - * Mina_net2.Validation_callback.t ) - Strict_pipe.Reader.t - val broadcast_state : - t -> (External_transition.t, State_hash.t) With_hash.t -> unit + t -> (External_transition.t, State_hash.t) With_hash.t -> unit Deferred.t -val broadcast_snark_pool_diff : t -> Snark_pool.Resource_pool.Diff.t -> unit +val broadcast_snark_pool_diff : + t -> Snark_pool.Resource_pool.Diff.t -> unit Deferred.t val broadcast_transaction_pool_diff : - t -> Transaction_pool.Resource_pool.Diff.t -> unit + t -> Transaction_pool.Resource_pool.Diff.t -> unit Deferred.t val glue_sync_ledger : t @@ -315,6 +300,7 @@ val ban_notification_reader : val create : Config.t + -> sinks:Sinks.Unwrapped.sinks -> get_some_initial_peers: ( Rpcs.Get_some_initial_peers.query Envelope.Incoming.t -> Rpcs.Get_some_initial_peers.response Deferred.t) diff --git a/src/lib/mina_networking/sinks.ml b/src/lib/mina_networking/sinks.ml new file mode 100644 index 000000000000..f778226833cc --- /dev/null +++ b/src/lib/mina_networking/sinks.ml @@ -0,0 +1,16 @@ +module Unwrapped = struct + module Tx_sink = Network_pool.Transaction_pool.Remote_sink + module Snark_sink = Network_pool.Snark_pool.Remote_sink + module Block_sink = Network_pool.Block_sink + + type sinks = + { sink_block : Block_sink.t + ; sink_tx : Tx_sink.t + ; sink_snark_work : Snark_sink.t + } +end + +include Gossip_net.Wrapped_sinks (Unwrapped) + +(* let create_test_unwrapped = + let sink_tx = Network_pool.Transaction_pool.Remote_sink.create_test *) diff --git a/src/lib/network_peer/envelope.ml b/src/lib/network_peer/envelope.ml index 6a974ac0f0f8..9af6bf76364c 100644 --- a/src/lib/network_peer/envelope.ml +++ b/src/lib/network_peer/envelope.ml @@ -110,3 +110,5 @@ module Incoming = struct let received_at = Time.now () in { data; sender; received_at } end + +type 'a gossip = Gossip of { incoming : 'a Incoming.t; topic : string } diff --git a/src/lib/network_peer/envelope.mli b/src/lib/network_peer/envelope.mli index 1a2dc3894240..f5b74b145690 100644 --- a/src/lib/network_peer/envelope.mli +++ b/src/lib/network_peer/envelope.mli @@ -28,3 +28,5 @@ module Incoming : sig val gen : 'a Quickcheck.Generator.t -> 'a t Quickcheck.Generator.t end + +type 'a gossip = Gossip of { incoming : 'a Incoming.t; topic : string } diff --git a/src/lib/network_pool/block_sink.ml b/src/lib/network_pool/block_sink.ml new file mode 100644 index 000000000000..16047d9009b0 --- /dev/null +++ b/src/lib/network_pool/block_sink.ml @@ -0,0 +1,58 @@ +open Network_peer +open Mina_transition +open Core_kernel +open Async +open Pipe_lib.Strict_pipe + +type stream_msg = + External_transition.t Envelope.Incoming.t + * Block_time.t + * Mina_net2.Validation_callback.t + +type t = + | Sink of + { writer : (stream_msg, synchronous, unit Deferred.t) Writer.t + ; rate_limiter : Rate_limiter.t + ; logger : Logger.t + ; time_controller : Block_time.Controller.t + } + | Void + +let push sink (e, cb) = + match sink with + | Void -> + Deferred.unit + | Sink { writer; rate_limiter; logger; time_controller; _ } -> ( + let sender = Envelope.Incoming.sender e in + match + Rate_limiter.add rate_limiter sender ~now:(Time.now ()) ~score:1 + with + | `Capacity_exceeded -> + [%log' warn logger] + "$sender has sent many blocks. This is very unusual." + ~metadata:[ ("sender", Envelope.Sender.to_yojson sender) ] ; + Mina_net2.Validation_callback.fire_if_not_already_fired cb `Reject ; + Deferred.unit + | `Within_capacity -> + Writer.write writer (e, Block_time.now time_controller, cb) ) + +let log_rate_limiter_occasionally rl ~logger ~label = + let t = Time.Span.of_min 1. in + every t (fun () -> + [%log' debug logger] + ~metadata:[ ("rate_limiter", Rate_limiter.summary rl) ] + !"%s $rate_limiter" label) + +let create ~logger ~slot_duration_ms ~time_controller = + let rate_limiter = + Rate_limiter.create + ~capacity: + ( (* Max of 20 transitions per slot per peer. *) + 20 + , `Per (Block_time.Span.to_time_span slot_duration_ms) ) + in + log_rate_limiter_occasionally rate_limiter ~logger ~label:"new_block" ; + let reader, writer = create Synchronous in + (reader, Sink { writer; rate_limiter; logger; time_controller }) + +let void = Void diff --git a/src/lib/network_pool/block_sink.mli b/src/lib/network_pool/block_sink.mli new file mode 100644 index 000000000000..32c43e589f71 --- /dev/null +++ b/src/lib/network_pool/block_sink.mli @@ -0,0 +1,18 @@ +open Network_peer +open Mina_transition + +include + Mina_net2.Sink.S_with_void + with type msg := + External_transition.t Envelope.Incoming.t + * Mina_net2.Validation_callback.t + +val create : + logger:Logger.t + -> slot_duration_ms:Block_time.Span.t + -> time_controller:Block_time.Controller.t + -> ( External_transition.t Envelope.Incoming.t + * Block_time.t + * Mina_net2.Validation_callback.t ) + Pipe_lib.Strict_pipe.Reader.t + * t diff --git a/src/lib/network_pool/intf.ml b/src/lib/network_pool/intf.ml index 029bb8e9a43b..2dd31873348f 100644 --- a/src/lib/network_pool/intf.ml +++ b/src/lib/network_pool/intf.ml @@ -113,6 +113,18 @@ module type Resource_pool_intf = sig t -> has_timed_out:(Time.t -> [ `Timed_out | `Ok ]) -> Diff.t list end +module type Broadcast_callback = sig + type resource_pool_diff + + type rejected_diff + + type t = + | Local of ((resource_pool_diff * rejected_diff) Or_error.t -> unit) + | External of Mina_net2.Validation_callback.t + + val drop : resource_pool_diff -> rejected_diff -> t -> unit Deferred.t +end + (** A [Network_pool_base_intf] is the core implementation of a * network pool on top of a [Resource_pool_intf]. It wraps * some [Resource_pool_intf] and provides a generic interface @@ -138,44 +150,39 @@ module type Network_pool_base_intf = sig type transition_frontier - module Broadcast_callback : sig - type t = - | Local of ((resource_pool_diff * rejected_diff) Or_error.t -> unit) - | External of Mina_net2.Validation_callback.t - end + module Local_sink : + Mina_net2.Sink.S_with_void + with type msg := + resource_pool_diff + * ((resource_pool_diff * rejected_diff) Or_error.t -> unit) + + module Remote_sink : + Mina_net2.Sink.S_with_void + with type msg := + resource_pool_diff Envelope.Incoming.t + * Mina_net2.Validation_callback.t + + module Broadcast_callback : + Broadcast_callback + with type resource_pool_diff := resource_pool_diff + and type rejected_diff := rejected_diff val create : config:config -> constraint_constants:Genesis_constants.Constraint_constants.t -> consensus_constants:Consensus.Constants.t -> time_controller:Block_time.Controller.t - -> incoming_diffs: - ( resource_pool_diff Envelope.Incoming.t - * Mina_net2.Validation_callback.t ) - Strict_pipe.Reader.t - -> local_diffs: - ( resource_pool_diff - * ((resource_pool_diff * rejected_diff) Or_error.t -> unit) ) - Strict_pipe.Reader.t -> frontier_broadcast_pipe: transition_frontier Option.t Broadcast_pipe.Reader.t -> logger:Logger.t - -> t + -> t * Remote_sink.t * Local_sink.t val of_resource_pool_and_diffs : resource_pool -> logger:Logger.t -> constraint_constants:Genesis_constants.Constraint_constants.t - -> incoming_diffs: - ( resource_pool_diff Envelope.Incoming.t - * Mina_net2.Validation_callback.t ) - Strict_pipe.Reader.t - -> local_diffs: - ( resource_pool_diff - * ((resource_pool_diff * rejected_diff) Or_error.t -> unit) ) - Strict_pipe.Reader.t -> tf_diffs:transition_frontier_diff Strict_pipe.Reader.t - -> t + -> t * Remote_sink.t * Local_sink.t val resource_pool : t -> resource_pool diff --git a/src/lib/network_pool/network_pool_base.ml b/src/lib/network_pool/network_pool_base.ml index 73d9ed14fcf0..6af0b7e6f8c7 100644 --- a/src/lib/network_pool/network_pool_base.ml +++ b/src/lib/network_pool/network_pool_base.ml @@ -18,6 +18,10 @@ end) and type config := Resource_pool.Config.t and type rejected_diff := Resource_pool.Diff.rejected = struct module Broadcast_callback = struct + type resource_pool_diff = Resource_pool.Diff.t + + type rejected_diff = Resource_pool.Diff.rejected + type t = | Local of ( (Resource_pool.Diff.t * Resource_pool.Diff.rejected) Or_error.t @@ -63,6 +67,24 @@ end) Linear_pipe.write broadcast_pipe accepted end + module Remote_sink = + Pool_sink.Remote_sink + (struct + include Resource_pool.Diff + + type pool = Resource_pool.t + end) + (Broadcast_callback) + + module Local_sink = + Pool_sink.Local_sink + (struct + include Resource_pool.Diff + + type pool = Resource_pool.t + end) + (Broadcast_callback) + type t = { resource_pool : Resource_pool.t ; logger : Logger.t @@ -117,98 +139,15 @@ end) ~metadata:[ ("rate_limiter", Rate_limiter.summary rl) ] !"%s $rate_limiter" Resource_pool.label) - let filter_verified (type a) ~log_rate_limiter (pipe : a Strict_pipe.Reader.t) - (t : t) - ~(f : - a -> Resource_pool.Diff.t Envelope.Incoming.t * Broadcast_callback.t) : - (Resource_pool.Diff.verified Envelope.Incoming.t * Broadcast_callback.t) - Strict_pipe.Reader.t = - let r, w = - Strict_pipe.create ~name:"verified network pool diffs" - (Buffered - ( `Capacity 1024 - , `Overflow - (Call - (fun (env, cb) -> - Mina_metrics.( - Counter.inc_one - Pipe.Drop_on_overflow.verified_network_pool_diffs) ; - let diff = Envelope.Incoming.data env in - [%log' warn t.logger] - "Dropping verified diff $diff due to pipe overflow" - ~metadata: - [ ("diff", Resource_pool.Diff.verified_to_yojson diff) ] ; - Broadcast_callback.drop Resource_pool.Diff.empty - (Resource_pool.Diff.reject_overloaded_diff diff) - cb)) )) - in - let rl = create_rate_limiter () in - if log_rate_limiter then log_rate_limiter_occasionally t rl ; - (*Note: This is done asynchronously to use batch verification*) - Strict_pipe.Reader.iter_without_pushback pipe ~f:(fun d -> - trace_recurring (Resource_pool.label ^ "_verification") (fun () -> - let diff, cb = f d in - if not (Broadcast_callback.is_expired cb) then ( - let summary = - `String - (Resource_pool.Diff.summary @@ Envelope.Incoming.data diff) - in - [%log' debug t.logger] "Verifying $diff from $sender" - ~metadata: - [ ("diff", summary) - ; ("sender", Envelope.Sender.to_yojson diff.sender) - ] ; - don't_wait_for - ( match - Rate_limiter.add rl diff.sender ~now:(Time.now ()) - ~score:(Resource_pool.Diff.score diff.data) - with - | `Capacity_exceeded -> - [%log' debug t.logger] - ~metadata: - [ ("sender", Envelope.Sender.to_yojson diff.sender) - ; ("diff", summary) - ] - "exceeded capacity from $sender" ; - Broadcast_callback.error - (Error.of_string "exceeded capacity") - cb - | `Within_capacity -> ( - match%bind - Resource_pool.Diff.verify t.resource_pool diff - with - | Error err -> - [%log' debug t.logger] - "Refusing to rebroadcast $diff. Verification error: \ - $error" - ~metadata: - [ ("diff", summary) - ; ("error", Error_json.error_to_yojson err) - ] ; - (*reject incoming messages*) - Broadcast_callback.error err cb - | Ok verified_diff -> ( - [%log' debug t.logger] "Verified diff: $verified_diff" - ~metadata: - [ ( "verified_diff" - , Resource_pool.Diff.verified_to_yojson - @@ Envelope.Incoming.data verified_diff ) - ; ( "sender" - , Envelope.Sender.to_yojson - @@ Envelope.Incoming.sender verified_diff ) - ] ; - match - Strict_pipe.Writer.write w (verified_diff, cb) - with - | Some r -> - r - | None -> - Deferred.unit ) ) ) ))) - |> don't_wait_for ; - r + type wrapped_t = + | Incoming of + (Resource_pool.Diff.verified Envelope.Incoming.t * Broadcast_callback.t) + | Local of + (Resource_pool.Diff.verified Envelope.Incoming.t * Broadcast_callback.t) + | Transition_frontier_extension of Resource_pool.transition_frontier_diff let of_resource_pool_and_diffs resource_pool ~logger ~constraint_constants - ~incoming_diffs ~local_diffs ~tf_diffs = + ~tf_diffs = let read_broadcasts, write_broadcasts = Linear_pipe.create () in let network_pool = { resource_pool @@ -218,33 +157,42 @@ end) ; constraint_constants } in - (*proiority: Transition frontier diffs > local diffs > incomming diffs*) + let remote_r, remote_w = + Remote_sink.create + ~wrap:(fun m -> Incoming m) + ~unwrap:(function + | Incoming m -> m | _ -> failwith "unexpected message type") + resource_pool logger + in + let local_r, local_w = + Local_sink.create + ~wrap:(fun m -> Local m) + ~unwrap:(function + | Local m -> m | _ -> failwith "unexpected message type") + resource_pool logger + in + log_rate_limiter_occasionally network_pool + (Remote_sink.rate_limiter remote_w) ; + (*priority: Transition frontier diffs > local diffs > incomming diffs*) Strict_pipe.Reader.Merge.iter [ Strict_pipe.Reader.map tf_diffs ~f:(fun diff -> - `Transition_frontier_extension diff) - ; Strict_pipe.Reader.map - (filter_verified ~log_rate_limiter:false local_diffs network_pool - ~f:(fun (diff, cb) -> - (Envelope.Incoming.local diff, Broadcast_callback.Local cb))) - ~f:(fun d -> `Local d) - ; Strict_pipe.Reader.map - (filter_verified ~log_rate_limiter:true incoming_diffs network_pool - ~f:(fun (diff, cb) -> (diff, Broadcast_callback.External cb))) - ~f:(fun d -> `Incoming d) + Transition_frontier_extension diff) + ; remote_r + ; local_r ] ~f:(fun diff_source -> match diff_source with - | `Incoming (verified_diff, cb) -> + | Incoming ((verified_diff, cb) : Remote_sink.unwrapped_t) -> apply_and_broadcast network_pool verified_diff cb - | `Local (verified_diff, cb) -> + | Local ((verified_diff, cb) : Local_sink.unwrapped_t) -> apply_and_broadcast network_pool verified_diff cb - | `Transition_frontier_extension diff -> + | Transition_frontier_extension diff -> trace_recurring (Resource_pool.label ^ "_handle_transition_frontier_diff") (fun () -> Resource_pool.handle_transition_frontier_diff diff resource_pool)) |> Deferred.don't_wait_for ; - network_pool + (network_pool, remote_w, local_w) (* Rebroadcast locally generated pool items every 10 minutes. Do so for 50 minutes - at most 5 rebroadcasts - before giving up. @@ -294,20 +242,19 @@ end) go () let create ~config ~constraint_constants ~consensus_constants ~time_controller - ~incoming_diffs ~local_diffs ~frontier_broadcast_pipe ~logger = + ~frontier_broadcast_pipe ~logger = (*Diffs from tansition frontier extensions*) let tf_diff_reader, tf_diff_writer = Strict_pipe.( create ~name:"Network pool transition frontier diffs" Synchronous) in - let t = + let t, locals, remotes = of_resource_pool_and_diffs (Resource_pool.create ~constraint_constants ~consensus_constants ~time_controller ~config ~logger ~frontier_broadcast_pipe ~tf_diff_writer) - ~constraint_constants ~incoming_diffs ~local_diffs ~logger - ~tf_diffs:tf_diff_reader + ~constraint_constants ~logger ~tf_diffs:tf_diff_reader in don't_wait_for (rebroadcast_loop t logger) ; - t + (t, locals, remotes) end diff --git a/src/lib/network_pool/pool_sink.ml b/src/lib/network_pool/pool_sink.ml new file mode 100644 index 000000000000..47c53100d8f6 --- /dev/null +++ b/src/lib/network_pool/pool_sink.ml @@ -0,0 +1,207 @@ +open Pipe_lib +open Async_kernel +open Network_peer +open Core_kernel + +module type BC_ext = sig + include Intf.Broadcast_callback + + val is_expired : t -> bool + + val error : Error.t -> t -> unit Deferred.t +end + +module type Pool_sink = sig + include Mina_net2.Sink.S + + type unwrapped_t + + type pool + + val create : + wrap:(unwrapped_t -> 'wrapped_t) + -> unwrap:('wrapped_t -> unwrapped_t) + -> pool + -> Logger.t + -> 'wrapped_t Strict_pipe.Reader.t * t + + (* Sink that ignores all messages *) + val void : t + + val rate_limiter : t -> Rate_limiter.t +end + +module Base + (Diff : Intf.Resource_pool_diff_intf) + (BC : BC_ext + with type resource_pool_diff = Diff.t + and type rejected_diff = Diff.rejected) + (Msg : sig + type raw_msg + + type raw_callback + + val convert_callback : raw_callback -> BC.t + + val convert : raw_msg -> Diff.t Envelope.Incoming.t + end) : + Pool_sink + with type pool := Diff.pool + and type unwrapped_t = Diff.verified Envelope.Incoming.t * BC.t + and type msg := Msg.raw_msg * Msg.raw_callback = struct + type unwrapped_t = Diff.verified Envelope.Incoming.t * BC.t + + type t = + | Sink : + { writer : + ( 'w + , Strict_pipe.call Strict_pipe.buffered + , unit Deferred.t option ) + Strict_pipe.Writer.t + ; logger : Logger.t + ; rate_limiter : Rate_limiter.t + ; pool : Diff.pool + ; wrap : unwrapped_t -> 'w + } + -> t + | Void + + let on_overflow ~unwrap logger m = + match unwrap m with + | env, cb -> + Mina_metrics.( + Counter.inc_one Pipe.Drop_on_overflow.verified_network_pool_diffs) ; + let diff = Envelope.Incoming.data env in + [%log' warn logger] "Dropping verified diff $diff due to pipe overflow" + ~metadata:[ ("diff", Diff.verified_to_yojson diff) ] ; + BC.drop Diff.empty (Diff.reject_overloaded_diff diff) cb + + (* keep logger in type *) + let verify_impl resource_pool rl logger env cb : + Diff.verified Envelope.Incoming.t option Deferred.t = + (* trace_recurring (Diff.label ^ "_verification") (fun () -> *) + (* let diff, cb = f d in *) + if BC.is_expired cb then Deferred.return None + else + let summary = `String (Diff.summary @@ Envelope.Incoming.data env) in + [%log' debug logger] "Verifying $diff from $sender" + ~metadata: + [ ("diff", summary) + ; ("sender", Envelope.Sender.to_yojson env.sender) + ] ; + match + Rate_limiter.add rl env.sender ~now:(Time.now ()) + ~score:(Diff.score env.data) + with + | `Capacity_exceeded -> + [%log' debug logger] + ~metadata: + [ ("sender", Envelope.Sender.to_yojson env.sender) + ; ("diff", summary) + ] + "exceeded capacity from $sender" ; + BC.error (Error.of_string "exceeded capacity") cb >>| fun _ -> None + | `Within_capacity -> ( + match%bind Diff.verify resource_pool env with + | Error err -> + [%log' debug logger] + "Refusing to rebroadcast $diff. Verification error: $error" + ~metadata: + [ ("diff", summary) + ; ("error", Error_json.error_to_yojson err) + ] ; + (*reject incoming messages*) + BC.error err cb >>| fun _ -> None + | Ok verified_diff -> + [%log' debug logger] "Verified diff: $verified_diff" + ~metadata: + [ ( "verified_diff" + , Diff.verified_to_yojson + @@ Envelope.Incoming.data verified_diff ) + ; ( "sender" + , Envelope.Sender.to_yojson + @@ Envelope.Incoming.sender verified_diff ) + ] ; + Deferred.return (Some verified_diff) ) + + let push t (msg, cb) = + match t with + | Sink { writer = w; logger; rate_limiter = rl; pool; wrap } -> ( + let env' = Msg.convert msg in + let cb' = Msg.convert_callback cb in + match%bind verify_impl pool rl logger env' cb' with + | None -> + (* TODO log unverified? *) + Deferred.unit + | Some verified_env -> + let m' = wrap (verified_env, cb') in + Option.value ~default:Deferred.unit (Strict_pipe.Writer.write w m') + ) + | Void -> + Deferred.unit + + let create ~wrap ~unwrap pool logger = + let r, writer = + Strict_pipe.create ~name:"verified network pool diffs" + (Buffered (`Capacity 1024, `Overflow (Call (on_overflow ~unwrap logger)))) + in + + let rate_limiter = + Rate_limiter.create + ~capacity:(Diff.max_per_15_seconds, `Per (Time.Span.of_sec 15.0)) + in + (r, Sink { writer; logger; rate_limiter; pool; wrap }) + + let void = Void + + let rate_limiter = function + | Sink { rate_limiter; _ } -> + rate_limiter + | Void -> + failwith "rate_limiter method not supported for void sink" +end + +module Local_sink + (Diff : Intf.Resource_pool_diff_intf) + (BC : BC_ext + with type resource_pool_diff = Diff.t + and type rejected_diff = Diff.rejected) : + Pool_sink + with type pool := Diff.pool + and type unwrapped_t = Diff.verified Envelope.Incoming.t * BC.t + and type msg := + BC.resource_pool_diff + * ((BC.resource_pool_diff * BC.rejected_diff) Or_error.t -> unit) = + Base (Diff) (BC) + (struct + type raw_msg = BC.resource_pool_diff + + type raw_callback = + (BC.resource_pool_diff * BC.rejected_diff) Or_error.t -> unit + + let convert_callback cb = BC.Local cb + + let convert m = Envelope.Incoming.local m + end) + +module Remote_sink + (Diff : Intf.Resource_pool_diff_intf) + (BC : BC_ext + with type resource_pool_diff = Diff.t + and type rejected_diff = Diff.rejected) : + Pool_sink + with type pool := Diff.pool + and type unwrapped_t = Diff.verified Envelope.Incoming.t * BC.t + and type msg := + BC.resource_pool_diff Envelope.Incoming.t + * Mina_net2.Validation_callback.t = + Base (Diff) (BC) + (struct + type raw_msg = BC.resource_pool_diff Envelope.Incoming.t + + type raw_callback = Mina_net2.Validation_callback.t + + let convert_callback cb = BC.External cb + + let convert m = m + end) diff --git a/src/lib/network_pool/snark_pool.ml b/src/lib/network_pool/snark_pool.ml index b5d778a573f2..a30f2b97620a 100644 --- a/src/lib/network_pool/snark_pool.ml +++ b/src/lib/network_pool/snark_pool.ml @@ -104,18 +104,9 @@ module type S = sig -> constraint_constants:Genesis_constants.Constraint_constants.t -> consensus_constants:Consensus.Constants.t -> time_controller:Block_time.Controller.t - -> incoming_diffs: - ( Resource_pool.Diff.t Envelope.Incoming.t - * Mina_net2.Validation_callback.t ) - Strict_pipe.Reader.t - -> local_diffs: - ( Resource_pool.Diff.t - * ( (Resource_pool.Diff.t * Resource_pool.Diff.rejected) Or_error.t - -> unit) ) - Strict_pipe.Reader.t -> frontier_broadcast_pipe: transition_frontier option Broadcast_pipe.Reader.t - -> t Deferred.t + -> (t * Remote_sink.t * Local_sink.t) Deferred.t end module type Transition_frontier_intf = sig @@ -671,7 +662,7 @@ struct let loaded = ref false let load ~config ~logger ~constraint_constants ~consensus_constants - ~time_controller ~incoming_diffs ~local_diffs ~frontier_broadcast_pipe = + ~time_controller ~frontier_broadcast_pipe = if !loaded then failwith "Snark_pool.load should only be called once. It has been called twice." ; @@ -680,7 +671,7 @@ struct Strict_pipe.( create ~name:"Snark pool Transition frontier diffs" Synchronous) in - let%map res = + let%map pool, r_sink, l_sink = match%map Async.Reader.load_bin_prot config.Resource_pool.Config.disk_location Snark_tables.Serializable.Stable.Latest.bin_reader_t @@ -690,20 +681,19 @@ struct Resource_pool.of_serializable snark_table ~constraint_constants ~config ~logger ~frontier_broadcast_pipe in - let network_pool = + let res = of_resource_pool_and_diffs pool ~logger ~constraint_constants - ~incoming_diffs ~local_diffs ~tf_diffs:tf_diff_reader + ~tf_diffs:tf_diff_reader in Resource_pool.listen_to_frontier_broadcast_pipe frontier_broadcast_pipe pool ~tf_diff_writer ; - network_pool + res | Error _e -> create ~config ~logger ~constraint_constants ~consensus_constants - ~time_controller ~incoming_diffs ~local_diffs - ~frontier_broadcast_pipe + ~time_controller ~frontier_broadcast_pipe in - store_periodically (resource_pool res) ; - res + store_periodically (resource_pool pool) ; + (pool, r_sink, l_sink) end (* TODO: defunctor or remove monkey patching (#3731) *) @@ -819,34 +809,25 @@ let%test_module "random set test" = in let tf = Mocks.Transition_frontier.create [] in let frontier_broadcast_pipe_r, _ = Broadcast_pipe.create (Some tf) in - let incoming_diff_r, _incoming_diff_w = - Strict_pipe.(create ~name:"Snark pool test" Synchronous) + let open Deferred.Let_syntax in + let mock_pool, _r_sink, _l_sink = + Mock_snark_pool.create ~config ~logger ~constraint_constants + ~consensus_constants ~time_controller + ~frontier_broadcast_pipe:frontier_broadcast_pipe_r + (* |> *) in - let local_diff_r, _local_diff_w = - Strict_pipe.(create ~name:"Snark pool test" Synchronous) + let pool = Mock_snark_pool.resource_pool mock_pool in + (*Statements should be referenced before work for those can be included*) + let%bind () = + Mocks.Transition_frontier.refer_statements tf + (List.unzip sample_solved_work |> fst) in - let res = - let open Deferred.Let_syntax in - let resource_pool = - Mock_snark_pool.create ~config ~logger ~constraint_constants - ~consensus_constants ~time_controller - ~frontier_broadcast_pipe:frontier_broadcast_pipe_r - ~incoming_diffs:incoming_diff_r ~local_diffs:local_diff_r - |> Mock_snark_pool.resource_pool - in - (*Statements should be referenced before work for those can be included*) - let%bind () = - Mocks.Transition_frontier.refer_statements tf - (List.unzip sample_solved_work |> fst) - in - let%map () = - Deferred.List.iter sample_solved_work ~f:(fun (work, fee) -> - let%map res = apply_diff resource_pool work fee in - assert (Result.is_ok res)) - in - (resource_pool, tf) + let%map () = + Deferred.List.iter sample_solved_work ~f:(fun (work, fee) -> + let%map res = apply_diff pool work fee in + assert (Result.is_ok res)) in - res + (pool, tf) let%test_unit "serialization" = let t, _tf = @@ -1002,19 +983,12 @@ let%test_module "random set test" = let%test_unit "Work that gets fed into apply_and_broadcast will be \ received in the pool's reader" = Async.Thread_safe.block_on_async_exn (fun () -> - let pool_reader, _pool_writer = - Strict_pipe.(create ~name:"Snark pool test" Synchronous) - in - let local_reader, _local_writer = - Strict_pipe.(create ~name:"Snark pool test" Synchronous) - in let frontier_broadcast_pipe_r, _ = Broadcast_pipe.create (Some (Mocks.Transition_frontier.create [])) in - let network_pool = + let network_pool, _, _ = Mock_snark_pool.create ~config ~constraint_constants - ~consensus_constants ~time_controller ~incoming_diffs:pool_reader - ~local_diffs:local_reader ~logger + ~consensus_constants ~time_controller ~logger ~frontier_broadcast_pipe:frontier_broadcast_pipe_r in let priced_proof = @@ -1077,36 +1051,29 @@ let%test_module "random set test" = } ) in let verify_unsolved_work () = - let pool_reader, pool_writer = - Strict_pipe.(create ~name:"Snark pool test" Synchronous) + (*incomming diffs*) + let%bind () = Async.Scheduler.yield_until_no_jobs_remain () in + let frontier_broadcast_pipe_r, _ = + Broadcast_pipe.create (Some (Mocks.Transition_frontier.create [])) in - let local_reader, local_writer = - Strict_pipe.(create ~name:"Snark pool test" Synchronous) + let network_pool, remote_sink, local_sink = + Mock_snark_pool.create ~logger ~config ~constraint_constants + ~consensus_constants ~time_controller + ~frontier_broadcast_pipe:frontier_broadcast_pipe_r in - (*incomming diffs*) List.map (List.take works per_reader) ~f:create_work |> List.map ~f:(fun work -> ( Envelope.Incoming.local work , Mina_net2.Validation_callback.create_without_expiration () )) |> List.iter ~f:(fun diff -> - Strict_pipe.Writer.write pool_writer diff + Mock_snark_pool.Remote_sink.push remote_sink diff |> Deferred.don't_wait_for) ; (* locally generated diffs *) List.map (List.drop works per_reader) ~f:create_work |> List.iter ~f:(fun diff -> - Strict_pipe.Writer.write local_writer (diff, Fn.const ()) + Mock_snark_pool.Local_sink.push local_sink (diff, Fn.const ()) |> Deferred.don't_wait_for) ; - let%bind () = Async.Scheduler.yield_until_no_jobs_remain () in - let frontier_broadcast_pipe_r, _ = - Broadcast_pipe.create (Some (Mocks.Transition_frontier.create [])) - in - let network_pool = - Mock_snark_pool.create ~logger ~config ~constraint_constants - ~consensus_constants ~time_controller - ~incoming_diffs:pool_reader ~local_diffs:local_reader - ~frontier_broadcast_pipe:frontier_broadcast_pipe_r - in don't_wait_for @@ Linear_pipe.iter (Mock_snark_pool.broadcasts network_pool) ~f:(fun work_command -> @@ -1128,12 +1095,6 @@ let%test_module "random set test" = verify_unsolved_work ()) let%test_unit "rebroadcast behavior" = - let pool_reader, _pool_writer = - Strict_pipe.(create ~name:"Snark pool test" Synchronous) - in - let local_reader, _local_writer = - Strict_pipe.(create ~name:"Snark pool test" Synchronous) - in let tf = Mocks.Transition_frontier.create [] in let frontier_broadcast_pipe_r, _w = Broadcast_pipe.create (Some tf) in let stmt1, stmt2, stmt3, stmt4 = @@ -1178,10 +1139,9 @@ let%test_module "random set test" = in Async.Thread_safe.block_on_async_exn (fun () -> let open Deferred.Let_syntax in - let network_pool = + let network_pool, _, _ = Mock_snark_pool.create ~logger:(Logger.null ()) ~config ~constraint_constants ~consensus_constants ~time_controller - ~incoming_diffs:pool_reader ~local_diffs:local_reader ~frontier_broadcast_pipe:frontier_broadcast_pipe_r in let resource_pool = Mock_snark_pool.resource_pool network_pool in diff --git a/src/lib/network_pool/snark_pool.mli b/src/lib/network_pool/snark_pool.mli index f9929adddc5a..dfea1affe4e7 100644 --- a/src/lib/network_pool/snark_pool.mli +++ b/src/lib/network_pool/snark_pool.mli @@ -1,6 +1,5 @@ open Async_kernel open Pipe_lib -open Network_peer open Core_kernel module type S = sig @@ -45,18 +44,9 @@ module type S = sig -> constraint_constants:Genesis_constants.Constraint_constants.t -> consensus_constants:Consensus.Constants.t -> time_controller:Block_time.Controller.t - -> incoming_diffs: - ( Resource_pool.Diff.t Envelope.Incoming.t - * Mina_net2.Validation_callback.t ) - Strict_pipe.Reader.t - -> local_diffs: - ( Resource_pool.Diff.t - * ( (Resource_pool.Diff.t * Resource_pool.Diff.rejected) Or_error.t - -> unit) ) - Strict_pipe.Reader.t -> frontier_broadcast_pipe: transition_frontier option Broadcast_pipe.Reader.t - -> t Deferred.t + -> (t * Remote_sink.t * Local_sink.t) Deferred.t end module type Transition_frontier_intf = sig diff --git a/src/lib/network_pool/test.ml b/src/lib/network_pool/test.ml index 8fe995ab6ce2..2a2a5c738ef9 100644 --- a/src/lib/network_pool/test.ml +++ b/src/lib/network_pool/test.ml @@ -38,12 +38,6 @@ let%test_module "network pool test" = let%test_unit "Work that gets fed into apply_and_broadcast will be \ received in the pool's reader" = - let pool_reader, _pool_writer = - Strict_pipe.(create ~name:"Network pool test" Synchronous) - in - let local_reader, _local_writer = - Strict_pipe.(create ~name:"Network pool test" Synchronous) - in let tf = Mocks.Transition_frontier.create [] in let frontier_broadcast_pipe_r, _ = Broadcast_pipe.create (Some tf) in let work = @@ -61,10 +55,9 @@ let%test_module "network pool test" = } in Async.Thread_safe.block_on_async_exn (fun () -> - let network_pool = + let network_pool, _, _ = Mock_snark_pool.create ~config ~logger ~constraint_constants - ~consensus_constants ~time_controller ~incoming_diffs:pool_reader - ~local_diffs:local_reader + ~consensus_constants ~time_controller ~frontier_broadcast_pipe:frontier_broadcast_pipe_r in let%bind () = @@ -114,32 +107,25 @@ let%test_module "network pool test" = } ) in let verify_unsolved_work () = - let pool_reader, pool_writer = - Strict_pipe.(create ~name:"Network pool test" Synchronous) - in - let local_reader, local_writer = - Strict_pipe.(create ~name:"Network pool test" Synchronous) + let%bind () = Async.Scheduler.yield_until_no_jobs_remain () in + let tf = Mocks.Transition_frontier.create [] in + let frontier_broadcast_pipe_r, _ = Broadcast_pipe.create (Some tf) in + let network_pool, remote_sink, local_sink = + Mock_snark_pool.create ~config ~logger ~constraint_constants + ~consensus_constants ~time_controller + ~frontier_broadcast_pipe:frontier_broadcast_pipe_r in List.map (List.take works per_reader) ~f:create_work |> List.map ~f:(fun work -> ( Envelope.Incoming.local work , Mina_net2.Validation_callback.create_without_expiration () )) |> List.iter ~f:(fun diff -> - Strict_pipe.Writer.write pool_writer diff + Mock_snark_pool.Remote_sink.push remote_sink diff |> Deferred.don't_wait_for) ; List.map (List.drop works per_reader) ~f:create_work |> List.iter ~f:(fun diff -> - Strict_pipe.Writer.write local_writer (diff, Fn.const ()) + Mock_snark_pool.Local_sink.push local_sink (diff, Fn.const ()) |> Deferred.don't_wait_for) ; - let%bind () = Async.Scheduler.yield_until_no_jobs_remain () in - let tf = Mocks.Transition_frontier.create [] in - let frontier_broadcast_pipe_r, _ = Broadcast_pipe.create (Some tf) in - let network_pool = - Mock_snark_pool.create ~config ~logger ~constraint_constants - ~consensus_constants ~time_controller ~incoming_diffs:pool_reader - ~local_diffs:local_reader - ~frontier_broadcast_pipe:frontier_broadcast_pipe_r - in let%bind () = Mocks.Transition_frontier.refer_statements tf works in don't_wait_for @@ Linear_pipe.iter (Mock_snark_pool.broadcasts network_pool) diff --git a/src/lib/network_pool/transaction_pool.ml b/src/lib/network_pool/transaction_pool.ml index f43effcd7b88..c5dfc30a27fa 100644 --- a/src/lib/network_pool/transaction_pool.ml +++ b/src/lib/network_pool/transaction_pool.ml @@ -1542,22 +1542,15 @@ let%test_module _ = let setup_test () = let tf, best_tip_diff_w = Mock_transition_frontier.create () in let tf_pipe_r, _tf_pipe_w = Broadcast_pipe.create @@ Some tf in - let incoming_diff_r, _incoming_diff_w = - Strict_pipe.(create ~name:"Transaction pool test" Synchronous) - in - let local_diff_r, _local_diff_w = - Strict_pipe.(create ~name:"Transaction pool test" Synchronous) - in let trust_system = Trust_system.null () in let config = Test.Resource_pool.make_config ~trust_system ~pool_max_size ~verifier in - let pool = + let pool_, _, _ = Test.create ~config ~logger ~constraint_constants ~consensus_constants - ~time_controller ~incoming_diffs:incoming_diff_r - ~local_diffs:local_diff_r ~frontier_broadcast_pipe:tf_pipe_r - |> Test.resource_pool + ~time_controller ~frontier_broadcast_pipe:tf_pipe_r in + let pool = Test.resource_pool pool_ in let%map () = Async.Scheduler.yield () in ( (fun txs -> Indexed_pool.For_tests.assert_invariants pool.pool ; @@ -2001,24 +1994,17 @@ let%test_module _ = Thread_safe.block_on_async_exn (fun () -> (* Set up initial frontier *) let frontier_pipe_r, frontier_pipe_w = Broadcast_pipe.create None in - let incoming_diff_r, _incoming_diff_w = - Strict_pipe.(create ~name:"Transaction pool test" Synchronous) - in - let local_diff_r, _local_diff_w = - Strict_pipe.(create ~name:"Transaction pool test" Synchronous) - in let trust_system = Trust_system.null () in let config = Test.Resource_pool.make_config ~trust_system ~pool_max_size ~verifier in - let pool = + let pool_, _, _ = Test.create ~config ~logger ~constraint_constants ~consensus_constants ~time_controller - ~incoming_diffs:incoming_diff_r ~local_diffs:local_diff_r ~frontier_broadcast_pipe:frontier_pipe_r - |> Test.resource_pool in + let pool = Test.resource_pool pool_ in let assert_pool_txs txs = [%test_eq: User_command.t List.t] ( Test.Resource_pool.transactions ~logger pool diff --git a/src/lib/transaction_inclusion_status/transaction_inclusion_status.ml b/src/lib/transaction_inclusion_status/transaction_inclusion_status.ml index 78c37c2be32a..f81adbf377d6 100644 --- a/src/lib/transaction_inclusion_status/transaction_inclusion_status.ml +++ b/src/lib/transaction_inclusion_status/transaction_inclusion_status.ml @@ -109,23 +109,15 @@ let%test_module "transaction_status" = ~key_gen ~nonce:(Account_nonce.of_int 1) () let create_pool ~frontier_broadcast_pipe = - let pool_reader, _ = - Strict_pipe.( - create ~name:"transaction_status incomming diff" Synchronous) - in - let local_reader, local_writer = - Strict_pipe.(create ~name:"transaction_status local diff" Synchronous) - in let config = Transaction_pool.Resource_pool.make_config ~trust_system ~pool_max_size ~verifier in - let transaction_pool = + let transaction_pool, _, local_sink = Transaction_pool.create ~config ~constraint_constants:precomputed_values.constraint_constants ~consensus_constants:precomputed_values.consensus_constants - ~time_controller ~incoming_diffs:pool_reader ~logger - ~local_diffs:local_reader ~frontier_broadcast_pipe + ~time_controller ~logger ~frontier_broadcast_pipe in don't_wait_for @@ Linear_pipe.iter (Transaction_pool.broadcasts transaction_pool) @@ -141,7 +133,7 @@ let%test_module "transaction_status" = Deferred.unit) ; (* Need to wait for transaction_pool to see the transition_frontier *) let%map () = Async.Scheduler.yield_until_no_jobs_remain () in - (transaction_pool, local_writer) + (transaction_pool, local_sink) let%test_unit "If the transition frontier currently doesn't exist, the \ status of a sent transaction will be unknown" = @@ -153,7 +145,7 @@ let%test_module "transaction_status" = create_pool ~frontier_broadcast_pipe in let%bind () = - Strict_pipe.Writer.write local_diffs_writer + Transaction_pool.Local_sink.push local_diffs_writer ([ Signed_command user_command ], Fn.const ()) in let%map () = Async.Scheduler.yield_until_no_jobs_remain () in @@ -177,7 +169,7 @@ let%test_module "transaction_status" = create_pool ~frontier_broadcast_pipe in let%bind () = - Strict_pipe.Writer.write local_diffs_writer + Transaction_pool.Local_sink.push local_diffs_writer ([ Signed_command user_command ], Fn.const ()) in let%map () = Async.Scheduler.yield_until_no_jobs_remain () in @@ -214,7 +206,7 @@ let%test_module "transaction_status" = Non_empty_list.uncons user_commands in let%bind () = - Strict_pipe.Writer.write local_diffs_writer + Transaction_pool.Local_sink.push local_diffs_writer ( List.map pool_user_commands ~f:(fun x -> User_command.Signed_command x) , Fn.const () )