diff --git a/network-albatross/src/network.rs b/network-albatross/src/network.rs index 322189a31f..62d73f9e48 100644 --- a/network-albatross/src/network.rs +++ b/network-albatross/src/network.rs @@ -429,7 +429,7 @@ impl NetworkInterface for Network { unimplemented!() } - async fn subscribe(&self, _topic: &T) -> Box)> + Send> + async fn subscribe(&self, _topic: &T) -> Box::Id)> + Send> where T: Topic + Sync, { diff --git a/network-interface/src/network.rs b/network-interface/src/network.rs index 06321818ad..b1ca695bd5 100644 --- a/network-interface/src/network.rs +++ b/network-interface/src/network.rs @@ -71,7 +71,7 @@ pub trait Network: Send + Sync + 'static { ReceiveFromAll::new(self) } - async fn subscribe(&self, topic: &T) -> Box)> + Send> + async fn subscribe(&self, topic: &T) -> Box::Id)> + Send> where T: Topic + Sync; diff --git a/network-libp2p/src/network.rs b/network-libp2p/src/network.rs index bacd151410..0ad6558923 100644 --- a/network-libp2p/src/network.rs +++ b/network-libp2p/src/network.rs @@ -299,9 +299,8 @@ impl Network { GossipsubEvent::Message(peer_id, msg_id, msg) => { log::trace!("Received message {:?} from peer {:?}: {:?}", msg_id, peer_id, msg); for topic in msg.topics.iter() { - if let Some(output) = state.gossip_topics.get(&topic) { - // let peer = Self::get_peer(peer_id).unwrap(); - output.send((msg, peer)); + if let Some(output) = state.gossip_topics.get_mut(&topic) { + output.send((msg.clone(), peer_id.clone())).await.ok(); } else { log::warn!("Unknown topic hash: {:?}", topic); } @@ -401,7 +400,7 @@ impl NetworkInterface for Network { self.events_tx.subscribe() } - async fn subscribe(&self, topic: &T) -> Box)> + Send> + async fn subscribe(&self, topic: &T) -> Box + Send> where T: Topic + Sync, { @@ -417,9 +416,9 @@ impl NetworkInterface for Network { .await .expect("Couldn't subscribe to pubsub topic"); - Box::new(rx.map(|(msg, peer)| { - let item: ::Item = Deserialize::deserialize_from_vec(&msg.data); - (item, peer) + Box::new(rx.map(|(msg, peer_id)| { + let item: ::Item = Deserialize::deserialize_from_vec(&msg.data).unwrap(); + (item, peer_id) })) } diff --git a/network-mock/src/network.rs b/network-mock/src/network.rs index 4a8cb71ca2..65d64210ad 100644 --- a/network-mock/src/network.rs +++ b/network-mock/src/network.rs @@ -236,7 +236,7 @@ impl Network for MockNetwork { self.event_tx.subscribe() } - async fn subscribe(&self, _topic: &T) -> Box)> + Send> + async fn subscribe(&self, _topic: &T) -> Box + Send> where T: Topic + Sync, {