Skip to content

Commit

Permalink
accounts-db: reopen mmap as file-backed storage after shrink_progress…
Browse files Browse the repository at this point in the history
… drop (#1871)

* reopen after shrink_progress drop

* add test

* allow deadcode

* docu the whole fn

---------

Co-authored-by: HaoranYi <[email protected]>
Co-authored-by: HaoranYi <[email protected]>
  • Loading branch information
3 people authored Aug 22, 2024
1 parent 0da63a3 commit 8ae52fb
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 76 deletions.
5 changes: 5 additions & 0 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3979,6 +3979,11 @@ impl AccountsDb {
}
}

#[cfg(feature = "dev-context-only-utils")]
pub fn set_storage_access(&mut self, storage_access: StorageAccess) {
self.storage_access = storage_access;
}

/// Sort `accounts` by pubkey and removes all but the *last* of consecutive
/// accounts in the vector with the same pubkey.
///
Expand Down
178 changes: 102 additions & 76 deletions accounts-db/src/ancient_append_vecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,11 +696,20 @@ impl AccountsDb {
let slot = shrink_collect.slot;

let shrink_in_progress = write_ancient_accounts.shrinks_in_progress.remove(&slot);

let mut reopen = false;
if shrink_in_progress.is_none() {
dropped_roots.push(slot);
} else {
self.reopen_storage_as_readonly_shrinking_in_progress_ok(slot);
// Remember that we need to 'reopen' the storage for this
// 'slot'. Note that it is not *safe* to reopen the storage for
// the 'slot' here, because 'shrink_in_progress' is still alive.
// Storage map may still point to the old storage and will be
// updated to point to the new storage, after we drop
// 'shrink_in_progress'.
reopen = true;
}

self.remove_old_stores_shrink(
&shrink_collect,
&self.shrink_ancient_stats.shrink_stats,
Expand All @@ -710,6 +719,11 @@ impl AccountsDb {

// If the slot is dead, remove the need to shrink the storage as the storage entries will be purged.
self.shrink_candidate_slots.lock().unwrap().remove(&slot);

if reopen {
// 'shrink_in_progress' is dead now. We can safely 'reopen' the new storage for 'slot'.
self.reopen_storage_as_readonly_shrinking_in_progress_ok(slot);
}
}
self.handle_dropped_roots_for_ancient(dropped_roots.into_iter());
metrics.accumulate(&write_ancient_accounts.metrics);
Expand Down Expand Up @@ -1184,6 +1198,7 @@ pub mod tests {
},
ShrinkCollectRefs,
},
accounts_file::StorageAccess,
accounts_hash::AccountHash,
accounts_index::UpsertReclaim,
append_vec::{
Expand Down Expand Up @@ -1591,89 +1606,100 @@ pub mod tests {
// or all slots shrunk so no roots or storages should be removed
for in_shrink_candidate_slots in [false, true] {
for all_slots_shrunk in [false, true] {
for num_slots in 0..3 {
let (db, storages, slots, infos) = get_sample_storages(num_slots, None);
let mut accounts_per_storage = infos
.iter()
.zip(
storages
.iter()
.map(|store| db.get_unique_accounts_from_storage(store)),
)
.collect::<Vec<_>>();

let alive_bytes = 1000;
let accounts_to_combine = db.calc_accounts_to_combine(
&mut accounts_per_storage,
&default_tuning(),
alive_bytes,
IncludeManyRefSlots::Include,
);
let mut stats = ShrinkStatsSub::default();
let mut write_ancient_accounts = WriteAncientAccounts::default();
for storage_access in [StorageAccess::Mmap, StorageAccess::File] {
for num_slots in 0..3 {
let (mut db, storages, slots, infos) = get_sample_storages(num_slots, None);
db.set_storage_access(storage_access);
let mut accounts_per_storage = infos
.iter()
.zip(
storages
.iter()
.map(|store| db.get_unique_accounts_from_storage(store)),
)
.collect::<Vec<_>>();

slots.clone().for_each(|slot| {
db.add_root(slot);
let storage = db.storage.get_slot_storage_entry(slot);
assert!(storage.is_some());
if in_shrink_candidate_slots {
db.shrink_candidate_slots.lock().unwrap().insert(slot);
}
});
let alive_bytes = 1000;
let accounts_to_combine = db.calc_accounts_to_combine(
&mut accounts_per_storage,
&default_tuning(),
alive_bytes,
IncludeManyRefSlots::Include,
);
let mut stats = ShrinkStatsSub::default();
let mut write_ancient_accounts = WriteAncientAccounts::default();

let roots = db
.accounts_index
.roots_tracker
.read()
.unwrap()
.alive_roots
.get_all();
assert_eq!(roots, slots.clone().collect::<Vec<_>>());

if all_slots_shrunk {
// make it look like each of the slots was shrunk
slots.clone().for_each(|slot| {
write_ancient_accounts
.shrinks_in_progress
.insert(slot, db.get_store_for_shrink(slot, 1));
db.add_root(slot);
let storage = db.storage.get_slot_storage_entry(slot);
assert!(storage.is_some());
if in_shrink_candidate_slots {
db.shrink_candidate_slots.lock().unwrap().insert(slot);
}
});
}

db.finish_combine_ancient_slots_packed_internal(
accounts_to_combine,
write_ancient_accounts,
&mut stats,
);

slots.clone().for_each(|slot| {
assert!(!db.shrink_candidate_slots.lock().unwrap().contains(&slot));
});

let roots_after = db
.accounts_index
.roots_tracker
.read()
.unwrap()
.alive_roots
.get_all();
let roots = db
.accounts_index
.roots_tracker
.read()
.unwrap()
.alive_roots
.get_all();
assert_eq!(roots, slots.clone().collect::<Vec<_>>());

assert_eq!(
roots_after,
if all_slots_shrunk {
slots.clone().collect::<Vec<_>>()
} else {
vec![]
},
"all_slots_shrunk: {all_slots_shrunk}"
);
slots.for_each(|slot| {
let storage = db.storage.get_slot_storage_entry(slot);
if all_slots_shrunk {
assert!(storage.is_some());
} else {
assert!(storage.is_none());
// make it look like each of the slots was shrunk
slots.clone().for_each(|slot| {
write_ancient_accounts
.shrinks_in_progress
.insert(slot, db.get_store_for_shrink(slot, 1));
});
}
});

db.finish_combine_ancient_slots_packed_internal(
accounts_to_combine,
write_ancient_accounts,
&mut stats,
);

slots.clone().for_each(|slot| {
assert!(!db.shrink_candidate_slots.lock().unwrap().contains(&slot));
});

let roots_after = db
.accounts_index
.roots_tracker
.read()
.unwrap()
.alive_roots
.get_all();

assert_eq!(
roots_after,
if all_slots_shrunk {
slots.clone().collect::<Vec<_>>()
} else {
vec![]
},
"all_slots_shrunk: {all_slots_shrunk}"
);
slots.for_each(|slot| {
let storage = db.storage.get_slot_storage_entry(slot);
if all_slots_shrunk {
assert!(storage.is_some());
// Here we use can_append() as a proxy to assert the backup storage of the accounts after shrinking.
// When storage_access is set to `File`, after shrinking an ancient slot, the backup storage should be
// open as File, which means can_append() will return false.
// When storage_access is set to `Mmap`, backup storage is still Mmap, and can_append() will return true.
assert_eq!(
storage.unwrap().accounts.can_append(),
storage_access == StorageAccess::Mmap
);
} else {
assert!(storage.is_none());
}
});
}
}
}
}
Expand Down

0 comments on commit 8ae52fb

Please sign in to comment.