Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bifrost] Decouple loglet errors from bifrost errors #1705

Merged
merged 2 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license.workspace = true
publish = false

[features]
default = ["replicated-loglet"]
default = []
options_schema = ["dep:schemars"]
replicated-loglet = ["restate-types/replicated-loglet", "restate-metadata-store"]
test-util = []
Expand Down
31 changes: 22 additions & 9 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber};
use restate_types::storage::StorageCodec;
use restate_types::Version;

use crate::loglet::{LogletBase, LogletWrapper};
use crate::loglet::{AppendError, LogletBase, LogletProvider};
use crate::loglet_wrapper::LogletWrapper;
use crate::watchdog::WatchdogSender;
use crate::{
Error, FindTailAttributes, LogReadStream, LogRecord, LogletProvider, Result, TailState,
Error, FindTailAttributes, LogReadStream, LogRecord, Result, TailState,
SMALL_BATCH_THRESHOLD_COUNT,
};

Expand All @@ -48,7 +49,7 @@ impl Bifrost {

#[cfg(any(test, feature = "test-util"))]
pub async fn init_in_memory(metadata: Metadata) -> Self {
use crate::loglets::memory_loglet;
use crate::providers::memory_loglet;

Self::init_with_factory(metadata, memory_loglet::Factory::default()).await
}
Expand All @@ -75,7 +76,7 @@ impl Bifrost {
#[cfg(any(test, feature = "test-util"))]
pub async fn init_with_factory(
metadata: Metadata,
factory: impl crate::LogletProviderFactory,
factory: impl crate::loglet::LogletProviderFactory,
) -> Self {
use crate::BifrostService;

Expand Down Expand Up @@ -256,7 +257,14 @@ impl BifrostInner {
let loglet = self.writeable_loglet(log_id).await?;
let mut buf = BytesMut::default();
StorageCodec::encode(payload, &mut buf).expect("serialization to bifrost is infallible");
loglet.append(buf.freeze()).await

let res = loglet.append(buf.freeze()).await;
// todo: Handle retries, segment seals and other recoverable errors.
res.map_err(|e| match e {
AppendError::Sealed => todo!(),
AppendError::Shutdown(e) => Error::Shutdown(e),
AppendError::Other(e) => Error::LogletError(e),
})
}

pub async fn append_batch(&self, log_id: LogId, payloads: &[Payload]) -> Result<Lsn> {
Expand All @@ -270,7 +278,13 @@ impl BifrostInner {
buf.freeze()
})
.collect();
loglet.append_batch(&raw_payloads).await
let res = loglet.append_batch(&raw_payloads).await;
// todo: Handle retries, segment seals and other recoverable errors.
res.map_err(|e| match e {
AppendError::Sealed => todo!(),
AppendError::Shutdown(e) => Error::Shutdown(e),
AppendError::Other(e) => Error::LogletError(e),
})
}

pub async fn read_next_single(&self, log_id: LogId, from: Lsn) -> Result<LogRecord> {
Expand Down Expand Up @@ -378,8 +392,7 @@ impl BifrostInner {
self.fail_if_shutting_down()?;
self.metadata
.sync(MetadataKind::Logs, TargetVersion::Latest)
.await
.map_err(Arc::new)?;
.await?;
Ok(())
}

Expand Down Expand Up @@ -436,7 +449,7 @@ mod tests {

use super::*;

use crate::loglets::memory_loglet::{self};
use crate::providers::memory_loglet::{self};
use googletest::prelude::*;

use crate::{Record, TrimGap};
Expand Down
21 changes: 11 additions & 10 deletions crates/bifrost/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ use std::sync::Arc;

use restate_types::logs::{LogId, Lsn};

use crate::loglets::local_loglet::LogStoreError;
use crate::loglet::{LogletError, OperationError};
use crate::types::SealReason;

/// Result type for bifrost operations.
pub type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(thiserror::Error, Debug, Clone)]
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("log '{0}' is sealed")]
LogSealed(LogId, SealReason),
Expand All @@ -30,18 +30,19 @@ pub enum Error {
#[error("operation failed due to an ongoing shutdown")]
Shutdown(#[from] ShutdownError),
#[error(transparent)]
LogStoreError(#[from] LogStoreError),
LogletError(#[from] Arc<dyn LogletError + Send + Sync>),
#[error("failed syncing logs metadata: {0}")]
// unfortunately, we have to use Arc here, because the SyncError is not Clone.
MetadataSync(#[from] Arc<SyncError>),
MetadataSync(#[from] SyncError),
/// Provider is unknown or disabled
#[error("bifrost provider '{0}' is disabled or unrecognized")]
Disabled(String),
}

#[derive(Debug, thiserror::Error)]
#[error(transparent)]
pub enum ProviderError {
Shutdown(#[from] ShutdownError),
Other(#[from] anyhow::Error),
impl From<OperationError> for Error {
fn from(value: OperationError) -> Self {
match value {
OperationError::Shutdown(e) => Error::Shutdown(e),
OperationError::Other(e) => Error::LogletError(e),
}
}
}
11 changes: 4 additions & 7 deletions crates/bifrost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,17 @@

mod bifrost;
mod error;
mod loglet;
#[cfg(test)]
mod loglet_tests;
pub mod loglets;
mod provider;
pub mod loglet;
mod loglet_wrapper;
pub mod providers;
mod read_stream;
mod record;
mod service;
mod types;
mod watchdog;

pub use bifrost::Bifrost;
pub use error::{Error, ProviderError, Result};
pub use provider::*;
pub use error::{Error, Result};
pub use read_stream::LogReadStream;
pub use record::*;
pub use service::BifrostService;
Expand Down
112 changes: 112 additions & 0 deletions crates/bifrost/src/loglet/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::fmt::{Debug, Display};
use std::sync::Arc;

use restate_core::ShutdownError;

#[derive(Debug, Clone, thiserror::Error)]
pub enum AppendError {
#[error("Loglet is sealed")]
Sealed,
#[error(transparent)]
Shutdown(#[from] ShutdownError),
#[error(transparent)]
Other(Arc<dyn LogletError>),
}

impl AppendError {
pub fn retryable<E: std::error::Error + Send + Sync + 'static>(error: E) -> Self {
Self::Other(Arc::new(RetryableError(error)))
}

pub fn terminal<E: std::error::Error + Send + Sync + 'static>(error: E) -> Self {
Self::Other(Arc::new(TerminalError(error)))
}

pub fn other<E: LogletError + Send + Sync>(error: E) -> Self {
Self::Other(Arc::new(error))
}
}

#[derive(Debug, thiserror::Error)]
pub enum OperationError {
#[error(transparent)]
Shutdown(#[from] ShutdownError),
#[error(transparent)]
Other(Arc<dyn LogletError>),
}

impl OperationError {
pub fn retryable<E: std::error::Error + Send + Sync + 'static>(error: E) -> Self {
Self::Other(Arc::new(RetryableError(error)))
}

pub fn terminal<E: std::error::Error + Send + Sync + 'static>(error: E) -> Self {
Self::Other(Arc::new(TerminalError(error)))
}

pub fn other<E: LogletError + Send + Sync>(error: E) -> Self {
Self::Other(Arc::new(error))
}
}

// -- Helper Types --

/// Represents a type-erased error from the loglet provider.
pub trait LogletError: std::error::Error + Send + Sync + Debug + Display + 'static {
/// Signal upper layers whether this error should be retried or not.
fn retryable(&self) -> bool {
false
}
}

#[derive(Debug, thiserror::Error)]
struct RetryableError<T>(#[source] T);

impl<T> Display for RetryableError<T>
where
T: Debug + Display + Send + Sync + std::error::Error + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[retryable] {}", self.0)
}
}

impl<T> LogletError for RetryableError<T>
where
T: Debug + Display + Send + Sync + std::error::Error + 'static,
{
fn retryable(&self) -> bool {
true
}
}

#[derive(Debug, thiserror::Error)]
struct TerminalError<T>(#[source] T);

impl<T> LogletError for TerminalError<T>
where
T: Debug + Display + Send + Sync + std::error::Error + 'static,
{
fn retryable(&self) -> bool {
false
}
}

impl<T> Display for TerminalError<T>
where
T: Debug + Display + Send + Sync + std::error::Error + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[terminal] {}", self.0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use restate_test_util::let_assert;
use restate_types::logs::SequenceNumber;
use tracing::info;

use crate::loglet::{Loglet, LogletOffset};
use super::{Loglet, LogletOffset};
use crate::{LogRecord, Record, TrimGap};

fn setup() {
Expand Down
Loading
Loading