Skip to content

Commit

Permalink
feat(derive): wire up the batch span stage
Browse files Browse the repository at this point in the history
  • Loading branch information
refcell committed Sep 26, 2024
1 parent a610b16 commit dac37f6
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 11 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions bin/client/src/l1/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use kona_derive::{
pipeline::{DerivationPipeline, Pipeline, PipelineBuilder, StepResult},
sources::EthereumDataSource,
stages::{
AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, L1Retrieval,
L1Traversal,
AttributesQueue, BatchQueue, BatchStream, ChannelBank, ChannelReader, FrameQueue,
L1Retrieval, L1Traversal,
},
traits::{BlobProvider, ChainProvider, L2ChainProvider, OriginProvider},
};
Expand Down Expand Up @@ -46,8 +46,10 @@ pub type OracleAttributesBuilder<O> =
/// An oracle-backed attributes queue for the derivation pipeline.
pub type OracleAttributesQueue<DAP, O> = AttributesQueue<
BatchQueue<
ChannelReader<
ChannelBank<FrameQueue<L1Retrieval<DAP, L1Traversal<OracleL1ChainProvider<O>>>>>,
BatchStream<
ChannelReader<
ChannelBank<FrameQueue<L1Retrieval<DAP, L1Traversal<OracleL1ChainProvider<O>>>>>,
>,
>,
OracleL2ChainProvider<O>,
>,
Expand Down
9 changes: 7 additions & 2 deletions crates/derive/src/online/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use op_alloy_protocol::BlockInfo;

// Pipeline internal stages aren't re-exported at the module-level.
use crate::stages::{
AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, L1Retrieval, L1Traversal,
AttributesQueue, BatchQueue, BatchStream, ChannelBank, ChannelReader, FrameQueue, L1Retrieval,
L1Traversal,
};

/// An online derivation pipeline.
Expand All @@ -32,7 +33,11 @@ pub type OnlineAttributesBuilder =
/// An `online` attributes queue for the derivation pipeline.
pub type OnlineAttributesQueue<DAP> = AttributesQueue<
BatchQueue<
ChannelReader<ChannelBank<FrameQueue<L1Retrieval<DAP, L1Traversal<AlloyChainProvider>>>>>,
BatchStream<
ChannelReader<
ChannelBank<FrameQueue<L1Retrieval<DAP, L1Traversal<AlloyChainProvider>>>>,
>,
>,
AlloyL2ChainProvider,
>,
OnlineAttributesBuilder,
Expand Down
9 changes: 6 additions & 3 deletions crates/derive/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use super::{
AttributesBuilder, ChainProvider, DataAvailabilityProvider, DerivationPipeline, L2ChainProvider,
};
use crate::stages::{
AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, L1Retrieval, L1Traversal,
AttributesQueue, BatchQueue, BatchStream, ChannelBank, ChannelReader, FrameQueue, L1Retrieval,
L1Traversal,
};
use alloc::sync::Arc;
use core::fmt::Debug;
Expand All @@ -16,7 +17,8 @@ type L1RetrievalStage<DAP, P> = L1Retrieval<DAP, L1TraversalStage<P>>;
type FrameQueueStage<DAP, P> = FrameQueue<L1RetrievalStage<DAP, P>>;
type ChannelBankStage<DAP, P> = ChannelBank<FrameQueueStage<DAP, P>>;
type ChannelReaderStage<DAP, P> = ChannelReader<ChannelBankStage<DAP, P>>;
type BatchQueueStage<DAP, P, T> = BatchQueue<ChannelReaderStage<DAP, P>, T>;
type BatchStreamStage<DAP, P> = BatchStream<ChannelReaderStage<DAP, P>>;
type BatchQueueStage<DAP, P, T> = BatchQueue<BatchStreamStage<DAP, P>, T>;
type AttributesQueueStage<DAP, P, T, B> = AttributesQueue<BatchQueueStage<DAP, P, T>, B>;

/// The `PipelineBuilder` constructs a [DerivationPipeline] using a builder pattern.
Expand Down Expand Up @@ -132,8 +134,9 @@ where
let frame_queue = FrameQueue::new(l1_retrieval);
let channel_bank = ChannelBank::new(Arc::clone(&rollup_config), frame_queue);
let channel_reader = ChannelReader::new(channel_bank, Arc::clone(&rollup_config));
let batch_stream = BatchStream::new(channel_reader, rollup_config.clone());

Check warning on line 137 in crates/derive/src/pipeline/builder.rs

View check run for this annotation

Codecov / codecov/patch

crates/derive/src/pipeline/builder.rs#L137

Added line #L137 was not covered by tests
let batch_queue =
BatchQueue::new(rollup_config.clone(), channel_reader, l2_chain_provider.clone());
BatchQueue::new(rollup_config.clone(), batch_stream, l2_chain_provider.clone());

Check warning on line 139 in crates/derive/src/pipeline/builder.rs

View check run for this annotation

Codecov / codecov/patch

crates/derive/src/pipeline/builder.rs#L139

Added line #L139 was not covered by tests
let attributes =
AttributesQueue::new(rollup_config.clone(), batch_queue, attributes_builder);

Expand Down

0 comments on commit dac37f6

Please sign in to comment.