From 3a7e48b1a6c698c99aaf1010a48fa41894eba848 Mon Sep 17 00:00:00 2001 From: arkpar Date: Thu, 11 Aug 2016 17:03:10 +0200 Subject: [PATCH] Save nodes removed from backing_overlay until commit --- util/src/journaldb/overlayrecentdb.rs | 33 ++++++++++++++++++++++----- util/src/journaldb/traits.rs | 8 ++++++- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/util/src/journaldb/overlayrecentdb.rs b/util/src/journaldb/overlayrecentdb.rs index 9de2799eeb2..5143b9999bd 100644 --- a/util/src/journaldb/overlayrecentdb.rs +++ b/util/src/journaldb/overlayrecentdb.rs @@ -66,7 +66,8 @@ pub struct OverlayRecentDB { #[derive(PartialEq)] struct JournalOverlay { - backing_overlay: MemoryDB, + backing_overlay: MemoryDB, // Nodes added in the history period + pending_overlay: H256FastMap, // Nodes being transfered from backing_overlay to backing db journal: HashMap>, latest_era: Option, } @@ -173,7 +174,11 @@ impl OverlayRecentDB { } } trace!("Recovered {} overlay entries, {} journal entries", count, journal.len()); - JournalOverlay { backing_overlay: overlay, journal: journal, latest_era: latest_era } + JournalOverlay { + backing_overlay: overlay, + pending_overlay: HashMap::default(), + journal: journal, + latest_era: latest_era } } } @@ -194,6 +199,7 @@ impl JournalDB for OverlayRecentDB { let mut mem = self.transaction_overlay.mem_used(); let overlay = self.journal_overlay.read(); mem += overlay.backing_overlay.mem_used(); + mem += overlay.pending_overlay.heap_size_of_children(); mem += overlay.journal.heap_size_of_children(); mem } @@ -209,14 +215,19 @@ impl JournalDB for OverlayRecentDB { fn latest_era(&self) -> Option { self.journal_overlay.read().latest_era } fn state(&self, key: &H256) -> Option { - let v = self.journal_overlay.read().backing_overlay.get(&to_short_key(key)).map(|v| v.to_vec()); - v.or_else(|| self.backing.get_by_prefix(self.column, &key[0..DB_PREFIX_LEN]).map(|b| b.to_vec())) + let journal_overlay = self.journal_overlay.read(); + let key = to_short_key(key); + journal_overlay.backing_overlay.get(&key).map(|v| v.to_vec()) + .or_else(|| journal_overlay.pending_overlay.get(&key).map(|v| v.clone())) + .or_else(|| self.backing.get_by_prefix(self.column, &key[0..DB_PREFIX_LEN]).map(|b| b.to_vec())) } fn commit(&mut self, batch: &DBTransaction, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { // record new commit's details. trace!("commit: #{} ({}), end era: {:?}", now, id, end); let mut journal_overlay = self.journal_overlay.write(); + // flush previous changes + journal_overlay.pending_overlay.clear(); { let mut r = RlpStream::new_list(3); let mut tx = self.transaction_overlay.drain(); @@ -280,7 +291,8 @@ impl JournalDB for OverlayRecentDB { } // apply canon inserts first for (k, v) in canon_insertions { - try!(batch.put_vec(self.column, &k, v)); + try!(batch.put(self.column, &k, &v)); + journal_overlay.pending_overlay.insert(to_short_key(&k), v); } // update the overlay for k in overlay_deletions { @@ -298,6 +310,10 @@ impl JournalDB for OverlayRecentDB { Ok(0) } + fn flush(&self) { + self.journal_overlay.write().pending_overlay.clear(); + } + fn inject(&mut self, batch: &DBTransaction) -> Result { let mut ops = 0; for (key, (value, rc)) in self.transaction_overlay.drain() { @@ -345,7 +361,12 @@ impl HashDB for OverlayRecentDB { match k { Some((d, rc)) if rc > 0 => Some(d), _ => { - let v = self.journal_overlay.read().backing_overlay.get(&to_short_key(key)).map(|v| v.to_vec()); + let v = { + let journal_overlay = self.journal_overlay.read(); + let key = to_short_key(key); + journal_overlay.backing_overlay.get(&key).map(|v| v.to_vec()) + .or_else(|| journal_overlay.pending_overlay.get(&key).map(|v| v.clone())) + }; match v { Some(x) => { Some(self.transaction_overlay.denote(key, x).0) diff --git a/util/src/journaldb/traits.rs b/util/src/journaldb/traits.rs index 2d3047a2224..96715604e4c 100644 --- a/util/src/journaldb/traits.rs +++ b/util/src/journaldb/traits.rs @@ -57,12 +57,18 @@ pub trait JournalDB: HashDB { /// Get backing database. fn backing(&self) -> &Arc; + /// Clear internal strucutres. This should called after changes have been written + /// to the backing strage + fn flush(&self) {} + /// Commit all changes in a single batch #[cfg(test)] fn commit_batch(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { let batch = self.backing().transaction(); let res = try!(self.commit(&batch, now, id, end)); - self.backing().write(batch).map(|_| res).map_err(Into::into) + let result = self.backing().write(batch).map(|_| res).map_err(Into::into); + self.flush(); + result } /// Inject all changes in a single batch.