Skip to content

Commit

Permalink
feat(bindings/python): implement async context mananger protocol for …
Browse files Browse the repository at this point in the history
…AsyncReader
  • Loading branch information
messense committed Mar 13, 2023
1 parent 9937b2b commit ae88342
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 14 deletions.
4 changes: 3 additions & 1 deletion bindings/python/opendal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class Operator:
class AsyncOperator:
def __init__(self, scheme: str, **kwargs): ...
async def read(self, path: str) -> bytes: ...
async def open_reader(self, path: str) -> AsyncReader: ...
def open_reader(self, path: str) -> AsyncReader: ...
async def write(self, path: str, bs: bytes): ...
async def stat(self, path: str) -> Metadata: ...
async def create_dir(self, path: str): ...
Expand All @@ -46,6 +46,8 @@ class AsyncReader:
async def read(self, size: Optional[int] = None) -> bytes: ...
async def seek(self, offset: int, whence: int = 0) -> int: ...
async def tell(self) -> int: ...
def __aenter__(self) -> AsyncReader: ...
def __aexit__(self, exc_type, exc_value, traceback) -> None: ...

class Entry:
@property
Expand Down
79 changes: 67 additions & 12 deletions bindings/python/src/asyncio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,11 @@ impl AsyncOperator {
})
}

pub fn open_reader<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
let reader = this.reader(&path).await.map_err(format_pyerr)?;
let pyreader: PyObject = Python::with_gil(|py| AsyncReader::new(reader).into_py(py));
Ok(pyreader)
})
pub fn open_reader(&self, path: String) -> PyResult<AsyncReader> {
Ok(AsyncReader::new(ReaderState::Init {
operator: self.0.clone(),
path,
}))
}

pub fn write<'p>(&'p self, py: Python<'p>, path: String, bs: Vec<u8>) -> PyResult<&'p PyAny> {
Expand Down Expand Up @@ -104,11 +102,45 @@ impl AsyncOperator {
}
}

enum ReaderState {
Init {
operator: od::Operator,
path: String,
},
Open(od::Reader),
Closed,
}

impl ReaderState {
async fn reader(&mut self) -> PyResult<&mut od::Reader> {
let reader = match self {
ReaderState::Init { operator, path } => {
let reader = operator.reader(&path).await.map_err(format_pyerr)?;
*self = ReaderState::Open(reader);
if let ReaderState::Open(ref mut reader) = self {
reader
} else {
unreachable!()
}
}
ReaderState::Open(ref mut reader) => reader,
ReaderState::Closed => {
return Err(PyValueError::new_err("I/O operation on closed file."));
}
};
Ok(reader)
}

fn close(&mut self) {
*self = ReaderState::Closed;
}
}

#[pyclass(module = "opendal")]
pub struct AsyncReader(Arc<Mutex<od::Reader>>);
pub struct AsyncReader(Arc<Mutex<ReaderState>>);

impl AsyncReader {
fn new(reader: od::Reader) -> Self {
fn new(reader: ReaderState) -> Self {
Self(Arc::new(Mutex::new(reader)))
}
}
Expand All @@ -118,7 +150,8 @@ impl AsyncReader {
pub fn read<'p>(&'p self, py: Python<'p>, size: Option<usize>) -> PyResult<&'p PyAny> {
let reader = self.0.clone();
future_into_py(py, async move {
let mut reader = reader.lock().await;
let mut state = reader.lock().await;
let reader = state.reader().await?;
let buffer = match size {
Some(size) => {
let mut buffer = vec![0; size];
Expand Down Expand Up @@ -160,7 +193,8 @@ impl AsyncReader {
};
let reader = self.0.clone();
future_into_py(py, async move {
let mut reader = reader.lock().await;
let mut state = reader.lock().await;
let reader = state.reader().await?;
let ret = reader
.seek(whence)
.await
Expand All @@ -172,12 +206,33 @@ impl AsyncReader {
pub fn tell<'p>(&'p mut self, py: Python<'p>) -> PyResult<&'p PyAny> {
let reader = self.0.clone();
future_into_py(py, async move {
let mut reader = reader.lock().await;
let mut state = reader.lock().await;
let reader = state.reader().await?;
let pos = reader
.stream_position()
.await
.map_err(|err| PyIOError::new_err(err.to_string()))?;
Ok(Python::with_gil(|py| pos.into_py(py)))
})
}

fn __aenter__<'a>(slf: PyRef<'a, Self>, py: Python<'a>) -> PyResult<&'a PyAny> {
let slf = slf.into_py(py);
future_into_py(py, async move { Ok(slf) })
}

fn __aexit__<'a>(
&self,
py: Python<'a>,
_exc_type: &'a PyAny,
_exc_value: &'a PyAny,
_traceback: &'a PyAny,
) -> PyResult<&'a PyAny> {
let reader = self.0.clone();
future_into_py(py, async move {
let mut state = reader.lock().await;
state.close();
Ok(())
})
}
}
1 change: 1 addition & 0 deletions bindings/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ fn opendal(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<Operator>()?;
m.add_class::<Reader>()?;
m.add_class::<AsyncOperator>()?;
m.add_class::<AsyncReader>()?;
m.add_class::<Entry>()?;
m.add_class::<EntryMode>()?;
m.add_class::<Metadata>()?;
Expand Down
5 changes: 4 additions & 1 deletion bindings/python/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,17 @@ async def test_async():
assert meta.content_length == 13, meta.content_length
assert meta.mode.is_file()

reader = await op.open_reader("test")
reader = op.open_reader("test")
bs = await reader.read(5)
assert bs == b"Hello", bs
bs = await reader.read()
assert bs == b", World!", bs
await reader.seek(0, os.SEEK_SET)
bs = await reader.read()
assert bs == b"Hello, World!", bs
async with op.open_reader("test") as f:
bs = await f.read()
assert bs == b"Hello, World!", bs

await op.delete("test")

Expand Down

0 comments on commit ae88342

Please sign in to comment.