Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Fix Erasure Index #7319

Merged
merged 4 commits into from
Dec 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
295 changes: 278 additions & 17 deletions ledger/src/blocktree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,20 @@ impl Blocktree {
Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot))
}

pub fn slot_coding_iterator<'a>(
&'a self,
slot: Slot,
) -> Result<impl Iterator<Item = ((u64, u64), Box<[u8]>)> + 'a> {
let slot_iterator = self
.db
.iter::<cf::ShredCode>(IteratorMode::From((slot, 0), IteratorDirection::Forward))?;
Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot))
}

fn try_shred_recovery(
db: &Database,
erasure_metas: &HashMap<(u64, u64), ErasureMeta>,
index_working_set: &HashMap<u64, IndexMetaWorkingSetEntry>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
prev_inserted_datas: &mut HashMap<(u64, u64), Shred>,
prev_inserted_codes: &mut HashMap<(u64, u64), Shred>,
) -> Vec<Shred> {
Expand All @@ -384,8 +394,8 @@ impl Blocktree {
);
};

let index_meta_entry = index_working_set.get(&slot).expect("Index");
let index = &index_meta_entry.index;
let index_meta_entry = index_working_set.get_mut(&slot).expect("Index");
let index = &mut index_meta_entry.index;
match erasure_meta.status(&index) {
ErasureMetaStatus::CanRecover => {
// Find shreds for this erasure set and try recovery
Expand All @@ -412,8 +422,17 @@ impl Blocktree {
});
(set_index..set_index + erasure_meta.config.num_coding() as u64).for_each(
|i| {
if let Some(shred) =
prev_inserted_codes.remove(&(slot, i)).or_else(|| {
if let Some(shred) = prev_inserted_codes
.remove(&(slot, i))
.map(|s| {
// Remove from the index so it doesn't get committed. We know
// this is safe to do because everything in
// `prev_inserted_codes` does not yet exist in blocktree
// (guaranteed by `check_cache_coding_shred`)
index.coding_mut().set_present(i, false);
s
})
.or_else(|| {
if index.coding().is_present(i) {
let some_code = code_cf
.get_bytes((slot, i))
Expand Down Expand Up @@ -449,8 +468,14 @@ impl Blocktree {
ErasureMetaStatus::DataFull => {
(set_index..set_index + erasure_meta.config.num_coding() as u64).for_each(
|i| {
// Remove saved coding shreds. We don't need these for future recovery
let _ = prev_inserted_codes.remove(&(slot, i));
// Remove saved coding shreds. We don't need these for future recovery.
if prev_inserted_codes.remove(&(slot, i)).is_some() {
// Remove from the index so it doesn't get committed. We know
// this is safe to do because everything in
// `prev_inserted_codes` does not yet exist in blocktree
// (guaranteed by `check_cache_coding_shred`)
index.coding_mut().set_present(i, false);
}
},
);
submit_metrics(false, "complete".into(), 0);
Expand Down Expand Up @@ -523,7 +548,7 @@ impl Blocktree {
let recovered_data = Self::try_shred_recovery(
&db,
&erasure_metas,
&index_working_set,
&mut index_working_set,
&mut just_inserted_data_shreds,
&mut just_inserted_coding_shreds,
);
Expand Down Expand Up @@ -680,6 +705,13 @@ impl Blocktree {
);
}

// Should be safe to modify index_meta here. Two cases
// 1) Recovery happens: Then all inserted erasure metas are removed
// from just_received_coding_shreds, and nothing wll be committed by
// `check_insert_coding_shred`, so the coding index meta will not be
// committed
index_meta.coding_mut().set_present(shred_index, true);

just_received_coding_shreds
.entry((slot, shred_index))
.or_insert_with(|| shred);
Expand Down Expand Up @@ -2109,10 +2141,12 @@ pub mod tests {
use crate::{
entry::{next_entry, next_entry_mut},
genesis_utils::{create_genesis_config, GenesisConfigInfo},
leader_schedule::{FixedSchedule, LeaderSchedule},
shred::{max_ticks_per_n_shreds, DataShredHeader},
};
use itertools::Itertools;
use rand::{seq::SliceRandom, thread_rng};
use solana_runtime::bank::Bank;
use solana_sdk::{
hash::{self, Hash},
instruction::CompiledInstruction,
Expand All @@ -2124,11 +2158,7 @@ pub mod tests {
use std::{iter::FromIterator, time::Duration};

// used for tests only
fn make_slot_entries_with_transactions(
slot: Slot,
parent_slot: Slot,
num_entries: u64,
) -> (Vec<Shred>, Vec<Entry>) {
fn make_slot_entries_with_transactions(num_entries: u64) -> Vec<Entry> {
let mut entries: Vec<Entry> = Vec::new();
for _ in 0..num_entries {
let transaction = Transaction::new_with_compiled_instructions(
Expand All @@ -2142,8 +2172,7 @@ pub mod tests {
let mut tick = create_ticks(1, 0, Hash::default());
entries.append(&mut tick);
}
let shreds = entries_to_test_shreds(entries.clone(), slot, parent_slot, true, 0);
(shreds, entries)
entries
}

#[test]
Expand Down Expand Up @@ -4187,8 +4216,8 @@ pub mod tests {
#[test]
fn test_get_confirmed_block() {
let slot = 0;
let (shreds, entries) = make_slot_entries_with_transactions(slot, 0, 100);

let entries = make_slot_entries_with_transactions(100);
let shreds = entries_to_test_shreds(entries.clone(), slot, 0, true, 0);
let ledger_path = get_tmp_ledger_path!();
let ledger = Blocktree::open(&ledger_path).unwrap();
ledger.insert_shreds(shreds, None, false).unwrap();
Expand Down Expand Up @@ -4376,4 +4405,236 @@ pub mod tests {
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}

#[test]
fn test_recovery() {
let slot = 1;
let (data_shreds, coding_shreds, leader_schedule_cache) =
setup_erasure_shreds(slot, 0, 100, 1.0);
let blocktree_path = get_tmp_ledger_path!();
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
blocktree
.insert_shreds(coding_shreds, Some(&leader_schedule_cache), false)
.unwrap();
let shred_bufs: Vec<_> = data_shreds
.iter()
.map(|shred| shred.payload.clone())
.collect();

// Check all the data shreds were recovered
for (s, buf) in data_shreds.iter().zip(shred_bufs) {
assert_eq!(
blocktree
.get_data_shred(s.slot(), s.index() as u64)
.unwrap()
.unwrap(),
buf
);
}

verify_index_integrity(&blocktree, slot);
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}

#[test]
fn test_index_integrity() {
let slot = 1;
let num_entries = 100;
let (data_shreds, coding_shreds, leader_schedule_cache) =
setup_erasure_shreds(slot, 0, num_entries, 1.0);
assert!(data_shreds.len() > 3);
assert!(coding_shreds.len() > 3);
let blocktree_path = get_tmp_ledger_path!();
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
// Test inserting all the shreds
let all_shreds: Vec<_> = data_shreds
.iter()
.cloned()
.chain(coding_shreds.iter().cloned())
.collect();
blocktree
.insert_shreds(all_shreds, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blocktree, slot);
blocktree.purge_slots(0, Some(slot));

// Test inserting just the codes, enough for recovery
blocktree
.insert_shreds(coding_shreds.clone(), Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blocktree, slot);
blocktree.purge_slots(0, Some(slot));

// Test inserting some codes, but not enough for recovery
blocktree
.insert_shreds(
coding_shreds[..coding_shreds.len() - 1].to_vec(),
Some(&leader_schedule_cache),
false,
)
.unwrap();
verify_index_integrity(&blocktree, slot);
blocktree.purge_slots(0, Some(slot));

// Test inserting just the codes, and some data, enough for recovery
let shreds: Vec<_> = data_shreds[..data_shreds.len() - 1]
.iter()
.cloned()
.chain(coding_shreds[..coding_shreds.len() - 1].iter().cloned())
.collect();
blocktree
.insert_shreds(shreds, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blocktree, slot);
blocktree.purge_slots(0, Some(slot));

// Test inserting some codes, and some data, but enough for recovery
let shreds: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1]
.iter()
.cloned()
.chain(coding_shreds[..coding_shreds.len() / 2 - 1].iter().cloned())
.collect();
blocktree
.insert_shreds(shreds, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blocktree, slot);
blocktree.purge_slots(0, Some(slot));

// Test inserting all shreds in 2 rounds, make sure nothing is lost
let shreds1: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1]
.iter()
.cloned()
.chain(coding_shreds[..coding_shreds.len() / 2 - 1].iter().cloned())
.collect();
let shreds2: Vec<_> = data_shreds[data_shreds.len() / 2 - 1..]
.iter()
.cloned()
.chain(coding_shreds[coding_shreds.len() / 2 - 1..].iter().cloned())
.collect();
blocktree
.insert_shreds(shreds1, Some(&leader_schedule_cache), false)
.unwrap();
blocktree
.insert_shreds(shreds2, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blocktree, slot);
blocktree.purge_slots(0, Some(slot));

// Test not all, but enough data and coding shreds in 2 rounds to trigger recovery,
// make sure nothing is lost
let shreds1: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1]
.iter()
.cloned()
.chain(coding_shreds[..coding_shreds.len() / 2 - 1].iter().cloned())
.collect();
let shreds2: Vec<_> = data_shreds[data_shreds.len() / 2 - 1..data_shreds.len() / 2]
.iter()
.cloned()
.chain(
coding_shreds[coding_shreds.len() / 2 - 1..data_shreds.len() / 2]
.iter()
.cloned(),
)
.collect();
blocktree
.insert_shreds(shreds1, Some(&leader_schedule_cache), false)
.unwrap();
blocktree
.insert_shreds(shreds2, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blocktree, slot);
blocktree.purge_slots(0, Some(slot));

// Test insert shreds in 2 rounds, but not enough to trigger
// recovery, make sure nothing is lost
let shreds1: Vec<_> = data_shreds[..data_shreds.len() / 2 - 2]
.iter()
.cloned()
.chain(coding_shreds[..coding_shreds.len() / 2 - 2].iter().cloned())
.collect();
let shreds2: Vec<_> = data_shreds[data_shreds.len() / 2 - 2..data_shreds.len() / 2 - 1]
.iter()
.cloned()
.chain(
coding_shreds[coding_shreds.len() / 2 - 2..coding_shreds.len() / 2 - 1]
.iter()
.cloned(),
)
.collect();
blocktree
.insert_shreds(shreds1, Some(&leader_schedule_cache), false)
.unwrap();
blocktree
.insert_shreds(shreds2, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blocktree, slot);
blocktree.purge_slots(0, Some(slot));
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}

fn setup_erasure_shreds(
slot: u64,
parent_slot: u64,
num_entries: u64,
erasure_rate: f32,
) -> (Vec<Shred>, Vec<Shred>, Arc<LeaderScheduleCache>) {
let entries = make_slot_entries_with_transactions(num_entries);
let leader_keypair = Arc::new(Keypair::new());
let shredder = Shredder::new(
slot,
parent_slot,
erasure_rate,
leader_keypair.clone(),
0,
0,
)
.expect("Failed in creating shredder");
let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 0);

let genesis_config = create_genesis_config(2).genesis_config;
let bank = Arc::new(Bank::new(&genesis_config));
let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank);
let fixed_schedule = FixedSchedule {
leader_schedule: Arc::new(LeaderSchedule::new_from_schedule(vec![
leader_keypair.pubkey()
])),
start_epoch: 0,
};
leader_schedule_cache.set_fixed_leader_schedule(Some(fixed_schedule));

(data_shreds, coding_shreds, Arc::new(leader_schedule_cache))
}

fn verify_index_integrity(blocktree: &Blocktree, slot: u64) {
let index = blocktree.get_index(slot).unwrap().unwrap();
// Test the set of data shreds in the index and in the data column
// family are the same
let data_iter = blocktree.slot_data_iterator(slot).unwrap();
let mut num_data = 0;
for ((slot, index), _) in data_iter {
num_data += 1;
assert!(blocktree.get_data_shred(slot, index).unwrap().is_some());
}

// Test the data index doesn't have anything extra
let num_data_in_index = index.data().num_data();
assert_eq!(num_data_in_index, num_data);

// Test the set of coding shreds in the index and in the coding column
// family are the same
let coding_iter = blocktree.slot_coding_iterator(slot).unwrap();
let mut num_coding = 0;
for ((slot, index), _) in coding_iter {
num_coding += 1;
assert!(blocktree.get_coding_shred(slot, index).unwrap().is_some());
}

// Test the data index doesn't have anything extra
let num_coding_in_index = index.coding().num_coding();
assert_eq!(num_coding_in_index, num_coding);
}
}
8 changes: 8 additions & 0 deletions ledger/src/blocktree_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ impl Index {
}

impl CodingIndex {
pub fn num_coding(&self) -> usize {
self.index.len()
}

pub fn present_in_bounds(&self, bounds: impl RangeBounds<u64>) -> usize {
self.index.range(bounds).count()
}
Expand All @@ -121,6 +125,10 @@ impl CodingIndex {
}

impl DataIndex {
pub fn num_data(&self) -> usize {
self.index.len()
}

pub fn present_in_bounds(&self, bounds: impl RangeBounds<u64>) -> usize {
self.index.range(bounds).count()
}
Expand Down