Skip to content

Commit

Permalink
feat(bindings/python): implement blocking file-like reader API
Browse files Browse the repository at this point in the history
  • Loading branch information
messense committed Mar 10, 2023
1 parent 8e0b3db commit 73606c9
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 0 deletions.
5 changes: 5 additions & 0 deletions bindings/python/opendal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class Error(Exception): ...
class Operator:
def __init__(self, scheme: str, **kwargs): ...
def read(self, path: str) -> bytes: ...
def reader(self, path: str) -> BlockingReader: ...
def write(self, path: str, bs: bytes): ...
def stat(self, path: str) -> Metadata: ...
def create_dir(self, path: str): ...
Expand All @@ -33,6 +34,10 @@ class AsyncOperator:
async def create_dir(self, path: str): ...
async def delete(self, path: str): ...

class BlockingReader:
def read(self, size: Optional[int] = None) -> bytes: ...
def seek(self, offset: int, whence: int = 0) -> int: ...

class Entry:
@property
def path(self) -> str: ...
Expand Down
65 changes: 65 additions & 0 deletions bindings/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@
// limitations under the License.

use std::collections::HashMap;
use std::io::Read;
use std::io::Seek;
use std::io::SeekFrom;
use std::str::FromStr;

use ::opendal as od;
use pyo3::create_exception;
use pyo3::exceptions::PyException;
use pyo3::exceptions::PyFileNotFoundError;
use pyo3::exceptions::PyIOError;
use pyo3::exceptions::PyNotImplementedError;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::PyBytes;
use pyo3::types::PyDict;
Expand Down Expand Up @@ -177,6 +182,13 @@ impl Operator {
.map(|res| PyBytes::new(py, &res).into())
}

pub fn reader(&self, path: &str) -> PyResult<BlockingReader> {
self.0
.reader(path)
.map(BlockingReader)
.map_err(format_pyerr)
}

pub fn write(&self, path: &str, bs: Vec<u8>) -> PyResult<()> {
self.0.write(path, bs).map_err(format_pyerr)
}
Expand All @@ -202,6 +214,58 @@ impl Operator {
}
}

#[pyclass(module = "opendal")]
struct BlockingReader(od::BlockingReader);

#[pymethods]
impl BlockingReader {
#[pyo3(signature = (size=None,))]
pub fn read<'p>(&'p mut self, py: Python<'p>, size: Option<usize>) -> PyResult<&'p PyAny> {
let buffer = match size {
Some(size) => {
let mut buffer = vec![0; size];
self.0
.read_exact(&mut buffer)
.map_err(|err| PyIOError::new_err(err.to_string()))?;
buffer
}
None => {
let mut buffer = Vec::new();
self.0
.read_to_end(&mut buffer)
.map_err(|err| PyIOError::new_err(err.to_string()))?;
buffer
}
};
Ok(PyBytes::new(py, &buffer).into())
}

pub fn write(&mut self, _bs: &[u8]) -> PyResult<()> {
Err(PyNotImplementedError::new_err(
"BlockingReader does not support write",
))
}

#[pyo3(signature = (pos, whence = 0))]
pub fn seek(&mut self, pos: i64, whence: u8) -> PyResult<u64> {
let whence = match whence {
0 => SeekFrom::Start(pos as u64),
1 => SeekFrom::Current(pos),
2 => SeekFrom::End(pos),
_ => return Err(PyValueError::new_err("invalid whence")),
};
self.0
.seek(whence)
.map_err(|err| PyIOError::new_err(err.to_string()))
}

pub fn tell(&mut self) -> PyResult<u64> {
self.0
.seek(SeekFrom::Current(0))
.map_err(|err| PyIOError::new_err(err.to_string()))
}
}

#[pyclass(unsendable, module = "opendal")]
struct BlockingLister(od::BlockingLister);

Expand Down Expand Up @@ -310,6 +374,7 @@ fn format_pyerr(err: od::Error) -> PyErr {
#[pymodule]
fn opendal(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<Operator>()?;
m.add_class::<BlockingReader>()?;
m.add_class::<AsyncOperator>()?;
m.add_class::<Entry>()?;
m.add_class::<EntryMode>()?;
Expand Down
11 changes: 11 additions & 0 deletions bindings/python/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.


import os
import opendal
import pytest

Expand All @@ -27,6 +28,16 @@ def test_blocking():
assert meta.mode.is_file()
assert [str(entry) for entry in op.list("/")] == ["test"]
assert [str(entry) for entry in op.scan("/")] == ["test"]

reader = op.reader("test")
bs = reader.read(5)
assert bs == b"Hello", bs
bs = reader.read()
assert bs == b", World!", bs
reader.seek(0, os.SEEK_SET)
bs = reader.read()
assert bs == b"Hello, World!", bs

op.delete("test")

op.create_dir("test/")
Expand Down

0 comments on commit 73606c9

Please sign in to comment.