From 464a69986cff975a5e3895090a4377543dc50059 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Mon, 9 Sep 2024 11:29:01 +0100 Subject: [PATCH 1/2] [Bifrost] Moves LogletOffset to restate-types --- Cargo.toml | 1 + crates/bifrost/src/loglet/mod.rs | 59 +----- crates/bifrost/src/loglet/util.rs | 25 ++- crates/bifrost/src/loglet_wrapper.rs | 6 +- .../src/providers/local_loglet/keys.rs | 7 +- .../src/providers/local_loglet/log_state.rs | 2 +- .../local_loglet/log_store_writer.rs | 4 +- .../bifrost/src/providers/local_loglet/mod.rs | 14 +- .../src/providers/local_loglet/read_stream.rs | 9 +- crates/bifrost/src/providers/memory_loglet.rs | 30 +-- .../providers/replicated_loglet/provider.rs | 2 +- crates/bifrost/src/record.rs | 4 +- crates/bifrost/src/types.rs | 7 +- crates/log-server/src/metadata.rs | 3 +- .../log-server/src/rocksdb_logstore/keys.rs | 3 +- crates/types/Cargo.toml | 2 +- crates/types/src/logs/mod.rs | 80 +++++++- crates/types/src/net/log_server.rs | 174 ++++++++++++++++++ crates/types/src/node_id.rs | 22 +++ 19 files changed, 342 insertions(+), 112 deletions(-) create mode 100644 crates/types/src/net/log_server.rs diff --git a/Cargo.toml b/Cargo.toml index 58bf1e8214..7e016572eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -193,6 +193,7 @@ tracing-test = { version = "0.2.5" } ulid = { version = "1.1.0" } url = { version = "2.5" } uuid = { version = "1.3.0", features = ["v7", "serde"] } +xxhash-rust = { version = "0.8", features = ["xxh3"] } [profile.release] opt-level = 3 diff --git a/crates/bifrost/src/loglet/mod.rs b/crates/bifrost/src/loglet/mod.rs index c9262bc024..7bb2319b63 100644 --- a/crates/bifrost/src/loglet/mod.rs +++ b/crates/bifrost/src/loglet/mod.rs @@ -12,7 +12,7 @@ mod error; #[cfg(test)] pub mod loglet_tests; mod provider; -pub(crate) mod util; +pub mod util; // exports pub use error::*; @@ -21,7 +21,6 @@ pub use provider::{LogletProvider, LogletProviderFactory}; use restate_core::ShutdownError; use tokio::sync::oneshot; -use std::ops::Add; use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Poll}; @@ -29,66 +28,12 @@ use std::task::{ready, Poll}; use async_trait::async_trait; use futures::{FutureExt, Stream}; -use restate_types::logs::{KeyFilter, SequenceNumber}; +use restate_types::logs::{KeyFilter, LogletOffset}; use crate::record::Record; use crate::LogEntry; use crate::{Result, TailState}; -// Inner loglet offset -#[derive( - Debug, - Copy, - Clone, - PartialEq, - Eq, - Ord, - PartialOrd, - derive_more::From, - derive_more::Into, - derive_more::Display, -)] -pub struct LogletOffset(pub(crate) u32); - -impl LogletOffset { - pub const fn new(offset: u32) -> Self { - Self(offset) - } -} - -impl From for u64 { - fn from(value: LogletOffset) -> Self { - u64::from(value.0) - } -} - -impl Add for LogletOffset { - type Output = Self; - fn add(self, rhs: u32) -> Self { - Self( - self.0 - .checked_add(rhs) - .expect("loglet offset must not overflow over u32"), - ) - } -} - -impl SequenceNumber for LogletOffset { - const MAX: Self = LogletOffset(u32::MAX); - const INVALID: Self = LogletOffset(0); - const OLDEST: Self = LogletOffset(1); - - /// Saturates to Self::MAX - fn next(self) -> Self { - Self(self.0.saturating_add(1)) - } - - /// Saturates to Self::OLDEST. - fn prev(self) -> Self { - Self(std::cmp::max(Self::OLDEST.0, self.0.saturating_sub(1))) - } -} - /// A loglet represents a logical log stream provided by a provider implementation. /// /// Loglets are required to follow these rules: diff --git a/crates/bifrost/src/loglet/util.rs b/crates/bifrost/src/loglet/util.rs index ac26829610..d16039b5e0 100644 --- a/crates/bifrost/src/loglet/util.rs +++ b/crates/bifrost/src/loglet/util.rs @@ -14,16 +14,15 @@ use tokio_stream::wrappers::WatchStream; use super::LogletOffset; use crate::TailState; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct TailOffsetWatch { sender: watch::Sender>, - receiver: watch::Receiver>, } impl TailOffsetWatch { pub fn new(tail: TailState) -> Self { - let (sender, receiver) = watch::channel(tail); - Self { sender, receiver } + let sender = watch::Sender::new(tail); + Self { sender } } /// Inform the watch that the tail might have changed. @@ -31,11 +30,27 @@ impl TailOffsetWatch { self.sender.send_if_modified(|v| v.combine(sealed, offset)); } + /// Update that the offset might have changed without updating the seal + pub fn notify_offset_update(&self, offset: LogletOffset) { + self.sender.send_if_modified(|v| v.combine(false, offset)); + } + pub fn notify_seal(&self) { self.sender.send_if_modified(|v| v.seal()); } + pub fn latest_offset(&self) -> LogletOffset { + self.sender.borrow().offset() + } + + pub fn is_sealed(&self) -> bool { + self.sender.borrow().is_sealed() + } + + /// The first yielded value is the latest known tail pub fn to_stream(&self) -> WatchStream> { - WatchStream::new(self.receiver.clone()) + let mut receiver = self.sender.subscribe(); + receiver.mark_changed(); + WatchStream::new(receiver) } } diff --git a/crates/bifrost/src/loglet_wrapper.rs b/crates/bifrost/src/loglet_wrapper.rs index 11f4e5d02f..c4ec82bf1d 100644 --- a/crates/bifrost/src/loglet_wrapper.rs +++ b/crates/bifrost/src/loglet_wrapper.rs @@ -14,13 +14,13 @@ use std::pin::Pin; use std::sync::Arc; use futures::{Stream, StreamExt}; +use tracing::instrument; use restate_core::ShutdownError; use restate_types::logs::metadata::SegmentIndex; -use restate_types::logs::{KeyFilter, Lsn, SequenceNumber}; -use tracing::instrument; +use restate_types::logs::{KeyFilter, LogletOffset, Lsn, SequenceNumber}; -use crate::loglet::{AppendError, Loglet, LogletOffset, OperationError, SendableLogletReadStream}; +use crate::loglet::{AppendError, Loglet, OperationError, SendableLogletReadStream}; use crate::record::Record; use crate::{Commit, LogEntry, LsnExt}; use crate::{Result, TailState}; diff --git a/crates/bifrost/src/providers/local_loglet/keys.rs b/crates/bifrost/src/providers/local_loglet/keys.rs index 74a08b7271..bb5f4412ef 100644 --- a/crates/bifrost/src/providers/local_loglet/keys.rs +++ b/crates/bifrost/src/providers/local_loglet/keys.rs @@ -12,9 +12,7 @@ use std::mem::size_of; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use restate_types::logs::SequenceNumber; - -use crate::loglet::LogletOffset; +use restate_types::logs::{LogletOffset, SequenceNumber}; pub(crate) const DATA_KEY_PREFIX_LENGTH: usize = size_of::() + size_of::(); @@ -124,11 +122,10 @@ impl MetadataKey { mod tests { // test RecordKey use super::*; - use crate::loglet::LogletOffset; #[test] fn test_record_key() { - let key = RecordKey::new(1, LogletOffset(2)); + let key = RecordKey::new(1, LogletOffset::new(2)); let mut buf = BytesMut::new(); let bytes = key.encode_and_split(&mut buf); let key2 = RecordKey::from_slice(&bytes); diff --git a/crates/bifrost/src/providers/local_loglet/log_state.rs b/crates/bifrost/src/providers/local_loglet/log_state.rs index ed73838da3..f123ec9401 100644 --- a/crates/bifrost/src/providers/local_loglet/log_state.rs +++ b/crates/bifrost/src/providers/local_loglet/log_state.rs @@ -15,10 +15,10 @@ use smallvec::SmallVec; use tracing::{error, trace, warn}; use restate_types::flexbuffers_storage_encode_decode; +use restate_types::logs::LogletOffset; use restate_types::storage::StorageCodec; use super::keys::{MetadataKey, MetadataKind}; -use crate::loglet::LogletOffset; use super::LogStoreError; diff --git a/crates/bifrost/src/providers/local_loglet/log_store_writer.rs b/crates/bifrost/src/providers/local_loglet/log_store_writer.rs index 840d38b28d..545760f635 100644 --- a/crates/bifrost/src/providers/local_loglet/log_store_writer.rs +++ b/crates/bifrost/src/providers/local_loglet/log_store_writer.rs @@ -24,7 +24,7 @@ use restate_core::{cancellation_watcher, task_center, ShutdownError, TaskKind}; use restate_rocksdb::{IoMode, Priority, RocksDb}; use restate_types::config::LocalLogletOptions; use restate_types::live::BoxedLiveLoad; -use restate_types::logs::SequenceNumber; +use restate_types::logs::{LogletOffset, SequenceNumber}; use super::keys::{MetadataKey, MetadataKind, RecordKey}; use super::log_state::LogStateUpdates; @@ -33,7 +33,7 @@ use super::metric_definitions::{ BIFROST_LOCAL_WRITE_BATCH_COUNT, BIFROST_LOCAL_WRITE_BATCH_SIZE_BYTES, }; use super::record_format::{encode_record_and_split, FORMAT_FOR_NEW_APPENDS}; -use crate::loglet::{LogletOffset, OperationError}; +use crate::loglet::OperationError; use crate::record::Record; type Ack = oneshot::Sender>; diff --git a/crates/bifrost/src/providers/local_loglet/mod.rs b/crates/bifrost/src/providers/local_loglet/mod.rs index 2c2e760c49..bdae72e9b0 100644 --- a/crates/bifrost/src/providers/local_loglet/mod.rs +++ b/crates/bifrost/src/providers/local_loglet/mod.rs @@ -29,7 +29,7 @@ use tokio::sync::Mutex; use tracing::{debug, warn}; use restate_core::ShutdownError; -use restate_types::logs::{KeyFilter, SequenceNumber}; +use restate_types::logs::{KeyFilter, LogletOffset, SequenceNumber}; use self::log_store::LogStoreError; use self::log_store::RocksDbLogStore; @@ -37,7 +37,7 @@ use self::log_store_writer::RocksDbLogWriterHandle; use self::metric_definitions::{BIFROST_LOCAL_APPEND, BIFROST_LOCAL_APPEND_DURATION}; use self::read_stream::LocalLogletReadStream; use crate::loglet::util::TailOffsetWatch; -use crate::loglet::{Loglet, LogletCommit, LogletOffset, OperationError, SendableLogletReadStream}; +use crate::loglet::{Loglet, LogletCommit, OperationError, SendableLogletReadStream}; use crate::providers::local_loglet::metric_definitions::{ BIFROST_LOCAL_TRIM, BIFROST_LOCAL_TRIM_LENGTH, }; @@ -208,7 +208,7 @@ impl Loglet for LocalLoglet { } async fn get_trim_point(&self) -> Result, OperationError> { - let current_trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); + let current_trim_point = LogletOffset::new(self.trim_point_offset.load(Ordering::Relaxed)); if current_trim_point == LogletOffset::INVALID { Ok(None) @@ -220,7 +220,7 @@ impl Loglet for LocalLoglet { /// Trim the log to the minimum of new_trim_point and last_committed_offset /// new_trim_point is inclusive (will be trimmed) async fn trim(&self, new_trim_point: LogletOffset) -> Result<(), OperationError> { - let effective_trim_point = new_trim_point.min(LogletOffset( + let effective_trim_point = new_trim_point.min(LogletOffset::new( self.last_committed_offset.load(Ordering::Relaxed), )); @@ -230,7 +230,7 @@ impl Loglet for LocalLoglet { // parts in case two trim operations get reordered and we crash before applying the second. let _trim_point_lock_guard = self.trim_point_lock.lock().await; - let current_trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); + let current_trim_point = LogletOffset::new(self.trim_point_offset.load(Ordering::Relaxed)); if current_trim_point >= effective_trim_point { // nothing to do since we have already trimmed beyond new_trim_point @@ -241,13 +241,13 @@ impl Loglet for LocalLoglet { // no compare & swap operation is needed because we are operating under the trim point lock self.trim_point_offset - .store(effective_trim_point.0, Ordering::Relaxed); + .store(*effective_trim_point, Ordering::Relaxed); self.log_writer .enqueue_trim(self.loglet_id, current_trim_point, effective_trim_point) .await?; - histogram!(BIFROST_LOCAL_TRIM_LENGTH).record(effective_trim_point.0 - current_trim_point.0); + histogram!(BIFROST_LOCAL_TRIM_LENGTH).record(*effective_trim_point - *current_trim_point); Ok(()) } diff --git a/crates/bifrost/src/providers/local_loglet/read_stream.rs b/crates/bifrost/src/providers/local_loglet/read_stream.rs index ca10533923..cb9a079ced 100644 --- a/crates/bifrost/src/providers/local_loglet/read_stream.rs +++ b/crates/bifrost/src/providers/local_loglet/read_stream.rs @@ -20,9 +20,9 @@ use tracing::{debug, error, warn}; use restate_core::ShutdownError; use restate_rocksdb::RocksDbPerfGuard; -use restate_types::logs::{KeyFilter, SequenceNumber}; +use restate_types::logs::{KeyFilter, LogletOffset, SequenceNumber}; -use crate::loglet::{Loglet, LogletOffset, LogletReadStream, OperationError}; +use crate::loglet::{Loglet, LogletReadStream, OperationError}; use crate::providers::local_loglet::record_format::decode_and_filter_record; use crate::providers::local_loglet::LogStoreError; use crate::{LogEntry, Result, TailState}; @@ -191,7 +191,8 @@ impl Stream for LocalLogletReadStream { // Trim point is the slot **before** the first readable record (if it exists) // trim point might have been updated since last time. - let trim_point = LogletOffset(self.loglet.trim_point_offset.load(Ordering::Relaxed)); + let trim_point = + LogletOffset::new(self.loglet.trim_point_offset.load(Ordering::Relaxed)); let head_offset = trim_point.next(); // Are we reading behind the loglet head? -> TrimGap assert!(self.read_pointer > LogletOffset::from(0)); @@ -224,7 +225,7 @@ impl Stream for LocalLogletReadStream { if !self.iterator.valid() || self.iterator.key().is_none() { // trim point might have been updated. let potentially_different_trim_point = - LogletOffset(self.loglet.trim_point_offset.load(Ordering::Relaxed)); + LogletOffset::new(self.loglet.trim_point_offset.load(Ordering::Relaxed)); if potentially_different_trim_point != trim_point { debug!("Trim point has been updated, fast-forwarding the stream"); continue; diff --git a/crates/bifrost/src/providers/memory_loglet.rs b/crates/bifrost/src/providers/memory_loglet.rs index 1c9838f4f9..d41dc2cd8b 100644 --- a/crates/bifrost/src/providers/memory_loglet.rs +++ b/crates/bifrost/src/providers/memory_loglet.rs @@ -23,12 +23,12 @@ use tracing::{debug, info}; use restate_core::ShutdownError; use restate_types::logs::metadata::{LogletParams, ProviderKind, SegmentIndex}; -use restate_types::logs::{KeyFilter, LogId, MatchKeyQuery, SequenceNumber}; +use restate_types::logs::{KeyFilter, LogId, LogletOffset, MatchKeyQuery, SequenceNumber}; use crate::loglet::util::TailOffsetWatch; use crate::loglet::{ - Loglet, LogletCommit, LogletOffset, LogletProvider, LogletProviderFactory, LogletReadStream, - OperationError, SendableLogletReadStream, + Loglet, LogletCommit, LogletProvider, LogletProviderFactory, LogletReadStream, OperationError, + SendableLogletReadStream, }; use crate::Record; use crate::Result; @@ -135,7 +135,7 @@ impl MemoryLoglet { fn saturating_offset_to_index(&self, offset: LogletOffset) -> usize { let trim_point = self.trim_point_offset.load(Ordering::Relaxed); - (offset.0.saturating_sub(trim_point) - 1) as usize + (offset.saturating_sub(trim_point) - 1) as usize } fn advance_commit_offset(&self, offset: LogletOffset) { @@ -145,7 +145,7 @@ impl MemoryLoglet { } fn notify_readers(&self) { - let release_pointer = LogletOffset(self.last_committed_offset.load(Ordering::Relaxed)); + let release_pointer = LogletOffset::new(self.last_committed_offset.load(Ordering::Relaxed)); // Note: We always notify with false here and the watcher will ignore it if it has observed // a previous seal. self.tail_watch.notify(false, release_pointer.next()); @@ -156,7 +156,7 @@ impl MemoryLoglet { from_offset: LogletOffset, ) -> Result>, OperationError> { let guard = self.log.lock().unwrap(); - let trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); + let trim_point = LogletOffset::new(self.trim_point_offset.load(Ordering::Relaxed)); let head_offset = trim_point.next(); // Are we reading behind the loglet head? if from_offset < head_offset { @@ -164,7 +164,7 @@ impl MemoryLoglet { } // are we reading after commit offset? - let commit_offset = LogletOffset(self.last_committed_offset.load(Ordering::Relaxed)); + let commit_offset = LogletOffset::new(self.last_committed_offset.load(Ordering::Relaxed)); if from_offset > commit_offset { Ok(None) } else { @@ -270,7 +270,8 @@ impl Stream for MemoryReadStream { // Trim point is the the slot **before** the first readable record (if it exists) // trim point might have been updated since last time. - let trim_point = LogletOffset(self.loglet.trim_point_offset.load(Ordering::Relaxed)); + let trim_point = + LogletOffset::new(self.loglet.trim_point_offset.load(Ordering::Relaxed)); let head_offset = trim_point.next(); // Are we reading behind the loglet head? -> TrimGap @@ -329,7 +330,7 @@ impl Loglet for MemoryLoglet { return Ok(LogletCommit::sealed()); } let mut last_committed_offset = - LogletOffset(self.last_committed_offset.load(Ordering::Relaxed)); + LogletOffset::new(self.last_committed_offset.load(Ordering::Relaxed)); log.reserve(payloads.len()); for payload in payloads.iter() { last_committed_offset = last_committed_offset.next(); @@ -346,7 +347,8 @@ impl Loglet for MemoryLoglet { async fn find_tail(&self) -> Result, OperationError> { let _guard = self.log.lock().unwrap(); - let committed = LogletOffset(self.last_committed_offset.load(Ordering::Relaxed)).next(); + let committed = + LogletOffset::new(self.last_committed_offset.load(Ordering::Relaxed)).next(); let sealed = self.sealed.load(Ordering::Relaxed); Ok(if sealed { TailState::Sealed(committed) @@ -358,7 +360,7 @@ impl Loglet for MemoryLoglet { /// Find the head (oldest) record in the loglet. async fn get_trim_point(&self) -> Result, OperationError> { let _guard = self.log.lock().unwrap(); - let current_trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); + let current_trim_point = LogletOffset::new(self.trim_point_offset.load(Ordering::Relaxed)); if current_trim_point == LogletOffset::INVALID { Ok(None) @@ -369,11 +371,11 @@ impl Loglet for MemoryLoglet { async fn trim(&self, new_trim_point: LogletOffset) -> Result<(), OperationError> { let mut log = self.log.lock().unwrap(); - let actual_trim_point = new_trim_point.min(LogletOffset( + let actual_trim_point = new_trim_point.min(LogletOffset::new( self.last_committed_offset.load(Ordering::Relaxed), )); - let current_trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed)); + let current_trim_point = LogletOffset::new(self.trim_point_offset.load(Ordering::Relaxed)); if current_trim_point >= actual_trim_point { return Ok(()); @@ -381,7 +383,7 @@ impl Loglet for MemoryLoglet { let trim_point_index = self.saturating_offset_to_index(actual_trim_point); self.trim_point_offset - .store(actual_trim_point.0, Ordering::Relaxed); + .store(*actual_trim_point, Ordering::Relaxed); log.drain(0..=trim_point_index); Ok(()) diff --git a/crates/bifrost/src/providers/replicated_loglet/provider.rs b/crates/bifrost/src/providers/replicated_loglet/provider.rs index 788dce69d9..ece0c984ba 100644 --- a/crates/bifrost/src/providers/replicated_loglet/provider.rs +++ b/crates/bifrost/src/providers/replicated_loglet/provider.rs @@ -25,7 +25,7 @@ use restate_types::logs::metadata::{LogletParams, ProviderKind, SegmentIndex}; use restate_types::logs::LogId; use super::metric_definitions; -use crate::loglet::{Loglet, LogletOffset, LogletProvider, LogletProviderFactory, OperationError}; +use crate::loglet::{Loglet, LogletProvider, LogletProviderFactory, OperationError}; use crate::Error; pub struct Factory { diff --git a/crates/bifrost/src/record.rs b/crates/bifrost/src/record.rs index 0882ff8f20..559ff0b313 100644 --- a/crates/bifrost/src/record.rs +++ b/crates/bifrost/src/record.rs @@ -14,15 +14,13 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; -use restate_types::logs::{KeyFilter, MatchKeyQuery}; - use restate_types::logs::{BodyWithKeys, HasRecordKeys, Keys, Lsn}; +use restate_types::logs::{KeyFilter, LogletOffset, MatchKeyQuery}; use restate_types::storage::{ PolyBytes, StorageCodec, StorageDecode, StorageDecodeError, StorageEncode, }; use restate_types::time::NanosSinceEpoch; -use crate::loglet::LogletOffset; use crate::LsnExt; /// An entry in the log. diff --git a/crates/bifrost/src/types.rs b/crates/bifrost/src/types.rs index 121801943a..3fea9ec970 100644 --- a/crates/bifrost/src/types.rs +++ b/crates/bifrost/src/types.rs @@ -12,9 +12,9 @@ use std::task::{ready, Poll}; use futures::FutureExt; -use restate_types::logs::{Lsn, SequenceNumber}; +use restate_types::logs::{LogletOffset, Lsn, SequenceNumber}; -use crate::loglet::{AppendError, LogletOffset}; +use crate::loglet::AppendError; // Only implemented for LSNs pub(crate) trait LsnExt @@ -204,9 +204,8 @@ impl std::future::Future for Commit { #[cfg(test)] mod tests { - use crate::loglet::LogletOffset; use crate::types::LsnExt; - use restate_types::logs::{Lsn, SequenceNumber}; + use restate_types::logs::{LogletOffset, Lsn, SequenceNumber}; #[test] fn lsn_to_offset() { diff --git a/crates/log-server/src/metadata.rs b/crates/log-server/src/metadata.rs index 1693afc383..9e69171b22 100644 --- a/crates/log-server/src/metadata.rs +++ b/crates/log-server/src/metadata.rs @@ -11,8 +11,7 @@ // todo: remove after scaffolding is complete #![allow(unused)] -use restate_bifrost::loglet::LogletOffset; -use restate_types::logs::SequenceNumber; +use restate_types::logs::{LogletOffset, SequenceNumber}; use restate_types::time::MillisSinceEpoch; use restate_types::PlainNodeId; diff --git a/crates/log-server/src/rocksdb_logstore/keys.rs b/crates/log-server/src/rocksdb_logstore/keys.rs index ef3636886d..73fca40e2a 100644 --- a/crates/log-server/src/rocksdb_logstore/keys.rs +++ b/crates/log-server/src/rocksdb_logstore/keys.rs @@ -15,8 +15,7 @@ use std::mem::size_of; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use restate_bifrost::loglet::LogletOffset; -use restate_types::logs::SequenceNumber; +use restate_types::logs::{LogletOffset, SequenceNumber}; use restate_types::replicated_loglet::ReplicatedLogletId; const DATA_KEY_PREFIX: u8 = b'd'; diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index 16b59814f3..b692f2406c 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -63,7 +63,7 @@ toml = { version = "0.8.12" } tracing = { workspace = true } tracing-opentelemetry = { workspace = true } ulid = { workspace = true } -xxhash-rust = { version = "0.8", features = ["xxh3"] } +xxhash-rust = { workspace = true, features = ["xxh3"] } [dev-dependencies] restate-test-util = { workspace = true } diff --git a/crates/types/src/logs/mod.rs b/crates/types/src/logs/mod.rs index a1ae80fd52..41e2400dec 100644 --- a/crates/types/src/logs/mod.rs +++ b/crates/types/src/logs/mod.rs @@ -8,8 +8,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::ops::RangeInclusive; +use std::ops::{Add, RangeInclusive}; +use bytes::{Buf, BufMut, BytesMut}; use serde::{Deserialize, Serialize}; use crate::identifiers::PartitionId; @@ -282,6 +283,83 @@ where } } +// Inner loglet offset +#[derive( + Debug, + Copy, + Clone, + PartialEq, + Eq, + Ord, + PartialOrd, + derive_more::From, + derive_more::Deref, + derive_more::Into, + derive_more::Display, + Serialize, + Deserialize, +)] +#[repr(transparent)] +#[serde(transparent)] +pub struct LogletOffset(u32); + +impl LogletOffset { + pub const fn new(offset: u32) -> Self { + Self(offset) + } + + pub fn decode(mut data: B) -> Self { + Self(data.get_u32()) + } + + pub const fn estimated_encode_size() -> usize { + size_of::() + } + + pub fn encode(&self, buf: &mut BytesMut) { + buf.reserve(Self::estimated_encode_size()); + buf.put_u32(self.0); + } + + pub fn encode_and_split(&self, buf: &mut BytesMut) -> BytesMut { + self.encode(buf); + buf.split() + } +} + +impl From for u64 { + fn from(value: LogletOffset) -> Self { + u64::from(value.0) + } +} + +impl Add for LogletOffset { + type Output = Self; + fn add(self, rhs: u32) -> Self { + Self( + self.0 + .checked_add(rhs) + .expect("loglet offset must not overflow over u32"), + ) + } +} + +impl SequenceNumber for LogletOffset { + const MAX: Self = LogletOffset(u32::MAX); + const INVALID: Self = LogletOffset(0); + const OLDEST: Self = LogletOffset(1); + + /// Saturates to Self::MAX + fn next(self) -> Self { + Self(self.0.saturating_add(1)) + } + + /// Saturates to Self::OLDEST. + fn prev(self) -> Self { + Self(std::cmp::max(Self::OLDEST.0, self.0.saturating_sub(1))) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/types/src/net/log_server.rs b/crates/types/src/net/log_server.rs new file mode 100644 index 0000000000..2ca5ca03e7 --- /dev/null +++ b/crates/types/src/net/log_server.rs @@ -0,0 +1,174 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use bitflags::bitflags; +use bytes::Bytes; +use serde::{Deserialize, Serialize}; + +use super::TargetName; +use crate::logs::{Keys, LogletOffset}; +use crate::net::define_rpc; +use crate::replicated_loglet::ReplicatedLogletId; +use crate::time::{MillisSinceEpoch, NanosSinceEpoch}; +use crate::GenerationalNodeId; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[repr(u8)] +pub enum Status { + /// Operation was successful + Ok = 0, + /// The node's storage system is disabled and cannot accept operations at the moment. + Disabled, + /// If the operation expired or not completed due to load shedding. The operation can be + /// retried by the client. It's guaranteed that this store has not been persisted by the node. + Dropped, + /// Operation rejected due to an ongoing or completed seal + Sealed, + /// Operation has been rejected. Operation requires that the sender is the authoritative + /// sequencer. + SequencerMismatch, + /// This indicates that the operation cannot be accepted due to the offset being out of bounds. + /// For instance, if a store is sent to a log-server that with a lagging local commit offset. + OutOfBounds, + /// The record is malformed, this could be because it has too many records or any other reason + /// that leads the server to reject processing it. + Malformed, +} + +// Store +define_rpc! { + @request = Store, + @response = Stored, + @request_target = TargetName::LogServerStore, + @response_target = TargetName::LogServerStored, +} + +// Release +define_rpc! { + @request = Release, + @response = Released, + @request_target = TargetName::LogServerRelease, + @response_target = TargetName::LogServerReleased, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RecordHeader { + pub created_at: NanosSinceEpoch, +} + +#[derive(derive_more::Debug, Clone, Serialize, Deserialize)] +pub struct RecordPayload { + pub created_at: NanosSinceEpoch, + #[debug("Bytes({} bytes)", body.len())] + pub body: Bytes, + pub keys: Keys, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AppendFlags(u32); + +// ------- Node to Bifrost ------ // +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Append { + pub loglet_id: ReplicatedLogletId, + pub flags: AppendFlags, + /// The receiver should skip handling this message if it hasn't started to act on it + /// before timeout expires. 0 means no timeout + pub timeout_at: MillisSinceEpoch, + pub payloads: Vec, +} + +impl Append { + /// The message's timeout has passed, we should discard if possible. + pub fn expired(&self) -> bool { + MillisSinceEpoch::now() >= self.timeout_at + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Appended { + known_global_tail: LogletOffset, + status: Status, + // INVALID if Status indicates that the append failed + first_offset: LogletOffset, +} + +// ------- Bifrost to LogServer ------ // +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StoreFlags(u32); +bitflags! { + impl StoreFlags: u32 { + const IgnoreSeal = 0b000_00001; + } +} + +/// Store one or more records on a log-server +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Store { + // The receiver should skip handling this message if it hasn't started to act on it + // before timeout expires. 0 means no timeout + pub timeout_at: MillisSinceEpoch, + pub known_global_tail: LogletOffset, + pub flags: StoreFlags, + pub loglet_id: ReplicatedLogletId, + /// Offset of the first record in the batch of payloads. Payloads in the batch get a gap-less + /// range of offsets that starts with (includes) the value of `first_offset`. + pub first_offset: LogletOffset, + /// This is the sequencer identifier for this log. This should be set even for repair store messages. + pub sequencer: GenerationalNodeId, + /// Denotes the last record that has been safely uploaded to an archiving data store. + pub known_archived: LogletOffset, + pub payloads: Vec, +} + +impl Store { + /// The message's timeout has passed, we should discard if possible. + pub fn expired(&self) -> bool { + MillisSinceEpoch::now() >= self.timeout_at + } + + // returns None on overflow + pub fn last_offset(&self) -> Option { + let len: u32 = self.payloads.len().try_into().ok()?; + self.first_offset.checked_add(len - 1).map(Into::into) + } +} + +/// Response to a `Store` request +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Stored { + pub local_tail: LogletOffset, + pub status: Status, +} + +impl Stored { + pub fn new(local_tail: LogletOffset) -> Self { + Self { + local_tail, + status: Status::Ok, + } + } + + pub fn status(mut self, status: Status) -> Self { + self.status = status; + self + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Release { + pub loglet_id: ReplicatedLogletId, + pub known_global_tail: LogletOffset, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Released { + pub known_global_tail: LogletOffset, +} diff --git a/crates/types/src/node_id.rs b/crates/types/src/node_id.rs index 608d61fe64..01d5bd4172 100644 --- a/crates/types/src/node_id.rs +++ b/crates/types/src/node_id.rs @@ -10,6 +10,8 @@ use std::str::FromStr; +use bytes::{Buf, BufMut, BytesMut}; + /// A generational node identifier. Nodes with the same ID but different generations /// represent the same node across different instances (restarts) of its lifetime. /// @@ -82,6 +84,26 @@ impl FromStr for GenerationalNodeId { } } +impl GenerationalNodeId { + pub fn decode(mut data: B) -> Self { + // generational node id is stored as two u32s next to each other, each in big-endian. + let plain_id = data.get_u32(); + let generation = data.get_u32(); + Self(PlainNodeId(plain_id), generation) + } + + pub fn encode(&self, buf: &mut BytesMut) { + buf.reserve(2 * size_of::()); + buf.put_u32(self.0 .0); + buf.put_u32(self.1); + } + + pub fn encode_and_split(&self, buf: &mut BytesMut) -> BytesMut { + self.encode(buf); + buf.split() + } +} + #[derive( Debug, Default, From f632f65628fb792ce18c594098f29849711aa4f3 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Mon, 9 Sep 2024 11:29:01 +0100 Subject: [PATCH 2/2] [Networking] Allow sending messages through Outgoing Allows Outgoing values to be sent directly if they were reciprocal to Incoming. --- crates/core/src/network/types.rs | 69 +++++++++++++++++++------------- 1 file changed, 41 insertions(+), 28 deletions(-) diff --git a/crates/core/src/network/types.rs b/crates/core/src/network/types.rs index 990ad11177..3386200715 100644 --- a/crates/core/src/network/types.rs +++ b/crates/core/src/network/types.rs @@ -127,8 +127,7 @@ impl Incoming { &self, response: O, ) -> Result<(), NetworkSendError> { - let (connection, versions, response) = self.respond_prep_inner(response)?; - connection.try_send(response, versions) + self.prepare_response(response).try_send() } /// Sends a response on the same connection where we received the request. This will @@ -139,27 +138,7 @@ impl Incoming { &self, response: O, ) -> Result<(), NetworkSendError> { - let (connection, versions, response) = self.respond_prep_inner(response)?; - connection.send(response, versions).await - } - - fn respond_prep_inner( - &self, - response: O, - ) -> Result<(Arc, HeaderMetadataVersions, Outgoing), NetworkSendError> { - let response = self.prepare_response(response); - - let connection = match self.connection.upgrade() { - Some(connection) => connection, - None => { - return Err(NetworkSendError::new( - response, - NetworkError::ConnectionClosed, - )); - } - }; - let versions = with_metadata(HeaderMetadataVersions::from_metadata); - Ok((connection, versions, response)) + self.prepare_response(response).send().await } } @@ -179,8 +158,7 @@ impl Incoming { &self, response: M::ResponseMessage, ) -> Result<(), NetworkSendError> { - let (connection, versions, response) = self.respond_prep_inner(response)?; - connection.try_send(response, versions) + self.prepare_response(response).try_send() } /// Sends a response on the same connection where we received the request. This will @@ -191,8 +169,7 @@ impl Incoming { &self, response: M::ResponseMessage, ) -> Result<(), NetworkSendError> { - let (connection, versions, response) = self.respond_prep_inner(response)?; - connection.send(response, versions).await + self.prepare_response(response).send().await } } @@ -255,7 +232,9 @@ impl Outgoing { self.connection.upgrade() } - pub(crate) fn reset_connection(&mut self) { + /// Detaches this message from the associated connection (if set). This allows this message to + /// be sent on any connection if NetworkSender is used to send this message. + pub fn reset_connection(&mut self) { self.connection = Weak::new(); } @@ -267,6 +246,7 @@ impl Outgoing { self.body } + /// If this is a response to a request, what is the message id of the original request? pub fn in_response_to(&self) -> Option { self.in_response_to } @@ -291,3 +271,36 @@ impl Outgoing { } } } + +impl Outgoing { + /// Sends a response on the same connection where we received the request. This will + /// fail with [`NetworkError::ConnectionClosed`] if the connection is terminated. + /// + /// This blocks until there is capacity on the connection stream. + pub async fn send(self) -> Result<(), NetworkSendError> { + let (connection, versions, outgoing) = self.prepare_send()?; + connection.send(outgoing, versions).await + } + + /// Sends a response on the same connection where we received the request. This will + /// fail with [`NetworkError::ConnectionClosed`] if the connection is terminated. + /// + /// This fails immediately with [`NetworkError::Full`] if connection stream is out of capacity. + pub fn try_send(self) -> Result<(), NetworkSendError> { + let (connection, versions, outgoing) = self.prepare_send()?; + connection.try_send(outgoing, versions) + } + + fn prepare_send( + self, + ) -> Result<(Arc, HeaderMetadataVersions, Self), NetworkSendError> { + let connection = match self.connection.upgrade() { + Some(connection) => connection, + None => { + return Err(NetworkSendError::new(self, NetworkError::ConnectionClosed)); + } + }; + let versions = with_metadata(HeaderMetadataVersions::from_metadata); + Ok((connection, versions, self)) + } +}