Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Gossipsub] Prevent non-published messages being added to caches #1930

Merged
merged 2 commits into from
Jan 20, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 16 additions & 18 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,15 +591,11 @@ where

// check that the size doesn't exceed the max transmission size
if event.encoded_len() > self.config.max_transmit_size() {
// NOTE: The size limit can be reached by excessive topics or an excessive message.
// This is an estimate that should be within 10% of the true encoded value. It is
// possible to have a message that exceeds the RPC limit and is not caught here. A
// warning log will be emitted in this case.
return Err(PublishError::MessageTooLarge);
}

// Add published message to the duplicate cache.
if !self.duplicate_cache.insert(msg_id.clone()) {
// Check the if the message has been published before
if self.duplicate_cache.contains(&msg_id) {
// This message has already been seen. We don't re-publish messages that have already
// been published on the network.
warn!(
Expand All @@ -609,24 +605,13 @@ where
return Err(PublishError::Duplicate);
}

// If the message isn't a duplicate add it to the memcache.
self.mcache.put(&msg_id, raw_message.clone());

debug!("Publishing message: {:?}", msg_id);

// If the message is anonymous or has a random author add it to the published message ids
// cache.
if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config {
if !self.config.allow_self_origin() {
self.published_message_ids.insert(msg_id.clone());
}
}

let topic_hash = raw_message.topic.clone();

// If we are not flood publishing forward the message to mesh peers.
let mesh_peers_sent =
!self.config.flood_publish() && self.forward_msg(&msg_id, raw_message, None)?;
!self.config.flood_publish() && self.forward_msg(&msg_id, raw_message.clone(), None)?;

let mut recipient_peers = HashSet::new();
if let Some(set) = self.topic_peers.get(&topic_hash) {
Expand Down Expand Up @@ -702,6 +687,19 @@ where
return Err(PublishError::InsufficientPeers);
}

// If the message isn't a duplicate and we have sent it to some peers add it to the
// duplicate cache and memcache.
self.duplicate_cache.insert(msg_id.clone());
self.mcache.put(&msg_id, raw_message);

// If the message is anonymous or has a random author add it to the published message ids
// cache.
if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config {
if !self.config.allow_self_origin() {
self.published_message_ids.insert(msg_id.clone());
}
}

// Send to peers we know are subscribed to the topic.
for peer_id in recipient_peers.iter() {
debug!("Sending message to peer: {:?}", peer_id);
Expand Down