From 1961244f47aac983c2f4872c04e575e43f44d433 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 18 Nov 2023 23:51:25 +0800 Subject: [PATCH] feat: Improve the read_to_end perf and add benchmark vs_fs Signed-off-by: Xuanwo --- Cargo.lock | 11 +++ Cargo.toml | 1 + core/Cargo.toml | 2 + core/benches/oio/main.rs | 2 +- core/benches/vs_fs/Cargo.toml | 36 ++++++++++ core/benches/vs_fs/README.md | 35 +++++++++ core/benches/vs_fs/src/main.rs | 74 ++++++++++++++++++++ core/src/raw/oio/read/api.rs | 64 +++++++++-------- core/src/types/operator/blocking_operator.rs | 18 ++++- 9 files changed, 208 insertions(+), 35 deletions(-) create mode 100644 core/benches/vs_fs/Cargo.toml create mode 100644 core/benches/vs_fs/README.md create mode 100644 core/benches/vs_fs/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 02bf97e8191f..662052bd2ee7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4250,6 +4250,17 @@ dependencies = [ "wiremock", ] +[[package]] +name = "opendal-benchmark-vs-fs" +version = "0.0.0" +dependencies = [ + "criterion", + "opendal", + "rand 0.8.5", + "tokio", + "uuid", +] + [[package]] name = "opendal-c" version = "0.42.0" diff --git a/Cargo.toml b/Cargo.toml index 1fa2f169f8fb..6ec1b15ddb39 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "core", "core/fuzz", "core/edge/*", + "core/benches/vs_fs", "bindings/c", "bindings/nodejs", diff --git a/core/Cargo.toml b/core/Cargo.toml index 7a77484e23c0..11d8cf96aa4a 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -201,10 +201,12 @@ bench = false [[bench]] harness = false name = "ops" +required-features = ["tests"] [[bench]] harness = false name = "oio" +required-features = ["tests"] [[test]] harness = false diff --git a/core/benches/oio/main.rs b/core/benches/oio/main.rs index 85ca2ebbe497..41ae52f5d1dc 100644 --- a/core/benches/oio/main.rs +++ b/core/benches/oio/main.rs @@ -21,5 +21,5 @@ mod write; use criterion::criterion_group; use criterion::criterion_main; -criterion_group!(benches, write::bench_exact_buf_write,); +criterion_group!(benches, write::bench_exact_buf_write); criterion_main!(benches); diff --git a/core/benches/vs_fs/Cargo.toml b/core/benches/vs_fs/Cargo.toml new file mode 100644 index 000000000000..7a98ea84678a --- /dev/null +++ b/core/benches/vs_fs/Cargo.toml @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "opendal-benchmark-vs-fs" +description = "OpenDAL Benchmark vs fs" +version = "0.0.0" +publish = false + +authors.workspace = true +edition.workspace = true +homepage.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true + +[dependencies] +opendal = { path = "../..", features = ["tests"] } +tokio = { version = "1", features = ["full"] } +uuid = { version = "1", features = ["v4"] } +criterion = { version = "0.4", features = ["async", "async_tokio"] } +rand = "0.8" diff --git a/core/benches/vs_fs/README.md b/core/benches/vs_fs/README.md new file mode 100644 index 000000000000..6ded9ac01164 --- /dev/null +++ b/core/benches/vs_fs/README.md @@ -0,0 +1,35 @@ +# OpenDAL Benchmark VS Fs + +This benchmark compares the performance of OpenDAL with the performance of the `std::fs`. + +## Goal + +We expect OpenDAL to match `std::fs` in speed: the throughput of OpenDAL should be within a `5%` range of `std::fs`. + +## Usage + +For test: `cargo run` + +```shell +Testing vs_fs/std_fs_read +Success +Testing vs_fs/opendal_fs_read +Success +Testing vs_fs/opendal_fs_read_with_range +Success +``` + +For bench: `cargo run --release -- --bench` + +```shell +read/std_fs time: [749.57 µs 762.69 µs 777.07 µs] + thrpt: [20.108 GiB/s 20.487 GiB/s 20.845 GiB/s] + +read/opendal_fs time: [750.90 µs 755.39 µs 760.49 µs] + thrpt: [20.546 GiB/s 20.685 GiB/s 20.808 GiB/s] + +read/opendal_fs_with_range + time: [684.02 µs 690.77 µs 697.99 µs] + thrpt: [22.386 GiB/s 22.620 GiB/s 22.843 GiB/s] + +``` diff --git a/core/benches/vs_fs/src/main.rs b/core/benches/vs_fs/src/main.rs new file mode 100644 index 000000000000..7ec09fb02eb9 --- /dev/null +++ b/core/benches/vs_fs/src/main.rs @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use criterion::Criterion; +use opendal::services; +use opendal::Operator; +use rand::prelude::*; + +fn main() { + let mut c = Criterion::default().configure_from_args(); + bench_vs_fs(&mut c); + + c.final_summary(); +} + +fn bench_vs_fs(c: &mut Criterion) { + let mut cfg = services::Fs::default(); + cfg.root("/tmp/opendal/"); + let op = Operator::new(cfg).unwrap().finish().blocking(); + + let mut group = c.benchmark_group("read"); + group.throughput(criterion::Throughput::Bytes(16 * 1024 * 1024)); + + group.bench_function("std_fs", |b| { + let path = format!("/tmp/opendal/{}", prepare()); + b.iter(|| { + let _ = std::fs::read(&path).unwrap(); + }); + }); + group.bench_function("opendal_fs", |b| { + let path = prepare(); + b.iter(|| { + let _ = op.read(&path).unwrap(); + }); + }); + group.bench_function("opendal_fs_with_range", |b| { + let path = prepare(); + b.iter(|| { + let _ = op + .read_with(&path) + .range(0..16 * 1024 * 1024) + .call() + .unwrap(); + }); + }); + + group.finish() +} + +fn prepare() -> String { + let mut rng = thread_rng(); + let mut content = vec![0; 16 * 1024 * 1024]; + rng.fill_bytes(&mut content); + + let name = uuid::Uuid::new_v4(); + let path = format!("/tmp/opendal/{}", name); + let _ = std::fs::write(path, content); + + name.to_string() +} diff --git a/core/src/raw/oio/read/api.rs b/core/src/raw/oio/read/api.rs index 2c25f998104f..03a362c6d9d7 100644 --- a/core/src/raw/oio/read/api.rs +++ b/core/src/raw/oio/read/api.rs @@ -27,6 +27,7 @@ use std::task::Poll; use bytes::Bytes; use futures::Future; use pin_project::pin_project; +use tokio::io::ReadBuf; use crate::*; @@ -359,46 +360,47 @@ pub trait BlockingRead: Send + Sync { /// Read all data of current reader to the end of buf. fn read_to_end(&mut self, buf: &mut Vec) -> Result { - let start = buf.len(); - let mut next = MAX_READ_TO_END_GROW_SIZE; - let mut length = start; + let start_len = buf.len(); + let start_cap = buf.capacity(); loop { - if buf.capacity() == length { - buf.reserve(next); - // # Safety - // - // We make sure that the length of buf is maintained correctly. - #[allow(clippy::uninit_vec)] - unsafe { - buf.set_len(buf.capacity()); - } + if buf.len() == buf.capacity() { + buf.reserve(32); // buf is full, need more space } - let bs = &mut buf[length..]; - match self.read(bs) { - Ok(0) => { + let spare = buf.spare_capacity_mut(); + let mut read_buf: ReadBuf = ReadBuf::uninit(spare); + + // SAFETY: These bytes were initialized but not filled in the previous loop + unsafe { + read_buf.assume_init(read_buf.capacity()); + } + + match self.read(read_buf.initialize_unfilled()) { + Ok(0) => return Ok(buf.len() - start_len), + Ok(n) => { + // SAFETY: Read API makes sure that returning `n` is correct. unsafe { - buf.set_len(length); + buf.set_len(buf.len() + n); } - return Ok(length - start); } - Ok(n) => { - next = if n >= next { - cmp::min(next.saturating_mul(2), MAX_READ_TO_END_GROW_SIZE) - } else if n >= next / 2 { - next - } else { - cmp::max(next.saturating_div(2), MIN_READ_TO_END_GROW_SIZE) - }; + Err(e) => return Err(e), + } - // We can't allow bogus values from read. If it is too large, the returned vec could have its length - // set past its capacity, or if it overflows the vec could be shortened which could create an invalid - // string if this is called via read_to_string. - assert!(n <= buf.len()); - length += n; + // The buffer might be an exact fit. Let's read into a probe buffer + // and see if it returns `Ok(0)`. If so, we've avoided an + // unnecessary doubling of the capacity. But if not, append the + // probe buffer to the primary buffer and let its capacity grow. + if buf.len() == buf.capacity() && buf.capacity() == start_cap { + let mut probe = [0u8; 32]; + + match self.read(&mut probe) { + Ok(0) => return Ok(buf.len() - start_len), + Ok(n) => { + buf.extend_from_slice(&probe[..n]); + } + Err(e) => return Err(e), } - Err(e) => return Err(e), } } } diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index 2b1f4a45671f..5caed7534fa3 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -18,8 +18,7 @@ use bytes::Bytes; use super::operator_functions::*; -use crate::raw::oio::BlockingRead; -use crate::raw::oio::WriteBuf; +use crate::raw::oio::{BlockingRead, WriteBuf}; use crate::raw::*; use crate::*; @@ -338,9 +337,22 @@ impl BlockingOperator { ); } + let range = args.range(); + let size_hint = match range.size() { + Some(v) => v, + None => { + let mut size = inner + .blocking_stat(&path, OpStat::default())? + .into_metadata() + .content_length(); + size -= range.offset().unwrap_or(0); + size + } + }; + let (_, mut s) = inner.blocking_read(&path, args)?; - let mut buf = Vec::new(); + let mut buf = Vec::with_capacity(size_hint as usize); s.read_to_end(&mut buf)?; Ok(buf)