Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bindings/cpp): expose reader #3004

Merged
merged 7 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions bindings/cpp/include/opendal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ struct Entry {
Entry(ffi::Entry &&);
};

class ReaderStream;

/**
* @class Operator
* @brief Operator is the entry for all public APIs.
Expand Down Expand Up @@ -117,6 +119,14 @@ class Operator {
*/
void write(std::string_view path, const std::vector<uint8_t> &data);

/**
* @brief Read data from the operator
*
* @param path The path of the data
* @return The reader of the data
*/
ReaderStream reader(std::string_view path);

/**
* @brief Check if the path exists
*
Expand Down Expand Up @@ -176,4 +186,37 @@ class Operator {
std::optional<rust::Box<opendal::ffi::Operator>> operator_;
};

using Reader = rust::Box<opendal::ffi::Reader>;

/**
* @class ReaderStreamBuf
* @brief The stream buffer for ReaderStream
*/
class ReaderStreamBuf : public std::streambuf {
public:
ReaderStreamBuf(Reader &&reader) : reader_(std::move(reader)) {}

protected:
int_type underflow() override;
pos_type seekoff(off_type off, std::ios_base::seekdir dir,
std::ios_base::openmode which) override;
pos_type seekpos(pos_type pos, std::ios_base::openmode which) override;

private:
Reader reader_;
};

/**
* @class ReaderStream
* @brief The stream for Reader
*/
class ReaderStream : public std::istream {
public:
ReaderStream(Reader &&reader)
: std::istream(&buf_), buf_(std::move(reader)) {}

private:
ReaderStreamBuf buf_;
};

} // namespace opendal
44 changes: 44 additions & 0 deletions bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use anyhow::Result;
use opendal as od;
use std::io::{BufRead, BufReader, Seek};
use std::str::FromStr;

#[cxx::bridge(namespace = "opendal::ffi")]
Expand All @@ -26,6 +27,12 @@ mod ffi {
value: String,
}

enum SeekDir {
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
Start = 0,
Current = 1,
End = 2,
}

enum EntryMode {
File = 1,
Dir = 2,
Expand Down Expand Up @@ -54,6 +61,7 @@ mod ffi {

extern "Rust" {
type Operator;
type Reader;

fn new_operator(scheme: &str, configs: Vec<HashMapValue>) -> Result<Box<Operator>>;
fn read(self: &Operator, path: &str) -> Result<Vec<u8>>;
Expand All @@ -65,10 +73,17 @@ mod ffi {
fn remove(self: &Operator, path: &str) -> Result<()>;
fn stat(self: &Operator, path: &str) -> Result<Metadata>;
fn list(self: &Operator, path: &str) -> Result<Vec<Entry>>;
fn reader(self: &Operator, path: &str) -> Result<Box<Reader>>;

fn fill_buf(self: &mut Reader) -> Result<&[u8]>;
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
fn buffer(self: &Reader) -> &[u8];
fn consume(self: &mut Reader, amt: usize);
fn seek(self: &mut Reader, offset: u64, dir: SeekDir) -> Result<u64>;
}
}

struct Operator(od::BlockingOperator);
struct Reader(BufReader<od::BlockingReader>);

fn new_operator(scheme: &str, configs: Vec<ffi::HashMapValue>) -> Result<Box<Operator>> {
let scheme = od::Scheme::from_str(scheme)?;
Expand Down Expand Up @@ -124,6 +139,35 @@ impl Operator {
fn list(&self, path: &str) -> Result<Vec<ffi::Entry>> {
Ok(self.0.list(path)?.into_iter().map(Into::into).collect())
}

fn reader(&self, path: &str) -> Result<Box<Reader>> {
Ok(Box::new(Reader(BufReader::new(self.0.reader(path)?))))
}
}

impl Reader {
fn fill_buf(&mut self) -> Result<&[u8]> {
Ok(self.0.fill_buf()?)
}

fn buffer(&self) -> &[u8] {
self.0.buffer()
}

fn consume(&mut self, amt: usize) {
self.0.consume(amt)
}

fn seek(&mut self, offset: u64, dir: ffi::SeekDir) -> Result<u64> {
let pos = match dir {
ffi::SeekDir::Start => std::io::SeekFrom::Start(offset),
ffi::SeekDir::Current => std::io::SeekFrom::Current(offset as i64),
ffi::SeekDir::End => std::io::SeekFrom::End(offset as i64),
_ => return Err(anyhow::anyhow!("invalid seek dir")),
};

Ok(self.0.seek(pos)?)
}
}

impl From<od::Metadata> for ffi::Metadata {
Expand Down
81 changes: 81 additions & 0 deletions bindings/cpp/src/opendal.cpp
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ using namespace opendal;
#define RUST_STR(s) rust::Str(s.data(), s.size())
#define RUST_STRING(s) rust::String(s.data(), s.size())

// Operator

Operator::Operator(std::string_view scheme,
const std::unordered_map<std::string, std::string> &config) {
auto rust_map = rust::Vec<ffi::HashMapValue>();
Expand Down Expand Up @@ -85,6 +87,70 @@ std::vector<Entry> Operator::list(std::string_view path) {
return entries;
}

ReaderStream Operator::reader(std::string_view path) {
return {operator_.value()->reader(RUST_STR(path))};
}

// Reader

// Development Note:
// Because rust side can't get current pointer info of c++, so we delay the
// `consume` operation to the next `fill_buf`. Please pay attention to call
// `consume` and update c++ pointers before each `seek` and `fill_buf`
// operation.

ffi::SeekDir to_rust_seek_dir(std::ios_base::seekdir dir);

ReaderStreamBuf::pos_type
ReaderStreamBuf::seekoff(ReaderStreamBuf::off_type off,
std::ios_base::seekdir dir,
std::ios_base::openmode which) {
if (!(which & std::ios_base::in)) {
return -1;
}

if (gptr() != nullptr) {
reader_->consume(gptr() - eback());
setg(gptr(), gptr(), egptr());
}

if (dir == std::ios_base::cur) {
off += gptr() - eback();
}

auto res = reader_->seek(off, to_rust_seek_dir(dir));

auto buffer = reader_->buffer();
auto gbeg = (char *)(buffer.data());
auto gcurr = gbeg;
auto gend = gbeg + buffer.size();
setg(gbeg, gcurr, gend);

return res;
}

ReaderStreamBuf::pos_type
ReaderStreamBuf::seekpos(ReaderStreamBuf::pos_type pos,
std::ios_base::openmode which) {
return seekoff(pos, std::ios_base::beg, which);
}

ReaderStreamBuf::int_type ReaderStreamBuf::underflow() {
if (gptr() != nullptr) {
reader_->consume(gptr() - eback());
setg(gptr(), gptr(), egptr());
}
auto buffer = reader_->fill_buf();
auto gbeg = (char *)(buffer.data());
auto gcurr = gbeg;
auto gend = gbeg + buffer.size();
setg(gbeg, gcurr, gend);

return gcurr == gend ? traits_type::eof() : traits_type::to_int_type(*gcurr);
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
}

// Metadata

std::optional<std::string> parse_optional_string(ffi::OptionalString &&s);

Metadata::Metadata(ffi::Metadata &&other) {
Expand All @@ -104,6 +170,8 @@ Metadata::Metadata(ffi::Metadata &&other) {
}
}

// Entry

Entry::Entry(ffi::Entry &&other) : path(std::move(other.path)) {}

// helper functions
Expand All @@ -114,4 +182,17 @@ std::optional<std::string> parse_optional_string(ffi::OptionalString &&s) {
} else {
return std::nullopt;
}
}

ffi::SeekDir to_rust_seek_dir(std::ios_base::seekdir dir) {
switch (dir) {
case std::ios_base::beg:
return ffi::SeekDir::Start;
case std::ios_base::cur:
return ffi::SeekDir::Current;
case std::ios_base::end:
return ffi::SeekDir::End;
default:
throw std::runtime_error("invalid seekdir");
}
}
44 changes: 43 additions & 1 deletion bindings/cpp/tests/basic_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

#include "opendal.hpp"
#include "gtest/gtest.h"
#include <ctime>
#include <optional>
#include <random>
#include <string>
#include <unordered_map>
#include <vector>

class OpendalTest : public ::testing::Test {
protected:
Expand All @@ -30,10 +33,14 @@ class OpendalTest : public ::testing::Test {
std::string scheme;
std::unordered_map<std::string, std::string> config;

// random number generator
std::mt19937 rng;

void SetUp() override {
scheme = "memory";
op = opendal::Operator(scheme, config);
rng.seed(time(nullptr));

op = opendal::Operator(scheme, config);
EXPECT_TRUE(op.available());
}
};
Expand Down Expand Up @@ -86,6 +93,41 @@ TEST_F(OpendalTest, BasicTest) {
EXPECT_FALSE(op.is_exist(file_path_renamed));
}

TEST_F(OpendalTest, ReaderTest) {
std::string file_path = "test";
constexpr int size = 2000;
std::vector<uint8_t> data(size);

for (auto &d : data) {
d = rng() % 256;
}

// write
op.write(file_path, data);

// read
auto reader = op.reader(file_path);

auto read = [&](std::size_t to_read, std::streampos expected_tellg) {
std::vector<char> v(to_read);
reader.read(v.data(), v.size());
EXPECT_TRUE(!!reader);
EXPECT_EQ(reader.tellg(), expected_tellg);
};

EXPECT_EQ(reader.tellg(), 0);
read(10, 10);
read(15, 25);
read(15, 40);
reader.get();
EXPECT_EQ(reader.tellg(), 41);
read(1000, 1041);

reader.seekg(0, std::ios::beg);
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
std::vector<uint8_t> reader_data(std::istreambuf_iterator<char>{reader}, {});
EXPECT_EQ(reader_data, data);
}

int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
Expand Down