Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gossipsub): remove control pool #559

Merged
merged 8 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 63 additions & 99 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ use libp2p_swarm::{
THandlerOutEvent, ToSwarm,
};

use crate::gossip_promises::GossipPromises;
use crate::handler::{Handler, HandlerEvent, HandlerIn};
use crate::mcache::MessageCache;
use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty};
use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason};
use crate::protocol::SIGNING_PREFIX;
use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter};
Expand All @@ -65,6 +61,16 @@ use crate::{
config::{Config, ValidationMode},
types::RpcOut,
};
use crate::{gossip_promises::GossipPromises, types::Graft};
use crate::{
handler::{Handler, HandlerEvent, HandlerIn},
types::Prune,
};
use crate::{mcache::MessageCache, types::IWant};
use crate::{
metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty},
types::IHave,
};
use crate::{rpc_proto::proto, TopicScoreParams};
use crate::{PublishError, SubscriptionError, ValidationError};
use instant::SystemTime;
Expand Down Expand Up @@ -247,9 +253,6 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
/// Events that need to be yielded to the outside when polling.
events: VecDeque<ToSwarm<Event, HandlerIn>>,

/// Pools non-urgent control messages between heartbeats.
control_pool: HashMap<PeerId, Vec<ControlAction>>,

/// Information used for publishing messages.
publish_config: PublishConfig,

Expand Down Expand Up @@ -317,10 +320,6 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
/// Counts the number of `IWANT` that we sent the each peer since the last heartbeat.
count_sent_iwant: HashMap<PeerId, usize>,

/// Keeps track of IWANT messages that we are awaiting to send.
/// This is used to prevent sending duplicate IWANT messages for the same message.
pending_iwant_msgs: HashSet<MessageId>,

/// Short term cache for published message ids. This is used for penalizing peers sending
/// our own messages back if the messages are anonymous or use a random author.
published_message_ids: DuplicateCache<MessageId>,
Expand Down Expand Up @@ -445,7 +444,6 @@ where
Ok(Behaviour {
metrics: metrics.map(|(registry, cfg)| Metrics::new(registry, cfg)),
events: VecDeque::new(),
control_pool: HashMap::new(),
publish_config: privacy.into(),
duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
topic_peers: HashMap::new(),
Expand All @@ -471,7 +469,6 @@ where
peer_score: None,
count_received_ihave: HashMap::new(),
count_sent_iwant: HashMap::new(),
pending_iwant_msgs: HashSet::new(),
connected_peers: HashMap::new(),
published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
config,
Expand Down Expand Up @@ -1027,13 +1024,14 @@ where
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.graft(&peer_id, topic_hash.clone());
}
Self::control_pool_add(
&mut self.control_pool,
peer_id,
ControlAction::Graft {
topic_hash: topic_hash.clone(),
},
);
let sender = self
.handler_send_queues
.get_mut(&peer_id)
.expect("Peerid should exist");

sender.graft(Graft {
topic_hash: topic_hash.clone(),
});

// If the peer did not previously exist in any mesh, inform the handler
peer_added_to_mesh(
Expand Down Expand Up @@ -1061,7 +1059,7 @@ where
peer: &PeerId,
do_px: bool,
on_unsubscribe: bool,
) -> ControlAction {
) -> Prune {
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.prune(peer, topic_hash.clone());
}
Expand All @@ -1072,7 +1070,7 @@ where
}
Some(PeerKind::Gossipsub) => {
// GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
return ControlAction::Prune {
return Prune {
topic_hash: topic_hash.clone(),
peers: Vec::new(),
backoff: None,
Expand Down Expand Up @@ -1109,7 +1107,7 @@ where
// update backoff
self.backoffs.update_backoff(topic_hash, peer, backoff);

ControlAction::Prune {
Prune {
topic_hash: topic_hash.clone(),
peers,
backoff: Some(backoff.as_secs()),
Expand All @@ -1129,9 +1127,13 @@ where
// Send a PRUNE control message
tracing::debug!(%peer, "LEAVE: Sending PRUNE to peer");
let on_unsubscribe = true;
let control =
self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe);
Self::control_pool_add(&mut self.control_pool, peer, control);
let prune = self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe);
let sender = self
.handler_send_queues
.get_mut(&peer)
.expect("Peerid should exist");

sender.prune(prune);

// If the peer did not previously exist in any mesh, inform the handler
peer_removed_from_mesh(
Expand Down Expand Up @@ -1230,10 +1232,6 @@ where
return false;
}

if self.pending_iwant_msgs.contains(id) {
return false;
}

self.peer_score
.as_ref()
.map(|(_, _, _, promises)| !promises.contains(id))
Expand Down Expand Up @@ -1284,11 +1282,6 @@ where
iwant_ids_vec.truncate(iask);
*iasked += iask;

for message_id in &iwant_ids_vec {
// Add all messages to the pending list
self.pending_iwant_msgs.insert(message_id.clone());
}

if let Some((_, _, _, gossip_promises)) = &mut self.peer_score {
gossip_promises.add_promise(
*peer_id,
Expand All @@ -1302,13 +1295,14 @@ where
iwant_ids_vec
);

Self::control_pool_add(
&mut self.control_pool,
*peer_id,
ControlAction::IWant {
message_ids: iwant_ids_vec,
},
);
let sender = self
.handler_send_queues
.get_mut(peer_id)
.expect("Peerid should exist");

sender.iwant(IWant {
message_ids: iwant_ids_vec,
});
}
tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
}
Expand Down Expand Up @@ -1512,11 +1506,11 @@ where
.expect("Peerid should exist")
.clone();

for action in to_prune_topics
for prune in to_prune_topics
.iter()
.map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
{
sender.control(action);
sender.prune(prune);
}
// Send the prune messages to the peer
tracing::debug!(
Expand Down Expand Up @@ -2016,11 +2010,8 @@ where
.get_mut(propagation_source)
.expect("Peerid should exist");

for action in topics_to_graft
.into_iter()
.map(|topic_hash| ControlAction::Graft { topic_hash })
{
sender.control(action);
for topic_hash in topics_to_graft.into_iter() {
sender.graft(Graft { topic_hash });
}

// Notify the application of the subscriptions
Expand Down Expand Up @@ -2438,9 +2429,6 @@ where
self.send_graft_prune(to_graft, to_prune, no_px);
}

// piggyback pooled control messages
self.flush_control_pool();

// shift the memcache
self.mcache.shift();

Expand Down Expand Up @@ -2507,14 +2495,14 @@ where
}

// send an IHAVE message
Self::control_pool_add(
&mut self.control_pool,
peer,
ControlAction::IHave {
topic_hash: topic_hash.clone(),
message_ids: peer_message_ids,
},
);
let sender = self
.handler_send_queues
.get_mut(&peer)
.expect("Peerid should exist");
sender.ihave(IHave {
topic_hash: topic_hash.clone(),
message_ids: peer_message_ids,
});
}
}
}
Expand Down Expand Up @@ -2546,9 +2534,6 @@ where
&self.connected_peers,
);
}
let control_msgs = topics.iter().map(|topic_hash| ControlAction::Graft {
topic_hash: topic_hash.clone(),
});

// If there are prunes associated with the same peer add them.
// NOTE: In this case a peer has been added to a topic mesh, and removed from another.
Expand Down Expand Up @@ -2576,8 +2561,14 @@ where
)
});

for msg in control_msgs.chain(prunes) {
sender.control(msg);
for topic_hash in topics {
sender.graft(Graft {
topic_hash: topic_hash.clone(),
});
}

for prune in prunes {
sender.prune(prune);
}
}

Expand All @@ -2597,7 +2588,7 @@ where
.expect("Peerid should exist")
.clone();

sender.control(prune);
sender.prune(prune);

// inform the handler
peer_removed_from_mesh(
Expand Down Expand Up @@ -2776,32 +2767,6 @@ where
}
}

// adds a control action to control_pool
fn control_pool_add(
control_pool: &mut HashMap<PeerId, Vec<ControlAction>>,
peer: PeerId,
control: ControlAction,
) {
control_pool.entry(peer).or_default().push(control);
}

/// Takes each control action mapping and turns it into a message
fn flush_control_pool(&mut self) {
for (peer, controls) in self.control_pool.drain().collect::<Vec<_>>() {
for msg in controls {
let sender = self
.handler_send_queues
.get_mut(&peer)
.expect("Peerid should exist");

sender.control(msg);
}
}

// This clears all pending IWANT messages
self.pending_iwant_msgs.clear();
}

fn on_connection_established(
&mut self,
ConnectionEstablished {
Expand Down Expand Up @@ -3205,21 +3170,21 @@ where
let mut prune_msgs = vec![];
for control_msg in rpc.control_msgs {
match control_msg {
ControlAction::IHave {
ControlAction::IHave(IHave {
topic_hash,
message_ids,
} => {
}) => {
ihave_msgs.push((topic_hash, message_ids));
}
ControlAction::IWant { message_ids } => {
ControlAction::IWant(IWant { message_ids }) => {
self.handle_iwant(&propagation_source, message_ids)
}
ControlAction::Graft { topic_hash } => graft_msgs.push(topic_hash),
ControlAction::Prune {
ControlAction::Graft(Graft { topic_hash }) => graft_msgs.push(topic_hash),
ControlAction::Prune(Prune {
topic_hash,
peers,
backoff,
} => prune_msgs.push((topic_hash, peers, backoff)),
}) => prune_msgs.push((topic_hash, peers, backoff)),
}
}
if !ihave_msgs.is_empty() {
Expand Down Expand Up @@ -3442,7 +3407,6 @@ impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Behaviour<C, F
f.debug_struct("Behaviour")
.field("config", &self.config)
.field("events", &self.events.len())
.field("control_pool", &self.control_pool)
.field("publish_config", &self.publish_config)
.field("topic_peers", &self.topic_peers)
.field("peer_topics", &self.peer_topics)
Expand Down
Loading
Loading