Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

structured rlp encoding in journaldb #8047

Merged
merged 2 commits into from
Mar 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions util/journaldb/src/archivedb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::sync::Arc;
use rlp::*;
use rlp::{encode, decode};
use hashdb::*;
use super::memorydb::*;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY};
Expand All @@ -46,7 +46,8 @@ pub struct ArchiveDB {
impl ArchiveDB {
/// Create a new instance from a key-value db.
pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> ArchiveDB {
let latest_era = backing.get(col, &LATEST_ERA_KEY).expect("Low-level database error.").map(|val| decode::<u64>(&val));
let latest_era = backing.get(col, &LATEST_ERA_KEY).expect("Low-level database error.")
.map(|val| decode::<u64>(&val));
ArchiveDB {
overlay: MemoryDB::new(),
backing: backing,
Expand Down
85 changes: 38 additions & 47 deletions util/journaldb/src/earlymergedb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::collections::hash_map::Entry;
use std::sync::Arc;
use parking_lot::RwLock;
use heapsize::HeapSizeOf;
use rlp::*;
use rlp::{encode, decode};
use hashdb::*;
use memorydb::*;
use super::{DB_PREFIX_LEN, LATEST_ERA_KEY};
Expand All @@ -30,6 +30,7 @@ use kvdb::{KeyValueDB, DBTransaction};
use ethereum_types::H256;
use error::{BaseDataError, UtilError};
use bytes::Bytes;
use util::{DatabaseKey, DatabaseValueView, DatabaseValueRef};

#[derive(Debug, Clone, PartialEq, Eq)]
struct RefInfo {
Expand Down Expand Up @@ -111,8 +112,6 @@ pub struct EarlyMergeDB {
column: Option<u32>,
}

const PADDING : [u8; 10] = [ 0u8; 10 ];

impl EarlyMergeDB {
/// Create a new instance from file
pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> EarlyMergeDB {
Expand Down Expand Up @@ -267,20 +266,17 @@ impl EarlyMergeDB {
let mut era = decode::<u64>(&val);
latest_era = Some(era);
loop {
let mut index = 0usize;
while let Some(rlp_data) = db.get(col, {
let mut r = RlpStream::new_list(3);
r.append(&era);
r.append(&index);
r.append(&&PADDING[..]);
&r.drain()
}).expect("Low-level database error.") {
let rlp = Rlp::new(&rlp_data);
let inserts: Vec<H256> = rlp.list_at(1);
//let mut index = 0usize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be removed.

let mut db_key = DatabaseKey {
era,
index: 0usize,
};
while let Some(rlp_data) = db.get(col, &encode(&db_key)).expect("Low-level database error.") {
let inserts = DatabaseValueView::new(&rlp_data).inserts().expect("rlp read from db; qed");
Self::replay_keys(&inserts, db, col, &mut refs);
index += 1;
db_key.index += 1;
};
if index == 0 || era == 0 {
if db_key.index == 0 || era == 0 {
break;
}
era -= 1;
Expand Down Expand Up @@ -373,18 +369,17 @@ impl JournalDB for EarlyMergeDB {
};

{
let mut index = 0usize;
let mut db_key = DatabaseKey {
era: now,
index: 0usize,
};
let mut last;

while self.backing.get(self.column, {
let mut r = RlpStream::new_list(3);
r.append(&now);
r.append(&index);
r.append(&&PADDING[..]);
last = r.drain();
last = encode(&db_key);
&last
})?.is_some() {
index += 1;
db_key.index += 1;
}

let drained = self.overlay.drain();
Expand All @@ -403,28 +398,25 @@ impl JournalDB for EarlyMergeDB {

// TODO: check all removes are in the db.

let mut r = RlpStream::new_list(3);
r.append(id);

// Process the new inserts.
// We use the inserts for three things. For each:
// - we place into the backing DB or increment the counter if already in;
// - we note in the backing db that it was already in;
// - we write the key into our journal for this block;

r.begin_list(inserts.len());
for &(k, _) in &inserts {
r.append(&k);
}
r.append_list(&removes);
Self::insert_keys(&inserts, &*self.backing, self.column, &mut refs, batch);

let ins = inserts.iter().map(|&(k, _)| k).collect::<Vec<_>>();
let value_ref = DatabaseValueRef {
id,
inserts: &ins,
deletes: &removes,
};

trace!(target: "jdb.ops", " Deletes: {:?}", removes);
trace!(target: "jdb.ops", " Inserts: {:?}", ins);

batch.put(self.column, &last, r.as_raw());
batch.put(self.column, &last, &encode(&value_ref));
if self.latest_era.map_or(true, |e| now > e) {
batch.put(self.column, &LATEST_ERA_KEY, &encode(&now));
self.latest_era = Some(now);
Expand All @@ -438,23 +430,22 @@ impl JournalDB for EarlyMergeDB {
let mut refs = self.refs.as_ref().unwrap().write();

// apply old commits' details
let mut index = 0usize;
let mut db_key = DatabaseKey {
era: end_era,
index: 0usize,
};
let mut last;

while let Some(rlp_data) = self.backing.get(self.column, {
let mut r = RlpStream::new_list(3);
r.append(&end_era);
r.append(&index);
r.append(&&PADDING[..]);
last = r.drain();
&last
})? {
let rlp = Rlp::new(&rlp_data);
let inserts: Vec<H256> = rlp.list_at(1);

if canon_id == &rlp.val_at::<H256>(0) {
while let Some(rlp_data) = {
last = encode(&db_key);
self.backing.get(self.column, &last)
}? {
let view = DatabaseValueView::new(&rlp_data);
let inserts = view.inserts().expect("rlp read from db; qed");

if canon_id == &view.id().expect("rlp read from db; qed") {
// Collect keys to be removed. Canon block - remove the (enacted) deletes.
let deletes: Vec<H256> = rlp.list_at(2);
let deletes = view.deletes().expect("rlp read from db; qed");
trace!(target: "jdb.ops", " Expunging: {:?}", deletes);
Self::remove_keys(&deletes, &mut refs, batch, self.column, RemoveFrom::Archive);

Expand Down Expand Up @@ -488,10 +479,10 @@ impl JournalDB for EarlyMergeDB {
}

batch.delete(self.column, &last);
index += 1;
db_key.index += 1;
}

trace!(target: "jdb", "EarlyMergeDB: delete journal for time #{}.{}, (canon was {})", end_era, index, canon_id);
trace!(target: "jdb", "EarlyMergeDB: delete journal for time #{}.{}, (canon was {})", end_era, db_key.index, canon_id);
trace!(target: "jdb", "OK: {:?}", &*refs);

Ok(0)
Expand Down
1 change: 1 addition & 0 deletions util/journaldb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ mod archivedb;
mod earlymergedb;
mod overlayrecentdb;
mod refcounteddb;
mod util;

pub mod overlaydb;

Expand Down
70 changes: 48 additions & 22 deletions util/journaldb/src/overlaydb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::collections::HashMap;
use std::collections::hash_map::Entry;
use error::{Result, BaseDataError};
use ethereum_types::H256;
use rlp::*;
use rlp::{UntrustedRlp, RlpStream, Encodable, DecoderError, Decodable, encode, decode};
use hashdb::*;
use memorydb::*;
use kvdb::{KeyValueDB, DBTransaction};
Expand All @@ -41,6 +41,39 @@ pub struct OverlayDB {
column: Option<u32>,
}

struct Payload {
count: u32,
value: DBValue,
}

impl Payload {
fn new(count: u32, value: DBValue) -> Self {
Payload {
count,
value,
}
}
}

impl Encodable for Payload {
fn rlp_append(&self, s: &mut RlpStream) {
s.begin_list(2);
s.append(&self.count);
s.append(&&*self.value);
}
}

impl Decodable for Payload {
fn decode(rlp: &UntrustedRlp) -> ::std::result::Result<Self, DecoderError> {
let payload = Payload {
count: rlp.val_at(0)?,
value: DBValue::from_slice(rlp.at(1)?.data()?),
};

Ok(payload)
}
}

impl OverlayDB {
/// Create a new instance of OverlayDB given a `backing` database.
pub fn new(backing: Arc<KeyValueDB>, col: Option<u32>) -> OverlayDB {
Expand Down Expand Up @@ -71,18 +104,19 @@ impl OverlayDB {
if rc != 0 {
match self.payload(&key) {
Some(x) => {
let (back_value, back_rc) = x;
let total_rc: i32 = back_rc as i32 + rc;
let total_rc: i32 = x.count as i32 + rc;
if total_rc < 0 {
return Err(From::from(BaseDataError::NegativelyReferencedHash(key)));
}
deletes += if self.put_payload_in_batch(batch, &key, (back_value, total_rc as u32)) {1} else {0};
let payload = Payload::new(total_rc as u32, x.value);
deletes += if self.put_payload_in_batch(batch, &key, &payload) {1} else {0};
}
None => {
if rc < 0 {
return Err(From::from(BaseDataError::NegativelyReferencedHash(key)));
}
self.put_payload_in_batch(batch, &key, (value, rc as u32));
let payload = Payload::new(rc as u32, value);
self.put_payload_in_batch(batch, &key, &payload);
}
};
ret += 1;
Expand All @@ -100,22 +134,16 @@ impl OverlayDB {
pub fn commit_refs(&self, key: &H256) -> i32 { self.overlay.raw(key).map_or(0, |(_, refs)| refs) }

/// Get the refs and value of the given key.
fn payload(&self, key: &H256) -> Option<(DBValue, u32)> {
fn payload(&self, key: &H256) -> Option<Payload> {
self.backing.get(self.column, key)
.expect("Low-level database error. Some issue with your hard disk?")
.map(|d| {
let r = Rlp::new(&d);
(DBValue::from_slice(r.at(1).data()), r.at(0).as_val())
})
.map(|d| decode(&d))
}

/// Put the refs and value of the given key, possibly deleting it from the db.
fn put_payload_in_batch(&self, batch: &mut DBTransaction, key: &H256, payload: (DBValue, u32)) -> bool {
if payload.1 > 0 {
let mut s = RlpStream::new_list(2);
s.append(&payload.1);
s.append(&&*payload.0);
batch.put(self.column, key, s.as_raw());
fn put_payload_in_batch(&self, batch: &mut DBTransaction, key: &H256, payload: &Payload) -> bool {
if payload.count > 0 {
batch.put(self.column, key, &encode(payload));
false
} else {
batch.delete(self.column, key);
Expand All @@ -129,7 +157,7 @@ impl HashDB for OverlayDB {
let mut ret: HashMap<H256, i32> = self.backing.iter(self.column)
.map(|(key, _)| {
let h = H256::from_slice(&*key);
let r = self.payload(&h).unwrap().1;
let r = self.payload(&h).unwrap().count;
(h, r as i32)
})
.collect();
Expand Down Expand Up @@ -161,9 +189,8 @@ impl HashDB for OverlayDB {
};
match self.payload(key) {
Some(x) => {
let (d, rc) = x;
if rc as i32 + memrc > 0 {
Some(d)
if x.count as i32 + memrc > 0 {
Some(x.value)
}
else {
None
Expand All @@ -185,8 +212,7 @@ impl HashDB for OverlayDB {
let memrc = k.map_or(0, |(_, rc)| rc);
match self.payload(key) {
Some(x) => {
let (_, rc) = x;
rc as i32 + memrc > 0
x.count as i32 + memrc > 0
}
// Replace above match arm with this once https://github.com/rust-lang/rust/issues/15287 is done.
//Some((d, rc)) if rc + memrc > 0 => true,
Expand Down
Loading