Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
adds rollout path to for chained Merkle shreds
Browse files Browse the repository at this point in the history
The commit adds should_chain_merkle_shreds to incrementally roll out
chained Merkle shreds to clusters.
  • Loading branch information
behzadnouri committed Feb 6, 2024
1 parent 99760e5 commit bf98394
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 22 deletions.
2 changes: 1 addition & 1 deletion ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ pub mod layout {
Ok(flags & ShredFlags::SHRED_TICK_REFERENCE_MASK.bits())
}

pub(crate) fn get_merkle_root(shred: &[u8]) -> Option<Hash> {
pub fn get_merkle_root(shred: &[u8]) -> Option<Hash> {
match get_shred_variant(shred).ok()? {
ShredVariant::LegacyCode | ShredVariant::LegacyData => None,
ShredVariant::MerkleCode(proof_size, chained) => {
Expand Down
9 changes: 9 additions & 0 deletions ledger/src/shred/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub struct ProcessShredsStats {
num_data_shreds_hist: [usize; 5],
// If the blockstore already has shreds for the broadcast slot.
pub num_extant_slots: u64,
// When looking up chained merkle root from parent slot fails.
pub err_unknown_chained_merkle_root: u64,
pub(crate) data_buffer_residual: usize,
pub num_merkle_data_shreds: usize,
pub num_merkle_coding_shreds: usize,
Expand Down Expand Up @@ -89,6 +91,11 @@ impl ProcessShredsStats {
("sign_coding_time", self.sign_coding_elapsed, i64),
("coding_send_time", self.coding_send_elapsed, i64),
("num_extant_slots", self.num_extant_slots, i64),
(
"err_unknown_chained_merkle_root",
self.err_unknown_chained_merkle_root,
i64
),
("data_buffer_residual", self.data_buffer_residual, i64),
("num_data_shreds_07", self.num_data_shreds_hist[0], i64),
("num_data_shreds_15", self.num_data_shreds_hist[1], i64),
Expand Down Expand Up @@ -161,6 +168,7 @@ impl AddAssign<ProcessShredsStats> for ProcessShredsStats {
coalesce_elapsed,
num_data_shreds_hist,
num_extant_slots,
err_unknown_chained_merkle_root,
data_buffer_residual,
num_merkle_data_shreds,
num_merkle_coding_shreds,
Expand All @@ -175,6 +183,7 @@ impl AddAssign<ProcessShredsStats> for ProcessShredsStats {
self.get_leader_schedule_elapsed += get_leader_schedule_elapsed;
self.coalesce_elapsed += coalesce_elapsed;
self.num_extant_slots += num_extant_slots;
self.err_unknown_chained_merkle_root += err_unknown_chained_merkle_root;
self.data_buffer_residual += data_buffer_residual;
self.num_merkle_data_shreds += num_merkle_data_shreds;
self.num_merkle_coding_shreds += num_merkle_coding_shreds;
Expand Down
8 changes: 8 additions & 0 deletions turbine/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ pub enum Error {
Blockstore(#[from] solana_ledger::blockstore::BlockstoreError),
#[error(transparent)]
ClusterInfo(#[from] solana_gossip::cluster_info::ClusterInfoError),
#[error("Invalid Merkle root, slot: {slot}, index: {index}")]
InvalidMerkleRoot { slot: Slot, index: u64 },
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Expand All @@ -76,8 +78,14 @@ pub enum Error {
Send,
#[error(transparent)]
Serialize(#[from] std::boxed::Box<bincode::ErrorKind>),
#[error("Shred not found, slot: {slot}, index: {index}")]
ShredNotFound { slot: Slot, index: u64 },
#[error(transparent)]
TransportError(#[from] solana_sdk::transport::TransportError),
#[error("Unknown last index, slot: {0}")]
UnknownLastIndex(Slot),
#[error("Unknown slot meta, slot: {0}")]
UnknownSlotMeta(Slot),
}

type Result<T> = std::result::Result<T, Error>;
Expand Down
38 changes: 35 additions & 3 deletions turbine/src/broadcast_stage/broadcast_utils.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use {
super::Result,
super::{Error, Result},
bincode::serialized_size,
crossbeam_channel::Receiver,
solana_entry::entry::Entry,
solana_ledger::shred::ShredData,
solana_ledger::{
blockstore::Blockstore,
shred::{self, ShredData},
},
solana_poh::poh_recorder::WorkingBankEntry,
solana_runtime::bank::Bank,
solana_sdk::clock::Slot,
solana_sdk::{clock::Slot, hash::Hash},
std::{
sync::Arc,
time::{Duration, Instant},
Expand All @@ -25,6 +28,7 @@ pub(super) struct ReceiveResults {

#[derive(Clone)]
pub struct UnfinishedSlotInfo {
pub(super) chained_merkle_root: Hash,
pub next_shred_index: u32,
pub(crate) next_code_index: u32,
pub slot: Slot,
Expand Down Expand Up @@ -96,6 +100,34 @@ pub(super) fn recv_slot_entries(receiver: &Receiver<WorkingBankEntry>) -> Result
})
}

// Returns the Merkle root of the last erasure batch of the parent slot.
pub(super) fn get_chained_merkle_root_from_parent(
slot: Slot,
parent: Slot,
blockstore: &Blockstore,
) -> Result<Hash> {
if slot == parent {
debug_assert_eq!(slot, 0u64);
return Ok(Hash::default());
}
debug_assert!(parent < slot, "parent: {parent} >= slot: {slot}");
let index = blockstore
.meta(parent)?
.ok_or_else(|| Error::UnknownSlotMeta(parent))?
.last_index
.ok_or_else(|| Error::UnknownLastIndex(parent))?;
let shred = blockstore
.get_data_shred(parent, index)?
.ok_or(Error::ShredNotFound {
slot: parent,
index,
})?;
shred::layout::get_merkle_root(&shred).ok_or(Error::InvalidMerkleRoot {
slot: parent,
index,
})
}

#[cfg(test)]
mod tests {
use {
Expand Down
Loading

0 comments on commit bf98394

Please sign in to comment.