Skip to content

Commit

Permalink
[Bifrost] Move TailState to restate-types
Browse files Browse the repository at this point in the history
This allows TailState to be used in log-server as well
  • Loading branch information
AhmedSoliman committed Sep 9, 2024
1 parent 02d825c commit 9190929
Show file tree
Hide file tree
Showing 14 changed files with 118 additions and 99 deletions.
4 changes: 2 additions & 2 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use enum_map::EnumMap;

use restate_core::{Metadata, MetadataKind, TargetVersion};
use restate_types::logs::metadata::{MaybeSegment, ProviderKind, Segment};
use restate_types::logs::{KeyFilter, LogId, Lsn, SequenceNumber};
use restate_types::logs::{KeyFilter, LogId, Lsn, SequenceNumber, TailState};
use restate_types::storage::StorageEncode;
use restate_types::Version;

Expand All @@ -25,7 +25,7 @@ use crate::background_appender::BackgroundAppender;
use crate::loglet::LogletProvider;
use crate::loglet_wrapper::LogletWrapper;
use crate::watchdog::WatchdogSender;
use crate::{Error, FindTailAttributes, InputRecord, LogReadStream, Result, TailState};
use crate::{Error, FindTailAttributes, InputRecord, LogReadStream, Result};

/// Bifrost is Restate's durable interconnect system
///
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/bifrost_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ use restate_metadata_store::MetadataStoreClient;
use restate_types::config::Configuration;
use restate_types::logs::builder::BuilderError;
use restate_types::logs::metadata::{LogletParams, Logs, ProviderKind, SegmentIndex};
use restate_types::logs::{LogId, Lsn};
use restate_types::logs::{LogId, Lsn, TailState};
use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY;
use restate_types::Version;

use crate::error::AdminError;
use crate::{Bifrost, Error, Result, TailState};
use crate::{Bifrost, Error, Result};

/// Bifrost's Admin API
#[derive(Clone, Copy)]
Expand Down
10 changes: 5 additions & 5 deletions crates/bifrost/src/loglet/loglet_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ use std::sync::Arc;
use std::time::Duration;

use googletest::prelude::*;
use restate_types::logs::metadata::SegmentIndex;
use tokio::sync::Barrier;
use tokio::task::{JoinHandle, JoinSet};

use restate_test_util::let_assert;
use restate_types::logs::{KeyFilter, Lsn, SequenceNumber};
use tokio_stream::StreamExt;
use tracing::info;

use restate_test_util::let_assert;
use restate_types::logs::metadata::SegmentIndex;
use restate_types::logs::{KeyFilter, Lsn, SequenceNumber, TailState};

use super::{Loglet, LogletOffset};
use crate::loglet::AppendError;
use crate::loglet_wrapper::LogletWrapper;
use crate::{setup_panic_handler, TailState};
use crate::setup_panic_handler;

async fn wait_for_trim(loglet: &LogletWrapper, required_trim_point: Lsn) -> anyhow::Result<()> {
for _ in 0..3 {
Expand Down
6 changes: 3 additions & 3 deletions crates/bifrost/src/loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ pub mod util;
pub use error::*;
use futures::stream::BoxStream;
pub use provider::{LogletProvider, LogletProviderFactory};
use restate_core::ShutdownError;
use tokio::sync::oneshot;

use std::pin::Pin;
Expand All @@ -28,10 +27,11 @@ use std::task::{ready, Poll};
use async_trait::async_trait;
use futures::{FutureExt, Stream};

use restate_types::logs::{KeyFilter, LogletOffset, Record};
use restate_core::ShutdownError;
use restate_types::logs::{KeyFilter, LogletOffset, Record, TailState};

use crate::LogEntry;
use crate::{Result, TailState};
use crate::Result;

/// A loglet represents a logical log stream provided by a provider implementation.
///
Expand Down
7 changes: 6 additions & 1 deletion crates/bifrost/src/loglet/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;

use restate_types::logs::TailState;

use super::LogletOffset;
use crate::TailState;

#[derive(Debug, Clone)]
pub struct TailOffsetWatch {
Expand Down Expand Up @@ -43,6 +44,10 @@ impl TailOffsetWatch {
self.sender.borrow().offset()
}

pub fn get(&self) -> watch::Ref<'_, TailState<LogletOffset>> {
self.sender.borrow()
}

pub fn is_sealed(&self) -> bool {
self.sender.borrow().is_sealed()
}
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/loglet_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ use tracing::instrument;

use restate_core::ShutdownError;
use restate_types::logs::metadata::SegmentIndex;
use restate_types::logs::Record;
use restate_types::logs::{KeyFilter, LogletOffset, Lsn, SequenceNumber};
use restate_types::logs::{Record, TailState};

use crate::loglet::{AppendError, Loglet, OperationError, SendableLogletReadStream};
use crate::Result;
use crate::{Commit, LogEntry, LsnExt};
use crate::{Result, TailState};

#[cfg(any(test, feature = "test-util"))]
#[derive(Debug, Clone, thiserror::Error)]
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/providers/local_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tokio::sync::Mutex;
use tracing::{debug, warn};

use restate_core::ShutdownError;
use restate_types::logs::{KeyFilter, LogletOffset, Record, SequenceNumber};
use restate_types::logs::{KeyFilter, LogletOffset, Record, SequenceNumber, TailState};

use self::log_store::LogStoreError;
use self::log_store::RocksDbLogStore;
Expand All @@ -41,7 +41,7 @@ use crate::loglet::{Loglet, LogletCommit, OperationError, SendableLogletReadStre
use crate::providers::local_loglet::metric_definitions::{
BIFROST_LOCAL_TRIM, BIFROST_LOCAL_TRIM_LENGTH,
};
use crate::{Result, TailState};
use crate::Result;

#[derive(derive_more::Debug)]
struct LocalLoglet {
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/providers/local_loglet/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ use tracing::{debug, error, warn};

use restate_core::ShutdownError;
use restate_rocksdb::RocksDbPerfGuard;
use restate_types::logs::{KeyFilter, LogletOffset, SequenceNumber};
use restate_types::logs::{KeyFilter, LogletOffset, SequenceNumber, TailState};

use crate::loglet::{Loglet, LogletReadStream, OperationError};
use crate::providers::local_loglet::record_format::decode_and_filter_record;
use crate::providers::local_loglet::LogStoreError;
use crate::{LogEntry, Result, TailState};
use crate::{LogEntry, Result};

use super::keys::RecordKey;
use super::LocalLoglet;
Expand Down
6 changes: 4 additions & 2 deletions crates/bifrost/src/providers/memory_loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@ use tracing::{debug, info};

use restate_core::ShutdownError;
use restate_types::logs::metadata::{LogletParams, ProviderKind, SegmentIndex};
use restate_types::logs::{KeyFilter, LogId, LogletOffset, MatchKeyQuery, Record, SequenceNumber};
use restate_types::logs::{
KeyFilter, LogId, LogletOffset, MatchKeyQuery, Record, SequenceNumber, TailState,
};

use crate::loglet::util::TailOffsetWatch;
use crate::loglet::{
Loglet, LogletCommit, LogletProvider, LogletProviderFactory, LogletReadStream, OperationError,
SendableLogletReadStream,
};
use crate::LogEntry;
use crate::Result;
use crate::{LogEntry, TailState};

#[derive(Default)]
pub struct Factory {
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use restate_types::logs::metadata::MaybeSegment;
use restate_types::logs::KeyFilter;
use restate_types::logs::MatchKeyQuery;
use restate_types::logs::SequenceNumber;
use restate_types::logs::TailState;
use restate_types::logs::{LogId, Lsn};
use restate_types::Version;
use restate_types::Versioned;
Expand All @@ -38,7 +39,6 @@ use crate::loglet_wrapper::LogletReadStreamWrapper;
use crate::Error;
use crate::LogEntry;
use crate::Result;
use crate::TailState;

/// A read stream reads from the virtual log. The stream provides a unified view over
/// the virtual log addressing space in the face of seals, reconfiguration, and trims.
Expand Down
75 changes: 0 additions & 75 deletions crates/bifrost/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,81 +78,6 @@ pub struct FindTailAttributes {
// TODO: consistent_read: bool,
}

/// Represents the state of the tail of the loglet.
#[derive(Clone, Debug)]
pub enum TailState<Offset = Lsn> {
/// Loglet is open for appends
Open(Offset),
/// Loglet is sealed. This offset if the durable tail.
Sealed(Offset),
}

impl<Offset: SequenceNumber> TailState<Offset> {
pub fn new(sealed: bool, offset: Offset) -> Self {
if sealed {
TailState::Sealed(offset)
} else {
TailState::Open(offset)
}
}

/// Combines two TailStates together
///
/// Only applies updates to the value according to the following rules:
/// - Offsets can only move forward.
/// - Tail cannot be unsealed once sealed.
///
/// Returns true if the state was updated
pub fn combine(&mut self, sealed: bool, offset: Offset) -> bool {
let old_offset = self.offset();
let is_already_sealed = self.is_sealed();

let new_offset = std::cmp::max(self.offset(), offset);
let new_sealed = self.is_sealed() || sealed;
if new_sealed != is_already_sealed || new_offset > old_offset {
*self = TailState::new(new_sealed, new_offset);
true
} else {
false
}
}

/// Applies a seal on the tail state without changing the tail offset
/// Returns true if the state was updated
pub fn seal(&mut self) -> bool {
if self.is_sealed() {
false
} else {
*self = TailState::new(true, self.offset());
true
}
}
}

impl<Offset: SequenceNumber> TailState<Offset> {
pub fn map<F, T>(self, f: F) -> TailState<T>
where
F: FnOnce(Offset) -> T,
{
match self {
TailState::Open(offset) => TailState::Open(f(offset)),
TailState::Sealed(offset) => TailState::Sealed(f(offset)),
}
}

#[inline(always)]
pub fn is_sealed(&self) -> bool {
matches!(self, TailState::Sealed(_))
}

#[inline(always)]
pub fn offset(&self) -> Offset {
match self {
TailState::Open(offset) | TailState::Sealed(offset) => *offset,
}
}
}

/// A future that resolves to the Lsn of the last Lsn in a committed batch.
///
/// Note: dropping this future doesn't cancel or stop the underlying enqueued append.
Expand Down
4 changes: 2 additions & 2 deletions crates/node/src/network_server/handler/cluster_ctrl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {
Ok(Response::new(DescribeLogResponse {
chain: serialize_value(chain),
tail_state: match tail_state {
restate_bifrost::TailState::Open(_) => 1,
restate_bifrost::TailState::Sealed(_) => 2,
restate_types::logs::TailState::Open(_) => 1,
restate_types::logs::TailState::Sealed(_) => 2,
},
tail_offset: tail_state.offset().as_u64(),
}))
Expand Down
2 changes: 2 additions & 0 deletions crates/types/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use crate::storage::StorageEncode;
pub mod builder;
pub mod metadata;
mod record;
mod tail;
pub use record::Record;
pub use tail::*;

#[derive(
Debug,
Expand Down
85 changes: 85 additions & 0 deletions crates/types/src/logs/tail.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
/// Represents the state of the tail of the loglet.
use super::{Lsn, SequenceNumber};

#[derive(Clone, Debug)]
pub enum TailState<Offset = Lsn> {
/// Loglet is open for appends
Open(Offset),
/// Loglet is sealed. This offset if the durable tail.
Sealed(Offset),
}

impl<Offset: SequenceNumber> TailState<Offset> {
pub fn new(sealed: bool, offset: Offset) -> Self {
if sealed {
TailState::Sealed(offset)
} else {
TailState::Open(offset)
}
}

/// Combines two TailStates together
///
/// Only applies updates to the value according to the following rules:
/// - Offsets can only move forward.
/// - Tail cannot be unsealed once sealed.
///
/// Returns true if the state was updated
pub fn combine(&mut self, sealed: bool, offset: Offset) -> bool {
let old_offset = self.offset();
let is_already_sealed = self.is_sealed();

let new_offset = std::cmp::max(self.offset(), offset);
let new_sealed = self.is_sealed() || sealed;
if new_sealed != is_already_sealed || new_offset > old_offset {
*self = TailState::new(new_sealed, new_offset);
true
} else {
false
}
}

/// Applies a seal on the tail state without changing the tail offset
/// Returns true if the state was updated
pub fn seal(&mut self) -> bool {
if self.is_sealed() {
false
} else {
*self = TailState::new(true, self.offset());
true
}
}
}

impl<Offset: SequenceNumber> TailState<Offset> {
pub fn map<F, T>(self, f: F) -> TailState<T>
where
F: FnOnce(Offset) -> T,
{
match self {
TailState::Open(offset) => TailState::Open(f(offset)),
TailState::Sealed(offset) => TailState::Sealed(f(offset)),
}
}

#[inline(always)]
pub fn is_sealed(&self) -> bool {
matches!(self, TailState::Sealed(_))
}

#[inline(always)]
pub fn offset(&self) -> Offset {
match self {
TailState::Open(offset) | TailState::Sealed(offset) => *offset,
}
}
}

0 comments on commit 9190929

Please sign in to comment.