Skip to content

Commit

Permalink
feat(gossipsub): remove fast_message_id_fn
Browse files Browse the repository at this point in the history
Resolves #4138.

Pull-Request: #4285.
  • Loading branch information
StemCll authored Oct 20, 2023
1 parent 0181e86 commit a044b88
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 217 deletions.
2 changes: 2 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## 0.46.0 - unreleased

- Remove `fast_message_id_fn` mechanism from `Config`.
See [PR 4285](https://github.com/libp2p/rust-libp2p/pull/4285).
- Remove deprecated `gossipsub::Config::idle_timeout` in favor of `SwarmBuilder::idle_connection_timeout`.
See [PR 4642](https://github.com/libp2p/rust-libp2p/pull/4642).
- Return typed error from config builder.
Expand Down
58 changes: 9 additions & 49 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty
use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason};
use crate::protocol::SIGNING_PREFIX;
use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter};
use crate::time_cache::{DuplicateCache, TimeCache};
use crate::time_cache::DuplicateCache;
use crate::topic::{Hasher, Topic, TopicHash};
use crate::transform::{DataTransform, IdentityTransform};
use crate::types::{
ControlAction, FastMessageId, Message, MessageAcceptance, MessageId, PeerInfo, RawMessage,
Subscription, SubscriptionAction,
ControlAction, Message, MessageAcceptance, MessageId, PeerInfo, RawMessage, Subscription,
SubscriptionAction,
};
use crate::types::{PeerConnections, PeerKind, Rpc};
use crate::{rpc_proto::proto, TopicScoreParams};
Expand Down Expand Up @@ -323,9 +323,6 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
/// our own messages back if the messages are anonymous or use a random author.
published_message_ids: DuplicateCache<MessageId>,

/// Short term cache for fast message ids mapping them to the real message ids
fast_message_id_cache: TimeCache<FastMessageId, MessageId>,

/// The filter used to handle message subscriptions.
subscription_filter: F,

Expand Down Expand Up @@ -446,7 +443,6 @@ where
control_pool: HashMap::new(),
publish_config: privacy.into(),
duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
fast_message_id_cache: TimeCache::new(config.duplicate_cache_time()),
topic_peers: HashMap::new(),
peer_topics: HashMap::new(),
explicit_peers: HashSet::new(),
Expand Down Expand Up @@ -1755,31 +1751,6 @@ where
metrics.msg_recvd_unfiltered(&raw_message.topic, raw_message.raw_protobuf_len());
}

let fast_message_id = self.config.fast_message_id(&raw_message);

if let Some(fast_message_id) = fast_message_id.as_ref() {
if let Some(msg_id) = self.fast_message_id_cache.get(fast_message_id) {
let msg_id = msg_id.clone();
// Report the duplicate
if self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.duplicated_message(
propagation_source,
&msg_id,
&raw_message.topic,
);
}
// Update the cache, informing that we have received a duplicate from another peer.
// The peers in this cache are used to prevent us forwarding redundant messages onto
// these peers.
self.mcache.observe_duplicate(&msg_id, propagation_source);
}

// This message has been seen previously. Ignore it
return;
}
}

// Try and perform the data transform to the message. If it fails, consider it invalid.
let message = match self.data_transform.inbound_transform(raw_message.clone()) {
Ok(message) => message,
Expand All @@ -1805,14 +1776,6 @@ where
return;
}

// Add the message to the duplicate caches
if let Some(fast_message_id) = fast_message_id {
// add id to cache
self.fast_message_id_cache
.entry(fast_message_id)
.or_insert_with(|| msg_id.clone());
}

if !self.duplicate_cache.insert(msg_id.clone()) {
debug!("Message already received, ignoring. Message: {}", msg_id);
if let Some((peer_score, ..)) = &mut self.peer_score {
Expand Down Expand Up @@ -1887,20 +1850,17 @@ where
metrics.register_invalid_message(&raw_message.topic);
}

let fast_message_id_cache = &self.fast_message_id_cache;
if let Ok(message) = self.data_transform.inbound_transform(raw_message.clone()) {
let message_id = self.config.message_id(&message);

if let Some(msg_id) = self
.config
.fast_message_id(raw_message)
.and_then(|id| fast_message_id_cache.get(&id))
{
peer_score.reject_message(
propagation_source,
msg_id,
&raw_message.topic,
&message_id,
&message.topic,
reject_reason,
);
gossip_promises.reject_message(msg_id, &reject_reason);

gossip_promises.reject_message(&message_id, &reject_reason);
} else {
// The message is invalid, we reject it ignoring any gossip promises. If a peer is
// advertising this message via an IHAVE and it's invalid it will be double
Expand Down
87 changes: 1 addition & 86 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,12 @@ use super::*;
use crate::protocol::ProtocolConfig;
use crate::subscription_filter::WhitelistSubscriptionFilter;
use crate::transform::{DataTransform, IdentityTransform};
use crate::types::FastMessageId;
use crate::ValidationError;
use crate::{
config::Config, config::ConfigBuilder, IdentTopic as Topic, Message, TopicScoreParams,
};
use crate::{config::Config, config::ConfigBuilder, IdentTopic as Topic, TopicScoreParams};
use async_std::net::Ipv4Addr;
use byteorder::{BigEndian, ByteOrder};
use libp2p_core::{ConnectedPoint, Endpoint};
use rand::Rng;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::thread::sleep;
use std::time::Duration;

Expand Down Expand Up @@ -5064,86 +5059,6 @@ fn test_public_api() {
);
}

#[test]
fn test_msg_id_fn_only_called_once_with_fast_message_ids() {
struct Pointers {
slow_counter: u32,
fast_counter: u32,
}

let mut counters = Pointers {
slow_counter: 0,
fast_counter: 0,
};

let counters_pointer: *mut Pointers = &mut counters;

let counters_address = counters_pointer as u64;

macro_rules! get_counters_pointer {
($m: expr) => {{
let mut address_bytes: [u8; 8] = Default::default();
address_bytes.copy_from_slice($m.as_slice());
let address = u64::from_be_bytes(address_bytes);
address as *mut Pointers
}};
}

macro_rules! get_counters_and_hash {
($m: expr) => {{
let mut hasher = DefaultHasher::new();
$m.hash(&mut hasher);
let id = hasher.finish().to_be_bytes().into();
(id, get_counters_pointer!($m))
}};
}

let message_id_fn = |m: &Message| -> MessageId {
let (mut id, counters_pointer): (MessageId, *mut Pointers) =
get_counters_and_hash!(&m.data);
unsafe {
(*counters_pointer).slow_counter += 1;
}
id.0.reverse();
id
};
let fast_message_id_fn = |m: &RawMessage| -> FastMessageId {
let (id, counters_pointer) = get_counters_and_hash!(&m.data);
unsafe {
(*counters_pointer).fast_counter += 1;
}
id
};
let config = ConfigBuilder::default()
.message_id_fn(message_id_fn)
.fast_message_id_fn(fast_message_id_fn)
.build()
.unwrap();
let (mut gs, _, topic_hashes) = inject_nodes1()
.peer_no(0)
.topics(vec![String::from("topic1")])
.to_subscribe(true)
.gs_config(config)
.create_network();

let message = RawMessage {
source: None,
data: counters_address.to_be_bytes().to_vec(),
sequence_number: None,
topic: topic_hashes[0].clone(),
signature: None,
key: None,
validated: true,
};

for _ in 0..5 {
gs.handle_received_message(message.clone(), &PeerId::random());
}

assert_eq!(counters.fast_counter, 5);
assert_eq!(counters.slow_counter, 1);
}

#[test]
fn test_subscribe_to_invalid_topic() {
let t1 = Topic::new("t1");
Expand Down
34 changes: 1 addition & 33 deletions protocols/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::time::Duration;

use crate::error::ConfigBuilderError;
use crate::protocol::{ProtocolConfig, ProtocolId, FLOODSUB_PROTOCOL};
use crate::types::{FastMessageId, Message, MessageId, PeerKind, RawMessage};
use crate::types::{Message, MessageId, PeerKind};

use libp2p_identity::PeerId;
use libp2p_swarm::StreamProtocol;
Expand Down Expand Up @@ -78,7 +78,6 @@ pub struct Config {
duplicate_cache_time: Duration,
validate_messages: bool,
message_id_fn: Arc<dyn Fn(&Message) -> MessageId + Send + Sync + 'static>,
fast_message_id_fn: Option<Arc<dyn Fn(&RawMessage) -> FastMessageId + Send + Sync + 'static>>,
allow_self_origin: bool,
do_px: bool,
prune_peers: usize,
Expand Down Expand Up @@ -218,20 +217,6 @@ impl Config {
(self.message_id_fn)(message)
}

/// A user-defined optional function that computes fast ids from raw messages. This can be used
/// to avoid possibly expensive transformations from [`RawMessage`] to
/// [`Message`] for duplicates. Two semantically different messages must always
/// have different fast message ids, but it is allowed that two semantically identical messages
/// have different fast message ids as long as the message_id_fn produces the same id for them.
///
/// The function takes a [`RawMessage`] as input and outputs a String to be
/// interpreted as the fast message id. Default is None.
pub fn fast_message_id(&self, message: &RawMessage) -> Option<FastMessageId> {
self.fast_message_id_fn
.as_ref()
.map(|fast_message_id_fn| fast_message_id_fn(message))
}

/// By default, gossipsub will reject messages that are sent to us that have the same message
/// source as we have specified locally. Enabling this, allows these messages and prevents
/// penalizing the peer that sent us the message. Default is false.
Expand Down Expand Up @@ -415,7 +400,6 @@ impl Default for ConfigBuilder {
.push_str(&message.sequence_number.unwrap_or_default().to_string());
MessageId::from(source_string)
}),
fast_message_id_fn: None,
allow_self_origin: false,
do_px: false,
prune_peers: 0, // NOTE: Increasing this currently has little effect until Signed records are implemented.
Expand Down Expand Up @@ -634,22 +618,6 @@ impl ConfigBuilder {
self
}

/// A user-defined optional function that computes fast ids from raw messages. This can be used
/// to avoid possibly expensive transformations from [`RawMessage`] to
/// [`Message`] for duplicates. Two semantically different messages must always
/// have different fast message ids, but it is allowed that two semantically identical messages
/// have different fast message ids as long as the message_id_fn produces the same id for them.
///
/// The function takes a [`Message`] as input and outputs a String to be interpreted
/// as the fast message id. Default is None.
pub fn fast_message_id_fn<F>(&mut self, fast_id_fn: F) -> &mut Self
where
F: Fn(&RawMessage) -> FastMessageId + Send + Sync + 'static,
{
self.config.fast_message_id_fn = Some(Arc::new(fast_id_fn));
self
}

/// Enables Peer eXchange. This should be enabled in bootstrappers and other well
/// connected/trusted nodes. The default is false.
///
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ pub use self::subscription_filter::{
};
pub use self::topic::{Hasher, Topic, TopicHash};
pub use self::transform::{DataTransform, IdentityTransform};
pub use self::types::{FastMessageId, Message, MessageAcceptance, MessageId, RawMessage, Rpc};
pub use self::types::{Message, MessageAcceptance, MessageId, RawMessage, Rpc};

pub type IdentTopic = Topic<self::topic::IdentityHash>;
pub type Sha256Topic = Topic<self::topic::Sha256Hash>;
10 changes: 0 additions & 10 deletions protocols/gossipsub/src/time_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,6 @@ impl<'a, K: 'a, V: 'a> Entry<'a, K, V>
where
K: Eq + std::hash::Hash + Clone,
{
pub(crate) fn or_insert_with<F: FnOnce() -> V>(self, default: F) -> &'a mut V {
match self {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => entry.insert(default()),
}
}
pub(crate) fn or_default(self) -> &'a mut V
where
V: Default,
Expand Down Expand Up @@ -159,10 +153,6 @@ where
pub(crate) fn contains_key(&self, key: &Key) -> bool {
self.map.contains_key(key)
}

pub(crate) fn get(&self, key: &Key) -> Option<&Value> {
self.map.get(key).map(|e| &e.element)
}
}

pub(crate) struct DuplicateCache<Key>(TimeCache<Key, ()>);
Expand Down
60 changes: 22 additions & 38 deletions protocols/gossipsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,49 +43,33 @@ pub enum MessageAcceptance {
Ignore,
}

/// Macro for declaring message id types
macro_rules! declare_message_id_type {
($name: ident, $name_string: expr) => {
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct $name(pub Vec<u8>);

impl $name {
pub fn new(value: &[u8]) -> Self {
Self(value.to_vec())
}
}

impl<T: Into<Vec<u8>>> From<T> for $name {
fn from(value: T) -> Self {
Self(value.into())
}
}
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct MessageId(pub Vec<u8>);

impl std::fmt::Display for $name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", hex_fmt::HexFmt(&self.0))
}
}
impl MessageId {
pub fn new(value: &[u8]) -> Self {
Self(value.to_vec())
}
}

impl std::fmt::Debug for $name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}({})", $name_string, hex_fmt::HexFmt(&self.0))
}
}
};
impl<T: Into<Vec<u8>>> From<T> for MessageId {
fn from(value: T) -> Self {
Self(value.into())
}
}

// A type for gossipsub message ids.
declare_message_id_type!(MessageId, "MessageId");
impl std::fmt::Display for MessageId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", hex_fmt::HexFmt(&self.0))
}
}

// A type for gossipsub fast messsage ids, not to confuse with "real" message ids.
//
// A fast-message-id is an optional message_id that can be used to filter duplicates quickly. On
// high intensive networks with lots of messages, where the message_id is based on the result of
// decompressed traffic, it is beneficial to specify a `fast-message-id` that can identify and
// filter duplicates quickly without performing the overhead of decompression.
declare_message_id_type!(FastMessageId, "FastMessageId");
impl std::fmt::Debug for MessageId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "MessageId({})", hex_fmt::HexFmt(&self.0))
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct PeerConnections {
Expand Down

0 comments on commit a044b88

Please sign in to comment.