From 9e9c002dd18f1215a8ee16808a7787c71eb7d8f8 Mon Sep 17 00:00:00 2001 From: clabby Date: Mon, 30 Sep 2024 18:34:20 -0400 Subject: [PATCH] feat(derive): Span batch prefix checks (#592) * prefix checks * fix * flush prev * fix test --- bin/client/src/l1/driver.rs | 1 + crates/derive/src/batch/mod.rs | 2 +- crates/derive/src/batch/span_batch/batch.rs | 136 ++++++++++++++++++-- crates/derive/src/pipeline/builder.rs | 7 +- crates/derive/src/stages/batch_stream.rs | 80 ++++++++---- crates/providers-alloy/src/pipeline.rs | 1 + 6 files changed, 191 insertions(+), 36 deletions(-) diff --git a/bin/client/src/l1/driver.rs b/bin/client/src/l1/driver.rs index 30b462b2c..2495371df 100644 --- a/bin/client/src/l1/driver.rs +++ b/bin/client/src/l1/driver.rs @@ -50,6 +50,7 @@ pub type OracleAttributesQueue = AttributesQueue< ChannelReader< ChannelBank>>>>, >, + OracleL2ChainProvider, >, OracleL2ChainProvider, >, diff --git a/crates/derive/src/batch/mod.rs b/crates/derive/src/batch/mod.rs index 86fc9dc67..8dad61531 100644 --- a/crates/derive/src/batch/mod.rs +++ b/crates/derive/src/batch/mod.rs @@ -75,7 +75,7 @@ impl Batch { pub fn timestamp(&self) -> u64 { match self { Self::Single(sb) => sb.timestamp, - Self::Span(sb) => sb.timestamp(), + Self::Span(sb) => sb.starting_timestamp(), } } diff --git a/crates/derive/src/batch/span_batch/batch.rs b/crates/derive/src/batch/span_batch/batch.rs index 1d13842b5..04dde5662 100644 --- a/crates/derive/src/batch/span_batch/batch.rs +++ b/crates/derive/src/batch/span_batch/batch.rs @@ -37,12 +37,26 @@ pub struct SpanBatch { } impl SpanBatch { - /// Returns the timestamp for the first batch in the span. - pub fn timestamp(&self) -> u64 { + /// Returns the starting timestamp for the first batch in the span. + /// + /// ## Safety + /// Panics if [Self::batches] is empty. + pub fn starting_timestamp(&self) -> u64 { self.batches[0].timestamp } + /// Returns the final timestamp for the last batch in the span. + /// + /// ## Safety + /// Panics if [Self::batches] is empty. + pub fn final_timestamp(&self) -> u64 { + self.batches[self.batches.len() - 1].timestamp + } + /// Returns the epoch number for the first batch in the span. + /// + /// ## Safety + /// Panics if [Self::batches] is empty. pub fn starting_epoch_num(&self) -> u64 { self.batches[0].epoch_num } @@ -97,10 +111,10 @@ impl SpanBatch { // Skip out of order batches. let next_timestamp = l2_safe_head.block_info.timestamp + cfg.block_time; - if self.timestamp() > next_timestamp { + if self.starting_timestamp() > next_timestamp { warn!( "received out-of-order batch for future processing after next batch ({} > {})", - self.timestamp(), + self.starting_timestamp(), next_timestamp ); return BatchValidity::Future; @@ -116,18 +130,19 @@ impl SpanBatch { // safe head. let mut parent_num = l2_safe_head.block_info.number; let mut parent_block = l2_safe_head; - if self.timestamp() < next_timestamp { - if self.timestamp() > l2_safe_head.block_info.timestamp { + if self.starting_timestamp() < next_timestamp { + if self.starting_timestamp() > l2_safe_head.block_info.timestamp { // Batch timestamp cannot be between safe head and next timestamp. warn!("batch has misaligned timestamp, block time is too short"); return BatchValidity::Drop; } - if (l2_safe_head.block_info.timestamp - self.timestamp()) % cfg.block_time != 0 { + if (l2_safe_head.block_info.timestamp - self.starting_timestamp()) % cfg.block_time != 0 + { warn!("batch has misaligned timestamp, not overlapped exactly"); return BatchValidity::Drop; } parent_num = l2_safe_head.block_info.number - - (l2_safe_head.block_info.timestamp - self.timestamp()) / cfg.block_time - + (l2_safe_head.block_info.timestamp - self.starting_timestamp()) / cfg.block_time - 1; parent_block = match fetcher.l2_block_info_by_number(parent_num).await { Ok(block) => block, @@ -275,7 +290,7 @@ impl SpanBatch { } // Check overlapped blocks - if self.timestamp() < next_timestamp { + if self.starting_timestamp() < next_timestamp { for i in 0..(l2_safe_head.block_info.number - parent_num) { let safe_block_num = parent_num + i + 1; let safe_block_payload = match fetcher.block_by_number(safe_block_num).await { @@ -326,6 +341,107 @@ impl SpanBatch { BatchValidity::Accept } + /// Checks the validity of the batch's prefix. This function is used in the [BatchStream] + /// post-Holocene as a batch is being loaded in. + /// + /// [BatchStream]: crate::stages::BatchStream + pub async fn check_batch_prefix( + &self, + cfg: &RollupConfig, + l1_origins: &[BlockInfo], + l2_safe_head: BlockInfo, + fetcher: &mut BF, + ) -> BatchValidity { + if l1_origins.is_empty() { + warn!("missing L1 block input, cannot proceed with batch checking"); + return BatchValidity::Undecided; + } + if self.batches.is_empty() { + warn!("empty span batch, cannot proceed with batch checking"); + return BatchValidity::Undecided; + } + + let next_timestamp = l2_safe_head.timestamp + cfg.block_time; + + // Find the parent block of the span batch. + // If the span batch does not overlap the current safe chain, parent block should be the L2 + // safe head. + let mut parent_num = l2_safe_head.number; + let mut parent_block = l2_safe_head; + if self.starting_timestamp() < next_timestamp { + if self.starting_timestamp() > l2_safe_head.timestamp { + // Batch timestamp cannot be between safe head and next timestamp. + warn!("batch has misaligned timestamp, block time is too short"); + return BatchValidity::Drop; + } + if (l2_safe_head.timestamp - self.starting_timestamp()) % cfg.block_time != 0 { + warn!("batch has misaligned timestamp, not overlapped exactly"); + return BatchValidity::Drop; + } + parent_num = l2_safe_head.number - + (l2_safe_head.timestamp - self.starting_timestamp()) / cfg.block_time - + 1; + parent_block = match fetcher.l2_block_info_by_number(parent_num).await { + Ok(block) => block.block_info, + Err(e) => { + warn!("failed to fetch L2 block number {parent_num}: {e}"); + // Unable to validate the batch for now. Retry later. + return BatchValidity::Undecided; + } + }; + } + if !self.check_parent_hash(parent_block.hash) { + warn!( + "parent block mismatch, expected: {parent_num}, received: {}. parent hash: {}, parent hash check: {}", + parent_block.number, + parent_block.hash, + self.parent_check, + ); + return BatchValidity::Drop; + } + + // Verify the l1 origin hash for each l1 block. + // SAFETY: `Self::batches` is not empty, so the last element is guaranteed to exist. + let end_epoch_num = self.batches.last().unwrap().epoch_num; + let mut origin_checked = false; + // l1Blocks is supplied from batch queue and its length is limited to SequencerWindowSize. + for l1_block in l1_origins { + if l1_block.number == end_epoch_num { + if !self.check_origin_hash(l1_block.hash) { + warn!( + "batch is for different L1 chain, epoch hash does not match, expected: {}", + l1_block.hash + ); + return BatchValidity::Drop; + } + origin_checked = true; + break; + } + } + if !origin_checked { + info!("need more l1 blocks to check entire origins of span batch"); + return BatchValidity::Undecided; + } + + // Drop the batch if it is out of order. Post-Holocene, gaps are disallowed. + if self.starting_timestamp() > next_timestamp { + warn!( + "received out-of-order batch for future processing after next batch ({} > {})", + self.starting_timestamp(), + next_timestamp + ); + return BatchValidity::Drop; + } + + // Drop the batch if it has no new blocks after the safe head. + if self.final_timestamp() < next_timestamp { + warn!("span batch has no new blocks after safe head"); + return BatchValidity::Drop; + } + + BatchValidity::Accept + } + /// Converts all [SpanBatchElement]s after the L2 safe head to [SingleBatch]es. The resulting /// [SingleBatch]es do not contain a parent hash, as it is populated by the Batch Queue /// stage. @@ -437,7 +553,7 @@ mod tests { let first_element = SpanBatchElement { timestamp, ..Default::default() }; let batch = SpanBatch { batches: vec![first_element, Default::default()], ..Default::default() }; - assert_eq!(batch.timestamp(), timestamp); + assert_eq!(batch.starting_timestamp(), timestamp); } #[test] diff --git a/crates/derive/src/pipeline/builder.rs b/crates/derive/src/pipeline/builder.rs index f51556fe2..6c853ba58 100644 --- a/crates/derive/src/pipeline/builder.rs +++ b/crates/derive/src/pipeline/builder.rs @@ -17,8 +17,8 @@ type L1RetrievalStage = L1Retrieval>; type FrameQueueStage = FrameQueue>; type ChannelBankStage = ChannelBank>; type ChannelReaderStage = ChannelReader>; -type BatchStreamStage = BatchStream>; -type BatchQueueStage = BatchQueue, T>; +type BatchStreamStage = BatchStream, T>; +type BatchQueueStage = BatchQueue, T>; type AttributesQueueStage = AttributesQueue, B>; /// The `PipelineBuilder` constructs a [DerivationPipeline] using a builder pattern. @@ -134,7 +134,8 @@ where let frame_queue = FrameQueue::new(l1_retrieval, Arc::clone(&rollup_config)); let channel_bank = ChannelBank::new(Arc::clone(&rollup_config), frame_queue); let channel_reader = ChannelReader::new(channel_bank, Arc::clone(&rollup_config)); - let batch_stream = BatchStream::new(channel_reader, rollup_config.clone()); + let batch_stream = + BatchStream::new(channel_reader, rollup_config.clone(), l2_chain_provider.clone()); let batch_queue = BatchQueue::new(rollup_config.clone(), batch_stream, l2_chain_provider.clone()); let attributes = diff --git a/crates/derive/src/stages/batch_stream.rs b/crates/derive/src/stages/batch_stream.rs index a622d3141..0d8722151 100644 --- a/crates/derive/src/stages/batch_stream.rs +++ b/crates/derive/src/stages/batch_stream.rs @@ -1,8 +1,9 @@ //! This module contains the `BatchStream` stage. use crate::{ - batch::{Batch, SingleBatch, SpanBatch}, + batch::{Batch, BatchValidity, SingleBatch, SpanBatch}, errors::{PipelineEncodingError, PipelineError, PipelineResult}, + pipeline::L2ChainProvider, stages::BatchQueueProvider, traits::{OriginAdvancer, OriginProvider, ResettableStage}, }; @@ -33,9 +34,10 @@ pub trait BatchStreamProvider { /// [ChannelReader]: crate::stages::ChannelReader /// [BatchQueue]: crate::stages::BatchQueue #[derive(Debug)] -pub struct BatchStream

+pub struct BatchStream where P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + BF: L2ChainProvider + Debug, { /// The previous stage in the derivation pipeline. prev: P, @@ -46,15 +48,18 @@ where /// A reference to the rollup config, used to check /// if the [BatchStream] stage should be activated. config: Arc, + /// Used to validate the batches. + fetcher: BF, } -impl

BatchStream

+impl BatchStream where P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + BF: L2ChainProvider + Debug, { /// Create a new [BatchStream] stage. - pub const fn new(prev: P, config: Arc) -> Self { - Self { prev, span: None, buffer: VecDeque::new(), config } + pub const fn new(prev: P, config: Arc, fetcher: BF) -> Self { + Self { prev, span: None, buffer: VecDeque::new(), config, fetcher } } /// Returns if the [BatchStream] stage is active based on the @@ -95,12 +100,15 @@ where } #[async_trait] -impl

BatchQueueProvider for BatchStream

+impl BatchQueueProvider for BatchStream where P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + BF: L2ChainProvider + Send + Debug, { fn flush(&mut self) { if self.is_active().unwrap_or(false) { + self.prev.flush(); + self.span = None; self.buffer.clear(); } } @@ -124,12 +132,31 @@ where // If the next batch is a singular batch, it is immediately // forwarded to the `BatchQueue` stage. Otherwise, we buffer - // the span batch in this stage. + // the span batch in this stage if it passes the validity checks. match batch { Batch::Single(b) => return Ok(Batch::Single(b)), Batch::Span(b) => { - // TODO: New span batch prefix checks. - self.span = Some(b) + let validity = b + .check_batch_prefix( + self.config.as_ref(), + l1_origins, + parent.block_info, + &mut self.fetcher, + ) + .await; + + match validity { + BatchValidity::Accept => self.span = Some(b), + BatchValidity::Drop => { + // Flush the stage. + self.flush(); + + return Err(PipelineError::Eof.temp()); + } + BatchValidity::Undecided | BatchValidity::Future => { + return Err(PipelineError::NotEnoughData.temp()) + } + } } } } @@ -140,18 +167,20 @@ where } #[async_trait] -impl

OriginAdvancer for BatchStream

+impl OriginAdvancer for BatchStream where P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + BF: L2ChainProvider + Send + Debug, { async fn advance_origin(&mut self) -> PipelineResult<()> { self.prev.advance_origin().await } } -impl

OriginProvider for BatchStream

+impl OriginProvider for BatchStream where P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + BF: L2ChainProvider + Debug, { fn origin(&self) -> Option { self.prev.origin() @@ -159,9 +188,10 @@ where } #[async_trait] -impl

ResettableStage for BatchStream

+impl ResettableStage for BatchStream where P: BatchStreamProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug + Send, + BF: L2ChainProvider + Send + Debug, { async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> PipelineResult<()> { self.prev.reset(base, cfg).await?; @@ -177,6 +207,7 @@ mod test { use crate::{ batch::{SingleBatch, SpanBatchElement}, stages::test_utils::{CollectingLayer, MockBatchStreamProvider, TraceStorage}, + traits::test_utils::TestL2ChainProvider, }; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -189,7 +220,7 @@ mod test { let data = vec![Ok(Batch::Single(SingleBatch::default()))]; let config = Arc::new(RollupConfig { holocene_time: Some(100), ..RollupConfig::default() }); let prev = MockBatchStreamProvider::new(data); - let mut stream = BatchStream::new(prev, config.clone()); + let mut stream = BatchStream::new(prev, config.clone(), TestL2ChainProvider::default()); // The stage should not be active. assert!(!stream.is_active().unwrap()); @@ -207,17 +238,22 @@ mod test { async fn test_span_buffer() { let mock_batch = SpanBatch { batches: vec![ - SpanBatchElement { epoch_num: 10, timestamp: 10, ..Default::default() }, - SpanBatchElement { epoch_num: 10, timestamp: 12, ..Default::default() }, + SpanBatchElement { epoch_num: 10, timestamp: 2, ..Default::default() }, + SpanBatchElement { epoch_num: 10, timestamp: 4, ..Default::default() }, ], ..Default::default() }; let mock_origins = [BlockInfo { number: 10, timestamp: 12, ..Default::default() }]; let data = vec![Ok(Batch::Span(mock_batch.clone()))]; - let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() }); + let config = Arc::new(RollupConfig { + holocene_time: Some(0), + block_time: 2, + ..RollupConfig::default() + }); let prev = MockBatchStreamProvider::new(data); - let mut stream = BatchStream::new(prev, config.clone()); + let provider = TestL2ChainProvider::default(); + let mut stream = BatchStream::new(prev, config.clone(), provider); // The stage should be active. assert!(stream.is_active().unwrap()); @@ -226,7 +262,7 @@ mod test { let batch = stream.next_batch(Default::default(), &mock_origins).await.unwrap(); if let Batch::Single(single) = batch { assert_eq!(single.epoch_num, 10); - assert_eq!(single.timestamp, 10); + assert_eq!(single.timestamp, 2); } else { panic!("Wrong batch type"); } @@ -234,7 +270,7 @@ mod test { let batch = stream.next_batch(Default::default(), &mock_origins).await.unwrap(); if let Batch::Single(single) = batch { assert_eq!(single.epoch_num, 10); - assert_eq!(single.timestamp, 12); + assert_eq!(single.timestamp, 4); } else { panic!("Wrong batch type"); } @@ -251,7 +287,7 @@ mod test { let batch = stream.next_batch(Default::default(), &mock_origins).await.unwrap(); if let Batch::Single(single) = batch { assert_eq!(single.epoch_num, 10); - assert_eq!(single.timestamp, 10); + assert_eq!(single.timestamp, 2); } else { panic!("Wrong batch type"); } @@ -259,7 +295,7 @@ mod test { let batch = stream.next_batch(Default::default(), &mock_origins).await.unwrap(); if let Batch::Single(single) = batch { assert_eq!(single.epoch_num, 10); - assert_eq!(single.timestamp, 12); + assert_eq!(single.timestamp, 4); } else { panic!("Wrong batch type"); } @@ -275,7 +311,7 @@ mod test { let data = vec![Ok(Batch::Single(SingleBatch::default()))]; let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() }); let prev = MockBatchStreamProvider::new(data); - let mut stream = BatchStream::new(prev, config.clone()); + let mut stream = BatchStream::new(prev, config.clone(), TestL2ChainProvider::default()); // The stage should be active. assert!(stream.is_active().unwrap()); diff --git a/crates/providers-alloy/src/pipeline.rs b/crates/providers-alloy/src/pipeline.rs index 4b76e5f73..881c24cde 100644 --- a/crates/providers-alloy/src/pipeline.rs +++ b/crates/providers-alloy/src/pipeline.rs @@ -40,6 +40,7 @@ pub type OnlineAttributesQueue = AttributesQueue< ChannelReader< ChannelBank>>>, >, + AlloyL2ChainProvider, >, AlloyL2ChainProvider, >,