diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index ca23fb5a1b3..0b51dacd3a3 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -594,19 +594,35 @@ impl Client { } } - /// Take a snapshot. - pub fn take_snapshot(&self, writer: W) -> Result<(), ::error::Error> { + /// Take a snapshot at the given block. + /// If the ID given is "latest", this will default to 1000 blocks behind. + pub fn take_snapshot(&self, writer: W, at: BlockID, p: &snapshot::Progress) -> Result<(), ::error::Error> { let db = self.state_db.lock().boxed_clone(); let best_block_number = self.chain_info().best_block_number; - let start_block_number = if best_block_number > 1000 { - best_block_number - 1000 - } else { - 0 + let block_number = try!(self.block_number(at).ok_or(snapshot::Error::InvalidStartingBlock(at))); + + if best_block_number > HISTORY + block_number && db.is_pruned() { + return Err(snapshot::Error::OldBlockPrunedDB.into()); + } + + let start_hash = match at { + BlockID::Latest => { + let start_num = if best_block_number > 1000 { + best_block_number - 1000 + } else { + 0 + }; + + self.block_hash(BlockID::Number(start_num)) + .expect("blocks within HISTORY are always stored.") + } + _ => match self.block_hash(at) { + Some(hash) => hash, + None => return Err(snapshot::Error::InvalidStartingBlock(at).into()), + }, }; - let start_hash = self.block_hash(BlockID::Number(start_block_number)) - .expect("blocks within HISTORY are always stored."); - try!(snapshot::take_snapshot(&self.chain, start_hash, db.as_hashdb(), writer)); + try!(snapshot::take_snapshot(&self.chain, start_hash, db.as_hashdb(), writer, p)); Ok(()) } diff --git a/ethcore/src/snapshot/error.rs b/ethcore/src/snapshot/error.rs index 98f906ec541..be4ed39d2d0 100644 --- a/ethcore/src/snapshot/error.rs +++ b/ethcore/src/snapshot/error.rs @@ -18,6 +18,8 @@ use std::fmt; +use ids::BlockID; + use util::H256; use util::trie::TrieError; use util::rlp::DecoderError; @@ -26,9 +28,13 @@ use util::rlp::DecoderError; #[derive(Debug)] pub enum Error { /// Invalid starting block for snapshot. - InvalidStartingBlock(H256), + InvalidStartingBlock(BlockID), /// Block not found. BlockNotFound(H256), + /// Incomplete chain. + IncompleteChain, + /// Old starting block in a pruned database. + OldBlockPrunedDB, /// Trie error. Trie(TrieError), /// Decoder error. @@ -40,8 +46,11 @@ pub enum Error { impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { - Error::InvalidStartingBlock(ref hash) => write!(f, "Invalid starting block hash: {}", hash), + Error::InvalidStartingBlock(ref id) => write!(f, "Invalid starting block: {:?}", id), Error::BlockNotFound(ref hash) => write!(f, "Block not found in chain: {}", hash), + Error::IncompleteChain => write!(f, "Cannot create snapshot due to incomplete chain."), + Error::OldBlockPrunedDB => write!(f, "Attempted to create a snapshot at an old block while using \ + a pruned database. Please re-run with the --pruning archive flag."), Error::Io(ref err) => err.fmt(f), Error::Decoder(ref err) => err.fmt(f), Error::Trie(ref err) => err.fmt(f), diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index 5784ed93667..9dbbf1d9a46 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -18,10 +18,12 @@ use std::collections::VecDeque; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use account_db::{AccountDB, AccountDBMut}; use blockchain::{BlockChain, BlockProvider}; use engines::Engine; +use ids::BlockID; use views::BlockView; use util::{Bytes, Hashable, HashDB, snappy, TrieDB, TrieDBMut, TrieMut}; @@ -58,9 +60,49 @@ const PREFERRED_CHUNK_SIZE: usize = 4 * 1024 * 1024; // How many blocks to include in a snapshot, starting from the head of the chain. const SNAPSHOT_BLOCKS: u64 = 30000; +/// A progress indicator for snapshots. +#[derive(Debug)] +pub struct Progress { + accounts: AtomicUsize, + blocks: AtomicUsize, + size: AtomicUsize, // Todo [rob] use Atomicu64 when it stabilizes. + done: AtomicBool, +} + +impl Progress { + /// Create a new progress indicator. + pub fn new() -> Self { + Progress { + accounts: AtomicUsize::new(0), + blocks: AtomicUsize::new(0), + size: AtomicUsize::new(0), + done: AtomicBool::new(false), + } + } + + /// Get the number of accounts snapshotted thus far. + pub fn accounts(&self) -> usize { self.accounts.load(Ordering::Relaxed) } + + /// Get the number of blocks snapshotted thus far. + pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Relaxed) } + + /// Get the written size of the snapshot in bytes. + pub fn size(&self) -> usize { self.size.load(Ordering::Relaxed) } + + /// Whether the snapshot is complete. + pub fn done(&self) -> bool { self.done.load(Ordering::SeqCst) } + +} /// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer. -pub fn take_snapshot(chain: &BlockChain, start_block_hash: H256, state_db: &HashDB, writer: W) -> Result<(), Error> { - let start_header = try!(chain.block_header(&start_block_hash).ok_or(Error::InvalidStartingBlock(start_block_hash))); +pub fn take_snapshot( + chain: &BlockChain, + block_at: H256, + state_db: &HashDB, + writer: W, + p: &Progress +) -> Result<(), Error> { + let start_header = try!(chain.block_header(&block_at) + .ok_or(Error::InvalidStartingBlock(BlockID::Hash(block_at)))); let state_root = start_header.state_root(); let number = start_header.number(); @@ -68,8 +110,8 @@ pub fn take_snapshot(chain: &BlockChain, start_block_h let writer = Mutex::new(writer); let (state_hashes, block_hashes) = try!(scope(|scope| { - let block_guard = scope.spawn(|| chunk_blocks(chain, (number, start_block_hash), &writer)); - let state_res = chunk_state(state_db, state_root, &writer); + let block_guard = scope.spawn(|| chunk_blocks(chain, (number, block_at), &writer, p)); + let state_res = chunk_state(state_db, state_root, &writer, p); state_res.and_then(|state_hashes| { block_guard.join().map(|block_hashes| (state_hashes, block_hashes)) @@ -83,11 +125,13 @@ pub fn take_snapshot(chain: &BlockChain, start_block_h block_hashes: block_hashes, state_root: *state_root, block_number: number, - block_hash: start_block_hash, + block_hash: block_at, }; try!(writer.into_inner().finish(manifest_data)); + p.done.store(true, Ordering::SeqCst); + Ok(()) } @@ -100,6 +144,7 @@ struct BlockChunker<'a> { hashes: Vec, snappy_buffer: Vec, writer: &'a Mutex, + progress: &'a Progress, } impl<'a> BlockChunker<'a> { @@ -162,7 +207,8 @@ impl<'a> BlockChunker<'a> { let parent_total_difficulty = parent_details.total_difficulty; - let mut rlp_stream = RlpStream::new_list(3 + self.rlps.len()); + let num_entries = self.rlps.len(); + let mut rlp_stream = RlpStream::new_list(3 + num_entries); rlp_stream.append(&parent_number).append(&parent_hash).append(&parent_total_difficulty); for pair in self.rlps.drain(..) { @@ -178,6 +224,9 @@ impl<'a> BlockChunker<'a> { try!(self.writer.lock().write_block_chunk(hash, compressed)); trace!(target: "snapshot", "wrote block chunk. hash: {}, size: {}, uncompressed size: {}", hash.hex(), size, raw_data.len()); + self.progress.size.fetch_add(size, Ordering::SeqCst); + self.progress.blocks.fetch_add(num_entries, Ordering::SeqCst); + self.hashes.push(hash); Ok(()) } @@ -189,7 +238,7 @@ impl<'a> BlockChunker<'a> { /// The path parameter is the directory to store the block chunks in. /// This function assumes the directory exists already. /// Returns a list of chunk hashes, with the first having the blocks furthest from the genesis. -pub fn chunk_blocks<'a>(chain: &'a BlockChain, start_block_info: (u64, H256), writer: &Mutex) -> Result, Error> { +pub fn chunk_blocks<'a>(chain: &'a BlockChain, start_block_info: (u64, H256), writer: &Mutex, progress: &'a Progress) -> Result, Error> { let (start_number, start_hash) = start_block_info; let first_hash = if start_number < SNAPSHOT_BLOCKS { @@ -197,8 +246,7 @@ pub fn chunk_blocks<'a>(chain: &'a BlockChain, start_block_info: (u64, H256), wr chain.genesis_hash() } else { let first_num = start_number - SNAPSHOT_BLOCKS; - chain.block_hash(first_num) - .expect("number before best block number; whole chain is stored; qed") + try!(chain.block_hash(first_num).ok_or(Error::IncompleteChain)) }; let mut chunker = BlockChunker { @@ -208,6 +256,7 @@ pub fn chunk_blocks<'a>(chain: &'a BlockChain, start_block_info: (u64, H256), wr hashes: Vec::new(), snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)], writer: writer, + progress: progress, }; try!(chunker.chunk_all(first_hash)); @@ -222,6 +271,7 @@ struct StateChunker<'a> { cur_size: usize, snappy_buffer: Vec, writer: &'a Mutex, + progress: &'a Progress, } impl<'a> StateChunker<'a> { @@ -249,7 +299,8 @@ impl<'a> StateChunker<'a> { // Write out the buffer to disk, pushing the created chunk's hash to // the list. fn write_chunk(&mut self) -> Result<(), Error> { - let mut stream = RlpStream::new_list(self.rlps.len()); + let num_entries = self.rlps.len(); + let mut stream = RlpStream::new_list(num_entries); for rlp in self.rlps.drain(..) { stream.append_raw(&rlp, 1); } @@ -263,6 +314,9 @@ impl<'a> StateChunker<'a> { try!(self.writer.lock().write_state_chunk(hash, compressed)); trace!(target: "snapshot", "wrote state chunk. size: {}, uncompressed size: {}", compressed_size, raw_data.len()); + self.progress.accounts.fetch_add(num_entries, Ordering::SeqCst); + self.progress.size.fetch_add(compressed_size, Ordering::SeqCst); + self.hashes.push(hash); self.cur_size = 0; @@ -275,7 +329,7 @@ impl<'a> StateChunker<'a> { /// /// Returns a list of hashes of chunks created, or any error it may /// have encountered. -pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: &Mutex) -> Result, Error> { +pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: &Mutex, progress: &'a Progress) -> Result, Error> { let account_trie = try!(TrieDB::new(db, &root)); let mut chunker = StateChunker { @@ -284,10 +338,9 @@ pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: &Mutex, pub flag_jitvm: bool, pub flag_log_file: Option, diff --git a/parity/configuration.rs b/parity/configuration.rs index 18d54f91c2e..17a0bef29f9 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -171,6 +171,7 @@ impl Configuration { file_path: self.args.arg_file.clone(), wal: wal, kind: snapshot::Kind::Take, + block_at: try!(to_block_id(&self.args.flag_at)), }; Cmd::Snapshot(snapshot_cmd) } else if self.args.cmd_restore { @@ -186,6 +187,7 @@ impl Configuration { file_path: self.args.arg_file.clone(), wal: wal, kind: snapshot::Kind::Restore, + block_at: try!(to_block_id("latest")), // unimportant. }; Cmd::Snapshot(restore_cmd) } else { diff --git a/parity/snapshot.rs b/parity/snapshot.rs index 790e001e600..70abf8fc1d2 100644 --- a/parity/snapshot.rs +++ b/parity/snapshot.rs @@ -19,12 +19,15 @@ use std::time::Duration; use std::path::{Path, PathBuf}; use std::sync::Arc; + use ethcore_logger::{setup_log, Config as LogConfig}; -use ethcore::snapshot::{RestorationStatus, SnapshotService}; +use ethcore::snapshot::{Progress, RestorationStatus, SnapshotService}; use ethcore::snapshot::io::{SnapshotReader, PackedReader, PackedWriter}; use ethcore::service::ClientService; use ethcore::client::{Mode, DatabaseCompactionProfile, Switch, VMType}; use ethcore::miner::Miner; +use ethcore::ids::BlockID; + use cache::CacheConfig; use params::{SpecType, Pruning}; use helpers::{to_client_config, execute_upgrades}; @@ -56,6 +59,7 @@ pub struct SnapshotCommand { pub file_path: Option, pub wal: bool, pub kind: Kind, + pub block_at: BlockID, } impl SnapshotCommand { @@ -168,6 +172,7 @@ impl SnapshotCommand { pub fn take_snapshot(self) -> Result<(), String> { let file_path = try!(self.file_path.clone().ok_or("No file path provided.".to_owned())); let file_path: PathBuf = file_path.into(); + let block_at = self.block_at.clone(); let (service, _panic_handler) = try!(self.start_service()); warn!("Snapshots are currently experimental. File formats may be subject to change."); @@ -175,11 +180,35 @@ impl SnapshotCommand { let writer = try!(PackedWriter::new(&file_path) .map_err(|e| format!("Failed to open snapshot writer: {}", e))); - if let Err(e) = service.client().take_snapshot(writer) { + let progress = Arc::new(Progress::new()); + let p = progress.clone(); + let informant_handle = ::std::thread::spawn(move || { + ::std::thread::sleep(Duration::from_secs(5)); + + let mut last_size = 0; + while !p.done() { + let cur_size = p.size(); + if cur_size != last_size { + last_size = cur_size; + info!("Snapshot: {} accounts {} blocks {} bytes", p.accounts(), p.blocks(), p.size()); + } else { + info!("Snapshot: No progress since last update."); + } + + ::std::thread::sleep(Duration::from_secs(5)); + } + }); + + if let Err(e) = service.client().take_snapshot(writer, block_at, &*progress) { let _ = ::std::fs::remove_file(&file_path); return Err(format!("Encountered fatal error while creating snapshot: {}", e)); } + info!("snapshot creation complete"); + + assert!(progress.done()); + try!(informant_handle.join().map_err(|_| "failed to join logger thread")); + Ok(()) } }