From 350460f44c749f8d6d3ca8aabe9886f0c9be75f6 Mon Sep 17 00:00:00 2001 From: refcell Date: Tue, 24 Sep 2024 21:21:42 -0400 Subject: [PATCH] feat(derive): holocene channel bank checks --- crates/derive/src/metrics.rs | 8 ++++++++ crates/derive/src/stages/channel_bank.rs | 24 +++++++++++++++++++----- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/crates/derive/src/metrics.rs b/crates/derive/src/metrics.rs index 589642f19..3f9d4b943 100644 --- a/crates/derive/src/metrics.rs +++ b/crates/derive/src/metrics.rs @@ -48,6 +48,14 @@ lazy_static! { &["error"] ).expect("Batch Reader Errors failed to register"); + /// Tracks the number of times the channel queue was detected + /// non-empty during a frame ingestion, and new channel creation + /// was attempted post-holocene. + pub static ref CHANNEL_QUEUE_NON_EMPTY: IntGauge = register_int_gauge!( + "kona_derive_channel_queue_non_empty", + "Number of times a channel was attempted to be created in the channel bank, but the queue is non-empty post-holocene." + ).expect("Channel Queue Non Empty failed to register"); + /// Tracks the compression ratio of batches. pub static ref BATCH_COMPRESSION_RATIO: IntGauge = register_int_gauge!( "kona_derive_batch_compression_ratio", diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index a9a5d24bc..82c699d2c 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -83,11 +83,25 @@ where let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?; // Get the channel for the frame, or create a new one if it doesn't exist. - let current_channel = self.channels.entry(frame.id).or_insert_with(|| { - let channel = Channel::new(frame.id, origin); - self.channel_queue.push_back(frame.id); - channel - }); + let current_channel = match self.channels.get_mut(&frame.id) { + Some(c) => c, + None => { + if self.cfg.is_holocene_active(origin.timestamp) && !self.channel_queue.is_empty() { + // In holocene, channels are strictly ordered. + // If the previous frame is not the last in the channel + // and a starting frame for the next channel arrives, + // the previous channel/frames are removed and a new channel is created. + self.channel_queue.clear(); + + trace!(target: "channel-bank", "[holocene active] clearing non-empty channel queue"); + crate::inc!(CHANNEL_QUEUE_NON_EMPTY); + } + let channel = Channel::new(frame.id, origin); + self.channel_queue.push_back(frame.id); + self.channels.insert(frame.id, channel); + self.channels.get_mut(&frame.id).expect("Channel must be in queue") + } + }; // Check if the channel is not timed out. If it has, ignore the frame. if current_channel.open_block_number() + self.cfg.channel_timeout(origin.timestamp) <