Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Networking] Allow sending messages through Outgoing<M> #1928

Merged
merged 2 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ tracing-test = { version = "0.2.5" }
ulid = { version = "1.1.0" }
url = { version = "2.5" }
uuid = { version = "1.3.0", features = ["v7", "serde"] }
xxhash-rust = { version = "0.8", features = ["xxh3"] }

[profile.release]
opt-level = 3
Expand Down
59 changes: 2 additions & 57 deletions crates/bifrost/src/loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod error;
#[cfg(test)]
pub mod loglet_tests;
mod provider;
pub(crate) mod util;
pub mod util;

// exports
pub use error::*;
Expand All @@ -21,74 +21,19 @@ pub use provider::{LogletProvider, LogletProviderFactory};
use restate_core::ShutdownError;
use tokio::sync::oneshot;

use std::ops::Add;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{ready, Poll};

use async_trait::async_trait;
use futures::{FutureExt, Stream};

use restate_types::logs::{KeyFilter, SequenceNumber};
use restate_types::logs::{KeyFilter, LogletOffset};

use crate::record::Record;
use crate::LogEntry;
use crate::{Result, TailState};

// Inner loglet offset
#[derive(
Debug,
Copy,
Clone,
PartialEq,
Eq,
Ord,
PartialOrd,
derive_more::From,
derive_more::Into,
derive_more::Display,
)]
pub struct LogletOffset(pub(crate) u32);

impl LogletOffset {
pub const fn new(offset: u32) -> Self {
Self(offset)
}
}

impl From<LogletOffset> for u64 {
fn from(value: LogletOffset) -> Self {
u64::from(value.0)
}
}

impl Add<u32> for LogletOffset {
type Output = Self;
fn add(self, rhs: u32) -> Self {
Self(
self.0
.checked_add(rhs)
.expect("loglet offset must not overflow over u32"),
)
}
}

impl SequenceNumber for LogletOffset {
const MAX: Self = LogletOffset(u32::MAX);
const INVALID: Self = LogletOffset(0);
const OLDEST: Self = LogletOffset(1);

/// Saturates to Self::MAX
fn next(self) -> Self {
Self(self.0.saturating_add(1))
}

/// Saturates to Self::OLDEST.
fn prev(self) -> Self {
Self(std::cmp::max(Self::OLDEST.0, self.0.saturating_sub(1)))
}
}

/// A loglet represents a logical log stream provided by a provider implementation.
///
/// Loglets are required to follow these rules:
Expand Down
25 changes: 20 additions & 5 deletions crates/bifrost/src/loglet/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,43 @@ use tokio_stream::wrappers::WatchStream;
use super::LogletOffset;
use crate::TailState;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct TailOffsetWatch {
sender: watch::Sender<TailState<LogletOffset>>,
receiver: watch::Receiver<TailState<LogletOffset>>,
}

impl TailOffsetWatch {
pub fn new(tail: TailState<LogletOffset>) -> Self {
let (sender, receiver) = watch::channel(tail);
Self { sender, receiver }
let sender = watch::Sender::new(tail);
Self { sender }
}

/// Inform the watch that the tail might have changed.
pub fn notify(&self, sealed: bool, offset: LogletOffset) {
self.sender.send_if_modified(|v| v.combine(sealed, offset));
}

/// Update that the offset might have changed without updating the seal
pub fn notify_offset_update(&self, offset: LogletOffset) {
self.sender.send_if_modified(|v| v.combine(false, offset));
}

pub fn notify_seal(&self) {
self.sender.send_if_modified(|v| v.seal());
}

pub fn latest_offset(&self) -> LogletOffset {
self.sender.borrow().offset()
}

pub fn is_sealed(&self) -> bool {
self.sender.borrow().is_sealed()
}

/// The first yielded value is the latest known tail
pub fn to_stream(&self) -> WatchStream<TailState<LogletOffset>> {
WatchStream::new(self.receiver.clone())
let mut receiver = self.sender.subscribe();
receiver.mark_changed();
WatchStream::new(receiver)
}
}
6 changes: 3 additions & 3 deletions crates/bifrost/src/loglet_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ use std::pin::Pin;
use std::sync::Arc;

use futures::{Stream, StreamExt};
use tracing::instrument;

use restate_core::ShutdownError;
use restate_types::logs::metadata::SegmentIndex;
use restate_types::logs::{KeyFilter, Lsn, SequenceNumber};
use tracing::instrument;
use restate_types::logs::{KeyFilter, LogletOffset, Lsn, SequenceNumber};

use crate::loglet::{AppendError, Loglet, LogletOffset, OperationError, SendableLogletReadStream};
use crate::loglet::{AppendError, Loglet, OperationError, SendableLogletReadStream};
use crate::record::Record;
use crate::{Commit, LogEntry, LsnExt};
use crate::{Result, TailState};
Expand Down
7 changes: 2 additions & 5 deletions crates/bifrost/src/providers/local_loglet/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ use std::mem::size_of;

use bytes::{Buf, BufMut, Bytes, BytesMut};

use restate_types::logs::SequenceNumber;

use crate::loglet::LogletOffset;
use restate_types::logs::{LogletOffset, SequenceNumber};

pub(crate) const DATA_KEY_PREFIX_LENGTH: usize = size_of::<u8>() + size_of::<u64>();

Expand Down Expand Up @@ -124,11 +122,10 @@ impl MetadataKey {
mod tests {
// test RecordKey
use super::*;
use crate::loglet::LogletOffset;

#[test]
fn test_record_key() {
let key = RecordKey::new(1, LogletOffset(2));
let key = RecordKey::new(1, LogletOffset::new(2));
let mut buf = BytesMut::new();
let bytes = key.encode_and_split(&mut buf);
let key2 = RecordKey::from_slice(&bytes);
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/providers/local_loglet/log_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use smallvec::SmallVec;
use tracing::{error, trace, warn};

use restate_types::flexbuffers_storage_encode_decode;
use restate_types::logs::LogletOffset;
use restate_types::storage::StorageCodec;

use super::keys::{MetadataKey, MetadataKind};
use crate::loglet::LogletOffset;

use super::LogStoreError;

Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/providers/local_loglet/log_store_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use restate_core::{cancellation_watcher, task_center, ShutdownError, TaskKind};
use restate_rocksdb::{IoMode, Priority, RocksDb};
use restate_types::config::LocalLogletOptions;
use restate_types::live::BoxedLiveLoad;
use restate_types::logs::SequenceNumber;
use restate_types::logs::{LogletOffset, SequenceNumber};

use super::keys::{MetadataKey, MetadataKind, RecordKey};
use super::log_state::LogStateUpdates;
Expand All @@ -33,7 +33,7 @@ use super::metric_definitions::{
BIFROST_LOCAL_WRITE_BATCH_COUNT, BIFROST_LOCAL_WRITE_BATCH_SIZE_BYTES,
};
use super::record_format::{encode_record_and_split, FORMAT_FOR_NEW_APPENDS};
use crate::loglet::{LogletOffset, OperationError};
use crate::loglet::OperationError;
use crate::record::Record;

type Ack = oneshot::Sender<Result<(), OperationError>>;
Expand Down
14 changes: 7 additions & 7 deletions crates/bifrost/src/providers/local_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ use tokio::sync::Mutex;
use tracing::{debug, warn};

use restate_core::ShutdownError;
use restate_types::logs::{KeyFilter, SequenceNumber};
use restate_types::logs::{KeyFilter, LogletOffset, SequenceNumber};

use self::log_store::LogStoreError;
use self::log_store::RocksDbLogStore;
use self::log_store_writer::RocksDbLogWriterHandle;
use self::metric_definitions::{BIFROST_LOCAL_APPEND, BIFROST_LOCAL_APPEND_DURATION};
use self::read_stream::LocalLogletReadStream;
use crate::loglet::util::TailOffsetWatch;
use crate::loglet::{Loglet, LogletCommit, LogletOffset, OperationError, SendableLogletReadStream};
use crate::loglet::{Loglet, LogletCommit, OperationError, SendableLogletReadStream};
use crate::providers::local_loglet::metric_definitions::{
BIFROST_LOCAL_TRIM, BIFROST_LOCAL_TRIM_LENGTH,
};
Expand Down Expand Up @@ -208,7 +208,7 @@ impl Loglet for LocalLoglet {
}

async fn get_trim_point(&self) -> Result<Option<LogletOffset>, OperationError> {
let current_trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed));
let current_trim_point = LogletOffset::new(self.trim_point_offset.load(Ordering::Relaxed));

if current_trim_point == LogletOffset::INVALID {
Ok(None)
Expand All @@ -220,7 +220,7 @@ impl Loglet for LocalLoglet {
/// Trim the log to the minimum of new_trim_point and last_committed_offset
/// new_trim_point is inclusive (will be trimmed)
async fn trim(&self, new_trim_point: LogletOffset) -> Result<(), OperationError> {
let effective_trim_point = new_trim_point.min(LogletOffset(
let effective_trim_point = new_trim_point.min(LogletOffset::new(
self.last_committed_offset.load(Ordering::Relaxed),
));

Expand All @@ -230,7 +230,7 @@ impl Loglet for LocalLoglet {
// parts in case two trim operations get reordered and we crash before applying the second.
let _trim_point_lock_guard = self.trim_point_lock.lock().await;

let current_trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed));
let current_trim_point = LogletOffset::new(self.trim_point_offset.load(Ordering::Relaxed));

if current_trim_point >= effective_trim_point {
// nothing to do since we have already trimmed beyond new_trim_point
Expand All @@ -241,13 +241,13 @@ impl Loglet for LocalLoglet {

// no compare & swap operation is needed because we are operating under the trim point lock
self.trim_point_offset
.store(effective_trim_point.0, Ordering::Relaxed);
.store(*effective_trim_point, Ordering::Relaxed);

self.log_writer
.enqueue_trim(self.loglet_id, current_trim_point, effective_trim_point)
.await?;

histogram!(BIFROST_LOCAL_TRIM_LENGTH).record(effective_trim_point.0 - current_trim_point.0);
histogram!(BIFROST_LOCAL_TRIM_LENGTH).record(*effective_trim_point - *current_trim_point);

Ok(())
}
Expand Down
9 changes: 5 additions & 4 deletions crates/bifrost/src/providers/local_loglet/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use tracing::{debug, error, warn};

use restate_core::ShutdownError;
use restate_rocksdb::RocksDbPerfGuard;
use restate_types::logs::{KeyFilter, SequenceNumber};
use restate_types::logs::{KeyFilter, LogletOffset, SequenceNumber};

use crate::loglet::{Loglet, LogletOffset, LogletReadStream, OperationError};
use crate::loglet::{Loglet, LogletReadStream, OperationError};
use crate::providers::local_loglet::record_format::decode_and_filter_record;
use crate::providers::local_loglet::LogStoreError;
use crate::{LogEntry, Result, TailState};
Expand Down Expand Up @@ -191,7 +191,8 @@ impl Stream for LocalLogletReadStream {

// Trim point is the slot **before** the first readable record (if it exists)
// trim point might have been updated since last time.
let trim_point = LogletOffset(self.loglet.trim_point_offset.load(Ordering::Relaxed));
let trim_point =
LogletOffset::new(self.loglet.trim_point_offset.load(Ordering::Relaxed));
let head_offset = trim_point.next();
// Are we reading behind the loglet head? -> TrimGap
assert!(self.read_pointer > LogletOffset::from(0));
Expand Down Expand Up @@ -224,7 +225,7 @@ impl Stream for LocalLogletReadStream {
if !self.iterator.valid() || self.iterator.key().is_none() {
// trim point might have been updated.
let potentially_different_trim_point =
LogletOffset(self.loglet.trim_point_offset.load(Ordering::Relaxed));
LogletOffset::new(self.loglet.trim_point_offset.load(Ordering::Relaxed));
if potentially_different_trim_point != trim_point {
debug!("Trim point has been updated, fast-forwarding the stream");
continue;
Expand Down
Loading
Loading