diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index 09fcae0c49..e63350be9b 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -8,7 +8,7 @@ license.workspace = true publish = false [features] -default = ["replicated-loglet"] +default = [] options_schema = ["dep:schemars"] replicated-loglet = ["restate-types/replicated-loglet", "restate-metadata-store"] test-util = [] diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index d8923bc511..6724d2c2c4 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -24,10 +24,11 @@ use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber}; use restate_types::storage::StorageCodec; use restate_types::Version; -use crate::loglet::{LogletBase, LogletWrapper}; +use crate::loglet::{AppendError, LogletBase, LogletProvider}; +use crate::loglet_wrapper::LogletWrapper; use crate::watchdog::WatchdogSender; use crate::{ - Error, FindTailAttributes, LogReadStream, LogRecord, LogletProvider, Result, TailState, + Error, FindTailAttributes, LogReadStream, LogRecord, Result, TailState, SMALL_BATCH_THRESHOLD_COUNT, }; @@ -48,7 +49,7 @@ impl Bifrost { #[cfg(any(test, feature = "test-util"))] pub async fn init_in_memory(metadata: Metadata) -> Self { - use crate::loglets::memory_loglet; + use crate::providers::memory_loglet; Self::init_with_factory(metadata, memory_loglet::Factory::default()).await } @@ -75,7 +76,7 @@ impl Bifrost { #[cfg(any(test, feature = "test-util"))] pub async fn init_with_factory( metadata: Metadata, - factory: impl crate::LogletProviderFactory, + factory: impl crate::loglet::LogletProviderFactory, ) -> Self { use crate::BifrostService; @@ -256,7 +257,14 @@ impl BifrostInner { let loglet = self.writeable_loglet(log_id).await?; let mut buf = BytesMut::default(); StorageCodec::encode(payload, &mut buf).expect("serialization to bifrost is infallible"); - loglet.append(buf.freeze()).await + + let res = loglet.append(buf.freeze()).await; + // todo: Handle retries, segment seals and other recoverable errors. + res.map_err(|e| match e { + AppendError::Sealed => todo!(), + AppendError::Shutdown(e) => Error::Shutdown(e), + AppendError::Other(e) => Error::LogletError(e), + }) } pub async fn append_batch(&self, log_id: LogId, payloads: &[Payload]) -> Result { @@ -270,7 +278,13 @@ impl BifrostInner { buf.freeze() }) .collect(); - loglet.append_batch(&raw_payloads).await + let res = loglet.append_batch(&raw_payloads).await; + // todo: Handle retries, segment seals and other recoverable errors. + res.map_err(|e| match e { + AppendError::Sealed => todo!(), + AppendError::Shutdown(e) => Error::Shutdown(e), + AppendError::Other(e) => Error::LogletError(e), + }) } pub async fn read_next_single(&self, log_id: LogId, from: Lsn) -> Result { @@ -378,8 +392,7 @@ impl BifrostInner { self.fail_if_shutting_down()?; self.metadata .sync(MetadataKind::Logs, TargetVersion::Latest) - .await - .map_err(Arc::new)?; + .await?; Ok(()) } @@ -436,7 +449,7 @@ mod tests { use super::*; - use crate::loglets::memory_loglet::{self}; + use crate::providers::memory_loglet::{self}; use googletest::prelude::*; use crate::{Record, TrimGap}; diff --git a/crates/bifrost/src/error.rs b/crates/bifrost/src/error.rs index 3789fc3730..3c3377e601 100644 --- a/crates/bifrost/src/error.rs +++ b/crates/bifrost/src/error.rs @@ -13,13 +13,13 @@ use std::sync::Arc; use restate_types::logs::{LogId, Lsn}; -use crate::loglets::local_loglet::LogStoreError; +use crate::loglet::{LogletError, OperationError}; use crate::types::SealReason; /// Result type for bifrost operations. pub type Result = std::result::Result; -#[derive(thiserror::Error, Debug, Clone)] +#[derive(thiserror::Error, Debug)] pub enum Error { #[error("log '{0}' is sealed")] LogSealed(LogId, SealReason), @@ -30,18 +30,19 @@ pub enum Error { #[error("operation failed due to an ongoing shutdown")] Shutdown(#[from] ShutdownError), #[error(transparent)] - LogStoreError(#[from] LogStoreError), + LogletError(#[from] Arc), #[error("failed syncing logs metadata: {0}")] - // unfortunately, we have to use Arc here, because the SyncError is not Clone. - MetadataSync(#[from] Arc), + MetadataSync(#[from] SyncError), /// Provider is unknown or disabled #[error("bifrost provider '{0}' is disabled or unrecognized")] Disabled(String), } -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub enum ProviderError { - Shutdown(#[from] ShutdownError), - Other(#[from] anyhow::Error), +impl From for Error { + fn from(value: OperationError) -> Self { + match value { + OperationError::Shutdown(e) => Error::Shutdown(e), + OperationError::Other(e) => Error::LogletError(e), + } + } } diff --git a/crates/bifrost/src/lib.rs b/crates/bifrost/src/lib.rs index b57757a38a..d4d024665f 100644 --- a/crates/bifrost/src/lib.rs +++ b/crates/bifrost/src/lib.rs @@ -10,11 +10,9 @@ mod bifrost; mod error; -mod loglet; -#[cfg(test)] -mod loglet_tests; -pub mod loglets; -mod provider; +pub mod loglet; +mod loglet_wrapper; +pub mod providers; mod read_stream; mod record; mod service; @@ -22,8 +20,7 @@ mod types; mod watchdog; pub use bifrost::Bifrost; -pub use error::{Error, ProviderError, Result}; -pub use provider::*; +pub use error::{Error, Result}; pub use read_stream::LogReadStream; pub use record::*; pub use service::BifrostService; diff --git a/crates/bifrost/src/loglet/error.rs b/crates/bifrost/src/loglet/error.rs new file mode 100644 index 0000000000..69abeed3bd --- /dev/null +++ b/crates/bifrost/src/loglet/error.rs @@ -0,0 +1,112 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::fmt::{Debug, Display}; +use std::sync::Arc; + +use restate_core::ShutdownError; + +#[derive(Debug, Clone, thiserror::Error)] +pub enum AppendError { + #[error("Loglet is sealed")] + Sealed, + #[error(transparent)] + Shutdown(#[from] ShutdownError), + #[error(transparent)] + Other(Arc), +} + +impl AppendError { + pub fn retryable(error: E) -> Self { + Self::Other(Arc::new(RetryableError(error))) + } + + pub fn terminal(error: E) -> Self { + Self::Other(Arc::new(TerminalError(error))) + } + + pub fn other(error: E) -> Self { + Self::Other(Arc::new(error)) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum OperationError { + #[error(transparent)] + Shutdown(#[from] ShutdownError), + #[error(transparent)] + Other(Arc), +} + +impl OperationError { + pub fn retryable(error: E) -> Self { + Self::Other(Arc::new(RetryableError(error))) + } + + pub fn terminal(error: E) -> Self { + Self::Other(Arc::new(TerminalError(error))) + } + + pub fn other(error: E) -> Self { + Self::Other(Arc::new(error)) + } +} + +// -- Helper Types -- + +/// Represents a type-erased error from the loglet provider. +pub trait LogletError: std::error::Error + Send + Sync + Debug + Display + 'static { + /// Signal upper layers whether this error should be retried or not. + fn retryable(&self) -> bool { + false + } +} + +#[derive(Debug, thiserror::Error)] +struct RetryableError(#[source] T); + +impl Display for RetryableError +where + T: Debug + Display + Send + Sync + std::error::Error + 'static, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[retryable] {}", self.0) + } +} + +impl LogletError for RetryableError +where + T: Debug + Display + Send + Sync + std::error::Error + 'static, +{ + fn retryable(&self) -> bool { + true + } +} + +#[derive(Debug, thiserror::Error)] +struct TerminalError(#[source] T); + +impl LogletError for TerminalError +where + T: Debug + Display + Send + Sync + std::error::Error + 'static, +{ + fn retryable(&self) -> bool { + false + } +} + +impl Display for TerminalError +where + T: Debug + Display + Send + Sync + std::error::Error + 'static, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[terminal] {}", self.0) + } +} diff --git a/crates/bifrost/src/loglet_tests.rs b/crates/bifrost/src/loglet/loglet_tests.rs similarity index 71% rename from crates/bifrost/src/loglet_tests.rs rename to crates/bifrost/src/loglet/loglet_tests.rs index 31db38a954..c8f635da5b 100644 --- a/crates/bifrost/src/loglet_tests.rs +++ b/crates/bifrost/src/loglet/loglet_tests.rs @@ -8,21 +8,24 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::BTreeSet; use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::time::Duration; use bytes::Bytes; -use futures::StreamExt; use googletest::prelude::*; -use tokio::task::JoinHandle; +use tokio::sync::Barrier; +use tokio::task::{JoinHandle, JoinSet}; use restate_test_util::let_assert; use restate_types::logs::SequenceNumber; +use tokio_stream::StreamExt; use tracing::info; -use crate::loglet::{Loglet, LogletOffset}; -use crate::{LogRecord, Record, TrimGap}; +use super::{Loglet, LogletOffset}; +use crate::loglet::AppendError; +use crate::{LogRecord, Record, TailState, TrimGap}; fn setup() { // Make sure that panics exits the process. @@ -216,7 +219,10 @@ pub async fn single_loglet_readstream_test(loglet: Arc) -> googletes setup(); let read_from_offset = LogletOffset::from(6); - let mut reader = loglet.clone().create_read_stream(read_from_offset).await?; + let mut reader = loglet + .clone() + .create_read_stream(read_from_offset, None) + .await?; { // no records have been written yet. @@ -307,7 +313,7 @@ pub async fn single_loglet_readstream_test_with_trims( let mut read_stream = loglet .clone() - .create_read_stream(LogletOffset::OLDEST) + .create_read_stream(LogletOffset::OLDEST, None) .await?; let record = read_stream.next().await.unwrap()?; @@ -393,3 +399,150 @@ pub async fn single_loglet_readstream_test_with_trims( Ok(()) } + +/// Validates that appends fail after find_tail() returned Sealed() +pub async fn loglet_test_append_after_seal(loglet: Arc) -> googletest::Result<()> { + setup(); + + assert_eq!(None, loglet.get_trim_point().await?); + { + let tail = loglet.find_tail().await?; + assert_eq!(LogletOffset::OLDEST, tail.offset()); + assert!(!tail.is_sealed()); + } + + // append 5 records. Offsets [1..5] + for i in 1..=5 { + loglet.append(Bytes::from(format!("record{}", i))).await?; + } + + loglet.seal().await?; + + // attempt to append 5 records. Offsets [6..10]. Expected to fail since seal happened on the same client. + for i in 6..=10 { + let res = loglet.append(Bytes::from(format!("record{}", i))).await; + assert_that!(res, err(pat!(AppendError::Sealed))); + } + + let tail = loglet.find_tail().await?; + // Seal must be applied after commit index 5 since it has been acknowledged (tail is 6 or higher) + assert_that!(tail, pat!(TailState::Sealed(gt(LogletOffset::from(5))))); + + Ok(()) +} + +/// Validates that appends fail after find_tail() returned Sealed() +pub async fn loglet_test_append_after_seal_concurrent( + loglet: Arc, +) -> googletest::Result<()> { + use futures::TryStreamExt as _; + + const WARMUP_APPENDS: usize = 1000; + const CONCURRENT_APPENDERS: usize = 20; + + setup(); + + assert_eq!(None, loglet.get_trim_point().await?); + { + let tail = loglet.find_tail().await?; + assert_eq!(LogletOffset::OLDEST, tail.offset()); + assert!(!tail.is_sealed()); + } + // +1 for the main task waiting on all concurrent appenders + let append_barrier = Arc::new(Barrier::new(CONCURRENT_APPENDERS + 1)); + + let mut appenders: JoinSet> = JoinSet::new(); + for appender_id in 0..CONCURRENT_APPENDERS { + appenders.spawn({ + let loglet = loglet.clone(); + let append_barrier = append_barrier.clone(); + async move { + let mut i = 1; + let mut committed = Vec::new(); + let mut warmup = true; + loop { + let res = loglet + .append(Bytes::from(format!("appender-{}-record{}", appender_id, i))) + .await; + i += 1; + if i > WARMUP_APPENDS && warmup { + println!("appender({}) - warmup complete....", appender_id); + append_barrier.wait().await; + warmup = false; + } + match res { + Ok(offset) => { + committed.push(offset); + } + Err(AppendError::Sealed) => { + break; + } + Err(e) => fail!("unexpected error: {}", e)?, + } + // give a chance to other tasks to work + tokio::task::yield_now().await; + } + Ok(committed) + } + }); + } + + // Wait for some warmup appends + println!( + "Awaiting all appenders to reach at least {} appends", + WARMUP_APPENDS + ); + append_barrier.wait().await; + // Go places and do other things. + for _ in 0..5 { + tokio::task::yield_now().await; + } + + loglet.seal().await?; + // fails immediately + assert_that!( + loglet.append(Bytes::from_static(b"failed-record")).await, + err(pat!(AppendError::Sealed)) + ); + + let tail = loglet.find_tail().await?; + assert!(tail.is_sealed()); + println!("Sealed tail: {:?}", tail); + + let mut all_committed = BTreeSet::new(); + while let Some(handle) = appenders.join_next().await { + let mut committed = handle??; + assert!(!committed.is_empty()); + let committed_len = committed.len(); + assert!(committed_len >= WARMUP_APPENDS); + let tail_record = committed.pop().unwrap(); + // tail must be beyond seal point + assert!(tail.offset() > tail_record); + println!( + "Committed len: {}, last appended was {}", + committed_len, tail_record + ); + // ensure that all committed records are unique + assert!(all_committed.insert(tail_record)); + for offset in committed { + assert!(all_committed.insert(offset)); + } + } + + let reader = loglet + .clone() + .create_read_stream(LogletOffset::OLDEST, Some(tail.offset().prev())) + .await?; + + let records: BTreeSet = reader + .try_filter_map(|x| std::future::ready(Ok(Some(x.offset)))) + .try_collect() + .await?; + + // every record committed must be observed in readstream, and it's acceptable for the + // readstream to include more records. + assert!(all_committed.len() <= records.len()); + assert!(all_committed.is_subset(&records)); + + Ok(()) +} diff --git a/crates/bifrost/src/loglet.rs b/crates/bifrost/src/loglet/mod.rs similarity index 51% rename from crates/bifrost/src/loglet.rs rename to crates/bifrost/src/loglet/mod.rs index b600e7b58d..7e9f3067d8 100644 --- a/crates/bifrost/src/loglet.rs +++ b/crates/bifrost/src/loglet/mod.rs @@ -8,10 +8,19 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +mod error; +#[cfg(test)] +pub mod loglet_tests; +mod provider; +pub(crate) mod util; + +// exports +pub use error::*; +pub use provider::{LogletProvider, LogletProviderFactory}; + use std::ops::Add; use std::pin::Pin; use std::sync::Arc; -use std::task::Poll; use async_trait::async_trait; use bytes::Bytes; @@ -19,7 +28,7 @@ use futures::Stream; use restate_types::logs::{Lsn, SequenceNumber}; -use crate::{LogRecord, LsnExt}; +use crate::LogRecord; use crate::{Result, TailState}; // Inner loglet offset @@ -79,6 +88,7 @@ impl SequenceNumber for LogletOffset { /// ^ Last Committed /// ^ -- Last released - can be delivered to readers /// +/// /// An empty loglet. A log is empty when trim_point.next() == tail.prev() /// /// Semantics of offsets @@ -94,59 +104,26 @@ impl SequenceNumber for LogletOffset { pub trait Loglet: LogletBase {} impl Loglet for T where T: LogletBase {} -/// Wraps loglets with the base LSN of the segment -#[derive(Clone, Debug)] -pub struct LogletWrapper { - /// The offset of the first record in the segment (if exists). - /// A segment on a clean chain is created with Lsn::OLDEST but this doesn't mean that this - /// record exists. It only means that we want to offset the loglet offsets by base_lsn - - /// Loglet::Offset::OLDEST. - pub(crate) base_lsn: Lsn, - loglet: Arc, -} - -impl LogletWrapper { - pub fn new(base_lsn: Lsn, loglet: Arc) -> Self { - Self { base_lsn, loglet } - } - - pub async fn create_wrapped_read_stream( - self, - start_lsn: Lsn, - ) -> Result { - // Translates LSN to loglet offset - Ok(LogletReadStreamWrapper::new( - self.loglet - .create_read_stream(start_lsn.into_offset(self.base_lsn)) - .await?, - self.base_lsn, - )) - } -} - -impl PartialEq for LogletWrapper { - fn eq(&self, other: &Self) -> bool { - self.base_lsn == other.base_lsn && Arc::ptr_eq(&self.loglet, &other.loglet) - } -} - #[async_trait] pub trait LogletBase: Send + Sync + std::fmt::Debug { type Offset: SequenceNumber; /// Create a read stream that streams record from a single loglet instance. /// + /// `to`: The offset of the last record to be read (inclusive). If `None`, the + /// stream is an open-ended tailing read stream. async fn create_read_stream( self: Arc, from: Self::Offset, + to: Option, ) -> Result>; /// Append a record to the loglet. - async fn append(&self, data: Bytes) -> Result; + async fn append(&self, data: Bytes) -> Result; /// Append a batch of records to the loglet. The returned offset (on success) if the offset of /// the first record in the batch) - async fn append_batch(&self, payloads: &[Bytes]) -> Result; + async fn append_batch(&self, payloads: &[Bytes]) -> Result; /// The tail is *the first unwritten position* in the loglet. /// @@ -155,14 +132,12 @@ pub trait LogletBase: Send + Sync + std::fmt::Debug { /// after the next `append()` call. /// /// If the loglet is empty, the loglet should return TailState::Open(Offset::OLDEST). - /// This should never return Err(Error::LogSealed). Sealed state is represented as - /// TailState::Sealed(..) - async fn find_tail(&self) -> Result>; + async fn find_tail(&self) -> Result, OperationError>; /// The offset of the slot **before** the first readable record (if it exists), or the offset /// before the next slot that will be written to. Must not return Self::INVALID. If the loglet /// is never trimmed, this must return `None`. - async fn get_trim_point(&self) -> Result>; + async fn get_trim_point(&self) -> Result, OperationError>; /// Trim the loglet prefix up to and including the `trim_point`. /// If trim_point equal or higher than the loglet tail, the loglet trims its data until the tail. @@ -172,124 +147,37 @@ pub trait LogletBase: Send + Sync + std::fmt::Debug { /// /// Passing `Offset::INVALID` is a no-op. (success) /// Passing `Offset::OLDEST` trims the first record in the loglet (if exists). - async fn trim(&self, trim_point: Self::Offset) -> Result<()>; + async fn trim(&self, trim_point: Self::Offset) -> Result<(), OperationError>; + + /// Seal the loglet. This operation is idempotent. + /// + /// Appends **SHOULD NOT** succeed after a `seal()` call is successful. And appends **MUST + /// NOT** succeed after the offset returned by the *first* TailState::Sealed() response. + async fn seal(&self) -> Result<(), OperationError>; /// Read or wait for the record at `from` offset, or the next available record if `from` isn't /// defined for the loglet. - async fn read_next_single(&self, from: Self::Offset) -> Result>; + async fn read_next_single( + &self, + from: Self::Offset, + ) -> Result, OperationError>; /// Read the next record if it's been committed, otherwise, return None without waiting. async fn read_next_single_opt( &self, from: Self::Offset, - ) -> Result>>; + ) -> Result>, OperationError>; } /// A stream of log records from a single loglet. Loglet streams are _always_ tailing streams. -pub trait LogletReadStream: Stream>> { +pub trait LogletReadStream: + Stream, OperationError>> +{ /// Current read pointer. This points to the next offset to be read. fn read_pointer(&self) -> S; + /// Returns true if the stream is terminated. fn is_terminated(&self) -> bool; } pub type SendableLogletReadStream = Pin + Send>>; - -#[async_trait] -impl LogletBase for LogletWrapper { - type Offset = Lsn; - - /// This should never be used directly. Instead, use `create_wrapped_read_stream()` instead. - async fn create_read_stream( - self: Arc, - _after: Self::Offset, - ) -> Result> { - unreachable!("create_read_stream on LogletWrapper should never be used directly") - } - - async fn append(&self, data: Bytes) -> Result { - let offset = self.loglet.append(data).await?; - // Return the LSN given the loglet offset. - Ok(self.base_lsn.offset_by(offset)) - } - - async fn append_batch(&self, payloads: &[Bytes]) -> Result { - let offset = self.loglet.append_batch(payloads).await?; - Ok(self.base_lsn.offset_by(offset)) - } - - async fn find_tail(&self) -> Result> { - Ok(self - .loglet - .find_tail() - .await? - .map(|o| self.base_lsn.offset_by(o))) - } - - async fn get_trim_point(&self) -> Result> { - let offset = self.loglet.get_trim_point().await?; - Ok(offset.map(|o| self.base_lsn.offset_by(o))) - } - - // trim_point is inclusive. - async fn trim(&self, trim_point: Self::Offset) -> Result<()> { - // trimming to INVALID is no-op - if trim_point == Self::Offset::INVALID { - return Ok(()); - } - let trim_point = trim_point.into_offset(self.base_lsn); - self.loglet.trim(trim_point).await - } - - async fn read_next_single(&self, from: Lsn) -> Result> { - // convert LSN to loglet offset - let offset = from.into_offset(self.base_lsn); - self.loglet - .read_next_single(offset) - .await - .map(|record| record.with_base_lsn(self.base_lsn)) - } - - async fn read_next_single_opt( - &self, - from: Self::Offset, - ) -> Result>> { - let offset = from.into_offset(self.base_lsn); - self.loglet - .read_next_single_opt(offset) - .await - .map(|maybe_record| maybe_record.map(|record| record.with_base_lsn(self.base_lsn))) - } -} - -/// Wraps loglet read streams with the base LSN of the segment -pub struct LogletReadStreamWrapper { - pub(crate) base_lsn: Lsn, - inner: SendableLogletReadStream, -} - -impl LogletReadStreamWrapper { - pub fn new(inner: SendableLogletReadStream, base_lsn: Lsn) -> Self { - Self { inner, base_lsn } - } -} - -impl Stream for LogletReadStreamWrapper { - type Item = Result>; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - match self.inner.as_mut().poll_next(cx) { - Poll::Ready(Some(Ok(record))) => { - Poll::Ready(Some(Ok(record.with_base_lsn(self.base_lsn)))) - } - Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - } - } -} - -static_assertions::assert_impl_all!(LogletWrapper: Send, Sync, Clone); diff --git a/crates/bifrost/src/provider.rs b/crates/bifrost/src/loglet/provider.rs similarity index 88% rename from crates/bifrost/src/provider.rs rename to crates/bifrost/src/loglet/provider.rs index 9ef0326225..acfac51ebb 100644 --- a/crates/bifrost/src/provider.rs +++ b/crates/bifrost/src/loglet/provider.rs @@ -14,8 +14,7 @@ use async_trait::async_trait; use restate_types::logs::metadata::{LogletParams, ProviderKind}; -use crate::loglet::Loglet; -use crate::ProviderError; +use super::{Loglet, OperationError}; use crate::Result; #[async_trait] @@ -24,7 +23,7 @@ pub trait LogletProviderFactory: Send + 'static { /// Factory creates providers of `kind`. fn kind(&self) -> ProviderKind; /// Initialize provider. - async fn create(self: Box) -> Result, ProviderError>; + async fn create(self: Box) -> Result, OperationError>; } #[async_trait] @@ -36,7 +35,7 @@ pub trait LogletProvider: Send + Sync { async fn post_start(&self) {} /// Hook for handling graceful shutdown - async fn shutdown(&self) -> Result<(), ProviderError> { + async fn shutdown(&self) -> Result<(), OperationError> { Ok(()) } } diff --git a/crates/bifrost/src/loglets/util.rs b/crates/bifrost/src/loglet/util.rs similarity index 97% rename from crates/bifrost/src/loglets/util.rs rename to crates/bifrost/src/loglet/util.rs index ac2d595ced..e3d19e859f 100644 --- a/crates/bifrost/src/loglets/util.rs +++ b/crates/bifrost/src/loglet/util.rs @@ -13,7 +13,7 @@ use tokio_stream::wrappers::WatchStream; use restate_core::ShutdownError; -use crate::loglet::LogletOffset; +use super::LogletOffset; #[derive(Debug)] pub struct OffsetWatch { diff --git a/crates/bifrost/src/loglet_wrapper.rs b/crates/bifrost/src/loglet_wrapper.rs new file mode 100644 index 0000000000..a2256441fa --- /dev/null +++ b/crates/bifrost/src/loglet_wrapper.rs @@ -0,0 +1,166 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::task::Poll; + +use std::pin::Pin; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::Stream; + +use restate_types::logs::{Lsn, SequenceNumber}; + +use crate::loglet::{ + AppendError, Loglet, LogletBase, LogletOffset, OperationError, SendableLogletReadStream, +}; +use crate::{LogRecord, LsnExt}; +use crate::{Result, TailState}; + +/// Wraps loglets with the base LSN of the segment +#[derive(Clone, Debug)] +pub struct LogletWrapper { + /// The offset of the first record in the segment (if exists). + /// A segment on a clean chain is created with Lsn::OLDEST but this doesn't mean that this + /// record exists. It only means that we want to offset the loglet offsets by base_lsn - + /// Loglet::Offset::OLDEST. + pub(crate) base_lsn: Lsn, + loglet: Arc, +} + +impl LogletWrapper { + pub fn new(base_lsn: Lsn, loglet: Arc) -> Self { + Self { base_lsn, loglet } + } + + pub async fn create_wrapped_read_stream( + self, + start_lsn: Lsn, + ) -> Result { + // Translates LSN to loglet offset + Ok(LogletReadStreamWrapper::new( + self.loglet + .create_read_stream(start_lsn.into_offset(self.base_lsn), None) + .await?, + self.base_lsn, + )) + } +} + +impl PartialEq for LogletWrapper { + fn eq(&self, other: &Self) -> bool { + self.base_lsn == other.base_lsn && Arc::ptr_eq(&self.loglet, &other.loglet) + } +} + +#[async_trait] +impl LogletBase for LogletWrapper { + type Offset = Lsn; + + /// This should never be used directly. Instead, use `create_wrapped_read_stream()` instead. + async fn create_read_stream( + self: Arc, + _after: Self::Offset, + _to: Option, + ) -> Result> { + unreachable!("create_read_stream on LogletWrapper should never be used directly") + } + + async fn append(&self, data: Bytes) -> Result { + let offset = self.loglet.append(data).await?; + // Return the LSN given the loglet offset. + Ok(self.base_lsn.offset_by(offset)) + } + + async fn append_batch(&self, payloads: &[Bytes]) -> Result { + let offset = self.loglet.append_batch(payloads).await?; + Ok(self.base_lsn.offset_by(offset)) + } + + async fn find_tail(&self) -> Result, OperationError> { + Ok(self + .loglet + .find_tail() + .await? + .map(|o| self.base_lsn.offset_by(o))) + } + + async fn get_trim_point(&self) -> Result, OperationError> { + let offset = self.loglet.get_trim_point().await?; + Ok(offset.map(|o| self.base_lsn.offset_by(o))) + } + + // trim_point is inclusive. + async fn trim(&self, trim_point: Self::Offset) -> Result<(), OperationError> { + // trimming to INVALID is no-op + if trim_point == Self::Offset::INVALID { + return Ok(()); + } + let trim_point = trim_point.into_offset(self.base_lsn); + self.loglet.trim(trim_point).await + } + + async fn seal(&self) -> Result<(), OperationError> { + self.loglet.seal().await + } + + async fn read_next_single(&self, from: Lsn) -> Result, OperationError> { + // convert LSN to loglet offset + let offset = from.into_offset(self.base_lsn); + self.loglet + .read_next_single(offset) + .await + .map(|record| record.with_base_lsn(self.base_lsn)) + } + + async fn read_next_single_opt( + &self, + from: Self::Offset, + ) -> Result>, OperationError> { + let offset = from.into_offset(self.base_lsn); + self.loglet + .read_next_single_opt(offset) + .await + .map(|maybe_record| maybe_record.map(|record| record.with_base_lsn(self.base_lsn))) + } +} + +/// Wraps loglet read streams with the base LSN of the segment +pub struct LogletReadStreamWrapper { + pub(crate) base_lsn: Lsn, + inner: SendableLogletReadStream, +} + +impl LogletReadStreamWrapper { + pub fn new(inner: SendableLogletReadStream, base_lsn: Lsn) -> Self { + Self { inner, base_lsn } + } +} + +impl Stream for LogletReadStreamWrapper { + type Item = Result, OperationError>; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + match self.inner.as_mut().poll_next(cx) { + Poll::Ready(Some(Ok(record))) => { + Poll::Ready(Some(Ok(record.with_base_lsn(self.base_lsn)))) + } + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +static_assertions::assert_impl_all!(LogletWrapper: Send, Sync, Clone); diff --git a/crates/bifrost/src/loglets/local_loglet/keys.rs b/crates/bifrost/src/providers/local_loglet/keys.rs similarity index 100% rename from crates/bifrost/src/loglets/local_loglet/keys.rs rename to crates/bifrost/src/providers/local_loglet/keys.rs diff --git a/crates/bifrost/src/loglets/local_loglet/log_state.rs b/crates/bifrost/src/providers/local_loglet/log_state.rs similarity index 93% rename from crates/bifrost/src/loglets/local_loglet/log_state.rs rename to crates/bifrost/src/providers/local_loglet/log_state.rs index 31a9c21e00..a07ef6de6e 100644 --- a/crates/bifrost/src/loglets/local_loglet/log_state.rs +++ b/crates/bifrost/src/providers/local_loglet/log_state.rs @@ -8,8 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::sync::Arc; - use bytes::{BufMut, Bytes, BytesMut}; use restate_types::flexbuffers_storage_encode_decode; use restate_types::storage::StorageCodec; @@ -18,8 +16,8 @@ use serde::{Deserialize, Serialize}; use smallvec::SmallVec; use tracing::{error, trace, warn}; +use super::keys::{MetadataKey, MetadataKind}; use crate::loglet::LogletOffset; -use crate::loglets::local_loglet::keys::{MetadataKey, MetadataKind}; use crate::SealReason; use super::LogStoreError; @@ -75,11 +73,11 @@ impl LogStateUpdates { } pub fn encode(&self, buf: &mut B) -> Result<(), LogStoreError> { - StorageCodec::encode(self, buf).map_err(|err| LogStoreError::Encode(Arc::new(err))) + Ok(StorageCodec::encode(self, buf)?) } pub fn from_slice(mut data: &[u8]) -> Result { - StorageCodec::decode(&mut data).map_err(|err| LogStoreError::Decode(Arc::new(err))) + Ok(StorageCodec::decode(&mut data)?) } } @@ -95,12 +93,12 @@ pub struct LogState { impl LogState { pub fn to_bytes(&self) -> Result { let mut buf = BytesMut::default(); - StorageCodec::encode(self, &mut buf).map_err(Arc::new)?; + StorageCodec::encode(self, &mut buf)?; Ok(buf.freeze()) } pub fn from_slice(mut data: &[u8]) -> Result { - StorageCodec::decode(&mut data).map_err(|err| LogStoreError::Decode(Arc::new(err))) + Ok(StorageCodec::decode(&mut data)?) } } diff --git a/crates/bifrost/src/loglets/local_loglet/log_store.rs b/crates/bifrost/src/providers/local_loglet/log_store.rs similarity index 94% rename from crates/bifrost/src/loglets/local_loglet/log_store.rs rename to crates/bifrost/src/providers/local_loglet/log_store.rs index 7176f763ce..7652350656 100644 --- a/crates/bifrost/src/loglets/local_loglet/log_store.rs +++ b/crates/bifrost/src/providers/local_loglet/log_store.rs @@ -19,6 +19,8 @@ use restate_types::storage::{StorageDecodeError, StorageEncodeError}; use rocksdb::{BoundColumnFamily, DBCompressionType, SliceTransform, DB}; use static_assertions::const_assert; +use crate::loglet::LogletError; + use super::keys::{MetadataKey, MetadataKind, DATA_KEY_PREFIX_LENGTH}; use super::log_state::{log_state_full_merge, log_state_partial_merge, LogState}; use super::log_store_writer::LogStoreWriter; @@ -33,20 +35,29 @@ const DATA_CF_BUDGET_RATIO: f64 = 0.85; const_assert!(DATA_CF_BUDGET_RATIO < 1.0); -#[derive(Debug, Clone, thiserror::Error)] +#[derive(Debug, thiserror::Error)] pub enum LogStoreError { #[error(transparent)] - // unfortunately, we have to use Arc here, because the storage encode error is not Clone. - Encode(#[from] Arc), + Encode(#[from] StorageEncodeError), #[error(transparent)] - // unfortunately, we have to use Arc here, because the storage decode error is not Clone. - Decode(#[from] Arc), + Decode(#[from] StorageDecodeError), #[error(transparent)] Rocksdb(#[from] rocksdb::Error), #[error(transparent)] RocksDbManager(#[from] RocksError), } +impl LogletError for LogStoreError { + fn retryable(&self) -> bool { + match self { + LogStoreError::Encode(_) => false, + LogStoreError::Decode(_) => false, + LogStoreError::Rocksdb(_) => true, + LogStoreError::RocksDbManager(_) => false, + } + } +} + #[derive(Debug, Clone)] pub struct RocksDbLogStore { rocksdb: Arc, diff --git a/crates/bifrost/src/loglets/local_loglet/log_store_writer.rs b/crates/bifrost/src/providers/local_loglet/log_store_writer.rs similarity index 97% rename from crates/bifrost/src/loglets/local_loglet/log_store_writer.rs rename to crates/bifrost/src/providers/local_loglet/log_store_writer.rs index 0d88e5e7a8..04c290d278 100644 --- a/crates/bifrost/src/loglets/local_loglet/log_store_writer.rs +++ b/crates/bifrost/src/providers/local_loglet/log_store_writer.rs @@ -27,18 +27,17 @@ use restate_types::config::LocalLogletOptions; use restate_types::live::BoxedLiveLoad; use restate_types::logs::SequenceNumber; -use crate::loglet::LogletOffset; -use crate::{Error, SMALL_BATCH_THRESHOLD_COUNT}; - use super::keys::{MetadataKey, MetadataKind, RecordKey}; use super::log_state::LogStateUpdates; use super::log_store::{DATA_CF, METADATA_CF}; use super::metric_definitions::{ BIFROST_LOCAL_WRITE_BATCH_COUNT, BIFROST_LOCAL_WRITE_BATCH_SIZE_BYTES, }; +use crate::loglet::{AppendError, LogletOffset}; +use crate::SMALL_BATCH_THRESHOLD_COUNT; -type Ack = oneshot::Sender>; -type AckRecv = oneshot::Receiver>; +type Ack = oneshot::Sender>; +type AckRecv = oneshot::Receiver>; pub struct LogStoreWriteCommand { log_id: u64, @@ -267,14 +266,14 @@ impl LogStoreWriter { if let Err(e) = result { error!("Failed to commit local loglet write batch: {}", e); - self.send_acks(Err(Error::LogStoreError(e.into()))); + self.send_acks(Err(AppendError::terminal(e))); return; } self.send_acks(Ok(())); } - fn send_acks(&mut self, result: Result<(), Error>) { + fn send_acks(&mut self, result: Result<(), AppendError>) { self.batch_acks_buf.drain(..).for_each(|a| { let _ = a.send(result.clone()); }); diff --git a/crates/bifrost/src/loglets/local_loglet/metric_definitions.rs b/crates/bifrost/src/providers/local_loglet/metric_definitions.rs similarity index 100% rename from crates/bifrost/src/loglets/local_loglet/metric_definitions.rs rename to crates/bifrost/src/providers/local_loglet/metric_definitions.rs diff --git a/crates/bifrost/src/loglets/local_loglet/mod.rs b/crates/bifrost/src/providers/local_loglet/mod.rs similarity index 92% rename from crates/bifrost/src/loglets/local_loglet/mod.rs rename to crates/bifrost/src/providers/local_loglet/mod.rs index 982b4dbf52..5281a31274 100644 --- a/crates/bifrost/src/loglets/local_loglet/mod.rs +++ b/crates/bifrost/src/providers/local_loglet/mod.rs @@ -29,18 +29,20 @@ use tracing::{debug, warn}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use crate::loglet::{LogletBase, LogletOffset, SendableLogletReadStream}; -use crate::loglets::local_loglet::metric_definitions::{ +use crate::loglet::{ + AppendError, LogletBase, LogletOffset, OperationError, SendableLogletReadStream, +}; +use crate::providers::local_loglet::metric_definitions::{ BIFROST_LOCAL_TRIM, BIFROST_LOCAL_TRIM_LENGTH, }; -use crate::{Error, LogRecord, Result, SealReason, TailState}; +use crate::{LogRecord, Result, SealReason, TailState}; use self::keys::RecordKey; use self::log_store::RocksDbLogStore; use self::log_store_writer::RocksDbLogWriterHandle; use self::metric_definitions::{BIFROST_LOCAL_APPEND, BIFROST_LOCAL_APPEND_DURATION}; use self::read_stream::LocalLogletReadStream; -use super::util::OffsetWatch; +use crate::loglet::util::OffsetWatch; struct LocalLoglet { log_id: u64, @@ -76,9 +78,11 @@ impl LocalLoglet { log_id: u64, log_store: RocksDbLogStore, log_writer: RocksDbLogWriterHandle, - ) -> Result { + ) -> Result { // Fetch the log metadata from the store - let log_state = log_store.get_log_state(log_id)?; + let log_state = log_store + .get_log_state(log_id) + .map_err(OperationError::other)?; let log_state = log_state.unwrap_or_default(); let trim_point_offset = AtomicU64::new(log_state.trim_point); @@ -120,7 +124,7 @@ impl LocalLoglet { fn read_from( &self, from_offset: LogletOffset, - ) -> Result>> { + ) -> Result>, OperationError> { debug_assert_ne!(LogletOffset::INVALID, from_offset); let trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); let head_offset = trim_point.next(); @@ -144,7 +148,10 @@ impl LocalLoglet { read_opts, rocksdb::IteratorMode::From(&key.to_bytes(), rocksdb::Direction::Forward), ); - let record = iter.next().transpose().map_err(LogStoreError::Rocksdb)?; + let record = iter + .next() + .transpose() + .map_err(|e| OperationError::other(LogStoreError::Rocksdb(e)))?; let Some(record) = record else { let trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); // we might not have been able to read the next record because of a concurrent trim operation @@ -188,11 +195,14 @@ impl LogletBase for LocalLoglet { async fn create_read_stream( self: Arc, from: Self::Offset, + to: Option, ) -> Result> { - Ok(Box::pin(LocalLogletReadStream::create(self, from).await?)) + Ok(Box::pin( + LocalLogletReadStream::create(self, from, to).await?, + )) } - async fn append(&self, payload: Bytes) -> Result { + async fn append(&self, payload: Bytes) -> Result { counter!(BIFROST_LOCAL_APPEND).increment(1); let start_time = std::time::Instant::now(); // We hold the lock to ensure that offsets are enqueued in the order of @@ -217,7 +227,7 @@ impl LogletBase for LocalLoglet { let _ = receiver.await.unwrap_or_else(|_| { warn!("Unsure if the local loglet record was written, the ack channel was dropped"); - Err(Error::Shutdown(ShutdownError)) + Err(ShutdownError.into()) })?; self.last_committed_offset @@ -227,7 +237,7 @@ impl LogletBase for LocalLoglet { Ok(offset) } - async fn append_batch(&self, payloads: &[Bytes]) -> Result { + async fn append_batch(&self, payloads: &[Bytes]) -> Result { let num_payloads = payloads.len(); counter!(BIFROST_LOCAL_APPEND).increment(num_payloads as u64); let start_time = std::time::Instant::now(); @@ -251,7 +261,7 @@ impl LogletBase for LocalLoglet { let _ = receiver.await.unwrap_or_else(|_| { warn!("Unsure if the local loglet record was written, the ack channel was dropped"); - Err(Error::Shutdown(ShutdownError)) + Err(ShutdownError.into()) })?; self.last_committed_offset @@ -261,7 +271,7 @@ impl LogletBase for LocalLoglet { Ok(offset) } - async fn find_tail(&self) -> Result> { + async fn find_tail(&self) -> Result, OperationError> { let last_committed = LogletOffset::from(self.last_committed_offset.load(Ordering::Relaxed)).next(); Ok(if self.seal.is_some() { @@ -271,7 +281,7 @@ impl LogletBase for LocalLoglet { }) } - async fn get_trim_point(&self) -> Result> { + async fn get_trim_point(&self) -> Result, OperationError> { let current_trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); if current_trim_point == LogletOffset::INVALID { @@ -283,7 +293,7 @@ impl LogletBase for LocalLoglet { /// Trim the log to the minimum of new_trim_point and last_committed_offset /// new_trim_point is inclusive (will be trimmed) - async fn trim(&self, new_trim_point: Self::Offset) -> Result<(), Error> { + async fn trim(&self, new_trim_point: Self::Offset) -> Result<(), OperationError> { let effective_trim_point = new_trim_point.min(LogletOffset( self.last_committed_offset.load(Ordering::Relaxed), )); @@ -318,7 +328,14 @@ impl LogletBase for LocalLoglet { Ok(()) } - async fn read_next_single(&self, from: Self::Offset) -> Result> { + async fn seal(&self) -> Result<(), OperationError> { + todo!() + } + + async fn read_next_single( + &self, + from: Self::Offset, + ) -> Result, OperationError> { loop { let next_record = self.read_from(from)?; if let Some(next_record) = next_record { @@ -332,7 +349,7 @@ impl LogletBase for LocalLoglet { async fn read_next_single_opt( &self, from: Self::Offset, - ) -> Result>> { + ) -> Result>, OperationError> { self.read_from(from) } } @@ -345,7 +362,7 @@ mod tests { use restate_types::live::Live; use restate_types::logs::metadata::{LogletParams, ProviderKind}; - use crate::loglet_tests::*; + use crate::loglet::loglet_tests::*; use super::*; diff --git a/crates/bifrost/src/loglets/local_loglet/provider.rs b/crates/bifrost/src/providers/local_loglet/provider.rs similarity index 92% rename from crates/bifrost/src/loglets/local_loglet/provider.rs rename to crates/bifrost/src/providers/local_loglet/provider.rs index 995b745eed..ede27e6d55 100644 --- a/crates/bifrost/src/loglets/local_loglet/provider.rs +++ b/crates/bifrost/src/providers/local_loglet/provider.rs @@ -11,7 +11,6 @@ use std::collections::{hash_map, HashMap}; use std::sync::Arc; -use anyhow::Context; use async_trait::async_trait; use tokio::sync::Mutex as AsyncMutex; use tracing::debug; @@ -23,9 +22,8 @@ use restate_types::logs::metadata::{LogletParams, ProviderKind}; use super::log_store::RocksDbLogStore; use super::log_store_writer::RocksDbLogWriterHandle; use super::{metric_definitions, LocalLoglet}; -use crate::loglet::{Loglet, LogletOffset}; -use crate::ProviderError; -use crate::{Error, LogletProvider}; +use crate::loglet::{Loglet, LogletOffset, LogletProvider, LogletProviderFactory, OperationError}; +use crate::Error; pub struct Factory { options: BoxedLiveLoad, @@ -45,12 +43,12 @@ impl Factory { } #[async_trait] -impl crate::LogletProviderFactory for Factory { +impl LogletProviderFactory for Factory { fn kind(&self) -> ProviderKind { ProviderKind::Local } - async fn create(self: Box) -> Result, ProviderError> { + async fn create(self: Box) -> Result, OperationError> { metric_definitions::describe_metrics(); let Factory { mut options, @@ -60,7 +58,7 @@ impl crate::LogletProviderFactory for Factory { let opts = options.live_load(); let log_store = RocksDbLogStore::create(opts, rocksdb_opts) .await - .context("RocksDb LogStore")?; + .map_err(OperationError::other)?; let log_writer = log_store.create_writer().start(options)?; debug!("Started a bifrost local loglet provider"); Ok(Arc::new(LocalLogletProvider { @@ -111,7 +109,7 @@ impl LogletProvider for LocalLogletProvider { Ok(loglet as Arc) } - async fn shutdown(&self) -> Result<(), ProviderError> { + async fn shutdown(&self) -> Result<(), OperationError> { Ok(()) } } diff --git a/crates/bifrost/src/loglets/local_loglet/read_stream.rs b/crates/bifrost/src/providers/local_loglet/read_stream.rs similarity index 91% rename from crates/bifrost/src/loglets/local_loglet/read_stream.rs rename to crates/bifrost/src/providers/local_loglet/read_stream.rs index 8f20f3b134..b5dde70d4a 100644 --- a/crates/bifrost/src/loglets/local_loglet/read_stream.rs +++ b/crates/bifrost/src/providers/local_loglet/read_stream.rs @@ -24,9 +24,9 @@ use restate_core::ShutdownError; use restate_rocksdb::RocksDbPerfGuard; use restate_types::logs::SequenceNumber; -use crate::loglet::{LogletOffset, LogletReadStream}; -use crate::loglets::local_loglet::LogStoreError; -use crate::{Error, LogRecord, Result}; +use crate::loglet::{LogletOffset, LogletReadStream, OperationError}; +use crate::providers::local_loglet::LogStoreError; +use crate::{LogRecord, Result}; use super::keys::RecordKey; use super::LocalLoglet; @@ -38,6 +38,8 @@ pub(crate) struct LocalLogletReadStream { // the next record this stream will attempt to read read_pointer: LogletOffset, release_pointer: LogletOffset, + /// Last offset to read before terminating the stream. None means "tailing" reader. + read_to: Option, #[pin] iterator: DBRawIteratorWithThreadMode<'static, DB>, #[pin] @@ -62,6 +64,7 @@ impl LocalLogletReadStream { pub(crate) async fn create( loglet: Arc, from_offset: LogletOffset, + to: Option, ) -> Result { // Reading from INVALID resets to OLDEST. let from_offset = from_offset.max(LogletOffset::OLDEST); @@ -111,6 +114,7 @@ impl LocalLogletReadStream { terminated: false, release_watch, release_pointer, + read_to: to, }) } } @@ -127,7 +131,7 @@ impl LogletReadStream for LocalLogletReadStream { } impl Stream for LocalLogletReadStream { - type Item = Result>; + type Item = Result, OperationError>; fn poll_next( mut self: std::pin::Pin<&mut Self>, @@ -143,6 +147,11 @@ impl Stream for LocalLogletReadStream { loop { let mut this = self.as_mut().project(); + // We have reached the limit we are allowed to read + if this.read_to.is_some_and(|read_to| next_offset > read_to) { + this.terminated.set(true); + return Poll::Ready(None); + } // Are we reading after commit offset? // We are at tail. We need to wait until new records have been released. if next_offset > *this.release_pointer { @@ -162,7 +171,7 @@ impl Stream for LocalLogletReadStream { None => { // system shutdown. Or that the loglet has been unexpectedly shutdown. this.terminated.set(true); - return Poll::Ready(Some(Err(Error::Shutdown(ShutdownError)))); + return Poll::Ready(Some(Err(OperationError::Shutdown(ShutdownError)))); } } } @@ -199,7 +208,7 @@ impl Stream for LocalLogletReadStream { // todo: If status is not ok(), we should retry if let Err(e) = this.iterator.status() { this.terminated.set(true); - return Poll::Ready(Some(Err(Error::LogStoreError(LogStoreError::Rocksdb(e))))); + return Poll::Ready(Some(Err(OperationError::other(LogStoreError::Rocksdb(e))))); } if !this.iterator.valid() || this.iterator.key().is_none() { diff --git a/crates/bifrost/src/loglets/memory_loglet.rs b/crates/bifrost/src/providers/memory_loglet.rs similarity index 78% rename from crates/bifrost/src/loglets/memory_loglet.rs rename to crates/bifrost/src/providers/memory_loglet.rs index 7249d648f5..27e62dec96 100644 --- a/crates/bifrost/src/loglets/memory_loglet.rs +++ b/crates/bifrost/src/providers/memory_loglet.rs @@ -27,11 +27,13 @@ use tracing::{debug, info}; use restate_types::logs::metadata::{LogletParams, ProviderKind}; use restate_types::logs::SequenceNumber; -use crate::loglet::{Loglet, LogletBase, LogletOffset, LogletReadStream, SendableLogletReadStream}; -use crate::{Error, LogRecord, LogletProvider, TailState}; -use crate::{ProviderError, Result}; - -use super::util::OffsetWatch; +use crate::loglet::util::OffsetWatch; +use crate::loglet::{ + AppendError, Loglet, LogletBase, LogletOffset, LogletProvider, LogletProviderFactory, + LogletReadStream, OperationError, SendableLogletReadStream, +}; +use crate::Result; +use crate::{LogRecord, TailState}; #[derive(Default)] pub struct Factory { @@ -39,6 +41,7 @@ pub struct Factory { } impl Factory { + #[cfg(test)] pub fn with_init_delay(init_delay: Duration) -> Self { Self { init_delay: Some(init_delay), @@ -47,12 +50,12 @@ impl Factory { } #[async_trait] -impl crate::LogletProviderFactory for Factory { +impl LogletProviderFactory for Factory { fn kind(&self) -> ProviderKind { ProviderKind::InMemory } - async fn create(self: Box) -> Result, ProviderError> { + async fn create(self: Box) -> Result, OperationError> { Ok(Arc::new(MemoryLogletProvider { store: Default::default(), init_delay: self.init_delay.unwrap_or_default(), @@ -92,7 +95,7 @@ impl LogletProvider for MemoryLogletProvider { Ok(loglet as Arc) } - async fn shutdown(&self) -> Result<(), ProviderError> { + async fn shutdown(&self) -> Result<(), OperationError> { info!("Shutting down in-memory loglet provider"); Ok(()) } @@ -124,18 +127,18 @@ impl MemoryLoglet { } fn index_to_offset(&self, index: usize) -> LogletOffset { - let offset = self.trim_point_offset.load(Ordering::Acquire); + let offset = self.trim_point_offset.load(Ordering::Relaxed); LogletOffset::from(offset + 1 + index as u64) } fn saturating_offset_to_index(&self, offset: LogletOffset) -> usize { - let trim_point = self.trim_point_offset.load(Ordering::Acquire); + let trim_point = self.trim_point_offset.load(Ordering::Relaxed); (offset.0.saturating_sub(trim_point) - 1) as usize } - pub fn advance_commit_offset(&self, offset: LogletOffset) { + fn advance_commit_offset(&self, offset: LogletOffset) { self.last_committed_offset - .fetch_max(offset.into(), Ordering::Release); + .fetch_max(offset.into(), Ordering::Relaxed); self.notify_readers(); } @@ -147,9 +150,9 @@ impl MemoryLoglet { fn read_from( &self, from_offset: LogletOffset, - ) -> Result>> { + ) -> Result>, OperationError> { let guard = self.log.lock().unwrap(); - let trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Acquire)); + let trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); let head_offset = trim_point.next(); // Are we reading behind the loglet head? if from_offset < head_offset { @@ -157,7 +160,7 @@ impl MemoryLoglet { } // are we reading after commit offset? - let commit_offset = LogletOffset(self.last_committed_offset.load(Ordering::Acquire)); + let commit_offset = LogletOffset(self.last_committed_offset.load(Ordering::Relaxed)); if from_offset > commit_offset { Ok(None) } else { @@ -177,14 +180,20 @@ struct MemoryReadStream { read_pointer: LogletOffset, #[pin] release_watch: WatchStream, - // how far we are allowed to read in the loglet + /// how far we are allowed to read in the loglet release_pointer: LogletOffset, + /// Last offset to read before terminating the stream. None means "tailing" reader. + read_to: Option, #[pin] terminated: bool, } impl MemoryReadStream { - async fn create(loglet: Arc, from_offset: LogletOffset) -> Self { + async fn create( + loglet: Arc, + from_offset: LogletOffset, + to: Option, + ) -> Self { let mut release_watch = loglet.release_watch.to_stream(); let release_pointer = release_watch .next() @@ -196,6 +205,7 @@ impl MemoryReadStream { read_pointer: from_offset, release_watch, release_pointer, + read_to: to, terminated: false, } } @@ -213,7 +223,7 @@ impl LogletReadStream for MemoryReadStream { } impl Stream for MemoryReadStream { - type Item = Result>; + type Item = Result, OperationError>; fn poll_next( mut self: std::pin::Pin<&mut Self>, @@ -224,9 +234,16 @@ impl Stream for MemoryReadStream { } let next_offset = self.read_pointer; + loop { let mut this = self.as_mut().project(); + // We have reached the limit we are allowed to read + if this.read_to.is_some_and(|read_to| next_offset > read_to) { + this.terminated.set(true); + return Poll::Ready(None); + } + // Are we reading after commit offset? // We are at tail. We need to wait until new records have been released. if next_offset > *this.release_pointer { @@ -245,7 +262,7 @@ impl Stream for MemoryReadStream { None => { // system shutdown. Or that the loglet has been unexpectedly shutdown. this.terminated.set(true); - return Poll::Ready(Some(Err(Error::Shutdown(ShutdownError)))); + return Poll::Ready(Some(Err(OperationError::Shutdown(ShutdownError)))); } } } @@ -291,12 +308,16 @@ impl LogletBase for MemoryLoglet { async fn create_read_stream( self: Arc, from: Self::Offset, + to: Option, ) -> Result> { - Ok(Box::pin(MemoryReadStream::create(self, from).await)) + Ok(Box::pin(MemoryReadStream::create(self, from, to).await)) } - async fn append(&self, payload: Bytes) -> Result { + async fn append(&self, payload: Bytes) -> Result { let mut log = self.log.lock().unwrap(); + if self.sealed.load(Ordering::Relaxed) { + return Err(AppendError::Sealed); + } let offset = self.index_to_offset(log.len()); debug!( "Appending record to in-memory loglet {:?} at offset {}", @@ -304,14 +325,17 @@ impl LogletBase for MemoryLoglet { ); log.push(payload); // mark as committed immediately. - let offset = LogletOffset(self.last_committed_offset.load(Ordering::Acquire)).next(); + let offset = LogletOffset(self.last_committed_offset.load(Ordering::Relaxed)).next(); self.advance_commit_offset(offset); Ok(offset) } - async fn append_batch(&self, payloads: &[Bytes]) -> Result { + async fn append_batch(&self, payloads: &[Bytes]) -> Result { let mut log = self.log.lock().unwrap(); - let offset = LogletOffset(self.last_committed_offset.load(Ordering::Acquire)).next(); + if self.sealed.load(Ordering::Relaxed) { + return Err(AppendError::Sealed); + } + let offset = LogletOffset(self.last_committed_offset.load(Ordering::Relaxed)).next(); let first_offset = offset; let num_payloads = payloads.len(); for payload in payloads { @@ -326,9 +350,10 @@ impl LogletBase for MemoryLoglet { Ok(first_offset) } - async fn find_tail(&self) -> Result> { - let committed = LogletOffset(self.last_committed_offset.load(Ordering::Acquire)).next(); - let sealed = self.sealed.load(Ordering::Acquire); + async fn find_tail(&self) -> Result, OperationError> { + let _guard = self.log.lock().unwrap(); + let sealed = self.sealed.load(Ordering::Relaxed); + let committed = LogletOffset(self.last_committed_offset.load(Ordering::Relaxed)).next(); Ok(if sealed { TailState::Sealed(committed) } else { @@ -337,7 +362,8 @@ impl LogletBase for MemoryLoglet { } /// Find the head (oldest) record in the loglet. - async fn get_trim_point(&self) -> Result> { + async fn get_trim_point(&self) -> Result, OperationError> { + let _guard = self.log.lock().unwrap(); let current_trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); if current_trim_point == LogletOffset::INVALID { @@ -347,13 +373,12 @@ impl LogletBase for MemoryLoglet { } } - async fn trim(&self, new_trim_point: Self::Offset) -> Result<()> { + async fn trim(&self, new_trim_point: Self::Offset) -> Result<(), OperationError> { + let mut log = self.log.lock().unwrap(); let actual_trim_point = new_trim_point.min(LogletOffset( self.last_committed_offset.load(Ordering::Relaxed), )); - let mut log = self.log.lock().unwrap(); - let current_trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); if current_trim_point >= actual_trim_point { @@ -368,7 +393,17 @@ impl LogletBase for MemoryLoglet { Ok(()) } - async fn read_next_single(&self, from: LogletOffset) -> Result> { + async fn seal(&self) -> Result<(), OperationError> { + // Ensures no in-flight operations are taking place. + let _guard = self.log.lock().unwrap(); + self.sealed.store(true, Ordering::Relaxed); + Ok(()) + } + + async fn read_next_single( + &self, + from: LogletOffset, + ) -> Result, OperationError> { loop { let next_record = self.read_from(from)?; if let Some(next_record) = next_record { @@ -382,14 +417,14 @@ impl LogletBase for MemoryLoglet { async fn read_next_single_opt( &self, after: Self::Offset, - ) -> Result>> { + ) -> Result>, OperationError> { self.read_from(after) } } #[cfg(test)] mod tests { - use crate::loglet_tests::*; + use crate::loglet::loglet_tests::*; use super::*; @@ -410,4 +445,17 @@ mod tests { let loglet = MemoryLoglet::new(LogletParams::from("112".to_string())); single_loglet_readstream_test_with_trims(loglet).await } + + #[tokio::test(start_paused = true)] + async fn memory_loglet_test_append_after_seal() -> googletest::Result<()> { + let loglet = MemoryLoglet::new(LogletParams::from("112".to_string())); + loglet_test_append_after_seal(loglet).await + } + + // multi-threaded to check correctness under parallel conditions + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn memory_loglet_test_append_after_seal_concurrent() -> googletest::Result<()> { + let loglet = MemoryLoglet::new(LogletParams::from("112".to_string())); + loglet_test_append_after_seal_concurrent(loglet).await + } } diff --git a/crates/bifrost/src/loglets/mod.rs b/crates/bifrost/src/providers/mod.rs similarity index 92% rename from crates/bifrost/src/loglets/mod.rs rename to crates/bifrost/src/providers/mod.rs index 0ba4f51f4b..2f091b1841 100644 --- a/crates/bifrost/src/loglets/mod.rs +++ b/crates/bifrost/src/providers/mod.rs @@ -10,5 +10,5 @@ pub mod local_loglet; pub mod memory_loglet; +#[cfg(feature = "replicated-loglet")] pub mod replicated_loglet; -pub(crate) mod util; diff --git a/crates/bifrost/src/loglets/replicated_loglet/metric_definitions.rs b/crates/bifrost/src/providers/replicated_loglet/metric_definitions.rs similarity index 100% rename from crates/bifrost/src/loglets/replicated_loglet/metric_definitions.rs rename to crates/bifrost/src/providers/replicated_loglet/metric_definitions.rs diff --git a/crates/bifrost/src/loglets/replicated_loglet/mod.rs b/crates/bifrost/src/providers/replicated_loglet/mod.rs similarity index 100% rename from crates/bifrost/src/loglets/replicated_loglet/mod.rs rename to crates/bifrost/src/providers/replicated_loglet/mod.rs diff --git a/crates/bifrost/src/loglets/replicated_loglet/provider.rs b/crates/bifrost/src/providers/replicated_loglet/provider.rs similarity index 89% rename from crates/bifrost/src/loglets/replicated_loglet/provider.rs rename to crates/bifrost/src/providers/replicated_loglet/provider.rs index db1c9e33f3..2459de2708 100644 --- a/crates/bifrost/src/loglets/replicated_loglet/provider.rs +++ b/crates/bifrost/src/providers/replicated_loglet/provider.rs @@ -24,9 +24,8 @@ use restate_types::live::BoxedLiveLoad; use restate_types::logs::metadata::{LogletParams, ProviderKind}; use super::metric_definitions; -use crate::loglet::{Loglet, LogletOffset}; -use crate::ProviderError; -use crate::{Error, LogletProvider}; +use crate::loglet::{Loglet, LogletOffset, LogletProvider, LogletProviderFactory, OperationError}; +use crate::Error; pub struct Factory { opts: BoxedLiveLoad, @@ -53,12 +52,12 @@ impl Factory { } #[async_trait] -impl crate::LogletProviderFactory for Factory { +impl LogletProviderFactory for Factory { fn kind(&self) -> ProviderKind { ProviderKind::Replicated } - async fn create(self: Box) -> Result, ProviderError> { + async fn create(self: Box) -> Result, OperationError> { metric_definitions::describe_metrics(); Ok(Arc::new(ReplicatedLogletProvider)) } @@ -76,7 +75,7 @@ impl LogletProvider for ReplicatedLogletProvider { todo!("Not implemented yet") } - async fn shutdown(&self) -> Result<(), ProviderError> { + async fn shutdown(&self) -> Result<(), OperationError> { Ok(()) } } diff --git a/crates/bifrost/src/read_stream.rs b/crates/bifrost/src/read_stream.rs index 606735b5d5..ef799f4ea6 100644 --- a/crates/bifrost/src/read_stream.rs +++ b/crates/bifrost/src/read_stream.rs @@ -21,8 +21,8 @@ use restate_types::logs::SequenceNumber; use restate_types::logs::{LogId, Lsn}; use crate::bifrost::BifrostInner; -use crate::loglet::LogletReadStreamWrapper; -use crate::loglet::LogletWrapper; +use crate::loglet_wrapper::LogletReadStreamWrapper; +use crate::loglet_wrapper::LogletWrapper; use crate::FindTailAttributes; use crate::LogRecord; use crate::Result; @@ -143,7 +143,7 @@ impl Stream for LogReadStream { self.read_pointer = new_pointer; Poll::Ready(Some(Ok(record))) } - Some(Err(e)) => Poll::Ready(Some(Err(e))), + Some(Err(e)) => Poll::Ready(Some(Err(e.into()))), None => { // todo: check if we should switch the loglet. self.as_mut().terminated = true; diff --git a/crates/bifrost/src/service.rs b/crates/bifrost/src/service.rs index e8d7884831..ac68d8058b 100644 --- a/crates/bifrost/src/service.rs +++ b/crates/bifrost/src/service.rs @@ -21,9 +21,9 @@ use restate_core::{cancellation_watcher, Metadata, TaskCenter, TaskKind}; use restate_types::logs::metadata::ProviderKind; use crate::bifrost::BifrostInner; -use crate::loglets::{local_loglet, memory_loglet}; +use crate::providers::{local_loglet, memory_loglet}; use crate::watchdog::{Watchdog, WatchdogCommand}; -use crate::{Bifrost, LogletProviderFactory}; +use crate::{loglet::LogletProviderFactory, Bifrost}; pub struct BifrostService { task_center: TaskCenter, diff --git a/crates/bifrost/src/watchdog.rs b/crates/bifrost/src/watchdog.rs index c125debed7..749bcf37ff 100644 --- a/crates/bifrost/src/watchdog.rs +++ b/crates/bifrost/src/watchdog.rs @@ -19,7 +19,7 @@ use tokio::task::JoinSet; use tracing::{debug, trace, warn}; use crate::bifrost::BifrostInner; -use crate::provider::LogletProvider; +use crate::loglet::LogletProvider; pub type WatchdogSender = tokio::sync::mpsc::UnboundedSender; type WatchdogReceiver = tokio::sync::mpsc::UnboundedReceiver; diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index cc5914469b..55dcd0dee1 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -160,7 +160,7 @@ impl Node { // Setup bifrost // replicated-loglet #[cfg(feature = "replicated-loglet")] - let replicated_loglet_factory = restate_bifrost::loglets::replicated_loglet::Factory::new( + let replicated_loglet_factory = restate_bifrost::providers::replicated_loglet::Factory::new( updateable_config .clone() .map(|c| &c.bifrost.replicated_loglet)