Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bindings/cpp): expose reader #3004

Merged
merged 7 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bindings/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ add_custom_command(
COMMENT "Running cargo..."
)

find_package(Boost REQUIRED COMPONENTS date_time)
find_package(Boost REQUIRED COMPONENTS date_time iostreams)

add_library(opendal_cpp STATIC ${CPP_SOURCE_FILE} ${RUST_BRIDGE_CPP})
target_include_directories(opendal_cpp PUBLIC ${CPP_INCLUDE_DIR} Boost::date_time)
Expand Down
53 changes: 53 additions & 0 deletions bindings/cpp/include/opendal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include "lib.rs.h"

#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/iostreams/concepts.hpp>
#include <boost/iostreams/stream.hpp>
#include <memory>
#include <optional>
#include <string>
Expand Down Expand Up @@ -66,6 +68,8 @@ struct Entry {
Entry(ffi::Entry &&);
};

class Reader;

/**
* @class Operator
* @brief Operator is the entry for all public APIs.
Expand Down Expand Up @@ -117,6 +121,14 @@ class Operator {
*/
void write(std::string_view path, const std::vector<uint8_t> &data);

/**
* @brief Read data from the operator
*
* @param path The path of the data
* @return The reader of the data
*/
Reader reader(std::string_view path);

/**
* @brief Check if the path exists
*
Expand Down Expand Up @@ -176,4 +188,45 @@ class Operator {
std::optional<rust::Box<opendal::ffi::Operator>> operator_;
};

/**
* @class Reader
* @brief Reader is designed to read data from the operator.
* @details It provides basic read and seek operations. If you want to use it
* like a stream, you can use `ReaderStream` instead.
* @code{.cpp}
* auto reader = operator.reader("path");
* opendal::ReaderStream stream(reader);
* @endcode
*/
class Reader
: public boost::iostreams::device<boost::iostreams::input_seekable> {
public:
// Users should not use this type directly.
using InternalReader = rust::Box<opendal::ffi::Reader>;

Reader(InternalReader &&reader) : reader_(std::move(reader)) {}

std::streamsize read(void *s, std::streamsize n);
std::streampos seek(std::streamoff off, std::ios_base::seekdir way);

private:
InternalReader reader_;
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
};

// Boost IOStreams requires it to be copyable. So we need to use
// `reference_wrapper` in ReaderStream. More details can be seen at
// https://lists.boost.org/Archives/boost/2005/10/95939.php

/**
* @class ReaderStream
* @brief ReaderStream is a stream wrapper of Reader which can provide
* `iostream` interface.
*/
class ReaderStream
: public boost::iostreams::stream<boost::reference_wrapper<Reader>> {
public:
ReaderStream(Reader &reader)
: boost::iostreams::stream<boost::reference_wrapper<Reader>>(
boost::ref(reader)) {}
};
} // namespace opendal
36 changes: 36 additions & 0 deletions bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use anyhow::Result;
use od::raw::oio::BlockingRead;
use opendal as od;
use std::str::FromStr;

Expand All @@ -26,6 +27,13 @@ mod ffi {
value: String,
}

#[cxx_name = "SeekDir"]
enum SeekFrom {
Start = 0,
Current = 1,
End = 2,
}

enum EntryMode {
File = 1,
Dir = 2,
Expand Down Expand Up @@ -54,6 +62,7 @@ mod ffi {

extern "Rust" {
type Operator;
type Reader;

fn new_operator(scheme: &str, configs: Vec<HashMapValue>) -> Result<Box<Operator>>;
fn read(self: &Operator, path: &str) -> Result<Vec<u8>>;
Expand All @@ -65,10 +74,16 @@ mod ffi {
fn remove(self: &Operator, path: &str) -> Result<()>;
fn stat(self: &Operator, path: &str) -> Result<Metadata>;
fn list(self: &Operator, path: &str) -> Result<Vec<Entry>>;
fn reader(self: &Operator, path: &str) -> Result<Box<Reader>>;

#[cxx_name = "read"]
fn reader_read(self: &mut Reader, buf: &mut [u8]) -> Result<usize>;
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
fn seek(self: &mut Reader, offset: u64, dir: SeekFrom) -> Result<u64>;
}
}

struct Operator(od::BlockingOperator);
struct Reader(od::BlockingReader);

fn new_operator(scheme: &str, configs: Vec<ffi::HashMapValue>) -> Result<Box<Operator>> {
let scheme = od::Scheme::from_str(scheme)?;
Expand Down Expand Up @@ -124,6 +139,27 @@ impl Operator {
fn list(&self, path: &str) -> Result<Vec<ffi::Entry>> {
Ok(self.0.list(path)?.into_iter().map(Into::into).collect())
}

fn reader(&self, path: &str) -> Result<Box<Reader>> {
Ok(Box::new(Reader(self.0.reader(path)?)))
}
}

impl Reader {
fn reader_read(&mut self, buf: &mut [u8]) -> Result<usize> {
Ok(self.0.read(buf)?)
}

fn seek(&mut self, offset: u64, dir: ffi::SeekFrom) -> Result<u64> {
let pos = match dir {
ffi::SeekFrom::Start => std::io::SeekFrom::Start(offset),
ffi::SeekFrom::Current => std::io::SeekFrom::Current(offset as i64),
ffi::SeekFrom::End => std::io::SeekFrom::End(offset as i64),
_ => return Err(anyhow::anyhow!("invalid seek dir")),
};

Ok(self.0.seek(pos)?)
}
}

impl From<od::Metadata> for ffi::Metadata {
Expand Down
37 changes: 37 additions & 0 deletions bindings/cpp/src/opendal.cpp
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ using namespace opendal;
#define RUST_STR(s) rust::Str(s.data(), s.size())
#define RUST_STRING(s) rust::String(s.data(), s.size())

// Operator

Operator::Operator(std::string_view scheme,
const std::unordered_map<std::string, std::string> &config) {
auto rust_map = rust::Vec<ffi::HashMapValue>();
Expand Down Expand Up @@ -85,6 +87,26 @@ std::vector<Entry> Operator::list(std::string_view path) {
return entries;
}

Reader Operator::reader(std::string_view path) {
return operator_.value()->reader(RUST_STR(path));
}

// Reader

std::streamsize Reader::read(void *s, std::streamsize n) {
auto rust_slice = rust::Slice<uint8_t>(reinterpret_cast<uint8_t *>(s), n);
auto read_size = reader_->read(rust_slice);
return read_size;
}

ffi::SeekDir to_rust_seek_dir(std::ios_base::seekdir dir);

std::streampos Reader::seek(std::streamoff off, std::ios_base::seekdir dir) {
return reader_->seek(off, to_rust_seek_dir(dir));
}

// Metadata

std::optional<std::string> parse_optional_string(ffi::OptionalString &&s);

Metadata::Metadata(ffi::Metadata &&other) {
Expand All @@ -104,6 +126,8 @@ Metadata::Metadata(ffi::Metadata &&other) {
}
}

// Entry

Entry::Entry(ffi::Entry &&other) : path(std::move(other.path)) {}

// helper functions
Expand All @@ -114,4 +138,17 @@ std::optional<std::string> parse_optional_string(ffi::OptionalString &&s) {
} else {
return std::nullopt;
}
}

ffi::SeekDir to_rust_seek_dir(std::ios_base::seekdir dir) {
switch (dir) {
case std::ios_base::beg:
return ffi::SeekDir::Start;
case std::ios_base::cur:
return ffi::SeekDir::Current;
case std::ios_base::end:
return ffi::SeekDir::End;
default:
throw std::runtime_error("invalid seekdir");
}
}
55 changes: 54 additions & 1 deletion bindings/cpp/tests/basic_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

#include "opendal.hpp"
#include "gtest/gtest.h"
#include <ctime>
#include <optional>
#include <random>
#include <string>
#include <unordered_map>
#include <vector>

class OpendalTest : public ::testing::Test {
protected:
Expand All @@ -30,10 +33,14 @@ class OpendalTest : public ::testing::Test {
std::string scheme;
std::unordered_map<std::string, std::string> config;

// random number generator
std::mt19937 rng;

void SetUp() override {
scheme = "memory";
op = opendal::Operator(scheme, config);
rng.seed(time(nullptr));

op = opendal::Operator(scheme, config);
EXPECT_TRUE(op.available());
}
};
Expand Down Expand Up @@ -86,6 +93,52 @@ TEST_F(OpendalTest, BasicTest) {
EXPECT_FALSE(op.is_exist(file_path_renamed));
}

TEST_F(OpendalTest, ReaderTest) {
std::string file_path = "test";
constexpr int size = 2000;
std::vector<uint8_t> data(size);

for (auto &d : data) {
d = rng() % 256;
}

// write
op.write(file_path, data);

// reader
auto reader = op.reader(file_path);
uint8_t part_data[100];
reader.seek(200, std::ios::cur);
reader.read(part_data, 100);
EXPECT_EQ(reader.seek(0, std::ios::cur), 300);
for (int i = 0; i < 100; ++i) {
EXPECT_EQ(part_data[i], data[200 + i]);
}
reader.seek(0, std::ios::beg);

// stream
opendal::ReaderStream stream(reader);

auto read_fn = [&](std::size_t to_read, std::streampos expected_tellg) {
std::vector<char> v(to_read);
stream.read(v.data(), v.size());
EXPECT_TRUE(!!stream);
EXPECT_EQ(stream.tellg(), expected_tellg);
};

EXPECT_EQ(stream.tellg(), 0);
read_fn(10, 10);
read_fn(15, 25);
read_fn(15, 40);
stream.get();
EXPECT_EQ(stream.tellg(), 41);
read_fn(1000, 1041);

stream.seekg(0, std::ios::beg);
std::vector<uint8_t> reader_data(std::istreambuf_iterator<char>{stream}, {});
EXPECT_EQ(reader_data, data);
}

int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
Expand Down