diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 080be2fafd5f44..74440cdd0c0a8f 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -2154,6 +2154,8 @@ impl Blockstore { } if highest_primary_index_slot.is_some() { self.set_highest_primary_index_slot(highest_primary_index_slot); + } else { + self.db.set_clean_slot_0(true); } Ok(()) } @@ -2167,6 +2169,9 @@ impl Blockstore { self.transaction_status_index_cf.delete(1)?; } } + if w_highest_primary_index_slot.is_none() { + self.db.set_clean_slot_0(true); + } Ok(()) } diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index d643dd2c7ef075..9669f8bd305a00 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -990,4 +990,109 @@ pub mod tests { } assert_eq!(count, max_slot - (oldest_slot - 1)); } + + #[test] + fn test_purge_transaction_memos_compaction_filter() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + let oldest_slot = 5; + + fn random_signature() -> Signature { + use rand::Rng; + + let mut key = [0u8; 64]; + rand::thread_rng().fill(&mut key[..]); + Signature::from(key) + } + + // Insert some deprecated TransactionMemos + blockstore + .transaction_memos_cf + .put_deprecated(random_signature(), &"this is a memo".to_string()) + .unwrap(); + blockstore + .transaction_memos_cf + .put_deprecated(random_signature(), &"another memo".to_string()) + .unwrap(); + // Set clean_slot_0 to false, since we have deprecated memos + blockstore.db.set_clean_slot_0(false); + + // Insert some current TransactionMemos + blockstore + .transaction_memos_cf + .put( + (random_signature(), oldest_slot - 1), + &"this is a new memo in slot 4".to_string(), + ) + .unwrap(); + blockstore + .transaction_memos_cf + .put( + (random_signature(), oldest_slot), + &"this is a memo in slot 5 ".to_string(), + ) + .unwrap(); + + let first_index = { + let mut memos_iterator = blockstore + .transaction_memos_cf + .iterator_cf_raw_key(IteratorMode::Start); + memos_iterator.next().unwrap().unwrap().0 + }; + let last_index = { + let mut memos_iterator = blockstore + .transaction_memos_cf + .iterator_cf_raw_key(IteratorMode::End); + memos_iterator.next().unwrap().unwrap().0 + }; + + // Purge at slot 0 should not affect any memos + blockstore.db.set_oldest_slot(0); + blockstore + .db + .compact_range_cf::(&first_index, &last_index); + let memos_iterator = blockstore + .transaction_memos_cf + .iterator_cf_raw_key(IteratorMode::Start); + let mut count = 0; + for item in memos_iterator { + let _item = item.unwrap(); + count += 1; + } + assert_eq!(count, 4); + + // Purge at oldest_slot without clean_slot_0 only purges the current memo at slot 4 + blockstore.db.set_oldest_slot(oldest_slot); + blockstore + .db + .compact_range_cf::(&first_index, &last_index); + let memos_iterator = blockstore + .transaction_memos_cf + .iterator_cf_raw_key(IteratorMode::Start); + let mut count = 0; + for item in memos_iterator { + let (key, _value) = item.unwrap(); + let slot = ::index(&key).1; + assert!(slot == 0 || slot >= oldest_slot); + count += 1; + } + assert_eq!(count, 3); + + // Purge at oldest_slot with clean_slot_0 purges deprecated memos + blockstore.db.set_clean_slot_0(true); + blockstore + .db + .compact_range_cf::(&first_index, &last_index); + let memos_iterator = blockstore + .transaction_memos_cf + .iterator_cf_raw_key(IteratorMode::Start); + let mut count = 0; + for item in memos_iterator { + let (key, _value) = item.unwrap(); + let slot = ::index(&key).1; + assert!(slot >= oldest_slot); + count += 1; + } + assert_eq!(count, 1); + } } diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index f9c87ce397d434..b65df82ee00c9e 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -40,7 +40,7 @@ use { marker::PhantomData, path::Path, sync::{ - atomic::{AtomicU64, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, }, }, @@ -348,7 +348,10 @@ pub mod columns { } #[derive(Default, Clone, Debug)] -struct OldestSlot(Arc); +struct OldestSlot { + slot: Arc, + clean_slot_0: Arc, +} impl OldestSlot { pub fn set(&self, oldest_slot: Slot) { @@ -356,7 +359,7 @@ impl OldestSlot { // also, compaction_filters are created via its factories, creating short-lived copies of // this atomic value for the single job of compaction. So, Relaxed store can be justified // in total - self.0.store(oldest_slot, Ordering::Relaxed); + self.slot.store(oldest_slot, Ordering::Relaxed); } pub fn get(&self) -> Slot { @@ -365,7 +368,15 @@ impl OldestSlot { // requirement at the moment // also eventual propagation (very Relaxed) load is Ok, because compaction by nature doesn't // require strictly synchronized semantics in this regard - self.0.load(Ordering::Relaxed) + self.slot.load(Ordering::Relaxed) + } + + pub(crate) fn set_clean_slot_0(&self, clean_slot_0: bool) { + self.clean_slot_0.store(clean_slot_0, Ordering::Relaxed); + } + + pub(crate) fn get_clean_slot_0(&self) -> bool { + self.clean_slot_0.load(Ordering::Relaxed) } } @@ -1427,6 +1438,10 @@ impl Database { self.backend.oldest_slot.set(oldest_slot); } + pub(crate) fn set_clean_slot_0(&self, clean_slot_0: bool) { + self.backend.oldest_slot.set_clean_slot_0(clean_slot_0); + } + pub fn live_files_metadata(&self) -> Result> { self.backend.live_files_metadata() } @@ -1835,6 +1850,10 @@ impl<'a> WriteBatch<'a> { struct PurgedSlotFilter { /// The oldest slot to keep; any slot < oldest_slot will be removed oldest_slot: Slot, + /// Whether to preserve keys that return slot 0, even when oldest_slot > 0. + // This is used to delete old column data that wasn't keyed with a Slot, and so always returns + // `C::slot() == 0` + clean_slot_0: bool, name: CString, _phantom: PhantomData, } @@ -1844,7 +1863,7 @@ impl CompactionFilter for PurgedSlotFilter { use rocksdb::CompactionDecision::*; let slot_in_key = C::slot(C::index(key)); - if slot_in_key >= self.oldest_slot { + if slot_in_key >= self.oldest_slot || (slot_in_key == 0 && !self.clean_slot_0) { Keep } else { Remove @@ -1867,8 +1886,10 @@ impl CompactionFilterFactory for PurgedSlotFilterFactory fn create(&mut self, _context: CompactionFilterContext) -> Self::Filter { let copied_oldest_slot = self.oldest_slot.get(); + let copied_clean_slot_0 = self.oldest_slot.get_clean_slot_0(); PurgedSlotFilter:: { oldest_slot: copied_oldest_slot, + clean_slot_0: copied_clean_slot_0, name: CString::new(format!( "purged_slot_filter({}, {:?})", C::NAME, @@ -2113,6 +2134,7 @@ pub mod tests { is_manual_compaction: true, }; let oldest_slot = OldestSlot::default(); + oldest_slot.set_clean_slot_0(true); let mut factory = PurgedSlotFilterFactory:: { oldest_slot: oldest_slot.clone(),