Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(derive): Batch Queue Logging #86

Merged
merged 8 commits into from
Apr 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 57 additions & 21 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ use anyhow::anyhow;
use async_trait::async_trait;
use core::fmt::Debug;

/// Provides batches for the [BatchQueue] stage.
/// Provides [Batch]es for the [BatchQueue] stage.
#[async_trait]
pub trait BatchQueueProvider {
/// Returns the next batch in the [ChannelReader] stage, if the stage is not complete.
/// Returns the next [Batch] in the [ChannelReader] stage, if the stage is not complete.
/// This function can only be called once while the stage is in progress, and will return
/// [`None`] on subsequent calls unless the stage is reset or complete. If the stage is
/// complete and the batch has been consumed, an [StageError::Eof] error is returned.
Expand Down Expand Up @@ -117,8 +117,10 @@ where

// Get the epoch
let epoch = self.l1_blocks[0];
// TODO: log that the next batch is being derived.
// TODO: metrice the time it takes to derive the next batch.
self.telemetry.write(
Bytes::from(alloc::format!("Deriving next batch for epoch: {}", epoch.number)),
LogLevel::Info,
);

// Note: epoch origin can now be one block ahead of the L2 Safe Head
// This is in the case where we auto generate all batches in an epoch & advance the epoch
Expand Down Expand Up @@ -149,9 +151,14 @@ where
remaining.push(batch.clone());
}
BatchValidity::Drop => {
// TODO: Log the drop reason with WARN level.
// batch.log_context(self.log).warn("Dropping batch", "parent", parent.id(),
// "parent_time", parent.info.time);
self.telemetry.write(
Bytes::from(alloc::format!(
"Dropping batch: {:?}, parent: {}",
batch.batch,
parent.block_info
)),
LogLevel::Warning,
);
continue;
}
BatchValidity::Accept => {
Expand All @@ -171,7 +178,10 @@ where
self.batches = remaining;

if let Some(nb) = next_batch {
// TODO: log that the next batch is found.
self.telemetry.write(
Bytes::from(alloc::format!("Next batch found: {:?}", nb.batch)),
LogLevel::Info,
);
return Ok(nb.batch);
}

Expand All @@ -182,15 +192,22 @@ where
expiry_epoch < parent.l1_origin.number;
let first_of_epoch = epoch.number == parent.l1_origin.number + 1;

// TODO: Log the empty batch generation.

// If the sequencer window did not expire,
// there is still room to receive batches for the current epoch.
// No need to force-create empty batch(es) towards the next epoch yet.
if !force_empty_batches {
return Err(StageError::Eof);
}

self.telemetry.write(
Bytes::from(alloc::format!(
"Generating empty batches. Epoch: {}, Parent: {}",
epoch.number,
parent.l1_origin.number
)),
LogLevel::Info,
);

// The next L1 block is needed to proceed towards the next epoch.
if self.l1_blocks.len() < 2 {
return Err(StageError::Eof);
Expand All @@ -202,7 +219,10 @@ where
// to preserve that L2 time >= L1 time. If this is the first block of the epoch, always
// generate a batch to ensure that we at least have one batch per epoch.
if next_timestamp < next_epoch.timestamp || first_of_epoch {
// TODO: log next batch generation.
self.telemetry.write(
Bytes::from(alloc::format!("Generating empty batch for epoch: {}", epoch.number)),
LogLevel::Info,
);
return Ok(Batch::Single(SingleBatch {
parent_hash: parent.block_info.hash,
epoch_num: epoch.number,
Expand All @@ -214,17 +234,24 @@ where

// At this point we have auto generated every batch for the current epoch
// that we can, so we can advance to the next epoch.
// TODO: log that the epoch is advanced.
// bq.log.Trace("Advancing internal L1 blocks", "next_timestamp", nextTimestamp,
// "next_epoch_time", nextEpoch.Time)
self.telemetry.write(
Bytes::from(alloc::format!(
"Advancing to next epoch: {}, timestamp: {}, epoch timestamp: {}",
next_epoch.number,
next_timestamp,
next_epoch.timestamp
)),
LogLevel::Info,
);
self.l1_blocks.remove(0);
Err(StageError::Eof)
}

/// Adds a batch to the queue.
pub fn add_batch(&mut self, batch: Batch, parent: L2BlockInfo) -> StageResult<()> {
if self.l1_blocks.is_empty() {
// TODO: log that the batch cannot be added without an origin
self.telemetry
.write(Bytes::from("Cannot add batch without an origin"), LogLevel::Error);
panic!("Cannot add batch without an origin");
}
let origin = self.origin.ok_or_else(|| anyhow!("cannot add batch with missing origin"))?;
Expand Down Expand Up @@ -259,9 +286,14 @@ where
// Parent block does not match the next batch.
// Means the previously returned batch is invalid.
// Drop cached batches and find another batch.
self.telemetry.write(
Bytes::from(alloc::format!(
"Parent block does not match the next batch. Dropping {} cached batches.",
self.next_spans.len()
)),
LogLevel::Warning,
);
self.next_spans.clear();
// TODO: log that the provided parent block does not match the next batch.
// TODO: metrice the internal batch drop.
}

// If the epoch is advanced, update the l1 blocks.
Expand All @@ -273,8 +305,7 @@ where
for (i, block) in self.l1_blocks.iter().enumerate() {
if parent.l1_origin.number == block.number {
self.l1_blocks.drain(0..i);
self.telemetry
.write(Bytes::from("Advancing internal L1 blocks"), LogLevel::Info);
self.telemetry.write(Bytes::from("Adancing epoch"), LogLevel::Info);
break;
}
}
Expand Down Expand Up @@ -303,7 +334,10 @@ where
// reset is called, the origin behind is false.
self.l1_blocks.clear();
}
// TODO: log batch queue origin advancement.
self.telemetry.write(
Bytes::from(alloc::format!("Batch queue advanced origin: {:?}", self.origin)),
LogLevel::Info,
);
}

// Load more data into the batch queue.
Expand All @@ -313,7 +347,8 @@ where
if !origin_behind {
self.add_batch(b, parent).ok();
} else {
// TODO: metrice when the batch is dropped because the origin is behind.
self.telemetry
.write(Bytes::from("[Batch Dropped]: Origin is behind"), LogLevel::Warning);
}
}
Err(StageError::Eof) => out_of_data = true,
Expand Down Expand Up @@ -445,6 +480,7 @@ mod tests {

// TODO(refcell): The batch reader here loops forever.
// Maybe the cursor isn't being used?
// UPDATE: the batch data is not valid
// #[tokio::test]
// async fn test_next_batch_succeeds() {
// let mut reader = new_batch_reader();
Expand Down
Loading