This repository has been archived by the owner on Jan 22, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4.6k
down samples outgoing gossip pull requests #33719
Merged
behzadnouri
merged 1 commit into
solana-labs:master
from
behzadnouri:reduce-pull-packets-early
Oct 18, 2023
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,8 +53,6 @@ use { | |
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; | ||
// Retention period of hashes of received outdated values. | ||
const FAILED_INSERTS_RETENTION_MS: u64 = 20_000; | ||
// Maximum number of pull requests to send out each time around. | ||
const MAX_NUM_PULL_REQUESTS: usize = 1024; | ||
pub const FALSE_RATE: f64 = 0.1f64; | ||
pub const KEYS: f64 = 8f64; | ||
|
||
|
@@ -143,19 +141,26 @@ impl CrdsFilter { | |
|
||
/// A vector of crds filters that together hold a complete set of Hashes. | ||
struct CrdsFilterSet { | ||
filters: Vec<AtomicBloom<Hash>>, | ||
filters: Vec<Option<AtomicBloom<Hash>>>, | ||
mask_bits: u32, | ||
} | ||
|
||
impl CrdsFilterSet { | ||
fn new(num_items: usize, max_bytes: usize) -> Self { | ||
fn new<R: Rng>(rng: &mut R, num_items: usize, max_bytes: usize) -> Self { | ||
const SAMPLE_RATE: usize = 8; | ||
const MAX_NUM_FILTERS: usize = 1024; | ||
let max_bits = (max_bytes * 8) as f64; | ||
let max_items = CrdsFilter::max_items(max_bits, FALSE_RATE, KEYS); | ||
let mask_bits = CrdsFilter::mask_bits(num_items as f64, max_items); | ||
let filters = | ||
repeat_with(|| Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize).into()) | ||
.take(1 << mask_bits) | ||
.collect(); | ||
let mut filters: Vec<_> = repeat_with(|| None).take(1usize << mask_bits).collect(); | ||
let mut indices: Vec<_> = (0..filters.len()).collect(); | ||
let size = (filters.len() + SAMPLE_RATE - 1) / SAMPLE_RATE; | ||
for _ in 0..MAX_NUM_FILTERS.min(size) { | ||
let k = rng.gen_range(0..indices.len()); | ||
let k = indices.swap_remove(k); | ||
let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize); | ||
filters[k] = Some(AtomicBloom::<Hash>::from(filter)); | ||
} | ||
Comment on lines
+155
to
+163
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are you just sending fewer bloom filters to get populated by your neighbor? now
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, that is basically what the commit is doing. |
||
Self { filters, mask_bits } | ||
} | ||
|
||
|
@@ -167,7 +172,9 @@ impl CrdsFilterSet { | |
.unwrap_or_default(), | ||
) | ||
.unwrap(); | ||
self.filters[index].add(&hash_value); | ||
if let Some(filter) = &self.filters[index] { | ||
filter.add(&hash_value); | ||
} | ||
} | ||
} | ||
|
||
|
@@ -177,10 +184,12 @@ impl From<CrdsFilterSet> for Vec<CrdsFilter> { | |
cfs.filters | ||
.into_iter() | ||
.enumerate() | ||
.map(|(seed, filter)| CrdsFilter { | ||
filter: filter.into(), | ||
mask: CrdsFilter::compute_mask(seed as u64, mask_bits), | ||
mask_bits, | ||
.filter_map(|(seed, filter)| { | ||
Some(CrdsFilter { | ||
filter: Bloom::<Hash>::from(filter?), | ||
mask: CrdsFilter::compute_mask(seed as u64, mask_bits), | ||
mask_bits, | ||
}) | ||
}) | ||
.collect() | ||
} | ||
|
@@ -269,14 +278,7 @@ impl CrdsGossipPull { | |
if nodes.is_empty() { | ||
return Err(CrdsGossipError::NoPeers); | ||
} | ||
let mut filters = self.build_crds_filters(thread_pool, crds, bloom_size); | ||
if filters.len() > MAX_NUM_PULL_REQUESTS { | ||
for i in 0..MAX_NUM_PULL_REQUESTS { | ||
let j = rng.gen_range(i..filters.len()); | ||
filters.swap(i, j); | ||
} | ||
filters.truncate(MAX_NUM_PULL_REQUESTS); | ||
} | ||
let filters = self.build_crds_filters(thread_pool, crds, bloom_size); | ||
// Associate each pull-request filter with a randomly selected peer. | ||
let dist = WeightedIndex::new(weights).unwrap(); | ||
let nodes = repeat_with(|| nodes[dist.sample(&mut rng)].clone()); | ||
|
@@ -427,7 +429,7 @@ impl CrdsGossipPull { | |
let crds = crds.read().unwrap(); | ||
let num_items = crds.len() + crds.num_purged() + failed_inserts.len(); | ||
let num_items = MIN_NUM_BLOOM_ITEMS.max(num_items); | ||
let filters = CrdsFilterSet::new(num_items, bloom_size); | ||
let filters = CrdsFilterSet::new(&mut rand::thread_rng(), num_items, bloom_size); | ||
thread_pool.install(|| { | ||
crds.par_values() | ||
.with_min_len(PAR_MIN_LENGTH) | ||
|
@@ -673,45 +675,61 @@ pub(crate) mod tests { | |
|
||
#[test] | ||
fn test_crds_filter_set_add() { | ||
let crds_filter_set = | ||
CrdsFilterSet::new(/*num_items=*/ 9672788, /*max_bytes=*/ 8196); | ||
let hash_values: Vec<_> = repeat_with(Hash::new_unique).take(1024).collect(); | ||
let mut rng = rand::thread_rng(); | ||
let crds_filter_set = CrdsFilterSet::new( | ||
&mut rng, /*num_items=*/ 59672788, /*max_bytes=*/ 8196, | ||
); | ||
let hash_values: Vec<_> = repeat_with(|| { | ||
let buf: [u8; 32] = rng.gen(); | ||
solana_sdk::hash::hashv(&[&buf]) | ||
}) | ||
.take(1024) | ||
.collect(); | ||
assert_eq!(crds_filter_set.filters.len(), 8192); | ||
for hash_value in &hash_values { | ||
crds_filter_set.add(*hash_value); | ||
} | ||
let filters: Vec<CrdsFilter> = crds_filter_set.into(); | ||
let mut num_hits = 0; | ||
assert_eq!(filters.len(), 1024); | ||
for hash_value in hash_values { | ||
let mut num_hits = 0; | ||
let mut hit = false; | ||
let mut false_positives = 0; | ||
for filter in &filters { | ||
if filter.test_mask(&hash_value) { | ||
num_hits += 1; | ||
assert!(!hit); | ||
hit = true; | ||
assert!(filter.contains(&hash_value)); | ||
assert!(filter.filter.contains(&hash_value)); | ||
} else if filter.filter.contains(&hash_value) { | ||
false_positives += 1; | ||
} | ||
} | ||
assert_eq!(num_hits, 1); | ||
assert!(false_positives < 5); | ||
} | ||
assert!(num_hits > 96, "num_hits: {num_hits}"); | ||
} | ||
|
||
#[test] | ||
fn test_crds_filter_set_new() { | ||
// Validates invariances required by CrdsFilterSet::get in the | ||
// vector of filters generated by CrdsFilterSet::new. | ||
let filters: Vec<CrdsFilter> = | ||
CrdsFilterSet::new(/*num_items=*/ 55345017, /*max_bytes=*/ 4098).into(); | ||
assert_eq!(filters.len(), 16384); | ||
let filters = CrdsFilterSet::new( | ||
&mut rand::thread_rng(), | ||
55345017, // num_items | ||
4098, // max_bytes | ||
); | ||
assert_eq!(filters.filters.len(), 16384); | ||
let filters = Vec::<CrdsFilter>::from(filters); | ||
assert_eq!(filters.len(), 1024); | ||
let mask_bits = filters[0].mask_bits; | ||
let right_shift = 64 - mask_bits; | ||
let ones = !0u64 >> mask_bits; | ||
for (i, filter) in filters.iter().enumerate() { | ||
for filter in &filters { | ||
// Check that all mask_bits are equal. | ||
assert_eq!(mask_bits, filter.mask_bits); | ||
assert_eq!(i as u64, filter.mask >> right_shift); | ||
assert!((0..16384).contains(&(filter.mask >> right_shift))); | ||
assert_eq!(ones, ones & filter.mask); | ||
} | ||
} | ||
|
@@ -744,7 +762,7 @@ pub(crate) mod tests { | |
let crds = RwLock::new(crds); | ||
assert!(num_inserts > 30_000, "num inserts: {num_inserts}"); | ||
let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE); | ||
assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS.max(32)); | ||
assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS.max(4)); | ||
let crds = crds.read().unwrap(); | ||
let purged: Vec<_> = thread_pool.install(|| crds.purged().collect()); | ||
let hash_values: Vec<_> = crds.values().map(|v| v.value_hash).chain(purged).collect(); | ||
|
@@ -755,21 +773,24 @@ pub(crate) mod tests { | |
"hash_values.len(): {}", | ||
hash_values.len() | ||
); | ||
let mut num_hits = 0; | ||
let mut false_positives = 0; | ||
for hash_value in hash_values { | ||
let mut num_hits = 0; | ||
let mut hit = false; | ||
for filter in &filters { | ||
if filter.test_mask(&hash_value) { | ||
num_hits += 1; | ||
assert!(!hit); | ||
hit = true; | ||
assert!(filter.contains(&hash_value)); | ||
assert!(filter.filter.contains(&hash_value)); | ||
} else if filter.filter.contains(&hash_value) { | ||
false_positives += 1; | ||
} | ||
} | ||
assert_eq!(num_hits, 1); | ||
} | ||
assert!(false_positives < 150_000, "fp: {false_positives}"); | ||
assert!(num_hits > 4000, "num_hits: {num_hits}"); | ||
assert!(false_positives < 20_000, "fp: {false_positives}"); | ||
} | ||
|
||
#[test] | ||
|
@@ -1313,7 +1334,8 @@ pub(crate) mod tests { | |
} | ||
#[test] | ||
fn test_crds_filter_complete_set_add_mask() { | ||
let mut filters: Vec<CrdsFilter> = CrdsFilterSet::new(1000, 10).into(); | ||
let mut filters = | ||
Vec::<CrdsFilter>::from(CrdsFilterSet::new(&mut rand::thread_rng(), 1000, 10)); | ||
assert!(filters.iter().all(|f| f.mask_bits > 0)); | ||
let mut h: Hash = Hash::default(); | ||
// rev to make the hash::default() miss on the first few test_masks | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do this refactor and filter sampling instead of reducing number of
MAX_NUM_PULL_REQUESTS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
MAX_NUM_PULL_REQUESTS
is just an upper bound on the number of outgoing requests regardless of crds table size and it is not proportional. The sampling is proportional to the number of requests going out originally.