diff --git a/gossip/benches/crds_gossip_pull.rs b/gossip/benches/crds_gossip_pull.rs index eaed9b671166ef..35be66b4bad97c 100644 --- a/gossip/benches/crds_gossip_pull.rs +++ b/gossip/benches/crds_gossip_pull.rs @@ -52,6 +52,6 @@ fn bench_build_crds_filters(bencher: &mut Bencher) { let crds = RwLock::new(crds); bencher.iter(|| { let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE); - assert_eq!(filters.len(), 128); + assert_eq!(filters.len(), 16); }); } diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index 3e69192f2ac7e9..0d8ecb258b2fc5 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -53,8 +53,6 @@ use { pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; // Retention period of hashes of received outdated values. const FAILED_INSERTS_RETENTION_MS: u64 = 20_000; -// Maximum number of pull requests to send out each time around. -const MAX_NUM_PULL_REQUESTS: usize = 1024; pub const FALSE_RATE: f64 = 0.1f64; pub const KEYS: f64 = 8f64; @@ -143,19 +141,26 @@ impl CrdsFilter { /// A vector of crds filters that together hold a complete set of Hashes. struct CrdsFilterSet { - filters: Vec>, + filters: Vec>>, mask_bits: u32, } impl CrdsFilterSet { - fn new(num_items: usize, max_bytes: usize) -> Self { + fn new(rng: &mut R, num_items: usize, max_bytes: usize) -> Self { + const SAMPLE_RATE: usize = 8; + const MAX_NUM_FILTERS: usize = 1024; let max_bits = (max_bytes * 8) as f64; let max_items = CrdsFilter::max_items(max_bits, FALSE_RATE, KEYS); let mask_bits = CrdsFilter::mask_bits(num_items as f64, max_items); - let filters = - repeat_with(|| Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize).into()) - .take(1 << mask_bits) - .collect(); + let mut filters: Vec<_> = repeat_with(|| None).take(1usize << mask_bits).collect(); + let mut indices: Vec<_> = (0..filters.len()).collect(); + let size = (filters.len() + SAMPLE_RATE - 1) / SAMPLE_RATE; + for _ in 0..MAX_NUM_FILTERS.min(size) { + let k = rng.gen_range(0..indices.len()); + let k = indices.swap_remove(k); + let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize); + filters[k] = Some(AtomicBloom::::from(filter)); + } Self { filters, mask_bits } } @@ -167,7 +172,9 @@ impl CrdsFilterSet { .unwrap_or_default(), ) .unwrap(); - self.filters[index].add(&hash_value); + if let Some(filter) = &self.filters[index] { + filter.add(&hash_value); + } } } @@ -177,10 +184,12 @@ impl From for Vec { cfs.filters .into_iter() .enumerate() - .map(|(seed, filter)| CrdsFilter { - filter: filter.into(), - mask: CrdsFilter::compute_mask(seed as u64, mask_bits), - mask_bits, + .filter_map(|(seed, filter)| { + Some(CrdsFilter { + filter: Bloom::::from(filter?), + mask: CrdsFilter::compute_mask(seed as u64, mask_bits), + mask_bits, + }) }) .collect() } @@ -269,14 +278,7 @@ impl CrdsGossipPull { if nodes.is_empty() { return Err(CrdsGossipError::NoPeers); } - let mut filters = self.build_crds_filters(thread_pool, crds, bloom_size); - if filters.len() > MAX_NUM_PULL_REQUESTS { - for i in 0..MAX_NUM_PULL_REQUESTS { - let j = rng.gen_range(i..filters.len()); - filters.swap(i, j); - } - filters.truncate(MAX_NUM_PULL_REQUESTS); - } + let filters = self.build_crds_filters(thread_pool, crds, bloom_size); // Associate each pull-request filter with a randomly selected peer. let dist = WeightedIndex::new(weights).unwrap(); let nodes = repeat_with(|| nodes[dist.sample(&mut rng)].clone()); @@ -427,7 +429,7 @@ impl CrdsGossipPull { let crds = crds.read().unwrap(); let num_items = crds.len() + crds.num_purged() + failed_inserts.len(); let num_items = MIN_NUM_BLOOM_ITEMS.max(num_items); - let filters = CrdsFilterSet::new(num_items, bloom_size); + let filters = CrdsFilterSet::new(&mut rand::thread_rng(), num_items, bloom_size); thread_pool.install(|| { crds.par_values() .with_min_len(PAR_MIN_LENGTH) @@ -673,45 +675,61 @@ pub(crate) mod tests { #[test] fn test_crds_filter_set_add() { - let crds_filter_set = - CrdsFilterSet::new(/*num_items=*/ 9672788, /*max_bytes=*/ 8196); - let hash_values: Vec<_> = repeat_with(Hash::new_unique).take(1024).collect(); + let mut rng = rand::thread_rng(); + let crds_filter_set = CrdsFilterSet::new( + &mut rng, /*num_items=*/ 59672788, /*max_bytes=*/ 8196, + ); + let hash_values: Vec<_> = repeat_with(|| { + let buf: [u8; 32] = rng.gen(); + solana_sdk::hash::hashv(&[&buf]) + }) + .take(1024) + .collect(); + assert_eq!(crds_filter_set.filters.len(), 8192); for hash_value in &hash_values { crds_filter_set.add(*hash_value); } let filters: Vec = crds_filter_set.into(); + let mut num_hits = 0; assert_eq!(filters.len(), 1024); for hash_value in hash_values { - let mut num_hits = 0; + let mut hit = false; let mut false_positives = 0; for filter in &filters { if filter.test_mask(&hash_value) { num_hits += 1; + assert!(!hit); + hit = true; assert!(filter.contains(&hash_value)); assert!(filter.filter.contains(&hash_value)); } else if filter.filter.contains(&hash_value) { false_positives += 1; } } - assert_eq!(num_hits, 1); assert!(false_positives < 5); } + assert!(num_hits > 96, "num_hits: {num_hits}"); } #[test] fn test_crds_filter_set_new() { // Validates invariances required by CrdsFilterSet::get in the // vector of filters generated by CrdsFilterSet::new. - let filters: Vec = - CrdsFilterSet::new(/*num_items=*/ 55345017, /*max_bytes=*/ 4098).into(); - assert_eq!(filters.len(), 16384); + let filters = CrdsFilterSet::new( + &mut rand::thread_rng(), + 55345017, // num_items + 4098, // max_bytes + ); + assert_eq!(filters.filters.len(), 16384); + let filters = Vec::::from(filters); + assert_eq!(filters.len(), 1024); let mask_bits = filters[0].mask_bits; let right_shift = 64 - mask_bits; let ones = !0u64 >> mask_bits; - for (i, filter) in filters.iter().enumerate() { + for filter in &filters { // Check that all mask_bits are equal. assert_eq!(mask_bits, filter.mask_bits); - assert_eq!(i as u64, filter.mask >> right_shift); + assert!((0..16384).contains(&(filter.mask >> right_shift))); assert_eq!(ones, ones & filter.mask); } } @@ -744,7 +762,7 @@ pub(crate) mod tests { let crds = RwLock::new(crds); assert!(num_inserts > 30_000, "num inserts: {num_inserts}"); let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE); - assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS.max(32)); + assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS.max(4)); let crds = crds.read().unwrap(); let purged: Vec<_> = thread_pool.install(|| crds.purged().collect()); let hash_values: Vec<_> = crds.values().map(|v| v.value_hash).chain(purged).collect(); @@ -755,21 +773,24 @@ pub(crate) mod tests { "hash_values.len(): {}", hash_values.len() ); + let mut num_hits = 0; let mut false_positives = 0; for hash_value in hash_values { - let mut num_hits = 0; + let mut hit = false; for filter in &filters { if filter.test_mask(&hash_value) { num_hits += 1; + assert!(!hit); + hit = true; assert!(filter.contains(&hash_value)); assert!(filter.filter.contains(&hash_value)); } else if filter.filter.contains(&hash_value) { false_positives += 1; } } - assert_eq!(num_hits, 1); } - assert!(false_positives < 150_000, "fp: {false_positives}"); + assert!(num_hits > 4000, "num_hits: {num_hits}"); + assert!(false_positives < 20_000, "fp: {false_positives}"); } #[test] @@ -1313,7 +1334,8 @@ pub(crate) mod tests { } #[test] fn test_crds_filter_complete_set_add_mask() { - let mut filters: Vec = CrdsFilterSet::new(1000, 10).into(); + let mut filters = + Vec::::from(CrdsFilterSet::new(&mut rand::thread_rng(), 1000, 10)); assert!(filters.iter().all(|f| f.mask_bits > 0)); let mut h: Hash = Hash::default(); // rev to make the hash::default() miss on the first few test_masks