diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index cf770233d..257d77cd2 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -24,7 +24,7 @@ use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber}; use restate_types::storage::StorageCodec; use restate_types::Version; -use crate::loglet::{LogletBase, LogletProvider}; +use crate::loglet::{AppendError, LogletBase, LogletProvider}; use crate::loglet_wrapper::LogletWrapper; use crate::watchdog::WatchdogSender; use crate::{ @@ -257,7 +257,14 @@ impl BifrostInner { let loglet = self.writeable_loglet(log_id).await?; let mut buf = BytesMut::default(); StorageCodec::encode(payload, &mut buf).expect("serialization to bifrost is infallible"); - loglet.append(buf.freeze()).await + + let res = loglet.append(buf.freeze()).await; + // todo: Handle retries, segment seals and other recoverable errors. + res.map_err(|e| match e { + AppendError::Sealed => todo!(), + AppendError::Shutdown(e) => Error::Shutdown(e), + AppendError::Other(e) => Error::LogletError(e), + }) } pub async fn append_batch(&self, log_id: LogId, payloads: &[Payload]) -> Result { @@ -271,7 +278,13 @@ impl BifrostInner { buf.freeze() }) .collect(); - loglet.append_batch(&raw_payloads).await + let res = loglet.append_batch(&raw_payloads).await; + // todo: Handle retries, segment seals and other recoverable errors. + res.map_err(|e| match e { + AppendError::Sealed => todo!(), + AppendError::Shutdown(e) => Error::Shutdown(e), + AppendError::Other(e) => Error::LogletError(e), + }) } pub async fn read_next_single(&self, log_id: LogId, from: Lsn) -> Result { @@ -377,10 +390,7 @@ impl BifrostInner { /// Immediately fetch new metadata from metadata store. pub async fn sync_metadata(&self) -> Result<()> { self.fail_if_shutting_down()?; - self.metadata - .sync(MetadataKind::Logs) - .await - .map_err(Arc::new)?; + self.metadata.sync(MetadataKind::Logs).await?; Ok(()) } diff --git a/crates/bifrost/src/error.rs b/crates/bifrost/src/error.rs index 997ed338d..3c3377e60 100644 --- a/crates/bifrost/src/error.rs +++ b/crates/bifrost/src/error.rs @@ -13,13 +13,13 @@ use std::sync::Arc; use restate_types::logs::{LogId, Lsn}; -use crate::providers::local_loglet::LogStoreError; +use crate::loglet::{LogletError, OperationError}; use crate::types::SealReason; /// Result type for bifrost operations. pub type Result = std::result::Result; -#[derive(thiserror::Error, Debug, Clone)] +#[derive(thiserror::Error, Debug)] pub enum Error { #[error("log '{0}' is sealed")] LogSealed(LogId, SealReason), @@ -30,11 +30,19 @@ pub enum Error { #[error("operation failed due to an ongoing shutdown")] Shutdown(#[from] ShutdownError), #[error(transparent)] - LogStoreError(#[from] LogStoreError), + LogletError(#[from] Arc), #[error("failed syncing logs metadata: {0}")] - // unfortunately, we have to use Arc here, because the SyncError is not Clone. - MetadataSync(#[from] Arc), + MetadataSync(#[from] SyncError), /// Provider is unknown or disabled #[error("bifrost provider '{0}' is disabled or unrecognized")] Disabled(String), } + +impl From for Error { + fn from(value: OperationError) -> Self { + match value { + OperationError::Shutdown(e) => Error::Shutdown(e), + OperationError::Other(e) => Error::LogletError(e), + } + } +} diff --git a/crates/bifrost/src/loglet/error.rs b/crates/bifrost/src/loglet/error.rs index 1204586a8..69abeed3b 100644 --- a/crates/bifrost/src/loglet/error.rs +++ b/crates/bifrost/src/loglet/error.rs @@ -8,11 +8,105 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::fmt::{Debug, Display}; +use std::sync::Arc; + use restate_core::ShutdownError; +#[derive(Debug, Clone, thiserror::Error)] +pub enum AppendError { + #[error("Loglet is sealed")] + Sealed, + #[error(transparent)] + Shutdown(#[from] ShutdownError), + #[error(transparent)] + Other(Arc), +} + +impl AppendError { + pub fn retryable(error: E) -> Self { + Self::Other(Arc::new(RetryableError(error))) + } + + pub fn terminal(error: E) -> Self { + Self::Other(Arc::new(TerminalError(error))) + } + + pub fn other(error: E) -> Self { + Self::Other(Arc::new(error)) + } +} + #[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub enum ProviderError { +pub enum OperationError { + #[error(transparent)] Shutdown(#[from] ShutdownError), - Other(#[from] anyhow::Error), + #[error(transparent)] + Other(Arc), +} + +impl OperationError { + pub fn retryable(error: E) -> Self { + Self::Other(Arc::new(RetryableError(error))) + } + + pub fn terminal(error: E) -> Self { + Self::Other(Arc::new(TerminalError(error))) + } + + pub fn other(error: E) -> Self { + Self::Other(Arc::new(error)) + } +} + +// -- Helper Types -- + +/// Represents a type-erased error from the loglet provider. +pub trait LogletError: std::error::Error + Send + Sync + Debug + Display + 'static { + /// Signal upper layers whether this error should be retried or not. + fn retryable(&self) -> bool { + false + } +} + +#[derive(Debug, thiserror::Error)] +struct RetryableError(#[source] T); + +impl Display for RetryableError +where + T: Debug + Display + Send + Sync + std::error::Error + 'static, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[retryable] {}", self.0) + } +} + +impl LogletError for RetryableError +where + T: Debug + Display + Send + Sync + std::error::Error + 'static, +{ + fn retryable(&self) -> bool { + true + } +} + +#[derive(Debug, thiserror::Error)] +struct TerminalError(#[source] T); + +impl LogletError for TerminalError +where + T: Debug + Display + Send + Sync + std::error::Error + 'static, +{ + fn retryable(&self) -> bool { + false + } +} + +impl Display for TerminalError +where + T: Debug + Display + Send + Sync + std::error::Error + 'static, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[terminal] {}", self.0) + } } diff --git a/crates/bifrost/src/loglet/mod.rs b/crates/bifrost/src/loglet/mod.rs index 38afecddf..477344763 100644 --- a/crates/bifrost/src/loglet/mod.rs +++ b/crates/bifrost/src/loglet/mod.rs @@ -116,11 +116,11 @@ pub trait LogletBase: Send + Sync + std::fmt::Debug { ) -> Result>; /// Append a record to the loglet. - async fn append(&self, data: Bytes) -> Result; + async fn append(&self, data: Bytes) -> Result; /// Append a batch of records to the loglet. The returned offset (on success) if the offset of /// the first record in the batch) - async fn append_batch(&self, payloads: &[Bytes]) -> Result; + async fn append_batch(&self, payloads: &[Bytes]) -> Result; /// The tail is *the first unwritten position* in the loglet. /// @@ -129,14 +129,12 @@ pub trait LogletBase: Send + Sync + std::fmt::Debug { /// after the next `append()` call. /// /// If the loglet is empty, the loglet should return TailState::Open(Offset::OLDEST). - /// This should never return Err(Error::LogSealed). Sealed state is represented as - /// TailState::Sealed(..) - async fn find_tail(&self) -> Result>; + async fn find_tail(&self) -> Result, OperationError>; /// The offset of the slot **before** the first readable record (if it exists), or the offset /// before the next slot that will be written to. Must not return Self::INVALID. If the loglet /// is never trimmed, this must return `None`. - async fn get_trim_point(&self) -> Result>; + async fn get_trim_point(&self) -> Result, OperationError>; /// Trim the loglet prefix up to and including the `trim_point`. /// If trim_point equal or higher than the loglet tail, the loglet trims its data until the tail. @@ -146,21 +144,26 @@ pub trait LogletBase: Send + Sync + std::fmt::Debug { /// /// Passing `Offset::INVALID` is a no-op. (success) /// Passing `Offset::OLDEST` trims the first record in the loglet (if exists). - async fn trim(&self, trim_point: Self::Offset) -> Result<()>; + async fn trim(&self, trim_point: Self::Offset) -> Result<(), OperationError>; /// Read or wait for the record at `from` offset, or the next available record if `from` isn't /// defined for the loglet. - async fn read_next_single(&self, from: Self::Offset) -> Result>; + async fn read_next_single( + &self, + from: Self::Offset, + ) -> Result, OperationError>; /// Read the next record if it's been committed, otherwise, return None without waiting. async fn read_next_single_opt( &self, from: Self::Offset, - ) -> Result>>; + ) -> Result>, OperationError>; } /// A stream of log records from a single loglet. Loglet streams are _always_ tailing streams. -pub trait LogletReadStream: Stream>> { +pub trait LogletReadStream: + Stream, OperationError>> +{ /// Current read pointer. This points to the next offset to be read. fn read_pointer(&self) -> S; /// Returns true if the stream is terminated. diff --git a/crates/bifrost/src/loglet/provider.rs b/crates/bifrost/src/loglet/provider.rs index 65e75965a..acfac51eb 100644 --- a/crates/bifrost/src/loglet/provider.rs +++ b/crates/bifrost/src/loglet/provider.rs @@ -14,7 +14,7 @@ use async_trait::async_trait; use restate_types::logs::metadata::{LogletParams, ProviderKind}; -use super::{Loglet, ProviderError}; +use super::{Loglet, OperationError}; use crate::Result; #[async_trait] @@ -23,7 +23,7 @@ pub trait LogletProviderFactory: Send + 'static { /// Factory creates providers of `kind`. fn kind(&self) -> ProviderKind; /// Initialize provider. - async fn create(self: Box) -> Result, ProviderError>; + async fn create(self: Box) -> Result, OperationError>; } #[async_trait] @@ -35,7 +35,7 @@ pub trait LogletProvider: Send + Sync { async fn post_start(&self) {} /// Hook for handling graceful shutdown - async fn shutdown(&self) -> Result<(), ProviderError> { + async fn shutdown(&self) -> Result<(), OperationError> { Ok(()) } } diff --git a/crates/bifrost/src/loglet_wrapper.rs b/crates/bifrost/src/loglet_wrapper.rs index affb4c127..b73685cdf 100644 --- a/crates/bifrost/src/loglet_wrapper.rs +++ b/crates/bifrost/src/loglet_wrapper.rs @@ -19,7 +19,9 @@ use futures::Stream; use restate_types::logs::{Lsn, SequenceNumber}; -use crate::loglet::{Loglet, LogletBase, LogletOffset, SendableLogletReadStream}; +use crate::loglet::{ + AppendError, Loglet, LogletBase, LogletOffset, OperationError, SendableLogletReadStream, +}; use crate::{LogRecord, LsnExt}; use crate::{Result, TailState}; @@ -71,18 +73,18 @@ impl LogletBase for LogletWrapper { unreachable!("create_read_stream on LogletWrapper should never be used directly") } - async fn append(&self, data: Bytes) -> Result { + async fn append(&self, data: Bytes) -> Result { let offset = self.loglet.append(data).await?; // Return the LSN given the loglet offset. Ok(self.base_lsn.offset_by(offset)) } - async fn append_batch(&self, payloads: &[Bytes]) -> Result { + async fn append_batch(&self, payloads: &[Bytes]) -> Result { let offset = self.loglet.append_batch(payloads).await?; Ok(self.base_lsn.offset_by(offset)) } - async fn find_tail(&self) -> Result> { + async fn find_tail(&self) -> Result, OperationError> { Ok(self .loglet .find_tail() @@ -90,13 +92,13 @@ impl LogletBase for LogletWrapper { .map(|o| self.base_lsn.offset_by(o))) } - async fn get_trim_point(&self) -> Result> { + async fn get_trim_point(&self) -> Result, OperationError> { let offset = self.loglet.get_trim_point().await?; Ok(offset.map(|o| self.base_lsn.offset_by(o))) } // trim_point is inclusive. - async fn trim(&self, trim_point: Self::Offset) -> Result<()> { + async fn trim(&self, trim_point: Self::Offset) -> Result<(), OperationError> { // trimming to INVALID is no-op if trim_point == Self::Offset::INVALID { return Ok(()); @@ -105,7 +107,7 @@ impl LogletBase for LogletWrapper { self.loglet.trim(trim_point).await } - async fn read_next_single(&self, from: Lsn) -> Result> { + async fn read_next_single(&self, from: Lsn) -> Result, OperationError> { // convert LSN to loglet offset let offset = from.into_offset(self.base_lsn); self.loglet @@ -117,7 +119,7 @@ impl LogletBase for LogletWrapper { async fn read_next_single_opt( &self, from: Self::Offset, - ) -> Result>> { + ) -> Result>, OperationError> { let offset = from.into_offset(self.base_lsn); self.loglet .read_next_single_opt(offset) @@ -139,7 +141,7 @@ impl LogletReadStreamWrapper { } impl Stream for LogletReadStreamWrapper { - type Item = Result>; + type Item = Result, OperationError>; fn poll_next( mut self: Pin<&mut Self>, diff --git a/crates/bifrost/src/providers/local_loglet/log_state.rs b/crates/bifrost/src/providers/local_loglet/log_state.rs index 83b4ade55..a07ef6de6 100644 --- a/crates/bifrost/src/providers/local_loglet/log_state.rs +++ b/crates/bifrost/src/providers/local_loglet/log_state.rs @@ -8,8 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::sync::Arc; - use bytes::{BufMut, Bytes, BytesMut}; use restate_types::flexbuffers_storage_encode_decode; use restate_types::storage::StorageCodec; @@ -75,11 +73,11 @@ impl LogStateUpdates { } pub fn encode(&self, buf: &mut B) -> Result<(), LogStoreError> { - StorageCodec::encode(self, buf).map_err(|err| LogStoreError::Encode(Arc::new(err))) + Ok(StorageCodec::encode(self, buf)?) } pub fn from_slice(mut data: &[u8]) -> Result { - StorageCodec::decode(&mut data).map_err(|err| LogStoreError::Decode(Arc::new(err))) + Ok(StorageCodec::decode(&mut data)?) } } @@ -95,12 +93,12 @@ pub struct LogState { impl LogState { pub fn to_bytes(&self) -> Result { let mut buf = BytesMut::default(); - StorageCodec::encode(self, &mut buf).map_err(Arc::new)?; + StorageCodec::encode(self, &mut buf)?; Ok(buf.freeze()) } pub fn from_slice(mut data: &[u8]) -> Result { - StorageCodec::decode(&mut data).map_err(|err| LogStoreError::Decode(Arc::new(err))) + Ok(StorageCodec::decode(&mut data)?) } } diff --git a/crates/bifrost/src/providers/local_loglet/log_store.rs b/crates/bifrost/src/providers/local_loglet/log_store.rs index 7176f763c..765235065 100644 --- a/crates/bifrost/src/providers/local_loglet/log_store.rs +++ b/crates/bifrost/src/providers/local_loglet/log_store.rs @@ -19,6 +19,8 @@ use restate_types::storage::{StorageDecodeError, StorageEncodeError}; use rocksdb::{BoundColumnFamily, DBCompressionType, SliceTransform, DB}; use static_assertions::const_assert; +use crate::loglet::LogletError; + use super::keys::{MetadataKey, MetadataKind, DATA_KEY_PREFIX_LENGTH}; use super::log_state::{log_state_full_merge, log_state_partial_merge, LogState}; use super::log_store_writer::LogStoreWriter; @@ -33,20 +35,29 @@ const DATA_CF_BUDGET_RATIO: f64 = 0.85; const_assert!(DATA_CF_BUDGET_RATIO < 1.0); -#[derive(Debug, Clone, thiserror::Error)] +#[derive(Debug, thiserror::Error)] pub enum LogStoreError { #[error(transparent)] - // unfortunately, we have to use Arc here, because the storage encode error is not Clone. - Encode(#[from] Arc), + Encode(#[from] StorageEncodeError), #[error(transparent)] - // unfortunately, we have to use Arc here, because the storage decode error is not Clone. - Decode(#[from] Arc), + Decode(#[from] StorageDecodeError), #[error(transparent)] Rocksdb(#[from] rocksdb::Error), #[error(transparent)] RocksDbManager(#[from] RocksError), } +impl LogletError for LogStoreError { + fn retryable(&self) -> bool { + match self { + LogStoreError::Encode(_) => false, + LogStoreError::Decode(_) => false, + LogStoreError::Rocksdb(_) => true, + LogStoreError::RocksDbManager(_) => false, + } + } +} + #[derive(Debug, Clone)] pub struct RocksDbLogStore { rocksdb: Arc, diff --git a/crates/bifrost/src/providers/local_loglet/log_store_writer.rs b/crates/bifrost/src/providers/local_loglet/log_store_writer.rs index 0857f308d..04c290d27 100644 --- a/crates/bifrost/src/providers/local_loglet/log_store_writer.rs +++ b/crates/bifrost/src/providers/local_loglet/log_store_writer.rs @@ -33,11 +33,11 @@ use super::log_store::{DATA_CF, METADATA_CF}; use super::metric_definitions::{ BIFROST_LOCAL_WRITE_BATCH_COUNT, BIFROST_LOCAL_WRITE_BATCH_SIZE_BYTES, }; -use crate::loglet::LogletOffset; -use crate::{Error, SMALL_BATCH_THRESHOLD_COUNT}; +use crate::loglet::{AppendError, LogletOffset}; +use crate::SMALL_BATCH_THRESHOLD_COUNT; -type Ack = oneshot::Sender>; -type AckRecv = oneshot::Receiver>; +type Ack = oneshot::Sender>; +type AckRecv = oneshot::Receiver>; pub struct LogStoreWriteCommand { log_id: u64, @@ -266,14 +266,14 @@ impl LogStoreWriter { if let Err(e) = result { error!("Failed to commit local loglet write batch: {}", e); - self.send_acks(Err(Error::LogStoreError(e.into()))); + self.send_acks(Err(AppendError::terminal(e))); return; } self.send_acks(Ok(())); } - fn send_acks(&mut self, result: Result<(), Error>) { + fn send_acks(&mut self, result: Result<(), AppendError>) { self.batch_acks_buf.drain(..).for_each(|a| { let _ = a.send(result.clone()); }); diff --git a/crates/bifrost/src/providers/local_loglet/mod.rs b/crates/bifrost/src/providers/local_loglet/mod.rs index 8f29d6a33..42bd2504e 100644 --- a/crates/bifrost/src/providers/local_loglet/mod.rs +++ b/crates/bifrost/src/providers/local_loglet/mod.rs @@ -29,11 +29,13 @@ use tracing::{debug, warn}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use crate::loglet::{LogletBase, LogletOffset, SendableLogletReadStream}; +use crate::loglet::{ + AppendError, LogletBase, LogletOffset, OperationError, SendableLogletReadStream, +}; use crate::providers::local_loglet::metric_definitions::{ BIFROST_LOCAL_TRIM, BIFROST_LOCAL_TRIM_LENGTH, }; -use crate::{Error, LogRecord, Result, SealReason, TailState}; +use crate::{LogRecord, Result, SealReason, TailState}; use self::keys::RecordKey; use self::log_store::RocksDbLogStore; @@ -76,9 +78,11 @@ impl LocalLoglet { log_id: u64, log_store: RocksDbLogStore, log_writer: RocksDbLogWriterHandle, - ) -> Result { + ) -> Result { // Fetch the log metadata from the store - let log_state = log_store.get_log_state(log_id)?; + let log_state = log_store + .get_log_state(log_id) + .map_err(OperationError::other)?; let log_state = log_state.unwrap_or_default(); let trim_point_offset = AtomicU64::new(log_state.trim_point); @@ -120,7 +124,7 @@ impl LocalLoglet { fn read_from( &self, from_offset: LogletOffset, - ) -> Result>> { + ) -> Result>, OperationError> { debug_assert_ne!(LogletOffset::INVALID, from_offset); let trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); let head_offset = trim_point.next(); @@ -144,7 +148,10 @@ impl LocalLoglet { read_opts, rocksdb::IteratorMode::From(&key.to_bytes(), rocksdb::Direction::Forward), ); - let record = iter.next().transpose().map_err(LogStoreError::Rocksdb)?; + let record = iter + .next() + .transpose() + .map_err(|e| OperationError::other(LogStoreError::Rocksdb(e)))?; let Some(record) = record else { let trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); // we might not have been able to read the next record because of a concurrent trim operation @@ -192,7 +199,7 @@ impl LogletBase for LocalLoglet { Ok(Box::pin(LocalLogletReadStream::create(self, from).await?)) } - async fn append(&self, payload: Bytes) -> Result { + async fn append(&self, payload: Bytes) -> Result { counter!(BIFROST_LOCAL_APPEND).increment(1); let start_time = std::time::Instant::now(); // We hold the lock to ensure that offsets are enqueued in the order of @@ -217,7 +224,7 @@ impl LogletBase for LocalLoglet { let _ = receiver.await.unwrap_or_else(|_| { warn!("Unsure if the local loglet record was written, the ack channel was dropped"); - Err(Error::Shutdown(ShutdownError)) + Err(ShutdownError.into()) })?; self.last_committed_offset @@ -227,7 +234,7 @@ impl LogletBase for LocalLoglet { Ok(offset) } - async fn append_batch(&self, payloads: &[Bytes]) -> Result { + async fn append_batch(&self, payloads: &[Bytes]) -> Result { let num_payloads = payloads.len(); counter!(BIFROST_LOCAL_APPEND).increment(num_payloads as u64); let start_time = std::time::Instant::now(); @@ -251,7 +258,7 @@ impl LogletBase for LocalLoglet { let _ = receiver.await.unwrap_or_else(|_| { warn!("Unsure if the local loglet record was written, the ack channel was dropped"); - Err(Error::Shutdown(ShutdownError)) + Err(ShutdownError.into()) })?; self.last_committed_offset @@ -261,7 +268,7 @@ impl LogletBase for LocalLoglet { Ok(offset) } - async fn find_tail(&self) -> Result> { + async fn find_tail(&self) -> Result, OperationError> { let last_committed = LogletOffset::from(self.last_committed_offset.load(Ordering::Relaxed)).next(); Ok(if self.seal.is_some() { @@ -271,7 +278,7 @@ impl LogletBase for LocalLoglet { }) } - async fn get_trim_point(&self) -> Result> { + async fn get_trim_point(&self) -> Result, OperationError> { let current_trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); if current_trim_point == LogletOffset::INVALID { @@ -283,7 +290,7 @@ impl LogletBase for LocalLoglet { /// Trim the log to the minimum of new_trim_point and last_committed_offset /// new_trim_point is inclusive (will be trimmed) - async fn trim(&self, new_trim_point: Self::Offset) -> Result<(), Error> { + async fn trim(&self, new_trim_point: Self::Offset) -> Result<(), OperationError> { let effective_trim_point = new_trim_point.min(LogletOffset( self.last_committed_offset.load(Ordering::Relaxed), )); @@ -318,7 +325,10 @@ impl LogletBase for LocalLoglet { Ok(()) } - async fn read_next_single(&self, from: Self::Offset) -> Result> { + async fn read_next_single( + &self, + from: Self::Offset, + ) -> Result, OperationError> { loop { let next_record = self.read_from(from)?; if let Some(next_record) = next_record { @@ -332,7 +342,7 @@ impl LogletBase for LocalLoglet { async fn read_next_single_opt( &self, from: Self::Offset, - ) -> Result>> { + ) -> Result>, OperationError> { self.read_from(from) } } diff --git a/crates/bifrost/src/providers/local_loglet/provider.rs b/crates/bifrost/src/providers/local_loglet/provider.rs index 6dec3be78..ede27e6d5 100644 --- a/crates/bifrost/src/providers/local_loglet/provider.rs +++ b/crates/bifrost/src/providers/local_loglet/provider.rs @@ -11,7 +11,6 @@ use std::collections::{hash_map, HashMap}; use std::sync::Arc; -use anyhow::Context; use async_trait::async_trait; use tokio::sync::Mutex as AsyncMutex; use tracing::debug; @@ -23,7 +22,7 @@ use restate_types::logs::metadata::{LogletParams, ProviderKind}; use super::log_store::RocksDbLogStore; use super::log_store_writer::RocksDbLogWriterHandle; use super::{metric_definitions, LocalLoglet}; -use crate::loglet::{Loglet, LogletOffset, LogletProvider, LogletProviderFactory, ProviderError}; +use crate::loglet::{Loglet, LogletOffset, LogletProvider, LogletProviderFactory, OperationError}; use crate::Error; pub struct Factory { @@ -49,7 +48,7 @@ impl LogletProviderFactory for Factory { ProviderKind::Local } - async fn create(self: Box) -> Result, ProviderError> { + async fn create(self: Box) -> Result, OperationError> { metric_definitions::describe_metrics(); let Factory { mut options, @@ -59,7 +58,7 @@ impl LogletProviderFactory for Factory { let opts = options.live_load(); let log_store = RocksDbLogStore::create(opts, rocksdb_opts) .await - .context("RocksDb LogStore")?; + .map_err(OperationError::other)?; let log_writer = log_store.create_writer().start(options)?; debug!("Started a bifrost local loglet provider"); Ok(Arc::new(LocalLogletProvider { @@ -110,7 +109,7 @@ impl LogletProvider for LocalLogletProvider { Ok(loglet as Arc) } - async fn shutdown(&self) -> Result<(), ProviderError> { + async fn shutdown(&self) -> Result<(), OperationError> { Ok(()) } } diff --git a/crates/bifrost/src/providers/local_loglet/read_stream.rs b/crates/bifrost/src/providers/local_loglet/read_stream.rs index c4e6e59a4..a444899b6 100644 --- a/crates/bifrost/src/providers/local_loglet/read_stream.rs +++ b/crates/bifrost/src/providers/local_loglet/read_stream.rs @@ -24,9 +24,9 @@ use restate_core::ShutdownError; use restate_rocksdb::RocksDbPerfGuard; use restate_types::logs::SequenceNumber; -use crate::loglet::{LogletOffset, LogletReadStream}; +use crate::loglet::{LogletOffset, LogletReadStream, OperationError}; use crate::providers::local_loglet::LogStoreError; -use crate::{Error, LogRecord, Result}; +use crate::{LogRecord, Result}; use super::keys::RecordKey; use super::LocalLoglet; @@ -127,7 +127,7 @@ impl LogletReadStream for LocalLogletReadStream { } impl Stream for LocalLogletReadStream { - type Item = Result>; + type Item = Result, OperationError>; fn poll_next( mut self: std::pin::Pin<&mut Self>, @@ -162,7 +162,7 @@ impl Stream for LocalLogletReadStream { None => { // system shutdown. Or that the loglet has been unexpectedly shutdown. this.terminated.set(true); - return Poll::Ready(Some(Err(Error::Shutdown(ShutdownError)))); + return Poll::Ready(Some(Err(OperationError::Shutdown(ShutdownError)))); } } } @@ -199,7 +199,7 @@ impl Stream for LocalLogletReadStream { // todo: If status is not ok(), we should retry if let Err(e) = this.iterator.status() { this.terminated.set(true); - return Poll::Ready(Some(Err(Error::LogStoreError(LogStoreError::Rocksdb(e))))); + return Poll::Ready(Some(Err(OperationError::other(LogStoreError::Rocksdb(e))))); } if !this.iterator.valid() || this.iterator.key().is_none() { diff --git a/crates/bifrost/src/providers/memory_loglet.rs b/crates/bifrost/src/providers/memory_loglet.rs index 2457c64b6..e1ecae473 100644 --- a/crates/bifrost/src/providers/memory_loglet.rs +++ b/crates/bifrost/src/providers/memory_loglet.rs @@ -29,11 +29,11 @@ use restate_types::logs::SequenceNumber; use crate::loglet::util::OffsetWatch; use crate::loglet::{ - Loglet, LogletBase, LogletOffset, LogletProvider, LogletProviderFactory, LogletReadStream, - ProviderError, SendableLogletReadStream, + AppendError, Loglet, LogletBase, LogletOffset, LogletProvider, LogletProviderFactory, + LogletReadStream, OperationError, SendableLogletReadStream, }; use crate::Result; -use crate::{Error, LogRecord, TailState}; +use crate::{LogRecord, TailState}; #[derive(Default)] pub struct Factory { @@ -55,7 +55,7 @@ impl LogletProviderFactory for Factory { ProviderKind::InMemory } - async fn create(self: Box) -> Result, ProviderError> { + async fn create(self: Box) -> Result, OperationError> { Ok(Arc::new(MemoryLogletProvider { store: Default::default(), init_delay: self.init_delay.unwrap_or_default(), @@ -95,7 +95,7 @@ impl LogletProvider for MemoryLogletProvider { Ok(loglet as Arc) } - async fn shutdown(&self) -> Result<(), ProviderError> { + async fn shutdown(&self) -> Result<(), OperationError> { info!("Shutting down in-memory loglet provider"); Ok(()) } @@ -150,7 +150,7 @@ impl MemoryLoglet { fn read_from( &self, from_offset: LogletOffset, - ) -> Result>> { + ) -> Result>, OperationError> { let guard = self.log.lock().unwrap(); let trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Acquire)); let head_offset = trim_point.next(); @@ -216,7 +216,7 @@ impl LogletReadStream for MemoryReadStream { } impl Stream for MemoryReadStream { - type Item = Result>; + type Item = Result, OperationError>; fn poll_next( mut self: std::pin::Pin<&mut Self>, @@ -248,7 +248,7 @@ impl Stream for MemoryReadStream { None => { // system shutdown. Or that the loglet has been unexpectedly shutdown. this.terminated.set(true); - return Poll::Ready(Some(Err(Error::Shutdown(ShutdownError)))); + return Poll::Ready(Some(Err(OperationError::Shutdown(ShutdownError)))); } } } @@ -298,7 +298,7 @@ impl LogletBase for MemoryLoglet { Ok(Box::pin(MemoryReadStream::create(self, from).await)) } - async fn append(&self, payload: Bytes) -> Result { + async fn append(&self, payload: Bytes) -> Result { let mut log = self.log.lock().unwrap(); let offset = self.index_to_offset(log.len()); debug!( @@ -312,7 +312,7 @@ impl LogletBase for MemoryLoglet { Ok(offset) } - async fn append_batch(&self, payloads: &[Bytes]) -> Result { + async fn append_batch(&self, payloads: &[Bytes]) -> Result { let mut log = self.log.lock().unwrap(); let offset = LogletOffset(self.last_committed_offset.load(Ordering::Acquire)).next(); let first_offset = offset; @@ -329,7 +329,7 @@ impl LogletBase for MemoryLoglet { Ok(first_offset) } - async fn find_tail(&self) -> Result> { + async fn find_tail(&self) -> Result, OperationError> { let committed = LogletOffset(self.last_committed_offset.load(Ordering::Acquire)).next(); let sealed = self.sealed.load(Ordering::Acquire); Ok(if sealed { @@ -340,7 +340,7 @@ impl LogletBase for MemoryLoglet { } /// Find the head (oldest) record in the loglet. - async fn get_trim_point(&self) -> Result> { + async fn get_trim_point(&self) -> Result, OperationError> { let current_trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); if current_trim_point == LogletOffset::INVALID { @@ -350,7 +350,7 @@ impl LogletBase for MemoryLoglet { } } - async fn trim(&self, new_trim_point: Self::Offset) -> Result<()> { + async fn trim(&self, new_trim_point: Self::Offset) -> Result<(), OperationError> { let actual_trim_point = new_trim_point.min(LogletOffset( self.last_committed_offset.load(Ordering::Relaxed), )); @@ -371,7 +371,10 @@ impl LogletBase for MemoryLoglet { Ok(()) } - async fn read_next_single(&self, from: LogletOffset) -> Result> { + async fn read_next_single( + &self, + from: LogletOffset, + ) -> Result, OperationError> { loop { let next_record = self.read_from(from)?; if let Some(next_record) = next_record { @@ -385,7 +388,7 @@ impl LogletBase for MemoryLoglet { async fn read_next_single_opt( &self, after: Self::Offset, - ) -> Result>> { + ) -> Result>, OperationError> { self.read_from(after) } } diff --git a/crates/bifrost/src/providers/replicated_loglet/provider.rs b/crates/bifrost/src/providers/replicated_loglet/provider.rs index df5b69398..2459de270 100644 --- a/crates/bifrost/src/providers/replicated_loglet/provider.rs +++ b/crates/bifrost/src/providers/replicated_loglet/provider.rs @@ -24,7 +24,7 @@ use restate_types::live::BoxedLiveLoad; use restate_types::logs::metadata::{LogletParams, ProviderKind}; use super::metric_definitions; -use crate::loglet::{Loglet, LogletOffset, LogletProvider, LogletProviderFactory, ProviderError}; +use crate::loglet::{Loglet, LogletOffset, LogletProvider, LogletProviderFactory, OperationError}; use crate::Error; pub struct Factory { @@ -57,7 +57,7 @@ impl LogletProviderFactory for Factory { ProviderKind::Replicated } - async fn create(self: Box) -> Result, ProviderError> { + async fn create(self: Box) -> Result, OperationError> { metric_definitions::describe_metrics(); Ok(Arc::new(ReplicatedLogletProvider)) } @@ -75,7 +75,7 @@ impl LogletProvider for ReplicatedLogletProvider { todo!("Not implemented yet") } - async fn shutdown(&self) -> Result<(), ProviderError> { + async fn shutdown(&self) -> Result<(), OperationError> { Ok(()) } } diff --git a/crates/bifrost/src/read_stream.rs b/crates/bifrost/src/read_stream.rs index 6bc2f19b1..ef799f4ea 100644 --- a/crates/bifrost/src/read_stream.rs +++ b/crates/bifrost/src/read_stream.rs @@ -143,7 +143,7 @@ impl Stream for LogReadStream { self.read_pointer = new_pointer; Poll::Ready(Some(Ok(record))) } - Some(Err(e)) => Poll::Ready(Some(Err(e))), + Some(Err(e)) => Poll::Ready(Some(Err(e.into()))), None => { // todo: check if we should switch the loglet. self.as_mut().terminated = true;