Skip to content

Commit

Permalink
[LogServer] Support for trimming loglets
Browse files Browse the repository at this point in the history
Introduces Trim/Trimmed messages to trim loglets.
  • Loading branch information
AhmedSoliman committed Sep 11, 2024
1 parent 074e088 commit aad71b8
Show file tree
Hide file tree
Showing 10 changed files with 452 additions and 52 deletions.
8 changes: 7 additions & 1 deletion crates/core/src/network/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::ops::Deref;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Weak};

Expand Down Expand Up @@ -46,6 +46,12 @@ impl<M> Deref for Incoming<M> {
}
}

impl<M> DerefMut for Incoming<M> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.body
}
}

impl<M> Incoming<M> {
pub(crate) fn from_parts(
peer: GenerationalNodeId,
Expand Down
326 changes: 316 additions & 10 deletions crates/log-server/src/loglet_worker.rs

Large diffs are not rendered by default.

8 changes: 2 additions & 6 deletions crates/log-server/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ use tokio::sync::oneshot;

use restate_bifrost::loglet::OperationError;
use restate_core::ShutdownError;
use restate_types::logs::LogletOffset;
use restate_types::net::log_server::{GetRecords, Records, Seal, Store};
use restate_types::net::log_server::{GetRecords, Records, Seal, Store, Trim};
use restate_types::replicated_loglet::ReplicatedLogletId;

use crate::metadata::{LogStoreMarker, LogletState};
Expand Down Expand Up @@ -47,12 +46,9 @@ pub trait LogStore: Clone + Send + 'static {
seal_message: Seal,
) -> impl Future<Output = Result<AsyncToken, OperationError>> + Send;

// todo: remove when trim is fully tested
#[allow(dead_code)]
fn enqueue_trim(
&mut self,
loglet_id: ReplicatedLogletId,
trim_point: LogletOffset,
trim_message: Trim,
) -> impl Future<Output = Result<AsyncToken, OperationError>> + Send;

fn read_records(
Expand Down
16 changes: 13 additions & 3 deletions crates/log-server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,24 @@ impl LogletState {
self.local_tail.is_sealed()
}

pub fn local_tail(&self) -> watch::Ref<'_, TailState<LogletOffset>> {
self.local_tail.get()
pub fn local_tail(&self) -> TailState<LogletOffset> {
*self.local_tail.get()
}

#[allow(unused)]
pub fn trim_point(&self) -> LogletOffset {
*self.trim_point.borrow()
}

pub fn update_trim_point(&mut self, new_trim_point: LogletOffset) -> bool {
self.trim_point.send_if_modified(|t| {
if new_trim_point > *t {
*t = new_trim_point;
true
} else {
false
}
})
}
}

#[derive(Clone)]
Expand Down
30 changes: 27 additions & 3 deletions crates/log-server/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ use restate_core::network::{Incoming, MessageRouterBuilder, NetworkSender};
use restate_core::{cancellation_watcher, Metadata, TaskCenter};
use restate_types::config::Configuration;
use restate_types::live::Live;
use restate_types::net::log_server::{
GetRecords, GetTailInfo, Records, Release, Released, Seal, Sealed, Store, Stored, TailInfo,
};
use restate_types::net::log_server::*;
use restate_types::nodes_config::StorageState;
use restate_types::replicated_loglet::ReplicatedLogletId;

Expand All @@ -51,6 +49,7 @@ pub struct RequestPump {
seal_stream: MessageStream<Seal>,
get_tail_info_stream: MessageStream<GetTailInfo>,
get_records_stream: MessageStream<GetRecords>,
trim_stream: MessageStream<Trim>,
}

impl RequestPump {
Expand All @@ -71,6 +70,7 @@ impl RequestPump {
let seal_stream = router_builder.subscribe_to_stream(queue_length);
let get_tail_info_stream = router_builder.subscribe_to_stream(queue_length);
let get_records_stream = router_builder.subscribe_to_stream(queue_length);
let trim_stream = router_builder.subscribe_to_stream(queue_length);
Self {
task_center,
_metadata: metadata,
Expand All @@ -80,6 +80,7 @@ impl RequestPump {
seal_stream,
get_tail_info_stream,
get_records_stream,
trim_stream,
}
}

Expand All @@ -100,6 +101,7 @@ impl RequestPump {
mut seal_stream,
mut get_tail_info_stream,
mut get_records_stream,
mut trim_stream,
..
} = self;

Expand Down Expand Up @@ -186,6 +188,19 @@ impl RequestPump {
).await?;
Self::on_get_records(worker, get_records);
}
Some(trim) = trim_stream.next() => {
// find the worker or create one.
// enqueue.
let worker = Self::find_or_create_worker(
trim.loglet_id,
&log_store,
&task_center,
&global_tail_tracker,
&mut state_map,
&mut loglet_workers,
).await?;
Self::on_trim(worker, trim);
}
Some(store) = store_stream.next() => {
// find the worker or create one.
// enqueue.
Expand Down Expand Up @@ -260,6 +275,15 @@ impl RequestPump {
}
}

fn on_trim(worker: &LogletWorkerHandle, msg: Incoming<Trim>) {
if let Err(msg) = worker.enqueue_trim(msg) {
// worker has crashed or shutdown in progress. Notify the sender and drop the message.
if let Err(e) = msg.try_respond_rpc(Trimmed::empty()) {
debug!(?e.source, peer = %msg.peer(), "Failed to respond to Trim message with status Disabled due to peer channel capacity being full");
}
}
}

async fn find_or_create_worker<'a, S: LogStore>(
loglet_id: ReplicatedLogletId,
log_store: &S,
Expand Down
27 changes: 14 additions & 13 deletions crates/log-server/src/rocksdb_logstore/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use restate_types::config::LogServerOptions;
use restate_types::live::BoxedLiveLoad;
use restate_types::logs::{LogletOffset, SequenceNumber};
use restate_types::net::log_server::{
Gap, GetRecords, LogServerResponseHeader, MaybeRecord, Records, Seal, Store,
Gap, GetRecords, LogServerResponseHeader, MaybeRecord, Records, Seal, Store, Trim,
};
use restate_types::replicated_loglet::ReplicatedLogletId;
use restate_types::GenerationalNodeId;
Expand Down Expand Up @@ -139,7 +139,6 @@ impl LogStore for RocksDbLogStore {
let max_legal_record = DataRecordKey::new(loglet_id, LogletOffset::MAX);
let upper_bound = DataRecordKey::exclusive_upper_bound(loglet_id);
readopts.fill_cache(true);
readopts.set_ignore_range_deletions(true);
readopts.set_total_order_seek(false);
readopts.set_prefix_same_as_start(true);
readopts.set_iterate_lower_bound(oldest_key.to_bytes());
Expand All @@ -152,7 +151,7 @@ impl LogStore for RocksDbLogStore {
.raw_iterator_cf_opt(&data_cf, readopts);
// see to the max key that exists
iterator.seek_for_prev(max_legal_record.to_bytes());
let local_tail = if iterator.valid() {
let mut local_tail = if iterator.valid() {
let decoded_key = DataRecordKey::from_slice(iterator.key().unwrap());
trace!(
"Found last record of loglet {} is {}",
Expand All @@ -164,6 +163,14 @@ impl LogStore for RocksDbLogStore {
trace!("No data records for loglet {}", loglet_id);
LogletOffset::OLDEST
};
// If the loglet is trimmed (all records were removed) and we know the trim_point, then we
// use the trim_point.next() as the local_tail.
//
// Another way to describe this is `if trim_point => local_tail` but at this stage, I
// prefer to be conservative and explicit to catch unintended corner cases.
if local_tail == LogletOffset::OLDEST && trim_point > LogletOffset::INVALID {
local_tail = trim_point.next();
}

Ok(LogletState::new(
sequencer, local_tail, is_sealed, trim_point,
Expand All @@ -188,14 +195,8 @@ impl LogStore for RocksDbLogStore {
self.writer_handle.enqueue_seal(seal_message).await
}

// todo: remove when trim is fully implemented
#[allow(dead_code)]
async fn enqueue_trim(
&mut self,
loglet_id: ReplicatedLogletId,
trim_point: LogletOffset,
) -> Result<AsyncToken, OperationError> {
self.writer_handle.enqueue_trim(loglet_id, trim_point).await
async fn enqueue_trim(&mut self, trim_message: Trim) -> Result<AsyncToken, OperationError> {
self.writer_handle.enqueue_trim(trim_message).await
}

async fn read_records(
Expand All @@ -213,7 +214,7 @@ impl LogStore for RocksDbLogStore {
//
// If we are reading beyond the tail, the first thing we do is to clip to the
// local_tail.
let local_tail = loglet_state.local_tail().clone();
let local_tail = loglet_state.local_tail();
let trim_point = loglet_state.trim_point();

let read_from = msg.from_offset.max(trim_point.next());
Expand Down Expand Up @@ -331,7 +332,7 @@ impl LogStore for RocksDbLogStore {
}

Ok(Records {
header: LogServerResponseHeader::new(&local_tail),
header: LogServerResponseHeader::new(local_tail),
next_offset: read_pointer,
records,
})
Expand Down
16 changes: 7 additions & 9 deletions crates/log-server/src/rocksdb_logstore/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;
use bytes::BytesMut;
use futures::StreamExt as FutureStreamExt;
use metrics::histogram;
use restate_types::net::log_server::{Seal, Store};
use restate_types::net::log_server::{Seal, Store, Trim};
use restate_types::time::NanosSinceEpoch;
use restate_types::GenerationalNodeId;
use rocksdb::{BoundColumnFamily, WriteBatch};
Expand Down Expand Up @@ -336,20 +336,18 @@ impl RocksDbLogWriterHandle {
Ok(AsyncToken::new(receiver))
}

pub async fn enqueue_trim(
&self,
loglet_id: ReplicatedLogletId,
trim_point: LogletOffset,
) -> Result<AsyncToken, OperationError> {
pub async fn enqueue_trim(&self, trim_message: Trim) -> Result<AsyncToken, OperationError> {
let (ack, receiver) = oneshot::channel();

let data_update = DataUpdate::TrimLogRecords { trim_point };
let data_update = DataUpdate::TrimLogRecords {
trim_point: trim_message.trim_point,
};
let metadata_update = Some(MetadataUpdate::UpdateTrimPoint {
new_trim_point: trim_point,
new_trim_point: trim_message.trim_point,
});

self.send_command(LogStoreWriteCommand {
loglet_id,
loglet_id: trim_message.loglet_id,
data_update: Some(data_update),
metadata_update,
ack: Some(ack),
Expand Down
2 changes: 2 additions & 0 deletions crates/types/protobuf/restate/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ enum TargetName {
LOG_SERVER_TAIL_INFO = 17;
LOG_SERVER_GET_RECORDS = 18;
LOG_SERVER_RECORDS = 19;
LOG_SERVER_TRIM = 20;
LOG_SERVER_TRIMMED = 21;
}

enum NodeStatus {
Expand Down
2 changes: 1 addition & 1 deletion crates/types/src/logs/tail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
/// Represents the state of the tail of the loglet.
use super::{Lsn, SequenceNumber};

#[derive(Clone, Debug)]
#[derive(Clone, Copy, Debug)]
pub enum TailState<Offset = Lsn> {
/// Loglet is open for appends
Open(Offset),
Expand Down
Loading

0 comments on commit aad71b8

Please sign in to comment.