Skip to content

Commit

Permalink
feat(derive): BatchStream buffering (#590)
Browse files Browse the repository at this point in the history
* feat(derive): `BatchStream` buffering

* tests

* rebase

* lint
  • Loading branch information
clabby authored Sep 30, 2024
1 parent ed6f8f9 commit 40a47f0
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 32 deletions.
2 changes: 1 addition & 1 deletion crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub trait BatchQueueProvider {
async fn next_batch(
&mut self,
parent: L2BlockInfo,
origins: &[BlockInfo],
l1_origins: &[BlockInfo],
) -> PipelineResult<Batch>;

/// Allows the [BatchQueue] to flush the buffer in the [crate::stages::BatchStream]
Expand Down
171 changes: 140 additions & 31 deletions crates/derive/src/stages/batch_stream.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
//! This module contains the `BatchStream` stage.
use alloc::{boxed::Box, sync::Arc, vec::Vec};
use async_trait::async_trait;
use core::fmt::Debug;
use op_alloy_genesis::{RollupConfig, SystemConfig};
use op_alloy_protocol::{BlockInfo, L2BlockInfo};
use tracing::trace;

use crate::{
batch::{Batch, SingleBatch, SpanBatch},
errors::{PipelineError, PipelineResult},
errors::{PipelineEncodingError, PipelineError, PipelineResult},
stages::BatchQueueProvider,
traits::{OriginAdvancer, OriginProvider, ResettableStage},
};
use alloc::{boxed::Box, collections::VecDeque, sync::Arc};
use async_trait::async_trait;
use core::fmt::Debug;
use op_alloy_genesis::{RollupConfig, SystemConfig};
use op_alloy_protocol::{BlockInfo, L2BlockInfo};
use tracing::trace;

/// Provides [Batch]es for the [BatchStream] stage.
#[async_trait]
Expand Down Expand Up @@ -43,7 +42,7 @@ where
/// There can only be a single staged span batch.
span: Option<SpanBatch>,
/// A buffer of single batches derived from the [SpanBatch].
buffer: Vec<SingleBatch>,
buffer: VecDeque<SingleBatch>,
/// A reference to the rollup config, used to check
/// if the [BatchStream] stage should be activated.
config: Arc<RollupConfig>,
Expand All @@ -55,7 +54,7 @@ where
{
/// Create a new [BatchStream] stage.
pub const fn new(prev: P, config: Arc<RollupConfig>) -> Self {
Self { prev, span: None, buffer: Vec::new(), config }
Self { prev, span: None, buffer: VecDeque::new(), config }
}

/// Returns if the [BatchStream] stage is active based on the
Expand All @@ -66,9 +65,32 @@ where
}

/// Gets a [SingleBatch] from the in-memory buffer.
pub fn get_single_batch(&mut self) -> Option<SingleBatch> {
pub fn get_single_batch(
&mut self,
parent: L2BlockInfo,
l1_origins: &[BlockInfo],
) -> PipelineResult<SingleBatch> {
trace!(target: "batch_span", "Attempting to get a SingleBatch from buffer len: {}", self.buffer.len());
unimplemented!()

self.try_hydrate_buffer(parent, l1_origins)?;
self.buffer.pop_front().ok_or_else(|| PipelineError::NotEnoughData.temp())
}

/// Hydrates the buffer with single batches derived from the span batch, if there is one
/// queued up.
pub fn try_hydrate_buffer(
&mut self,
parent: L2BlockInfo,
l1_origins: &[BlockInfo],
) -> PipelineResult<()> {
if let Some(span) = self.span.take() {
self.buffer.extend(
span.get_singular_batches(l1_origins, parent).map_err(|e| {
PipelineError::BadEncoding(PipelineEncodingError::from(e)).crit()
})?,
);
}
Ok(())
}
}

Expand All @@ -83,34 +105,37 @@ where
}
}

async fn next_batch(&mut self, _: L2BlockInfo, _: &[BlockInfo]) -> PipelineResult<Batch> {
async fn next_batch(
&mut self,
parent: L2BlockInfo,
l1_origins: &[BlockInfo],
) -> PipelineResult<Batch> {
// If the stage is not active, "pass" the next batch
// through this stage to the BatchQueue stage.
if !self.is_active()? {
trace!(target: "batch_span", "BatchStream stage is inactive, pass-through.");
return self.prev.next_batch().await;
}

// First, attempt to pull a SinguleBatch out of the buffer.
if let Some(b) = self.get_single_batch() {
return Ok(Batch::Single(b));
}

// Safety: bubble up any errors from the batch reader.
let batch = self.prev.next_batch().await?;

// If the next batch is a singular batch, it is immediately
// forwarded to the `BatchQueue` stage.
let Batch::Span(b) = batch else {
return Ok(batch);
};
// If the buffer is empty, attempt to pull a batch from the previous stage.
if self.buffer.is_empty() {
// Safety: bubble up any errors from the batch reader.
let batch = self.prev.next_batch().await?;

// Set the current span batch.
self.span = Some(b);
// If the next batch is a singular batch, it is immediately
// forwarded to the `BatchQueue` stage. Otherwise, we buffer
// the span batch in this stage.
match batch {
Batch::Single(b) => return Ok(Batch::Single(b)),
Batch::Span(b) => {
// TODO: New span batch prefix checks.
self.span = Some(b)
}
}
}

// Attempt to pull a SingleBatch out of the SpanBatch.
self.get_single_batch()
.map_or_else(|| Err(PipelineError::NotEnoughData.temp()), |b| Ok(Batch::Single(b)))
self.get_single_batch(parent, l1_origins).map(Batch::Single)
}
}

Expand Down Expand Up @@ -150,7 +175,7 @@ where
mod test {
use super::*;
use crate::{
batch::SingleBatch,
batch::{SingleBatch, SpanBatchElement},
stages::test_utils::{CollectingLayer, MockBatchStreamProvider, TraceStorage},
};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
Expand All @@ -177,4 +202,88 @@ mod test {
assert_eq!(logs.len(), 1);
assert!(logs[0].contains("BatchStream stage is inactive, pass-through."));
}

#[tokio::test]
async fn test_span_buffer() {
let mock_batch = SpanBatch {
batches: vec![
SpanBatchElement { epoch_num: 10, timestamp: 10, ..Default::default() },
SpanBatchElement { epoch_num: 10, timestamp: 12, ..Default::default() },
],
..Default::default()
};
let mock_origins = [BlockInfo { number: 10, timestamp: 12, ..Default::default() }];

let data = vec![Ok(Batch::Span(mock_batch.clone()))];
let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() });
let prev = MockBatchStreamProvider::new(data);
let mut stream = BatchStream::new(prev, config.clone());

// The stage should be active.
assert!(stream.is_active().unwrap());

// The next batches should be single batches derived from the span batch.
let batch = stream.next_batch(Default::default(), &mock_origins).await.unwrap();
if let Batch::Single(single) = batch {
assert_eq!(single.epoch_num, 10);
assert_eq!(single.timestamp, 10);
} else {
panic!("Wrong batch type");
}

let batch = stream.next_batch(Default::default(), &mock_origins).await.unwrap();
if let Batch::Single(single) = batch {
assert_eq!(single.epoch_num, 10);
assert_eq!(single.timestamp, 12);
} else {
panic!("Wrong batch type");
}

let err = stream.next_batch(Default::default(), &mock_origins).await.unwrap_err();
assert_eq!(err, PipelineError::Eof.temp());
assert_eq!(stream.buffer.len(), 0);
assert!(stream.span.is_none());

// Add more data into the provider, see if the buffer is re-hydrated.
stream.prev.batches.push(Ok(Batch::Span(mock_batch)));

// The next batches should be single batches derived from the span batch.
let batch = stream.next_batch(Default::default(), &mock_origins).await.unwrap();
if let Batch::Single(single) = batch {
assert_eq!(single.epoch_num, 10);
assert_eq!(single.timestamp, 10);
} else {
panic!("Wrong batch type");
}

let batch = stream.next_batch(Default::default(), &mock_origins).await.unwrap();
if let Batch::Single(single) = batch {
assert_eq!(single.epoch_num, 10);
assert_eq!(single.timestamp, 12);
} else {
panic!("Wrong batch type");
}

let err = stream.next_batch(Default::default(), &mock_origins).await.unwrap_err();
assert_eq!(err, PipelineError::Eof.temp());
assert_eq!(stream.buffer.len(), 0);
assert!(stream.span.is_none());
}

#[tokio::test]
async fn test_single_batch_pass_through() {
let data = vec![Ok(Batch::Single(SingleBatch::default()))];
let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() });
let prev = MockBatchStreamProvider::new(data);
let mut stream = BatchStream::new(prev, config.clone());

// The stage should be active.
assert!(stream.is_active().unwrap());

// The next batch should be passed through to the [BatchQueue] stage.
let batch = stream.next_batch(Default::default(), &[]).await.unwrap();
assert!(matches!(batch, Batch::Single(_)));
assert_eq!(stream.buffer.len(), 0);
assert!(stream.span.is_none());
}
}

0 comments on commit 40a47f0

Please sign in to comment.