Skip to content

Commit

Permalink
Benchmark all stream components together and add log messages
Browse files Browse the repository at this point in the history
  • Loading branch information
iamalwaysuncomfortable committed Apr 26, 2022
1 parent d5ab9cc commit 84525e1
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 79 deletions.
4 changes: 3 additions & 1 deletion ledger/db/src/test_utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
// Copyright (c) 2018-2022 The MobileCoin Foundation

pub mod mock_ledger;
pub use mock_ledger::{get_mock_ledger, get_test_ledger_blocks, MockLedger};
pub use mock_ledger::{
get_custom_test_ledger_blocks, get_mock_ledger, get_test_ledger_blocks, MockLedger,
};
1 change: 1 addition & 0 deletions ledger/streaming/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ cargo-emit = "0.2"

[dev-dependencies]
mc-common = { path = "../../../common", features = ["loggers"] }
mc-ledger-db = { path = "../../../ledger/db", features = ["test_utils"] }
mc-ledger-streaming-api = { path = "../api", features = ["test_utils"] }
mc-util-from-random = { path = "../../../util/from-random" }

Expand Down
6 changes: 5 additions & 1 deletion ledger/streaming/api/src/convert/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ mod tests {

#[test]
fn test_roundtrip() {
let contents = BlockContents::new(vec![], vec![]);
let contents = BlockContents {
key_images: vec![],
outputs: vec![],
..Default::default()
};
let block = Block::new_origin_block(&[]);
let block_data = BlockData::new(block, contents, None);
let quorum_set = make_quorum_set();
Expand Down
37 changes: 33 additions & 4 deletions ledger/streaming/client/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ use test::Bencher;
fn bench_ledger_sink_for_1000_blocks(b: &mut Bencher) {
let logger = mc_common::logger::create_test_logger("benchmark:sink_1000_blocks".into());
let upstream_producer = stream::mock_stream_with_custom_block_contents(1, 3, 1000, 2);
let ledger = get_mock_ledger(0);
let mut downstream_producer =
DbStream::new(upstream_producer.clone(), ledger, true, logger.clone());

b.iter(|| {
let ledger = get_mock_ledger(0);
let downstream_producer =
DbStream::new(upstream_producer.clone(), ledger, true, logger.clone());
let producer_ref = &mut downstream_producer;
producer_ref.reinitialize_ledger(get_mock_ledger(0));
let mut stream = producer_ref.get_block_stream(0).unwrap();
block_on(async move {
let mut stream = downstream_producer.get_block_stream(0).unwrap();
while let Some(_) = stream.next().await {
// Benchmark stream
}
Expand Down Expand Up @@ -73,3 +75,30 @@ fn bench_validation_for_1000_blocks(b: &mut Bencher) {
});
});
}

#[bench]
fn bench_integrated_components(b: &mut Bencher) {
let logger =
mc_common::logger::create_test_logger("benchmark:integrated_validation_1000_blocks".into());
let quorum_set = make_quorum_set();
let upstream_producer = stream::mock_stream_with_custom_block_contents(1, 3, 1000, 2);
let mut upstreams = HashMap::new();
let ledger = get_mock_ledger(0);
for i in 0..9 {
upstreams.insert(test_node_id(i), upstream_producer.clone());
}
let scp_validator = SCPValidator::new(upstreams, logger.clone(), test_node_id(10), quorum_set);
let block_validator = BlockValidator::new(scp_validator, Some(ledger.clone()), logger.clone());
let mut ledger_sink = DbStream::new(block_validator, ledger, true, logger.clone());

b.iter(|| {
let producer_ref = &mut ledger_sink;
producer_ref.reinitialize_ledger(get_mock_ledger(0));
let mut stream = producer_ref.get_block_stream(0).unwrap();
block_on(async move {
while let Some(_) = stream.next().await {
// Benchmark stream
}
});
});
}
143 changes: 78 additions & 65 deletions ledger/streaming/client/src/block_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,82 +50,96 @@ impl<US: BlockStream + 'static, L: Ledger + Clone + 'static> BlockStream for Blo
log::info!(self.logger, "Creating block validation stream");
let stream = self.upstream.get_block_stream(starting_height)?;

Ok(stream.scan(
(
ledger,
prev_block_id,
additional_key_images,
starting_height,
),
|state, component| {
match component {
Ok(component) => {
let (ledger, prev_block_id, additional_key_images, starting_height) = state;
Ok(
stream.scan(
(
ledger,
prev_block_id,
additional_key_images,
starting_height,
self.logger.clone(),
),
|state, component| {
match component {
Ok(component) => {
let (
ledger,
prev_block_id,
additional_key_images,
starting_height,
logger,
) = state;

let block = component.block_data.block();
let block_contents = component.block_data.contents();
let block = component.block_data.block();
let block_contents = component.block_data.contents();

if *starting_height == block.index && ledger.is_none() && block.index > 0 {
*prev_block_id = block.parent_id.clone();
}
if *starting_height == block.index
&& ledger.is_none()
&& block.index > 0
{
*prev_block_id = block.parent_id.clone();
}

// Check if parent block matches last block seen
if &block.parent_id != prev_block_id {
return future::ready(Some(Err(StreamError::BlockValidation(
"Block parent ID doesn't match".to_string(),
))));
}
// Check if parent block matches last block seen
if &block.parent_id != prev_block_id {
return future::ready(Some(Err(StreamError::BlockValidation(
"Block parent ID doesn't match".to_string(),
))));
}

// Check if key images already in ledger
if let Some(ledger) = ledger {
for key_image in &block_contents.key_images {
// Check if the key image is already in the local ledger.
match ledger.contains_key_image(key_image) {
Ok(contains_key_image) => {
if contains_key_image
|| additional_key_images.contains(key_image)
{
// Check if key images already in ledger
if let Some(ledger) = ledger {
for key_image in &block_contents.key_images {
// Check if the key image is already in the local ledger.
match ledger.contains_key_image(key_image) {
Ok(contains_key_image) => {
if contains_key_image
|| additional_key_images.contains(key_image)
{
return future::ready(Some(Err(
StreamError::BlockValidation(
"Contains spent key image".to_string(),
),
)));
}
}
Err(err) => {
return future::ready(Some(Err(
StreamError::BlockValidation(
"Contains spent key image".to_string(),
),
StreamError::DBAccess(err.to_string()),
)));
}
}
Err(err) => {
return future::ready(Some(Err(StreamError::DBAccess(
err.to_string(),
))));
}
additional_key_images.insert(*key_image);
}
additional_key_images.insert(*key_image);
}
}

// Compute the hash of the block
let derived_block_id = compute_block_id(
block.version,
&block.parent_id,
block.index,
block.cumulative_txo_count,
&block.root_element,
&block_contents.hash(),
);

// The block's ID must agree with the merkle hash of its transactions.
if block.id != derived_block_id {
return future::ready(Some(Err(StreamError::BlockValidation(
"Hash of transactions don't match claimed block id".to_string(),
))));
}
// Compute the hash of the block
let derived_block_id = compute_block_id(
block.version,
&block.parent_id,
block.index,
block.cumulative_txo_count,
&block.root_element,
&block_contents.hash(),
);

// The block's ID must agree with the merkle hash of its transactions.
if block.id != derived_block_id {
return future::ready(Some(Err(StreamError::BlockValidation(
"Hash of transactions don't match claimed block id".to_string(),
))));
}

log::debug!(logger, "Block {} validated", block.index);
*prev_block_id = block.id.clone();

*prev_block_id = block.id.clone();
future::ready(Some(Ok(component)))
future::ready(Some(Ok(component)))
}
Err(err) => future::ready(Some(Err(err))),
}
Err(err) => future::ready(Some(Err(err))),
}
},
))
},
),
)
}
}

Expand Down Expand Up @@ -205,12 +219,11 @@ mod tests {
let mock_ledger = get_mock_ledger(4);
let ledger = Some(mock_ledger);
let upstream = mock_stream_from_components(make_components(4));
let block_validator = BlockValidator::new(upstream, ledger, logger.clone());
let block_validator = BlockValidator::new(upstream, ledger, logger);

futures::executor::block_on(async move {
let mut stream = block_validator.get_block_stream(2).unwrap();
if let Some(data) = stream.next().await {
log::info!(logger, "{:?}", data);
assert!(matches!(data, Err(StreamError::BlockValidation(_))));
}
});
Expand Down
35 changes: 31 additions & 4 deletions ledger/streaming/client/src/ledger_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,22 @@ impl<US: BlockStream + 'static, L: Ledger + Clone + 'static> BlockStream for DbS
};

// Get the upstream, start our thread, and initialize our sink management object
log::info!(self.logger, "creating ledger sink stream & sink thread");
let (tx, rcv) = start_sink_thread(self.ledger.clone(), self.logger.clone());
let manager = SinkManager::new(tx, rcv, 0, 0, sync_start_height, self.logger.clone());
let stream = Box::pin(self.upstream.get_block_stream(starting_height).unwrap());

// Create the stream
let output_stream =
futures::stream::unfold((stream, manager), |(mut stream, mut manager)| async move {
let output_stream = futures::stream::unfold(
(stream, manager),
|(mut stream, mut manager)| async move {
if let Some(component) = stream.next().await {
if let Ok(component) = component {
log::debug!(
manager.logger,
"component {} received",
component.block_data.block().index
);
manager.last_block_received = component.block_data.block().index;
if manager.can_start_sync() {
// If we're above what's in the ledger, starting syncing the blocks
Expand Down Expand Up @@ -154,19 +161,29 @@ impl<US: BlockStream + 'static, L: Ledger + Clone + 'static> BlockStream for DbS
"upstream terminated, waiting for the rest of the blocks to sync"
);
} else {
log::warn!(manager.logger, "upstream stopped, ending stream");
log::warn!(manager.logger,
"upstream ended, ending downstream - blocks received: {}, blocks synced: {}",
manager.last_block_received,
manager.last_block_synced,
);
return None;
}
}
if let Some(component) = manager.receiver.recv().await {
log::debug!(
manager.logger,
"block {} synced",
component.block_data.block().index
);
manager.last_block_synced = component.block_data.block().index;
Some((Ok(component), (stream, manager)))
} else {
// TODO: Discuss whether we want to heal the stream or not
log::error!(manager.logger, "sink thread stopped, ending stream");
None
}
});
},
);
Ok(Box::pin(output_stream))
}
}
Expand All @@ -181,6 +198,11 @@ impl<US: BlockStream + 'static, L: Ledger + Clone + 'static> DbStream<US, L> {
logger,
}
}

/// Replace ledger
pub fn reinitialize_ledger(&mut self, ledger: L) {
self.ledger = ledger;
}
}

fn start_sink_thread(
Expand All @@ -199,6 +221,7 @@ fn start_sink_thread(

// Launch ledger sink thread
std::thread::spawn(move || {
log::debug!(logger, "starting ledger sink thread");
while let Some(component) = rcv_in.blocking_recv() {
let signature = component.block_data.signature().as_ref().cloned();

Expand Down Expand Up @@ -227,6 +250,10 @@ fn start_sink_thread(
break;
}
}
log::debug!(
logger,
"upstream receiver channel ended, ending ledger sink thread"
);
});
(send_in, rcv_out)
}
Expand Down
Loading

0 comments on commit 84525e1

Please sign in to comment.