Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

take snapshot at specified block and slightly better informants #1873

Merged
merged 4 commits into from
Aug 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,19 +593,35 @@ impl Client {
}
}

/// Take a snapshot.
pub fn take_snapshot<W: snapshot_io::SnapshotWriter + Send>(&self, writer: W) -> Result<(), ::error::Error> {
/// Take a snapshot at the given block.
/// If the ID given is "latest", this will default to 1000 blocks behind.
pub fn take_snapshot<W: snapshot_io::SnapshotWriter + Send>(&self, writer: W, at: BlockID, p: &snapshot::Progress) -> Result<(), ::error::Error> {
let db = self.state_db.lock().boxed_clone();
let best_block_number = self.chain_info().best_block_number;
let start_block_number = if best_block_number > 1000 {
best_block_number - 1000
} else {
0
let block_number = try!(self.block_number(at).ok_or(snapshot::Error::InvalidStartingBlock(at)));

if best_block_number > HISTORY + block_number && db.is_pruned() {
return Err(snapshot::Error::OldBlockPrunedDB.into());
}

let start_hash = match at {
BlockID::Latest => {
let start_num = if best_block_number > 1000 {
best_block_number - 1000
} else {
0
};

self.block_hash(BlockID::Number(start_num))
.expect("blocks within HISTORY are always stored.")
}
_ => match self.block_hash(at) {
Some(hash) => hash,
None => return Err(snapshot::Error::InvalidStartingBlock(at).into()),
},
};
let start_hash = self.block_hash(BlockID::Number(start_block_number))
.expect("blocks within HISTORY are always stored.");

try!(snapshot::take_snapshot(&self.chain, start_hash, db.as_hashdb(), writer));
try!(snapshot::take_snapshot(&self.chain, start_hash, db.as_hashdb(), writer, p));

Ok(())
}
Expand Down
13 changes: 11 additions & 2 deletions ethcore/src/snapshot/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

use std::fmt;

use ids::BlockID;

use util::H256;
use util::trie::TrieError;
use util::rlp::DecoderError;
Expand All @@ -26,9 +28,13 @@ use util::rlp::DecoderError;
#[derive(Debug)]
pub enum Error {
/// Invalid starting block for snapshot.
InvalidStartingBlock(H256),
InvalidStartingBlock(BlockID),
/// Block not found.
BlockNotFound(H256),
/// Incomplete chain.
IncompleteChain,
/// Old starting block in a pruned database.
OldBlockPrunedDB,
/// Trie error.
Trie(TrieError),
/// Decoder error.
Expand All @@ -40,8 +46,11 @@ pub enum Error {
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::InvalidStartingBlock(ref hash) => write!(f, "Invalid starting block hash: {}", hash),
Error::InvalidStartingBlock(ref id) => write!(f, "Invalid starting block: {:?}", id),
Error::BlockNotFound(ref hash) => write!(f, "Block not found in chain: {}", hash),
Error::IncompleteChain => write!(f, "Cannot create snapshot due to incomplete chain."),
Error::OldBlockPrunedDB => write!(f, "Attempted to create a snapshot at an old block while using \
a pruned database. Please re-run with the --pruning archive flag."),
Error::Io(ref err) => err.fmt(f),
Error::Decoder(ref err) => err.fmt(f),
Error::Trie(ref err) => err.fmt(f),
Expand Down
80 changes: 67 additions & 13 deletions ethcore/src/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};

use account_db::{AccountDB, AccountDBMut};
use blockchain::{BlockChain, BlockProvider};
use engines::Engine;
use ids::BlockID;
use views::BlockView;

use util::{Bytes, Hashable, HashDB, snappy, TrieDB, TrieDBMut, TrieMut};
Expand Down Expand Up @@ -58,18 +60,58 @@ const PREFERRED_CHUNK_SIZE: usize = 4 * 1024 * 1024;
// How many blocks to include in a snapshot, starting from the head of the chain.
const SNAPSHOT_BLOCKS: u64 = 30000;

/// A progress indicator for snapshots.
#[derive(Debug)]
pub struct Progress {
accounts: AtomicUsize,
blocks: AtomicUsize,
size: AtomicUsize, // Todo [rob] use Atomicu64 when it stabilizes.
done: AtomicBool,
}

impl Progress {
/// Create a new progress indicator.
pub fn new() -> Self {
Progress {
accounts: AtomicUsize::new(0),
blocks: AtomicUsize::new(0),
size: AtomicUsize::new(0),
done: AtomicBool::new(false),
}
}

/// Get the number of accounts snapshotted thus far.
pub fn accounts(&self) -> usize { self.accounts.load(Ordering::Relaxed) }

/// Get the number of blocks snapshotted thus far.
pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Relaxed) }

/// Get the written size of the snapshot in bytes.
pub fn size(&self) -> usize { self.size.load(Ordering::Relaxed) }

/// Whether the snapshot is complete.
pub fn done(&self) -> bool { self.done.load(Ordering::SeqCst) }

}
/// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer.
pub fn take_snapshot<W: SnapshotWriter + Send>(chain: &BlockChain, start_block_hash: H256, state_db: &HashDB, writer: W) -> Result<(), Error> {
let start_header = try!(chain.block_header(&start_block_hash).ok_or(Error::InvalidStartingBlock(start_block_hash)));
pub fn take_snapshot<W: SnapshotWriter + Send>(
chain: &BlockChain,
block_at: H256,
state_db: &HashDB,
writer: W,
p: &Progress
) -> Result<(), Error> {
let start_header = try!(chain.block_header(&block_at)
.ok_or(Error::InvalidStartingBlock(BlockID::Hash(block_at))));
let state_root = start_header.state_root();
let number = start_header.number();

info!("Taking snapshot starting at block {}", number);

let writer = Mutex::new(writer);
let (state_hashes, block_hashes) = try!(scope(|scope| {
let block_guard = scope.spawn(|| chunk_blocks(chain, (number, start_block_hash), &writer));
let state_res = chunk_state(state_db, state_root, &writer);
let block_guard = scope.spawn(|| chunk_blocks(chain, (number, block_at), &writer, p));
let state_res = chunk_state(state_db, state_root, &writer, p);

state_res.and_then(|state_hashes| {
block_guard.join().map(|block_hashes| (state_hashes, block_hashes))
Expand All @@ -83,11 +125,13 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(chain: &BlockChain, start_block_h
block_hashes: block_hashes,
state_root: *state_root,
block_number: number,
block_hash: start_block_hash,
block_hash: block_at,
};

try!(writer.into_inner().finish(manifest_data));

p.done.store(true, Ordering::SeqCst);

Ok(())
}

Expand All @@ -100,6 +144,7 @@ struct BlockChunker<'a> {
hashes: Vec<H256>,
snappy_buffer: Vec<u8>,
writer: &'a Mutex<SnapshotWriter + 'a>,
progress: &'a Progress,
}

impl<'a> BlockChunker<'a> {
Expand Down Expand Up @@ -162,7 +207,8 @@ impl<'a> BlockChunker<'a> {

let parent_total_difficulty = parent_details.total_difficulty;

let mut rlp_stream = RlpStream::new_list(3 + self.rlps.len());
let num_entries = self.rlps.len();
let mut rlp_stream = RlpStream::new_list(3 + num_entries);
rlp_stream.append(&parent_number).append(&parent_hash).append(&parent_total_difficulty);

for pair in self.rlps.drain(..) {
Expand All @@ -178,6 +224,9 @@ impl<'a> BlockChunker<'a> {
try!(self.writer.lock().write_block_chunk(hash, compressed));
trace!(target: "snapshot", "wrote block chunk. hash: {}, size: {}, uncompressed size: {}", hash.hex(), size, raw_data.len());

self.progress.size.fetch_add(size, Ordering::SeqCst);
self.progress.blocks.fetch_add(num_entries, Ordering::SeqCst);

self.hashes.push(hash);
Ok(())
}
Expand All @@ -189,16 +238,15 @@ impl<'a> BlockChunker<'a> {
/// The path parameter is the directory to store the block chunks in.
/// This function assumes the directory exists already.
/// Returns a list of chunk hashes, with the first having the blocks furthest from the genesis.
pub fn chunk_blocks<'a>(chain: &'a BlockChain, start_block_info: (u64, H256), writer: &Mutex<SnapshotWriter + 'a>) -> Result<Vec<H256>, Error> {
pub fn chunk_blocks<'a>(chain: &'a BlockChain, start_block_info: (u64, H256), writer: &Mutex<SnapshotWriter + 'a>, progress: &'a Progress) -> Result<Vec<H256>, Error> {
let (start_number, start_hash) = start_block_info;

let first_hash = if start_number < SNAPSHOT_BLOCKS {
// use the genesis hash.
chain.genesis_hash()
} else {
let first_num = start_number - SNAPSHOT_BLOCKS;
chain.block_hash(first_num)
.expect("number before best block number; whole chain is stored; qed")
try!(chain.block_hash(first_num).ok_or(Error::IncompleteChain))
};

let mut chunker = BlockChunker {
Expand All @@ -208,6 +256,7 @@ pub fn chunk_blocks<'a>(chain: &'a BlockChain, start_block_info: (u64, H256), wr
hashes: Vec::new(),
snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)],
writer: writer,
progress: progress,
};

try!(chunker.chunk_all(first_hash));
Expand All @@ -222,6 +271,7 @@ struct StateChunker<'a> {
cur_size: usize,
snappy_buffer: Vec<u8>,
writer: &'a Mutex<SnapshotWriter + 'a>,
progress: &'a Progress,
}

impl<'a> StateChunker<'a> {
Expand Down Expand Up @@ -249,7 +299,8 @@ impl<'a> StateChunker<'a> {
// Write out the buffer to disk, pushing the created chunk's hash to
// the list.
fn write_chunk(&mut self) -> Result<(), Error> {
let mut stream = RlpStream::new_list(self.rlps.len());
let num_entries = self.rlps.len();
let mut stream = RlpStream::new_list(num_entries);
for rlp in self.rlps.drain(..) {
stream.append_raw(&rlp, 1);
}
Expand All @@ -263,6 +314,9 @@ impl<'a> StateChunker<'a> {
try!(self.writer.lock().write_state_chunk(hash, compressed));
trace!(target: "snapshot", "wrote state chunk. size: {}, uncompressed size: {}", compressed_size, raw_data.len());

self.progress.accounts.fetch_add(num_entries, Ordering::SeqCst);
self.progress.size.fetch_add(compressed_size, Ordering::SeqCst);

self.hashes.push(hash);
self.cur_size = 0;

Expand All @@ -275,7 +329,7 @@ impl<'a> StateChunker<'a> {
///
/// Returns a list of hashes of chunks created, or any error it may
/// have encountered.
pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: &Mutex<SnapshotWriter + 'a>) -> Result<Vec<H256>, Error> {
pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: &Mutex<SnapshotWriter + 'a>, progress: &'a Progress) -> Result<Vec<H256>, Error> {
let account_trie = try!(TrieDB::new(db, &root));

let mut chunker = StateChunker {
Expand All @@ -284,10 +338,9 @@ pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: &Mutex<SnapshotWriter +
cur_size: 0,
snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)],
writer: writer,
progress: progress,
};

trace!(target: "snapshot", "beginning state chunking");

// account_key here is the address' hash.
for (account_key, account_data) in account_trie.iter() {
let account = Account::from_thin_rlp(account_data);
Expand Down Expand Up @@ -383,6 +436,7 @@ impl StateRebuilder {
let chunk_size = account_fat_rlps.len() / ::num_cpus::get() + 1;

// build account tries in parallel.
// Todo [rob] keep a thread pool around so we don't do this per-chunk.
try!(scope(|scope| {
let mut handles = Vec::new();
for (account_chunk, out_pairs_chunk) in account_fat_rlps.chunks(chunk_size).zip(pairs.chunks_mut(chunk_size)) {
Expand Down
4 changes: 2 additions & 2 deletions ethcore/src/snapshot/tests/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use devtools::RandomTempPath;

use blockchain::generator::{ChainGenerator, ChainIterator, BlockFinalizer};
use blockchain::BlockChain;
use snapshot::{chunk_blocks, BlockRebuilder};
use snapshot::{chunk_blocks, BlockRebuilder, Progress};
use snapshot::io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter};

use util::{Mutex, snappy};
Expand Down Expand Up @@ -55,7 +55,7 @@ fn chunk_and_restore(amount: u64) {

// snapshot it.
let writer = Mutex::new(PackedWriter::new(&snapshot_path).unwrap());
let block_hashes = chunk_blocks(&bc, (amount, best_hash), &writer).unwrap();
let block_hashes = chunk_blocks(&bc, (amount, best_hash), &writer, &Progress::new()).unwrap();
writer.into_inner().finish(::snapshot::ManifestData {
state_hashes: Vec::new(),
block_hashes: block_hashes,
Expand Down
4 changes: 2 additions & 2 deletions ethcore/src/snapshot/tests/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

//! State snapshotting tests.

use snapshot::{chunk_state, StateRebuilder};
use snapshot::{chunk_state, Progress, StateRebuilder};
use snapshot::io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter};
use super::helpers::{compare_dbs, StateProducer};

Expand Down Expand Up @@ -48,7 +48,7 @@ fn snap_and_restore() {
let state_root = producer.state_root();
let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap());

let state_hashes = chunk_state(&old_db, &state_root, &writer).unwrap();
let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::new()).unwrap();

writer.into_inner().finish(::snapshot::ManifestData {
state_hashes: state_hashes,
Expand Down
7 changes: 7 additions & 0 deletions parity/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,12 @@ Import/Export Options:
--format FORMAT For import/export in given format. FORMAT must be
one of 'hex' and 'binary'.

Snapshot Options:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is latest the latest - 1000?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

--at BLOCK Take a snapshot at the given block, which may be an
index, hash, or 'latest'. Note that taking snapshots at
non-recent blocks will only work with --pruning archive
[default: latest]

Virtual Machine Options:
--jitvm Enable the JIT VM.

Expand Down Expand Up @@ -365,6 +371,7 @@ pub struct Args {
pub flag_version: bool,
pub flag_from: String,
pub flag_to: String,
pub flag_at: String,
pub flag_format: Option<String>,
pub flag_jitvm: bool,
pub flag_log_file: Option<String>,
Expand Down
2 changes: 2 additions & 0 deletions parity/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl Configuration {
file_path: self.args.arg_file.clone(),
wal: wal,
kind: snapshot::Kind::Take,
block_at: try!(to_block_id(&self.args.flag_at)),
};
Cmd::Snapshot(snapshot_cmd)
} else if self.args.cmd_restore {
Expand All @@ -186,6 +187,7 @@ impl Configuration {
file_path: self.args.arg_file.clone(),
wal: wal,
kind: snapshot::Kind::Restore,
block_at: try!(to_block_id("latest")), // unimportant.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't even be here, really...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can refactor the common fields out into a struct of its own, but then you end up with ~5 extra lines of boilerplate as opposed to one.

};
Cmd::Snapshot(restore_cmd)
} else {
Expand Down
Loading