From 3bee5aaef5ec225efc664dd776392946e0223258 Mon Sep 17 00:00:00 2001 From: messense Date: Mon, 13 Mar 2023 09:57:11 +0800 Subject: [PATCH] feat(bindings/python): implement async context mananger protocol for AsyncReader --- bindings/python/opendal.pyi | 4 +- bindings/python/src/asyncio.rs | 79 +++++++++++++++++++++++++----- bindings/python/src/lib.rs | 1 + bindings/python/tests/test_core.py | 5 +- 4 files changed, 75 insertions(+), 14 deletions(-) diff --git a/bindings/python/opendal.pyi b/bindings/python/opendal.pyi index f87f6e78150f..ab0f8bcd70f9 100644 --- a/bindings/python/opendal.pyi +++ b/bindings/python/opendal.pyi @@ -29,7 +29,7 @@ class Operator: class AsyncOperator: def __init__(self, scheme: str, **kwargs): ... async def read(self, path: str) -> bytes: ... - async def open_reader(self, path: str) -> AsyncReader: ... + def open_reader(self, path: str) -> AsyncReader: ... async def write(self, path: str, bs: bytes): ... async def stat(self, path: str) -> Metadata: ... async def create_dir(self, path: str): ... @@ -46,6 +46,8 @@ class AsyncReader: async def read(self, size: Optional[int] = None) -> bytes: ... async def seek(self, offset: int, whence: int = 0) -> int: ... async def tell(self) -> int: ... + def __aenter__(self) -> AsyncReader: ... + def __aexit__(self, exc_type, exc_value, traceback) -> None: ... class Entry: @property diff --git a/bindings/python/src/asyncio.rs b/bindings/python/src/asyncio.rs index fb5ef0b1d205..2ace0f55ce9c 100644 --- a/bindings/python/src/asyncio.rs +++ b/bindings/python/src/asyncio.rs @@ -63,13 +63,11 @@ impl AsyncOperator { }) } - pub fn open_reader<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p PyAny> { - let this = self.0.clone(); - future_into_py(py, async move { - let reader = this.reader(&path).await.map_err(format_pyerr)?; - let pyreader: PyObject = Python::with_gil(|py| AsyncReader::new(reader).into_py(py)); - Ok(pyreader) - }) + pub fn open_reader(&self, path: String) -> PyResult { + Ok(AsyncReader::new(ReaderState::Init { + operator: self.0.clone(), + path, + })) } pub fn write<'p>(&'p self, py: Python<'p>, path: String, bs: Vec) -> PyResult<&'p PyAny> { @@ -104,11 +102,45 @@ impl AsyncOperator { } } +enum ReaderState { + Init { + operator: od::Operator, + path: String, + }, + Open(od::Reader), + Closed, +} + +impl ReaderState { + async fn reader(&mut self) -> PyResult<&mut od::Reader> { + let reader = match self { + ReaderState::Init { operator, path } => { + let reader = operator.reader(path).await.map_err(format_pyerr)?; + *self = ReaderState::Open(reader); + if let ReaderState::Open(ref mut reader) = self { + reader + } else { + unreachable!() + } + } + ReaderState::Open(ref mut reader) => reader, + ReaderState::Closed => { + return Err(PyValueError::new_err("I/O operation on closed file.")); + } + }; + Ok(reader) + } + + fn close(&mut self) { + *self = ReaderState::Closed; + } +} + #[pyclass(module = "opendal")] -pub struct AsyncReader(Arc>); +pub struct AsyncReader(Arc>); impl AsyncReader { - fn new(reader: od::Reader) -> Self { + fn new(reader: ReaderState) -> Self { Self(Arc::new(Mutex::new(reader))) } } @@ -118,7 +150,8 @@ impl AsyncReader { pub fn read<'p>(&'p self, py: Python<'p>, size: Option) -> PyResult<&'p PyAny> { let reader = self.0.clone(); future_into_py(py, async move { - let mut reader = reader.lock().await; + let mut state = reader.lock().await; + let reader = state.reader().await?; let buffer = match size { Some(size) => { let mut buffer = vec![0; size]; @@ -160,7 +193,8 @@ impl AsyncReader { }; let reader = self.0.clone(); future_into_py(py, async move { - let mut reader = reader.lock().await; + let mut state = reader.lock().await; + let reader = state.reader().await?; let ret = reader .seek(whence) .await @@ -172,7 +206,8 @@ impl AsyncReader { pub fn tell<'p>(&'p mut self, py: Python<'p>) -> PyResult<&'p PyAny> { let reader = self.0.clone(); future_into_py(py, async move { - let mut reader = reader.lock().await; + let mut state = reader.lock().await; + let reader = state.reader().await?; let pos = reader .stream_position() .await @@ -180,4 +215,24 @@ impl AsyncReader { Ok(Python::with_gil(|py| pos.into_py(py))) }) } + + fn __aenter__<'a>(slf: PyRef<'a, Self>, py: Python<'a>) -> PyResult<&'a PyAny> { + let slf = slf.into_py(py); + future_into_py(py, async move { Ok(slf) }) + } + + fn __aexit__<'a>( + &self, + py: Python<'a>, + _exc_type: &'a PyAny, + _exc_value: &'a PyAny, + _traceback: &'a PyAny, + ) -> PyResult<&'a PyAny> { + let reader = self.0.clone(); + future_into_py(py, async move { + let mut state = reader.lock().await; + state.close(); + Ok(()) + }) + } } diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index 16fd53bf5270..57da7effd08c 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -331,6 +331,7 @@ fn opendal(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/bindings/python/tests/test_core.py b/bindings/python/tests/test_core.py index da4627280d5d..b0f17bb03e79 100644 --- a/bindings/python/tests/test_core.py +++ b/bindings/python/tests/test_core.py @@ -56,7 +56,7 @@ async def test_async(): assert meta.content_length == 13, meta.content_length assert meta.mode.is_file() - reader = await op.open_reader("test") + reader = op.open_reader("test") bs = await reader.read(5) assert bs == b"Hello", bs bs = await reader.read() @@ -64,6 +64,9 @@ async def test_async(): await reader.seek(0, os.SEEK_SET) bs = await reader.read() assert bs == b"Hello, World!", bs + async with op.open_reader("test") as f: + bs = await f.read() + assert bs == b"Hello, World!", bs await op.delete("test")