Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export tokio runtime metrics via prometheus #1524

Merged
merged 5 commits into from
May 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 5 additions & 98 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -80,7 +80,6 @@ async-channel = "2.1.1"
async-trait = "0.1.73"
axum = { version = "0.6.18", default-features = false }
base64 = "0.21"
bincode = { version = "2.0.0-rc", default-features = false, features = ["std", "serde", ] }
bytes = { version = "1.3", features = ["serde"] }
bytes-utils = "0.1.3"
bytestring = { version = "1.2", features = ["serde"] }
@@ -93,7 +92,6 @@ datafusion-expr = { version = "35.0.0" }
derive-getters = { version = "0.3.0" }
derive_builder = "0.12.0"
derive_more = { version = "0.99.17" }
drain = "0.1.1"
enum-map = { version = "2.7.3" }
enumset = { version = "1.1.3" }
flexbuffers = { version = "2.0.0" }
2 changes: 0 additions & 2 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -43,7 +43,6 @@ futures = { workspace = true }
http = { workspace = true }
indicatif = "0.17.7"
indoc = { version = "2.0.4" }
is-terminal = { version = "0.4.9" }
itertools = { workspace = true }
octocrab = { version = "0.32.0", features = ["stream"] }
once_cell = { workspace = true }
@@ -64,7 +63,6 @@ unicode-width = { version = "0.1.11" }
url = { version = "2.4.1" }
uuid = { workspace = true }
zip = "0.6"
zip-extensions = "0.6"

[build-dependencies]
vergen = { version = "8.0.0", default-features = false, features = [
4 changes: 0 additions & 4 deletions crates/admin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -15,8 +15,6 @@ options_schema = ["restate-service-client/options_schema"]
restate-bifrost = { workspace = true }
restate-core = { workspace = true }
restate-errors = { workspace = true }
restate-fs-util = { workspace = true }
restate-futures-util = { workspace = true }
restate-meta-rest-model = { workspace = true, features = ["schema"] }
restate-node-services = { workspace = true, features = ["servers", "clients"] }
restate-schema = { workspace = true }
@@ -29,14 +27,12 @@ restate-wal-protocol = { workspace = true }
anyhow = { workspace = true }
arrow-flight = { workspace = true }
axum = { workspace = true }
bincode = { workspace = true }
bytes = { workspace = true }
bytestring = { workspace = true }
codederror = { workspace = true }
datafusion = { workspace = true }
derive_builder = { workspace = true }
derive_more = { workspace = true }
drain = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
hyper = { workspace = true, features = ["full"] }
1 change: 0 additions & 1 deletion crates/benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@ restate-types = { workspace = true, features = ["clap"] }

anyhow = { workspace = true }
arc-swap = { workspace = true }
drain = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
hyper = { workspace = true, features = ["client"] }
4 changes: 1 addition & 3 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
@@ -13,7 +13,6 @@ test-util = []

[dependencies]
restate-core = { workspace = true }
restate-metadata-store = { workspace = true }
restate-rocksdb = { workspace = true }
restate-types = { workspace = true }

@@ -24,7 +23,6 @@ bytestring = { workspace = true, features = ["serde"] }
codederror = { workspace = true }
derive_builder = { workspace = true }
derive_more = { workspace = true }
drain = { workspace = true }
enum-map = { workspace = true, features = ["serde"] }
humantime = { workspace = true }
metrics = { workspace = true }
@@ -46,13 +44,13 @@ tracing = { workspace = true }

[dev-dependencies]
restate-core = { workspace = true, features = ["test-util"] }
restate-metadata-store = { workspace = true }
restate-test-util = { workspace = true }

criterion = { workspace = true, features = ["async_tokio"] }
futures = { workspace = true }
googletest = { workspace = true }
tempfile = { workspace = true }
test-log = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }
tracing-subscriber = { workspace = true }
tracing-test = { workspace = true }
20 changes: 16 additions & 4 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
@@ -15,14 +15,16 @@ use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use bytes::BytesMut;
use enum_map::EnumMap;
use once_cell::sync::OnceCell;
use tracing::{error, instrument};

use restate_core::{metadata, Metadata, MetadataKind};
use restate_types::logs::metadata::ProviderKind;
use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber};
use restate_types::storage::StorageCodec;
use restate_types::Version;
use tracing::{error, instrument};

use crate::loglet::{LogletBase, LogletProvider, LogletWrapper};
use crate::watchdog::{WatchdogCommand, WatchdogSender};
@@ -158,14 +160,20 @@ impl BifrostInner {
pub async fn append(&self, log_id: LogId, payload: Payload) -> Result<Lsn, Error> {
self.fail_if_shutting_down()?;
let loglet = self.writeable_loglet(log_id).await?;
loglet.append(payload).await
let mut buf = BytesMut::default();
StorageCodec::encode(payload, &mut buf).expect("serialization to bifrost is infallible");
loglet.append(buf.freeze()).await
}

pub async fn read_next_single(&self, log_id: LogId, after: Lsn) -> Result<LogRecord, Error> {
self.fail_if_shutting_down()?;

let loglet = self.find_loglet_for_lsn(log_id, after.next()).await?;
loglet.read_next_single(after).await
Ok(loglet
.read_next_single(after)
.await?
.decode()
.expect("decoding a bifrost envelope succeeds"))
}

pub async fn read_next_single_opt(
@@ -176,7 +184,11 @@ impl BifrostInner {
self.fail_if_shutting_down()?;

let loglet = self.find_loglet_for_lsn(log_id, after.next()).await?;
loglet.read_next_single_opt(after).await
Ok(loglet.read_next_single_opt(after).await?.map(|record| {
record
.decode()
.expect("decoding a bifrost envelope succeeds")
}))
}

pub async fn find_tail(
2 changes: 2 additions & 0 deletions crates/bifrost/src/lib.rs
Original file line number Diff line number Diff line change
@@ -13,12 +13,14 @@ mod error;
mod loglet;
mod loglets;
mod read_stream;
mod record;
mod service;
mod types;
mod watchdog;

pub use bifrost::Bifrost;
pub use error::{Error, ProviderError};
pub use read_stream::LogReadStream;
pub use record::*;
pub use service::BifrostService;
pub use types::*;
21 changes: 12 additions & 9 deletions crates/bifrost/src/loglet.rs
Original file line number Diff line number Diff line change
@@ -12,9 +12,10 @@ use std::sync::Arc;

use async_trait::async_trait;

use bytes::Bytes;
use restate_types::config::Configuration;
use restate_types::logs::metadata::{LogletParams, ProviderKind};
use restate_types::logs::{Lsn, Payload, SequenceNumber};
use restate_types::logs::{Lsn, SequenceNumber};

use crate::{Error, LogRecord, LsnExt, ProviderError};

@@ -104,7 +105,7 @@ pub trait LogletBase: Send + Sync {
type Offset: SequenceNumber;

/// Append a record to the loglet.
async fn append(&self, payload: Payload) -> Result<Self::Offset, Error>;
async fn append(&self, data: Bytes) -> Result<Self::Offset, Error>;

/// Find the tail of the loglet. If the loglet is empty or have been trimmed, the loglet should
/// return `None`.
@@ -116,22 +117,24 @@ pub trait LogletBase: Send + Sync {

/// Read or wait for the record at `from` offset, or the next available record if `from` isn't
/// defined for the loglet.
async fn read_next_single(&self, after: Self::Offset)
-> Result<LogRecord<Self::Offset>, Error>;
async fn read_next_single(
&self,
after: Self::Offset,
) -> Result<LogRecord<Self::Offset, Bytes>, Error>;

/// Read the next record if it's been committed, otherwise, return None without waiting.
async fn read_next_single_opt(
&self,
after: Self::Offset,
) -> Result<Option<LogRecord<Self::Offset>>, Error>;
) -> Result<Option<LogRecord<Self::Offset, Bytes>>, Error>;
}

#[async_trait]
impl LogletBase for LogletWrapper {
type Offset = Lsn;

async fn append(&self, payload: Payload) -> Result<Lsn, Error> {
let offset = self.loglet.append(payload).await?;
async fn append(&self, data: Bytes) -> Result<Lsn, Error> {
let offset = self.loglet.append(data).await?;
// Return the LSN given the loglet offset.
Ok(self.base_lsn.offset_by(offset))
}
@@ -146,7 +149,7 @@ impl LogletBase for LogletWrapper {
Ok(self.base_lsn.offset_by(offset))
}

async fn read_next_single(&self, after: Lsn) -> Result<LogRecord<Lsn>, Error> {
async fn read_next_single(&self, after: Lsn) -> Result<LogRecord<Lsn, Bytes>, Error> {
// convert LSN to loglet offset
let offset = after.into_offset(self.base_lsn);
self.loglet
@@ -158,7 +161,7 @@ impl LogletBase for LogletWrapper {
async fn read_next_single_opt(
&self,
after: Self::Offset,
) -> Result<Option<LogRecord<Self::Offset>>, Error> {
) -> Result<Option<LogRecord<Self::Offset, Bytes>>, Error> {
let offset = after.into_offset(self.base_lsn);
self.loglet
.read_next_single_opt(offset)
17 changes: 10 additions & 7 deletions crates/bifrost/src/loglets/local_loglet/mod.rs
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ pub use log_store::LogStoreError;
use metrics::{counter, histogram};
pub use provider::LocalLogletProvider;
use restate_core::ShutdownError;
use restate_types::logs::{Payload, SequenceNumber};
use restate_types::logs::SequenceNumber;
use tokio::sync::Mutex;
use tracing::{debug, warn};

@@ -95,7 +95,10 @@ impl LocalLoglet {
self.release_watch.notify(release_pointer);
}

fn read_after(&self, after: LogletOffset) -> Result<Option<LogRecord<LogletOffset>>, Error> {
fn read_after(
&self,
after: LogletOffset,
) -> Result<Option<LogRecord<LogletOffset, Bytes>>, Error> {
let trim_point = LogletOffset(self.trim_point_offset.load(Ordering::Relaxed));
// Are we reading after before the trim point? Note that if `trim_point` == `after`
// then we don't return a trim gap, because the next record is potentially a data
@@ -138,15 +141,15 @@ impl LocalLoglet {
return Ok(None);
}
let data = Bytes::from(data);
Ok(Some(LogRecord::new_data(key.offset, Payload::from(data))))
Ok(Some(LogRecord::new_data(key.offset, data)))
}
}
}

#[async_trait]
impl LogletBase for LocalLoglet {
type Offset = LogletOffset;
async fn append(&self, payload: Payload) -> Result<LogletOffset, Error> {
async fn append(&self, payload: Bytes) -> Result<LogletOffset, Error> {
counter!(BIFROST_LOCAL_APPEND).increment(1);
let start_time = std::time::Instant::now();
// We hold the lock to ensure that offsets are enqueued in the order of
@@ -162,7 +165,7 @@ impl LogletBase for LocalLoglet {
.enqueue_put_record(
self.log_id,
offset,
payload.into(),
payload,
true, /* release_immediately */
)
.await?;
@@ -199,7 +202,7 @@ impl LogletBase for LocalLoglet {
async fn read_next_single(
&self,
after: Self::Offset,
) -> Result<LogRecord<Self::Offset>, Error> {
) -> Result<LogRecord<Self::Offset, Bytes>, Error> {
loop {
let next_record = self.read_after(after)?;
if let Some(next_record) = next_record {
@@ -213,7 +216,7 @@ impl LogletBase for LocalLoglet {
async fn read_next_single_opt(
&self,
after: Self::Offset,
) -> Result<Option<LogRecord<Self::Offset>>, Error> {
) -> Result<Option<LogRecord<Self::Offset, Bytes>>, Error> {
self.read_after(after)
}
}
Loading