diff --git a/Cargo.lock b/Cargo.lock index 24bc8a76fc5a..10e8741c7006 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1572,6 +1572,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ea835d29036a4087793836fa931b08837ad5e957da9e23886b29586fb9b6650" +[[package]] +name = "dtoa" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcbb2bf8e87535c23f7a8a321e364ce21462d0ff10cb6407820e8e96dfff6653" + [[package]] name = "either" version = "1.8.1" @@ -3580,6 +3586,7 @@ dependencies = [ "pin-project", "pretty_assertions", "prometheus", + "prometheus-client", "prost", "quick-xml", "rand 0.8.5", @@ -4493,6 +4500,29 @@ dependencies = [ "thiserror", ] +[[package]] +name = "prometheus-client" +version = "0.21.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c99afa9a01501019ac3a14d71d9f94050346f55ca471ce90c799a15c58f61e2" +dependencies = [ + "dtoa", + "itoa", + "parking_lot 0.12.1", + "prometheus-client-derive-encode", +] + +[[package]] +name = "prometheus-client-derive-encode" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.23", +] + [[package]] name = "prost" version = "0.11.9" diff --git a/core/Cargo.toml b/core/Cargo.toml index 33232299ddf6..e09d94c5116f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -83,8 +83,10 @@ layers-all = [ layers-chaos = ["dep:rand"] # Enable layers metrics support layers-metrics = ["dep:metrics"] -# Enable layers prometheus support +# Enable layers prometheus support, with tikv/prometheus-rs crate layers-prometheus = ["dep:prometheus"] +# Enable layers prometheus support, with prometheus-client crate +layers-prometheus-client = ["dep:prometheus-client"] # Enable layers madsim support layers-madsim = ["dep:madsim"] # Enable layers minitrace support. @@ -243,6 +245,7 @@ percent-encoding = "2" persy = { version = "1.4.4", optional = true } pin-project = "1" prometheus = { version = "0.13", features = ["process"], optional = true } +prometheus-client = { version = "0.21.2", optional = true } prost = { version = "0.11", optional = true } quick-xml = { version = "0.29", features = ["serialize", "overlapped-lists"] } rand = { version = "0.8", optional = true } diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index cd9414ba64ed..7c064cdcf967 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -56,6 +56,11 @@ mod prometheus; #[cfg(feature = "layers-prometheus")] pub use self::prometheus::PrometheusLayer; +#[cfg(feature = "layers-prometheus-client")] +mod prometheus_client; +#[cfg(feature = "layers-prometheus-client")] +pub use self::prometheus_client::PrometheusClientLayer; + mod retry; pub use self::retry::RetryInterceptor; pub use self::retry::RetryLayer; diff --git a/core/src/layers/prometheus_client.rs b/core/src/layers/prometheus_client.rs new file mode 100644 index 000000000000..b08b50a28af6 --- /dev/null +++ b/core/src/layers/prometheus_client.rs @@ -0,0 +1,675 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; +use std::fmt::Formatter; +use std::io; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; +use std::time::{Duration, Instant}; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::FutureExt; +use futures::TryFutureExt; +use prometheus_client::metrics::counter::Counter; +use prometheus_client::metrics::family::Family; +use prometheus_client::metrics::histogram; +use prometheus_client::metrics::histogram::Histogram; +use prometheus_client::registry::Registry; + +use crate::raw::Accessor; +use crate::raw::*; +use crate::*; + +/// Add [prometheus](https://docs.rs/prometheus) for every operations. +/// +/// # Examples +/// +/// ``` +/// use log::debug; +/// use log::info; +/// use opendal::layers::PrometheusClientLayer; +/// use opendal::services; +/// use opendal::Operator; +/// use opendal::Result; +/// +/// /// Visit [`opendal::services`] for more service related config. +/// /// Visit [`opendal::Operator`] for more operator level APIs. +/// #[tokio::main] +/// async fn main() -> Result<()> { +/// // Pick a builder and configure it. +/// let builder = services::Memory::default(); +/// let mut registry = prometheus_client::registry::Registry::default(); +/// +/// let op = Operator::new(builder) +/// .expect("must init") +/// .layer(PrometheusClientLayer::with_registry(&mut registry)) +/// .finish(); +/// debug!("operator: {op:?}"); +/// +/// // Write data into object test. +/// op.write("test", "Hello, World!").await?; +/// // Read data from object. +/// let bs = op.read("test").await?; +/// info!("content: {}", String::from_utf8_lossy(&bs)); +/// +/// // Get object metadata. +/// let meta = op.stat("test").await?; +/// info!("meta: {:?}", meta); +/// +/// // Export prometheus metrics. +/// let mut buf = String::new(); +/// prometheus_client::encoding::text::encode(&mut buf, ®istry).unwrap(); +/// println!("## Prometheus Metrics"); +/// println!("{}", buf); +/// Ok(()) +/// } +/// ``` +#[derive(Debug)] +pub struct PrometheusClientLayer { + metrics: PrometheusClientMetrics, +} + +impl PrometheusClientLayer { + /// create PrometheusClientLayer while registering itself to this registry. + pub fn new(registry: &mut Registry) -> Self { + let metrics = PrometheusClientMetrics::register(registry); + Self { metrics } + } +} + +impl Layer for PrometheusClientLayer { + type LayeredAccessor = PrometheusAccessor; + + fn layer(&self, inner: A) -> Self::LayeredAccessor { + let meta = inner.info(); + let scheme = meta.scheme(); + + let metrics = Arc::new(self.metrics.clone()); + PrometheusAccessor { + inner, + metrics, + scheme, + } + } +} + +type OperationLabels = [(&'static str, &'static str); 2]; +type ErrorLabels = [(&'static str, &'static str); 3]; + +/// [`PrometheusClientMetrics`] provide the performance and IO metrics with the `prometheus-client` crate. +#[derive(Debug, Clone)] +struct PrometheusClientMetrics { + /// Total counter of the specific operation be called. + requests_total: Family, + /// Total counter of the errors. + errors_total: Family, + /// Latency of the specific operation be called. + request_duration_seconds: Family, + /// The histogram of bytes + bytes_histogram: Family, +} + +impl PrometheusClientMetrics { + pub fn register(registry: &mut Registry) -> Self { + let requests_total = Family::default(); + let errors_total = Family::default(); + let request_duration_seconds = Family::::new_with_constructor(|| { + let buckets = histogram::exponential_buckets(0.01, 2.0, 16); + Histogram::new(buckets) + }); + let bytes_histogram = Family::::new_with_constructor(|| { + let buckets = histogram::exponential_buckets(1.0, 2.0, 16); + Histogram::new(buckets) + }); + + registry.register("opendal_requests_total", "", requests_total.clone()); + registry.register("opendal_errors_total", "", errors_total.clone()); + registry.register( + "opendal_request_duration_seconds", + "", + request_duration_seconds.clone(), + ); + registry.register("opendal_bytes_histogram", "", bytes_histogram.clone()); + Self { + requests_total, + errors_total, + request_duration_seconds, + bytes_histogram, + } + } + + fn increment_errors_total(&self, scheme: Scheme, op: Operation, err: ErrorKind) { + let labels = [ + ("scheme", scheme.into_static()), + ("op", op.into_static()), + ("err", err.into_static()), + ]; + self.errors_total.get_or_create(&labels).inc(); + } + + fn increment_request_total(&self, scheme: Scheme, op: Operation) { + let labels = [("scheme", scheme.into_static()), ("op", op.into_static())]; + self.requests_total.get_or_create(&labels).inc(); + } + + fn observe_bytes_total(&self, scheme: Scheme, op: Operation, bytes: usize) { + let labels = [("scheme", scheme.into_static()), ("op", op.into_static())]; + self.bytes_histogram + .get_or_create(&labels) + .observe(bytes as f64); + } + + fn observe_request_duration(&self, scheme: Scheme, op: Operation, duration: Duration) { + let labels = [("scheme", scheme.into_static()), ("op", op.into_static())]; + self.request_duration_seconds + .get_or_create(&labels) + .observe(duration.as_secs_f64()); + } +} + +#[derive(Clone)] +pub struct PrometheusAccessor { + inner: A, + metrics: Arc, + scheme: Scheme, +} + +impl Debug for PrometheusAccessor { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PrometheusAccessor") + .field("inner", &self.inner) + .finish_non_exhaustive() + } +} + +#[async_trait] +impl LayeredAccessor for PrometheusAccessor { + type Inner = A; + type Reader = PrometheusMetricWrapper; + type BlockingReader = PrometheusMetricWrapper; + type Writer = PrometheusMetricWrapper; + type BlockingWriter = PrometheusMetricWrapper; + type Pager = A::Pager; + type BlockingPager = A::BlockingPager; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { + self.metrics + .increment_request_total(self.scheme, Operation::CreateDir); + + let start_time = Instant::now(); + let create_res = self.inner.create_dir(path, args).await; + + self.metrics.observe_request_duration( + self.scheme, + Operation::CreateDir, + start_time.elapsed(), + ); + create_res.map_err(|e| { + self.metrics + .increment_errors_total(self.scheme, Operation::CreateDir, e.kind()); + e + }) + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + self.metrics + .increment_request_total(self.scheme, Operation::Read); + + let read_res = self + .inner + .read(path, args) + .map(|v| { + v.map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new( + r, + Operation::Read, + self.metrics.clone(), + self.scheme, + ), + ) + }) + }) + .await; + read_res.map_err(|e| { + self.metrics + .increment_errors_total(self.scheme, Operation::Read, e.kind()); + e + }) + } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + self.metrics + .increment_request_total(self.scheme, Operation::Write); + + let write_res = self + .inner + .write(path, args) + .map(|v| { + v.map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new( + r, + Operation::Write, + self.metrics.clone(), + self.scheme, + ), + ) + }) + }) + .await; + + write_res.map_err(|e| { + self.metrics + .increment_errors_total(self.scheme, Operation::Write, e.kind()); + e + }) + } + + async fn stat(&self, path: &str, args: OpStat) -> Result { + self.metrics + .increment_request_total(self.scheme, Operation::Stat); + let start_time = Instant::now(); + + let stat_res = self + .inner + .stat(path, args) + .inspect_err(|e| { + self.metrics + .increment_errors_total(self.scheme, Operation::Stat, e.kind()); + }) + .await; + + self.metrics + .observe_request_duration(self.scheme, Operation::Stat, start_time.elapsed()); + stat_res.map_err(|e| { + self.metrics + .increment_errors_total(self.scheme, Operation::Stat, e.kind()); + e + }) + } + + async fn delete(&self, path: &str, args: OpDelete) -> Result { + self.metrics + .increment_request_total(self.scheme, Operation::Delete); + let start_time = Instant::now(); + + let delete_res = self.inner.delete(path, args).await; + + self.metrics + .observe_request_duration(self.scheme, Operation::Delete, start_time.elapsed()); + delete_res.map_err(|e| { + self.metrics + .increment_errors_total(self.scheme, Operation::Delete, e.kind()); + e + }) + } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { + self.metrics + .increment_request_total(self.scheme, Operation::List); + let start_time = Instant::now(); + + let list_res = self.inner.list(path, args).await; + + self.metrics + .observe_request_duration(self.scheme, Operation::List, start_time.elapsed()); + list_res.map_err(|e| { + self.metrics + .increment_errors_total(self.scheme, Operation::List, e.kind()); + e + }) + } + + async fn batch(&self, args: OpBatch) -> Result { + self.metrics + .increment_request_total(self.scheme, Operation::Batch); + let start_time = Instant::now(); + + let result = self.inner.batch(args).await; + + self.metrics + .observe_request_duration(self.scheme, Operation::Batch, start_time.elapsed()); + result.map_err(|e| { + self.metrics + .increment_errors_total(self.scheme, Operation::Batch, e.kind()); + e + }) + } + + async fn presign(&self, path: &str, args: OpPresign) -> Result { + self.metrics + .increment_request_total(self.scheme, Operation::Presign); + let start_time = Instant::now(); + + let result = self.inner.presign(path, args).await; + + self.metrics.observe_request_duration( + self.scheme, + Operation::Presign, + start_time.elapsed(), + ); + result.map_err(|e| { + self.metrics + .increment_errors_total(self.scheme, Operation::Presign, e.kind()); + e + }) + } + + fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { + self.metrics + .increment_request_total(self.scheme, Operation::BlockingCreateDir); + let start_time = Instant::now(); + + let result = self.inner.blocking_create_dir(path, args); + + self.metrics.observe_request_duration( + self.scheme, + Operation::BlockingCreateDir, + start_time.elapsed(), + ); + result.map_err(|e| { + self.metrics.increment_errors_total( + self.scheme, + Operation::BlockingCreateDir, + e.kind(), + ); + e + }) + } + + fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { + self.metrics + .increment_request_total(self.scheme, Operation::BlockingRead); + + let result = self.inner.blocking_read(path, args).map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new( + r, + Operation::BlockingRead, + self.metrics.clone(), + self.scheme, + ), + ) + }); + + result.map_err(|e| { + self.metrics + .increment_errors_total(self.scheme, Operation::BlockingRead, e.kind()); + e + }) + } + + fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { + self.metrics + .increment_request_total(self.scheme, Operation::BlockingWrite); + + let result = self.inner.blocking_write(path, args).map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new( + r, + Operation::BlockingWrite, + self.metrics.clone(), + self.scheme, + ), + ) + }); + + result.map_err(|e| { + self.metrics + .increment_errors_total(self.scheme, Operation::BlockingWrite, e.kind()); + e + }) + } + + fn blocking_stat(&self, path: &str, args: OpStat) -> Result { + self.metrics + .increment_request_total(self.scheme, Operation::BlockingStat); + let start_time = Instant::now(); + + let result = self.inner.blocking_stat(path, args); + self.metrics.observe_request_duration( + self.scheme, + Operation::BlockingStat, + start_time.elapsed(), + ); + + result.map_err(|e| { + self.metrics + .increment_errors_total(self.scheme, Operation::BlockingStat, e.kind()); + e + }) + } + + fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { + self.metrics + .increment_request_total(self.scheme, Operation::BlockingDelete); + let start_time = Instant::now(); + + let result = self.inner.blocking_delete(path, args); + + self.metrics.observe_request_duration( + self.scheme, + Operation::BlockingDelete, + start_time.elapsed(), + ); + result.map_err(|e| { + self.metrics + .increment_errors_total(self.scheme, Operation::BlockingDelete, e.kind()); + e + }) + } + + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { + self.metrics + .increment_request_total(self.scheme, Operation::BlockingList); + let start_time = Instant::now(); + + let result = self.inner.blocking_list(path, args); + + self.metrics.observe_request_duration( + self.scheme, + Operation::BlockingList, + start_time.elapsed(), + ); + result.map_err(|e| { + self.metrics + .increment_errors_total(self.scheme, Operation::BlockingList, e.kind()); + e + }) + } +} + +pub struct PrometheusMetricWrapper { + inner: R, + + op: Operation, + metrics: Arc, + scheme: Scheme, + bytes_total: usize, + start_time: Instant, +} + +impl PrometheusMetricWrapper { + fn new(inner: R, op: Operation, metrics: Arc, scheme: Scheme) -> Self { + Self { + inner, + op, + metrics, + scheme, + bytes_total: 0, + start_time: Instant::now(), + } + } +} + +impl oio::Read for PrometheusMetricWrapper { + fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + self.inner.poll_read(cx, buf).map(|res| match res { + Ok(bytes) => { + self.bytes_total += bytes; + Ok(bytes) + } + Err(e) => { + self.metrics + .increment_errors_total(self.scheme, self.op, e.kind()); + Err(e) + } + }) + } + + fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) -> Poll> { + self.inner.poll_seek(cx, pos).map(|res| match res { + Ok(n) => Ok(n), + Err(e) => { + self.metrics + .increment_errors_total(self.scheme, self.op, e.kind()); + Err(e) + } + }) + } + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + self.inner.poll_next(cx).map(|res| match res { + Some(Ok(bytes)) => { + self.bytes_total += bytes.len(); + Some(Ok(bytes)) + } + Some(Err(e)) => { + self.metrics + .increment_errors_total(self.scheme, self.op, e.kind()); + Some(Err(e)) + } + None => None, + }) + } +} + +impl oio::BlockingRead for PrometheusMetricWrapper { + fn read(&mut self, buf: &mut [u8]) -> Result { + self.inner + .read(buf) + .map(|n| { + self.bytes_total += n; + n + }) + .map_err(|e| { + self.metrics + .increment_errors_total(self.scheme, self.op, e.kind()); + e + }) + } + + fn seek(&mut self, pos: io::SeekFrom) -> Result { + self.inner.seek(pos).map_err(|err| { + self.metrics + .increment_errors_total(self.scheme, self.op, err.kind()); + err + }) + } + + fn next(&mut self) -> Option> { + self.inner.next().map(|res| match res { + Ok(bytes) => { + self.bytes_total += bytes.len(); + Ok(bytes) + } + Err(e) => { + self.metrics + .increment_errors_total(self.scheme, self.op, e.kind()); + Err(e) + } + }) + } +} + +#[async_trait] +impl oio::Write for PrometheusMetricWrapper { + fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll> { + self.inner + .poll_write(cx, bs) + .map_ok(|n| { + self.bytes_total += n; + n + }) + .map_err(|err| { + self.metrics + .increment_errors_total(self.scheme, self.op, err.kind()); + err + }) + } + + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_abort(cx).map_err(|err| { + self.metrics + .increment_errors_total(self.scheme, self.op, err.kind()); + err + }) + } + + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_close(cx).map_err(|err| { + self.metrics + .increment_errors_total(self.scheme, self.op, err.kind()); + err + }) + } +} + +impl oio::BlockingWrite for PrometheusMetricWrapper { + fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result { + self.inner + .write(bs) + .map(|n| { + self.bytes_total += n; + n + }) + .map_err(|err| { + self.metrics + .increment_errors_total(self.scheme, self.op, err.kind()); + err + }) + } + + fn close(&mut self) -> Result<()> { + self.inner.close().map_err(|err| { + self.metrics + .increment_errors_total(self.scheme, self.op, err.kind()); + err + }) + } +} + +impl Drop for PrometheusMetricWrapper { + fn drop(&mut self) { + self.metrics + .observe_bytes_total(self.scheme, self.op, self.bytes_total); + self.metrics + .observe_request_duration(self.scheme, self.op, self.start_time.elapsed()); + } +}