Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
take snapshot at specified block and slightly better informants (#1873)
Browse files Browse the repository at this point in the history
* prettier informant for snapshot creation

* allow taking snapshot at a given block

* minor tweaks

* elaborate on cli
  • Loading branch information
rphmeier authored and arkpar committed Aug 9, 2016
1 parent c33cb7f commit b636e5e
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 30 deletions.
34 changes: 25 additions & 9 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,19 +594,35 @@ impl Client {
}
}

/// Take a snapshot.
pub fn take_snapshot<W: snapshot_io::SnapshotWriter + Send>(&self, writer: W) -> Result<(), ::error::Error> {
/// Take a snapshot at the given block.
/// If the ID given is "latest", this will default to 1000 blocks behind.
pub fn take_snapshot<W: snapshot_io::SnapshotWriter + Send>(&self, writer: W, at: BlockID, p: &snapshot::Progress) -> Result<(), ::error::Error> {
let db = self.state_db.lock().boxed_clone();
let best_block_number = self.chain_info().best_block_number;
let start_block_number = if best_block_number > 1000 {
best_block_number - 1000
} else {
0
let block_number = try!(self.block_number(at).ok_or(snapshot::Error::InvalidStartingBlock(at)));

if best_block_number > HISTORY + block_number && db.is_pruned() {
return Err(snapshot::Error::OldBlockPrunedDB.into());
}

let start_hash = match at {
BlockID::Latest => {
let start_num = if best_block_number > 1000 {
best_block_number - 1000
} else {
0
};

self.block_hash(BlockID::Number(start_num))
.expect("blocks within HISTORY are always stored.")
}
_ => match self.block_hash(at) {
Some(hash) => hash,
None => return Err(snapshot::Error::InvalidStartingBlock(at).into()),
},
};
let start_hash = self.block_hash(BlockID::Number(start_block_number))
.expect("blocks within HISTORY are always stored.");

try!(snapshot::take_snapshot(&self.chain, start_hash, db.as_hashdb(), writer));
try!(snapshot::take_snapshot(&self.chain, start_hash, db.as_hashdb(), writer, p));

Ok(())
}
Expand Down
13 changes: 11 additions & 2 deletions ethcore/src/snapshot/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
use std::fmt;

use ids::BlockID;

use util::H256;
use util::trie::TrieError;
use util::rlp::DecoderError;
Expand All @@ -26,9 +28,13 @@ use util::rlp::DecoderError;
#[derive(Debug)]
pub enum Error {
/// Invalid starting block for snapshot.
InvalidStartingBlock(H256),
InvalidStartingBlock(BlockID),
/// Block not found.
BlockNotFound(H256),
/// Incomplete chain.
IncompleteChain,
/// Old starting block in a pruned database.
OldBlockPrunedDB,
/// Trie error.
Trie(TrieError),
/// Decoder error.
Expand All @@ -40,8 +46,11 @@ pub enum Error {
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::InvalidStartingBlock(ref hash) => write!(f, "Invalid starting block hash: {}", hash),
Error::InvalidStartingBlock(ref id) => write!(f, "Invalid starting block: {:?}", id),
Error::BlockNotFound(ref hash) => write!(f, "Block not found in chain: {}", hash),
Error::IncompleteChain => write!(f, "Cannot create snapshot due to incomplete chain."),
Error::OldBlockPrunedDB => write!(f, "Attempted to create a snapshot at an old block while using \
a pruned database. Please re-run with the --pruning archive flag."),
Error::Io(ref err) => err.fmt(f),
Error::Decoder(ref err) => err.fmt(f),
Error::Trie(ref err) => err.fmt(f),
Expand Down
80 changes: 67 additions & 13 deletions ethcore/src/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};

use account_db::{AccountDB, AccountDBMut};
use blockchain::{BlockChain, BlockProvider};
use engines::Engine;
use ids::BlockID;
use views::BlockView;

use util::{Bytes, Hashable, HashDB, snappy, TrieDB, TrieDBMut, TrieMut};
Expand Down Expand Up @@ -58,18 +60,58 @@ const PREFERRED_CHUNK_SIZE: usize = 4 * 1024 * 1024;
// How many blocks to include in a snapshot, starting from the head of the chain.
const SNAPSHOT_BLOCKS: u64 = 30000;

/// A progress indicator for snapshots.
#[derive(Debug)]
pub struct Progress {
accounts: AtomicUsize,
blocks: AtomicUsize,
size: AtomicUsize, // Todo [rob] use Atomicu64 when it stabilizes.
done: AtomicBool,
}

impl Progress {
/// Create a new progress indicator.
pub fn new() -> Self {
Progress {
accounts: AtomicUsize::new(0),
blocks: AtomicUsize::new(0),
size: AtomicUsize::new(0),
done: AtomicBool::new(false),
}
}

/// Get the number of accounts snapshotted thus far.
pub fn accounts(&self) -> usize { self.accounts.load(Ordering::Relaxed) }

/// Get the number of blocks snapshotted thus far.
pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Relaxed) }

/// Get the written size of the snapshot in bytes.
pub fn size(&self) -> usize { self.size.load(Ordering::Relaxed) }

/// Whether the snapshot is complete.
pub fn done(&self) -> bool { self.done.load(Ordering::SeqCst) }

}
/// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer.
pub fn take_snapshot<W: SnapshotWriter + Send>(chain: &BlockChain, start_block_hash: H256, state_db: &HashDB, writer: W) -> Result<(), Error> {
let start_header = try!(chain.block_header(&start_block_hash).ok_or(Error::InvalidStartingBlock(start_block_hash)));
pub fn take_snapshot<W: SnapshotWriter + Send>(
chain: &BlockChain,
block_at: H256,
state_db: &HashDB,
writer: W,
p: &Progress
) -> Result<(), Error> {
let start_header = try!(chain.block_header(&block_at)
.ok_or(Error::InvalidStartingBlock(BlockID::Hash(block_at))));
let state_root = start_header.state_root();
let number = start_header.number();

info!("Taking snapshot starting at block {}", number);

let writer = Mutex::new(writer);
let (state_hashes, block_hashes) = try!(scope(|scope| {
let block_guard = scope.spawn(|| chunk_blocks(chain, (number, start_block_hash), &writer));
let state_res = chunk_state(state_db, state_root, &writer);
let block_guard = scope.spawn(|| chunk_blocks(chain, (number, block_at), &writer, p));
let state_res = chunk_state(state_db, state_root, &writer, p);

state_res.and_then(|state_hashes| {
block_guard.join().map(|block_hashes| (state_hashes, block_hashes))
Expand All @@ -83,11 +125,13 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(chain: &BlockChain, start_block_h
block_hashes: block_hashes,
state_root: *state_root,
block_number: number,
block_hash: start_block_hash,
block_hash: block_at,
};

try!(writer.into_inner().finish(manifest_data));

p.done.store(true, Ordering::SeqCst);

Ok(())
}

Expand All @@ -100,6 +144,7 @@ struct BlockChunker<'a> {
hashes: Vec<H256>,
snappy_buffer: Vec<u8>,
writer: &'a Mutex<SnapshotWriter + 'a>,
progress: &'a Progress,
}

impl<'a> BlockChunker<'a> {
Expand Down Expand Up @@ -162,7 +207,8 @@ impl<'a> BlockChunker<'a> {

let parent_total_difficulty = parent_details.total_difficulty;

let mut rlp_stream = RlpStream::new_list(3 + self.rlps.len());
let num_entries = self.rlps.len();
let mut rlp_stream = RlpStream::new_list(3 + num_entries);
rlp_stream.append(&parent_number).append(&parent_hash).append(&parent_total_difficulty);

for pair in self.rlps.drain(..) {
Expand All @@ -178,6 +224,9 @@ impl<'a> BlockChunker<'a> {
try!(self.writer.lock().write_block_chunk(hash, compressed));
trace!(target: "snapshot", "wrote block chunk. hash: {}, size: {}, uncompressed size: {}", hash.hex(), size, raw_data.len());

self.progress.size.fetch_add(size, Ordering::SeqCst);
self.progress.blocks.fetch_add(num_entries, Ordering::SeqCst);

self.hashes.push(hash);
Ok(())
}
Expand All @@ -189,16 +238,15 @@ impl<'a> BlockChunker<'a> {
/// The path parameter is the directory to store the block chunks in.
/// This function assumes the directory exists already.
/// Returns a list of chunk hashes, with the first having the blocks furthest from the genesis.
pub fn chunk_blocks<'a>(chain: &'a BlockChain, start_block_info: (u64, H256), writer: &Mutex<SnapshotWriter + 'a>) -> Result<Vec<H256>, Error> {
pub fn chunk_blocks<'a>(chain: &'a BlockChain, start_block_info: (u64, H256), writer: &Mutex<SnapshotWriter + 'a>, progress: &'a Progress) -> Result<Vec<H256>, Error> {
let (start_number, start_hash) = start_block_info;

let first_hash = if start_number < SNAPSHOT_BLOCKS {
// use the genesis hash.
chain.genesis_hash()
} else {
let first_num = start_number - SNAPSHOT_BLOCKS;
chain.block_hash(first_num)
.expect("number before best block number; whole chain is stored; qed")
try!(chain.block_hash(first_num).ok_or(Error::IncompleteChain))
};

let mut chunker = BlockChunker {
Expand All @@ -208,6 +256,7 @@ pub fn chunk_blocks<'a>(chain: &'a BlockChain, start_block_info: (u64, H256), wr
hashes: Vec::new(),
snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)],
writer: writer,
progress: progress,
};

try!(chunker.chunk_all(first_hash));
Expand All @@ -222,6 +271,7 @@ struct StateChunker<'a> {
cur_size: usize,
snappy_buffer: Vec<u8>,
writer: &'a Mutex<SnapshotWriter + 'a>,
progress: &'a Progress,
}

impl<'a> StateChunker<'a> {
Expand Down Expand Up @@ -249,7 +299,8 @@ impl<'a> StateChunker<'a> {
// Write out the buffer to disk, pushing the created chunk's hash to
// the list.
fn write_chunk(&mut self) -> Result<(), Error> {
let mut stream = RlpStream::new_list(self.rlps.len());
let num_entries = self.rlps.len();
let mut stream = RlpStream::new_list(num_entries);
for rlp in self.rlps.drain(..) {
stream.append_raw(&rlp, 1);
}
Expand All @@ -263,6 +314,9 @@ impl<'a> StateChunker<'a> {
try!(self.writer.lock().write_state_chunk(hash, compressed));
trace!(target: "snapshot", "wrote state chunk. size: {}, uncompressed size: {}", compressed_size, raw_data.len());

self.progress.accounts.fetch_add(num_entries, Ordering::SeqCst);
self.progress.size.fetch_add(compressed_size, Ordering::SeqCst);

self.hashes.push(hash);
self.cur_size = 0;

Expand All @@ -275,7 +329,7 @@ impl<'a> StateChunker<'a> {
///
/// Returns a list of hashes of chunks created, or any error it may
/// have encountered.
pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: &Mutex<SnapshotWriter + 'a>) -> Result<Vec<H256>, Error> {
pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: &Mutex<SnapshotWriter + 'a>, progress: &'a Progress) -> Result<Vec<H256>, Error> {
let account_trie = try!(TrieDB::new(db, &root));

let mut chunker = StateChunker {
Expand All @@ -284,10 +338,9 @@ pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: &Mutex<SnapshotWriter +
cur_size: 0,
snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)],
writer: writer,
progress: progress,
};

trace!(target: "snapshot", "beginning state chunking");

// account_key here is the address' hash.
for (account_key, account_data) in account_trie.iter() {
let account = Account::from_thin_rlp(account_data);
Expand Down Expand Up @@ -383,6 +436,7 @@ impl StateRebuilder {
let chunk_size = account_fat_rlps.len() / ::num_cpus::get() + 1;

// build account tries in parallel.
// Todo [rob] keep a thread pool around so we don't do this per-chunk.
try!(scope(|scope| {
let mut handles = Vec::new();
for (account_chunk, out_pairs_chunk) in account_fat_rlps.chunks(chunk_size).zip(pairs.chunks_mut(chunk_size)) {
Expand Down
4 changes: 2 additions & 2 deletions ethcore/src/snapshot/tests/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use devtools::RandomTempPath;

use blockchain::generator::{ChainGenerator, ChainIterator, BlockFinalizer};
use blockchain::BlockChain;
use snapshot::{chunk_blocks, BlockRebuilder};
use snapshot::{chunk_blocks, BlockRebuilder, Progress};
use snapshot::io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter};

use util::{Mutex, snappy};
Expand Down Expand Up @@ -55,7 +55,7 @@ fn chunk_and_restore(amount: u64) {

// snapshot it.
let writer = Mutex::new(PackedWriter::new(&snapshot_path).unwrap());
let block_hashes = chunk_blocks(&bc, (amount, best_hash), &writer).unwrap();
let block_hashes = chunk_blocks(&bc, (amount, best_hash), &writer, &Progress::new()).unwrap();
writer.into_inner().finish(::snapshot::ManifestData {
state_hashes: Vec::new(),
block_hashes: block_hashes,
Expand Down
4 changes: 2 additions & 2 deletions ethcore/src/snapshot/tests/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

//! State snapshotting tests.
use snapshot::{chunk_state, StateRebuilder};
use snapshot::{chunk_state, Progress, StateRebuilder};
use snapshot::io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter};
use super::helpers::{compare_dbs, StateProducer};

Expand Down Expand Up @@ -48,7 +48,7 @@ fn snap_and_restore() {
let state_root = producer.state_root();
let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap());

let state_hashes = chunk_state(&old_db, &state_root, &writer).unwrap();
let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::new()).unwrap();

writer.into_inner().finish(::snapshot::ManifestData {
state_hashes: state_hashes,
Expand Down
7 changes: 7 additions & 0 deletions parity/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,12 @@ Import/Export Options:
--format FORMAT For import/export in given format. FORMAT must be
one of 'hex' and 'binary'.
Snapshot Options:
--at BLOCK Take a snapshot at the given block, which may be an
index, hash, or 'latest'. Note that taking snapshots at
non-recent blocks will only work with --pruning archive
[default: latest]
Virtual Machine Options:
--jitvm Enable the JIT VM.
Expand Down Expand Up @@ -365,6 +371,7 @@ pub struct Args {
pub flag_version: bool,
pub flag_from: String,
pub flag_to: String,
pub flag_at: String,
pub flag_format: Option<String>,
pub flag_jitvm: bool,
pub flag_log_file: Option<String>,
Expand Down
2 changes: 2 additions & 0 deletions parity/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl Configuration {
file_path: self.args.arg_file.clone(),
wal: wal,
kind: snapshot::Kind::Take,
block_at: try!(to_block_id(&self.args.flag_at)),
};
Cmd::Snapshot(snapshot_cmd)
} else if self.args.cmd_restore {
Expand All @@ -186,6 +187,7 @@ impl Configuration {
file_path: self.args.arg_file.clone(),
wal: wal,
kind: snapshot::Kind::Restore,
block_at: try!(to_block_id("latest")), // unimportant.
};
Cmd::Snapshot(restore_cmd)
} else {
Expand Down
Loading

0 comments on commit b636e5e

Please sign in to comment.