From da93360c9a59c29409061789c598d8f4e55d7856 Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Wed, 14 Aug 2024 07:32:11 +0800 Subject: [PATCH] Capture the state sync data size in compressed/uncompressed form --- Cargo.lock | 1 + substrate/client/network/sync/Cargo.toml | 1 + substrate/client/network/sync/src/engine.rs | 22 ++++++++++++++++++--- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 60be41563299..75e43d4d5d47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17919,6 +17919,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "zstd 0.12.4", ] [[package]] diff --git a/substrate/client/network/sync/Cargo.toml b/substrate/client/network/sync/Cargo.toml index b29a9ccaaf1a..f28fec37c036 100644 --- a/substrate/client/network/sync/Cargo.toml +++ b/substrate/client/network/sync/Cargo.toml @@ -48,6 +48,7 @@ sp-consensus = { workspace = true, default-features = true } sp-core = { workspace = true, default-features = true } sp-consensus-grandpa = { workspace = true, default-features = true } sp-runtime = { workspace = true, default-features = true } +zstd = { workspace = true } [dev-dependencies] mockall = { workspace = true } diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index ee7576c22f16..d29213fd8e8d 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -287,6 +287,12 @@ pub struct SyncingEngine { /// Handle to import queue. import_queue: Box>, + + /// Total bytes of the state sync, if compressed. + total_bytes_compressed: usize, + + /// Total bytes of the state sync, uncompressed (the status-quo). + total_bytes_uncompressed: usize, } impl SyncingEngine @@ -487,6 +493,8 @@ where state_request_protocol_name, warp_sync_protocol_name, import_queue, + total_bytes_compressed: 0, + total_bytes_uncompressed: 0, }, SyncingService::new(tx, num_connected, is_major_syncing), block_announce_config, @@ -1207,11 +1215,14 @@ where Ok(request.encode_to_vec()) } - fn decode_state_response(response: &[u8]) -> Result { + fn decode_state_response(response: &[u8]) -> Result<(usize, usize, OpaqueStateResponse), String> { + let compressed_data = zstd::stream::encode_all(response, 0).expect("Failed to compress state response"); + let compressed_size = compressed_data.len(); + let uncompressed_size = response.len(); let response = StateResponse::decode(response) .map_err(|error| format!("Failed to decode state response: {error}"))?; - Ok(OpaqueStateResponse(Box::new(response))) + Ok((compressed_size, uncompressed_size, OpaqueStateResponse(Box::new(response)))) } fn process_response_event(&mut self, response_event: ResponseEvent) { @@ -1251,7 +1262,7 @@ where } }, PeerRequest::State => { - let response = match Self::decode_state_response(&resp[..]) { + let (compressed_size, uncompressed_size, response) = match Self::decode_state_response(&resp[..]) { Ok(proto) => proto, Err(e) => { debug!( @@ -1267,6 +1278,11 @@ where }, }; + self.total_bytes_compressed += compressed_size; + self.total_bytes_uncompressed += uncompressed_size; + + log::info!("==== total_bytes_compressed: {}, total_bytes_uncompressed: {}", self.total_bytes_compressed, self.total_bytes_uncompressed); + self.strategy.on_state_response(peer_id, key, response); }, PeerRequest::WarpProof => {