From e69a772c16309a6dfb3985031b126ee371c2627f Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 24 Jan 2019 16:25:20 +1100 Subject: [PATCH] Implement #868 for gossipsub. --- protocols/gossipsub/src/handler.rs | 257 ---------------------------- protocols/gossipsub/src/layer.rs | 42 ++++- protocols/gossipsub/src/lib.rs | 2 - protocols/gossipsub/src/protocol.rs | 245 ++++++++++++++------------ 4 files changed, 168 insertions(+), 378 deletions(-) delete mode 100644 protocols/gossipsub/src/handler.rs diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs deleted file mode 100644 index d466ae8c83c..00000000000 --- a/protocols/gossipsub/src/handler.rs +++ /dev/null @@ -1,257 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//TODO: Remove this handler and replace with OneShotHandler once PR #868 is merged - -use crate::protocol::{GossipsubCodec, GossipsubRpc, ProtocolConfig}; -use futures::prelude::*; -use libp2p_core::{ - protocols_handler::ProtocolsHandlerUpgrErr, - upgrade::{InboundUpgrade, OutboundUpgrade}, - ProtocolsHandler, ProtocolsHandlerEvent, -}; -use smallvec::SmallVec; -use std::{fmt, io}; -use tokio_codec::Framed; -use tokio_io::{AsyncRead, AsyncWrite}; - -/// Protocol handler that handles communication with the remote for the gossipsub protocol. -/// -/// The handler will automatically open a substream with the remote for each request we make. -/// -/// It also handles requests made by the remote. -pub struct GossipsubHandler -where - TSubstream: AsyncRead + AsyncWrite, -{ - /// Configuration for the gossipsub protocol. - config: ProtocolConfig, - - /// If true, we are trying to shut down the existing gossipsub substream and should refuse any - /// incoming connection. - shutting_down: bool, - - /// The active substreams. - // TODO: add a limit to the number of allowed substreams - substreams: Vec>, - - /// Queue of values that we want to send to the remote. - send_queue: SmallVec<[GossipsubRpc; 16]>, -} - -/// State of an active substream, opened either by us or by the remote. -enum SubstreamState -where - TSubstream: AsyncRead + AsyncWrite, -{ - /// Waiting for a message from the remote. - WaitingInput(Framed), - /// Waiting to send a message to the remote. - PendingSend(Framed, GossipsubRpc), - /// Waiting to flush the substream so that the data arrives to the remote. - PendingFlush(Framed), - /// The substream is being closed. - Closing(Framed), -} - -impl SubstreamState -where - TSubstream: AsyncRead + AsyncWrite, -{ - /// Consumes this state and produces the substream. - fn into_substream(self) -> Framed { - match self { - SubstreamState::WaitingInput(substream) => substream, - SubstreamState::PendingSend(substream, _) => substream, - SubstreamState::PendingFlush(substream) => substream, - SubstreamState::Closing(substream) => substream, - } - } -} - -impl GossipsubHandler -where - TSubstream: AsyncRead + AsyncWrite, -{ - /// Builds a new `GossipsubHandler`. - pub fn new() -> Self { - GossipsubHandler { - config: ProtocolConfig::new(), - shutting_down: false, - substreams: Vec::new(), - send_queue: SmallVec::new(), - } - } -} - -impl ProtocolsHandler for GossipsubHandler -where - TSubstream: AsyncRead + AsyncWrite, -{ - type InEvent = GossipsubRpc; - type OutEvent = GossipsubRpc; - type Error = io::Error; - type Substream = TSubstream; - type InboundProtocol = ProtocolConfig; - type OutboundProtocol = ProtocolConfig; - type OutboundOpenInfo = GossipsubRpc; - - #[inline] - fn listen_protocol(&self) -> Self::InboundProtocol { - self.config.clone() - } - - fn inject_fully_negotiated_inbound( - &mut self, - protocol: >::Output, - ) { - if self.shutting_down { - return (); - } - self.substreams.push(SubstreamState::WaitingInput(protocol)) - } - - fn inject_fully_negotiated_outbound( - &mut self, - protocol: >::Output, - message: Self::OutboundOpenInfo, - ) { - if self.shutting_down { - return (); - } - self.substreams - .push(SubstreamState::PendingSend(protocol, message)) - } - - #[inline] - fn inject_event(&mut self, message: GossipsubRpc) { - self.send_queue.push(message); - } - - #[inline] - fn inject_inbound_closed(&mut self) {} - - #[inline] - fn inject_dial_upgrade_error( - &mut self, - _: Self::OutboundOpenInfo, - _: ProtocolsHandlerUpgrErr< - >::Error, - >, - ) { - } - - #[inline] - fn connection_keep_alive(&self) -> bool { - !self.substreams.is_empty() - } - - #[inline] - fn shutdown(&mut self) { - self.shutting_down = true; - for n in (0..self.substreams.len()).rev() { - let mut substream = self.substreams.swap_remove(n); - self.substreams - .push(SubstreamState::Closing(substream.into_substream())); - } - } - - fn poll( - &mut self, - ) -> Poll< - ProtocolsHandlerEvent, - io::Error, - > { - if !self.send_queue.is_empty() { - let message = self.send_queue.remove(0); - return Ok(Async::Ready( - ProtocolsHandlerEvent::OutboundSubstreamRequest { - info: message, - upgrade: self.config.clone(), - }, - )); - } - - for n in (0..self.substreams.len()).rev() { - let mut substream = self.substreams.swap_remove(n); - loop { - substream = match substream { - SubstreamState::WaitingInput(mut substream) => match substream.poll() { - Ok(Async::Ready(Some(message))) => { - self.substreams - .push(SubstreamState::WaitingInput(substream)); - return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(message))); - } - Ok(Async::Ready(None)) => SubstreamState::Closing(substream), - Ok(Async::NotReady) => { - self.substreams - .push(SubstreamState::WaitingInput(substream)); - return Ok(Async::NotReady); - } - Err(_) => SubstreamState::Closing(substream), - }, - SubstreamState::PendingSend(mut substream, message) => { - match substream.start_send(message)? { - AsyncSink::Ready => SubstreamState::PendingFlush(substream), - AsyncSink::NotReady(message) => { - self.substreams - .push(SubstreamState::PendingSend(substream, message)); - return Ok(Async::NotReady); - } - } - } - SubstreamState::PendingFlush(mut substream) => { - match substream.poll_complete()? { - Async::Ready(()) => SubstreamState::Closing(substream), - Async::NotReady => { - self.substreams - .push(SubstreamState::PendingFlush(substream)); - return Ok(Async::NotReady); - } - } - } - SubstreamState::Closing(mut substream) => match substream.close() { - Ok(Async::Ready(())) => break, - Ok(Async::NotReady) => { - self.substreams.push(SubstreamState::Closing(substream)); - return Ok(Async::NotReady); - } - Err(_) => return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)), - }, - } - } - } - - Ok(Async::NotReady) - } -} - -impl fmt::Debug for GossipsubHandler -where - TSubstream: AsyncRead + AsyncWrite, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - f.debug_struct("GossipsubHandler") - .field("shutting_down", &self.shutting_down) - .field("substreams", &self.substreams.len()) - .field("send_queue", &self.send_queue.len()) - .finish() - } -} diff --git a/protocols/gossipsub/src/layer.rs b/protocols/gossipsub/src/layer.rs index 105aa28331e..916f9ebcaec 100644 --- a/protocols/gossipsub/src/layer.rs +++ b/protocols/gossipsub/src/layer.rs @@ -23,16 +23,18 @@ use cuckoofilter::CuckooFilter; use futures::prelude::*; -use handler::GossipsubHandler; use libp2p_core::swarm::{ ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters, }; -use libp2p_core::{protocols_handler::ProtocolsHandler, PeerId}; +use libp2p_core::{ + protocols_handler::{OneShotHandler, ProtocolsHandler}, + PeerId, +}; use libp2p_floodsub::{Topic, TopicHash}; use mcache::MessageCache; use protocol::{ GossipsubControlAction, GossipsubMessage, GossipsubRpc, GossipsubSubscription, - GossipsubSubscriptionAction, + GossipsubSubscriptionAction, ProtocolConfig, }; use rand; use rand::{seq::SliceRandom, thread_rng}; @@ -968,11 +970,11 @@ impl NetworkBehaviour for Gossipsub; + type ProtocolsHandler = OneShotHandler; type OutEvent = GossipsubEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { - GossipsubHandler::new() + Default::default() } fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) { @@ -1071,7 +1073,12 @@ where debug_assert!(was_in.is_some()); } - fn inject_node_event(&mut self, propagation_source: PeerId, event: GossipsubRpc) { + fn inject_node_event(&mut self, propagation_source: PeerId, event: InnerMessage) { + // ignore successful sends event + let event = match event { + InnerMessage::Rx(event) => event, + InnerMessage::Sent => return, + }; // Handle subscriptions // Update connected peers topics self.handle_received_subscriptions(event.subscriptions, &propagation_source); @@ -1135,6 +1142,29 @@ where Async::NotReady } } + +/// Transmission between the `OneShotHandler` and the `GossipsubRpc`. +pub enum InnerMessage { + /// We received an RPC from a remote. + Rx(GossipsubRpc), + /// We successfully sent an RPC request. + Sent, +} + +impl From for InnerMessage { + #[inline] + fn from(rpc: GossipsubRpc) -> InnerMessage { + InnerMessage::Rx(rpc) + } +} + +impl From<()> for InnerMessage { + #[inline] + fn from(_: ()) -> InnerMessage { + InnerMessage::Sent + } +} + /// Event that can happen on the gossipsub behaviour. #[derive(Debug)] pub enum GossipsubEvent { diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index 96fcffedc2c..a5f283f6bd1 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -36,13 +36,11 @@ extern crate tokio_io; extern crate tokio_timer; extern crate unsigned_varint; -pub mod handler; pub mod protocol; mod layer; mod mcache; mod rpc_proto; -pub use self::handler::GossipsubHandler; pub use self::layer::{Gossipsub, GossipsubConfig, GossipsubEvent}; pub use self::protocol::*; diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index 7f2d876b4f3..6101655e875 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -20,18 +20,18 @@ use crate::rpc_proto; use byteorder::{BigEndian, ByteOrder}; -use bytes::{BufMut, BytesMut}; -use futures::future; +use bytes::BytesMut; +use futures::{future, stream, Future, Stream}; use libp2p_core::{InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo}; use libp2p_floodsub::TopicHash; use protobuf::Message as ProtobufMessage; use std::{io, iter}; -use tokio_codec::{Decoder, Encoder, Framed}; +use tokio_codec::{Decoder, FramedRead}; use tokio_io::{AsyncRead, AsyncWrite}; use unsigned_varint::codec; /// Implementation of the `ConnectionUpgrade` for the Gossipsub protocol. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct ProtocolConfig {} impl ProtocolConfig { @@ -54,39 +54,35 @@ impl UpgradeInfo for ProtocolConfig { impl InboundUpgrade for ProtocolConfig where - TSocket: AsyncRead + AsyncWrite, + TSocket: AsyncRead, { - type Output = Framed; + type Output = GossipsubRpc; type Error = io::Error; - type Future = future::FutureResult; + type Future = future::MapErr< + future::AndThen< + stream::StreamFuture>, + Result)>, + fn( + (Option, FramedRead), + ) + -> Result)>, + >, + fn((io::Error, FramedRead)) -> io::Error, + >; #[inline] fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future { - future::ok(Framed::new( + FramedRead::new( socket, GossipsubCodec { length_prefix: Default::default(), }, - )) - } -} - -impl OutboundUpgrade for ProtocolConfig -where - TSocket: AsyncRead + AsyncWrite, -{ - type Output = Framed; - type Error = io::Error; - type Future = future::FutureResult; - - #[inline] - fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future { - future::ok(Framed::new( - socket, - GossipsubCodec { - length_prefix: Default::default(), - }, - )) + ) + .into_future() + .and_then:: _, _>(|(val, socket)| { + val.ok_or_else(move || (io::ErrorKind::UnexpectedEof.into(), socket)) + }) + .map_err(|(err, _)| err) } } @@ -96,90 +92,6 @@ pub struct GossipsubCodec { length_prefix: codec::UviBytes, } -impl Encoder for GossipsubCodec { - type Item = GossipsubRpc; - type Error = io::Error; - - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { - let mut proto = rpc_proto::RPC::new(); - - for message in item.messages.into_iter() { - let mut msg = rpc_proto::Message::new(); - msg.set_from(message.source.into_bytes()); - msg.set_data(message.data); - msg.set_seqno(message.sequence_number); - msg.set_topicIDs( - message - .topics - .into_iter() - .map(TopicHash::into_string) - .collect(), - ); - proto.mut_publish().push(msg); - } - - for subscription in item.subscriptions.into_iter() { - let mut rpc_subscription = rpc_proto::RPC_SubOpts::new(); - rpc_subscription - .set_subscribe(subscription.action == GossipsubSubscriptionAction::Subscribe); - rpc_subscription.set_topicid(subscription.topic_hash.into_string()); - proto.mut_subscriptions().push(rpc_subscription); - } - - // gossipsub control messages - let mut control_msg = rpc_proto::ControlMessage::new(); - - for action in item.control_msgs { - match action { - // collect all ihave messages - GossipsubControlAction::IHave { - topic_hash, - message_ids, - } => { - let mut rpc_ihave = rpc_proto::ControlIHave::new(); - rpc_ihave.set_topicID(topic_hash.into_string()); - for msg_id in message_ids { - rpc_ihave.mut_messageIDs().push(msg_id); - } - control_msg.mut_ihave().push(rpc_ihave); - } - GossipsubControlAction::IWant { message_ids } => { - let mut rpc_iwant = rpc_proto::ControlIWant::new(); - for msg_id in message_ids { - rpc_iwant.mut_messageIDs().push(msg_id); - } - control_msg.mut_iwant().push(rpc_iwant); - } - GossipsubControlAction::Graft { topic_hash } => { - let mut rpc_graft = rpc_proto::ControlGraft::new(); - rpc_graft.set_topicID(topic_hash.into_string()); - control_msg.mut_graft().push(rpc_graft); - } - GossipsubControlAction::Prune { topic_hash } => { - let mut rpc_prune = rpc_proto::ControlPrune::new(); - rpc_prune.set_topicID(topic_hash.into_string()); - control_msg.mut_prune().push(rpc_prune); - } - } - } - - proto.set_control(control_msg); - - let msg_size = proto.compute_size(); - // Reserve enough space for the data and the length. The length has a maximum of 32 bits, - // which means that 5 bytes is enough for the variable-length integer. - dst.reserve(msg_size as usize + 5); - - proto - .write_length_delimited_to_writer(&mut dst.by_ref().writer()) - .expect( - "there is no situation in which the protobuf message can be invalid, and \ - writing to a BytesMut never fails as we reserved enough space beforehand", - ); - Ok(()) - } -} - impl Decoder for GossipsubCodec { type Item = GossipsubRpc; type Error = io::Error; @@ -203,7 +115,7 @@ impl Decoder for GossipsubCodec { topics: publish .take_topicIDs() .into_iter() - .map(|topic| TopicHash::from_raw(topic)) + .map(TopicHash::from_raw) .collect(), }); } @@ -285,6 +197,113 @@ pub struct GossipsubRpc { pub control_msgs: Vec, } +impl UpgradeInfo for GossipsubRpc { + type Info = &'static [u8]; + type InfoIter = iter::Once; + + #[inline] + fn protocol_info(&self) -> Self::InfoIter { + iter::once(b"/meshsub/1.0.0") + } +} + +impl OutboundUpgrade for GossipsubRpc +where + TSocket: AsyncWrite, +{ + type Output = (); + type Error = io::Error; + type Future = future::Map< + future::AndThen< + tokio_io::io::WriteAll>, + tokio_io::io::Shutdown, + fn((TSocket, Vec)) -> tokio_io::io::Shutdown, + >, + fn(TSocket) -> (), + >; + + #[inline] + fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future { + let bytes = self.into_length_delimited_bytes(); + tokio_io::io::write_all(socket, bytes) + .and_then:: _, _>(|(socket, _)| tokio_io::io::shutdown(socket)) + .map(|_| ()) + } +} + +impl GossipsubRpc { + /// Turns this `GossipsubRpc` into a message that can be sent to a substream. + fn into_length_delimited_bytes(self) -> Vec { + let mut proto = rpc_proto::RPC::new(); + + for message in self.messages.into_iter() { + let mut msg = rpc_proto::Message::new(); + msg.set_from(message.source.into_bytes()); + msg.set_data(message.data); + msg.set_seqno(message.sequence_number); + msg.set_topicIDs( + message + .topics + .into_iter() + .map(TopicHash::into_string) + .collect(), + ); + proto.mut_publish().push(msg); + } + + for subscription in self.subscriptions.into_iter() { + let mut rpc_subscription = rpc_proto::RPC_SubOpts::new(); + rpc_subscription + .set_subscribe(subscription.action == GossipsubSubscriptionAction::Subscribe); + rpc_subscription.set_topicid(subscription.topic_hash.into_string()); + proto.mut_subscriptions().push(rpc_subscription); + } + + // gossipsub control messages + let mut control_msg = rpc_proto::ControlMessage::new(); + + for action in self.control_msgs { + match action { + // collect all ihave messages + GossipsubControlAction::IHave { + topic_hash, + message_ids, + } => { + let mut rpc_ihave = rpc_proto::ControlIHave::new(); + rpc_ihave.set_topicID(topic_hash.into_string()); + for msg_id in message_ids { + rpc_ihave.mut_messageIDs().push(msg_id); + } + control_msg.mut_ihave().push(rpc_ihave); + } + GossipsubControlAction::IWant { message_ids } => { + let mut rpc_iwant = rpc_proto::ControlIWant::new(); + for msg_id in message_ids { + rpc_iwant.mut_messageIDs().push(msg_id); + } + control_msg.mut_iwant().push(rpc_iwant); + } + GossipsubControlAction::Graft { topic_hash } => { + let mut rpc_graft = rpc_proto::ControlGraft::new(); + rpc_graft.set_topicID(topic_hash.into_string()); + control_msg.mut_graft().push(rpc_graft); + } + GossipsubControlAction::Prune { topic_hash } => { + let mut rpc_prune = rpc_proto::ControlPrune::new(); + rpc_prune.set_topicID(topic_hash.into_string()); + control_msg.mut_prune().push(rpc_prune); + } + } + } + + proto.set_control(control_msg); + + proto + .write_length_delimited_to_bytes() + .expect("there is no situation in which the protobuf message can be invalid") + } +} + /// A message received by the gossipsub system. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct GossipsubMessage {