Skip to content

Commit

Permalink
Capture the state sync data size in compressed/uncompressed form
Browse files Browse the repository at this point in the history
  • Loading branch information
liuchengxu committed Aug 13, 2024
1 parent b78d795 commit da93360
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions substrate/client/network/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ sp-consensus = { workspace = true, default-features = true }
sp-core = { workspace = true, default-features = true }
sp-consensus-grandpa = { workspace = true, default-features = true }
sp-runtime = { workspace = true, default-features = true }
zstd = { workspace = true }

[dev-dependencies]
mockall = { workspace = true }
Expand Down
22 changes: 19 additions & 3 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,12 @@ pub struct SyncingEngine<B: BlockT, Client> {

/// Handle to import queue.
import_queue: Box<dyn ImportQueueService<B>>,

/// Total bytes of the state sync, if compressed.
total_bytes_compressed: usize,

/// Total bytes of the state sync, uncompressed (the status-quo).
total_bytes_uncompressed: usize,
}

impl<B: BlockT, Client> SyncingEngine<B, Client>
Expand Down Expand Up @@ -487,6 +493,8 @@ where
state_request_protocol_name,
warp_sync_protocol_name,
import_queue,
total_bytes_compressed: 0,
total_bytes_uncompressed: 0,
},
SyncingService::new(tx, num_connected, is_major_syncing),
block_announce_config,
Expand Down Expand Up @@ -1207,11 +1215,14 @@ where
Ok(request.encode_to_vec())
}

fn decode_state_response(response: &[u8]) -> Result<OpaqueStateResponse, String> {
fn decode_state_response(response: &[u8]) -> Result<(usize, usize, OpaqueStateResponse), String> {
let compressed_data = zstd::stream::encode_all(response, 0).expect("Failed to compress state response");
let compressed_size = compressed_data.len();
let uncompressed_size = response.len();
let response = StateResponse::decode(response)
.map_err(|error| format!("Failed to decode state response: {error}"))?;

Ok(OpaqueStateResponse(Box::new(response)))
Ok((compressed_size, uncompressed_size, OpaqueStateResponse(Box::new(response))))
}

fn process_response_event(&mut self, response_event: ResponseEvent<B>) {
Expand Down Expand Up @@ -1251,7 +1262,7 @@ where
}
},
PeerRequest::State => {
let response = match Self::decode_state_response(&resp[..]) {
let (compressed_size, uncompressed_size, response) = match Self::decode_state_response(&resp[..]) {
Ok(proto) => proto,
Err(e) => {
debug!(
Expand All @@ -1267,6 +1278,11 @@ where
},
};

self.total_bytes_compressed += compressed_size;
self.total_bytes_uncompressed += uncompressed_size;

log::info!("==== total_bytes_compressed: {}, total_bytes_uncompressed: {}", self.total_bytes_compressed, self.total_bytes_uncompressed);

self.strategy.on_state_response(peer_id, key, response);
},
PeerRequest::WarpProof => {
Expand Down

0 comments on commit da93360

Please sign in to comment.