From 440bcc3eee9564e98495e201f37dc359eeb064b4 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 7 Jan 2025 10:14:35 -0500 Subject: [PATCH 01/12] Redue memory usage of scan_csv of a BytesIO. --- crates/polars-python/src/file.rs | 9 ++++++++- py-polars/tests/unit/io/test_scan.py | 27 +++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/crates/polars-python/src/file.rs b/crates/polars-python/src/file.rs index 741c5c695152..9e37ddce4d9d 100644 --- a/crates/polars-python/src/file.rs +++ b/crates/polars-python/src/file.rs @@ -45,6 +45,11 @@ impl PyFileLikeObject { pub fn to_memslice(&self) -> MemSlice { Python::with_gil(|py| { + let py_f = self.inner.bind(py); + if let Ok(bytes) = read_if_bytesio(py_f.clone()).downcast::() { + return MemSlice::from_arc(bytes.as_bytes(), Arc::new(py_f.clone().unbind())); + } + let bytes = self .inner .call_method(py, "read", (), None) @@ -373,9 +378,11 @@ pub fn get_file_like(f: PyObject, truncate: bool) -> PyResult> Ok(get_either_file(f, truncate)?.into_dyn()) } -/// If the give file-like is a BytesIO, read its contents. +/// If the give file-like is a BytesIO, read its contents efficiently. fn read_if_bytesio(py_f: Bound) -> Bound { if py_f.getattr("read").is_ok() { + // Note that BytesIO has some memory optimizations so much of the time + // getvalue() doesn't need to copy of the underlying data: let Ok(bytes) = py_f.call_method0("getvalue") else { return py_f; }; diff --git a/py-polars/tests/unit/io/test_scan.py b/py-polars/tests/unit/io/test_scan.py index 569c27260513..6e3cf08de2ee 100644 --- a/py-polars/tests/unit/io/test_scan.py +++ b/py-polars/tests/unit/io/test_scan.py @@ -16,6 +16,7 @@ if TYPE_CHECKING: from polars._typing import SchemaDict + from tests.unit.conftest import MemoryUsage @dataclass @@ -929,3 +930,29 @@ def test_predicate_stats_eval_nested_binary() -> None: ), pl.DataFrame({"x": [2]}), ) + + +@pytest.mark.parametrize("streaming", [True, False]) +def test_scan_csv_bytesio_memory_usage( + streaming: bool, + memory_usage_without_pyarrow: MemoryUsage, +) -> None: + memory_usage = memory_usage_without_pyarrow + + # Create CSV that is ~70-85 MB in size: + f = io.BytesIO() + df = pl.DataFrame({"mydata": pl.int_range(0, 10_000_000, eager=True)}) + df.write_csv(f) + assert 70_000_000 < f.tell() < 85_000_000 + f.seek(0, 0) + + # A lazy scan shouldn't make a full copy of the data: + starting_memory = memory_usage.get_current() + assert ( + pl.scan_csv(f) + .filter(pl.col("mydata") == 9_999_999) + .collect(new_streaming=streaming) + .item() + == 9_999_999 + ) + assert memory_usage.get_peak() - starting_memory < 10_000_000 From 266cae35b4a225bba59cc703aeb5fe0482cc79eb Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 7 Jan 2025 14:28:21 -0500 Subject: [PATCH 02/12] More accurate tracking --- py-polars/src/memory.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/py-polars/src/memory.rs b/py-polars/src/memory.rs index e7abf2b7d51c..70b0ceb8ac8c 100644 --- a/py-polars/src/memory.rs +++ b/py-polars/src/memory.rs @@ -69,6 +69,7 @@ unsafe impl GlobalAlloc for TracemallocAllocator { } unsafe fn realloc(&self, ptr: *mut u8, layout: std::alloc::Layout, new_size: usize) -> *mut u8 { + PyTraceMalloc_Untrack(TRACEMALLOC_DOMAIN, ptr as uintptr_t); let result = self.wrapped_alloc.realloc(ptr, layout, new_size); PyTraceMalloc_Track(TRACEMALLOC_DOMAIN, result as uintptr_t, new_size); result From 046eb93a4698109e394a14c51a5f68466c88e484 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 7 Jan 2025 14:28:31 -0500 Subject: [PATCH 03/12] Hold on to the direct object. --- crates/polars-python/src/file.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/polars-python/src/file.rs b/crates/polars-python/src/file.rs index 9e37ddce4d9d..8b71170525af 100644 --- a/crates/polars-python/src/file.rs +++ b/crates/polars-python/src/file.rs @@ -47,7 +47,7 @@ impl PyFileLikeObject { Python::with_gil(|py| { let py_f = self.inner.bind(py); if let Ok(bytes) = read_if_bytesio(py_f.clone()).downcast::() { - return MemSlice::from_arc(bytes.as_bytes(), Arc::new(py_f.clone().unbind())); + return MemSlice::from_arc(bytes.as_bytes(), Arc::new(bytes.clone().unbind().clone_ref(py))); } let bytes = self From 225be092786a1ce62ab12d73df854751c0006f31 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 9 Jan 2025 10:43:36 -0500 Subject: [PATCH 04/12] Workaround for segfault likely caused by CPython --- py-polars/tests/unit/conftest.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/py-polars/tests/unit/conftest.py b/py-polars/tests/unit/conftest.py index 8197872831b5..c7f05a4e281f 100644 --- a/py-polars/tests/unit/conftest.py +++ b/py-polars/tests/unit/conftest.py @@ -5,6 +5,7 @@ import random import string import sys +import time import tracemalloc from typing import TYPE_CHECKING, Any, cast @@ -205,7 +206,11 @@ def get_peak(self) -> int: return tracemalloc.get_traced_memory()[1] -@pytest.fixture +# The bizarre syntax is from +# https://github.com/pytest-dev/pytest/issues/1368#issuecomment-2344450259 - we +# need to mark any test using this fixture as slow because we have a sleep +# added to work around a CPython bug, see the end of the function. +@pytest.fixture(params=[pytest.param(0, marks=pytest.mark.slow)]) def memory_usage_without_pyarrow() -> Generator[MemoryUsage, Any, Any]: """ Provide an API for measuring peak memory usage. @@ -233,6 +238,9 @@ def memory_usage_without_pyarrow() -> Generator[MemoryUsage, Any, Any]: finally: tracemalloc.stop() + # Workaround for https://github.com/python/cpython/issues/128679 + time.sleep(1) + @pytest.fixture(params=[True, False]) def test_global_and_local( From a9cdbad55d755db85ee76a2a75f63e7436738ffc Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 9 Jan 2025 10:45:18 -0500 Subject: [PATCH 05/12] Document motivation. --- crates/polars-python/src/file.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/polars-python/src/file.rs b/crates/polars-python/src/file.rs index 8b71170525af..e902fdc00192 100644 --- a/crates/polars-python/src/file.rs +++ b/crates/polars-python/src/file.rs @@ -45,6 +45,10 @@ impl PyFileLikeObject { pub fn to_memslice(&self) -> MemSlice { Python::with_gil(|py| { + // CPython has some internal tricks that means much of the time + // BytesIO.getvalue() involves no memory copying, unlike + // BytesIO.read(). So we want to handle BytesIO specially in order + // to save memory. let py_f = self.inner.bind(py); if let Ok(bytes) = read_if_bytesio(py_f.clone()).downcast::() { return MemSlice::from_arc(bytes.as_bytes(), Arc::new(bytes.clone().unbind().clone_ref(py))); @@ -378,7 +382,8 @@ pub fn get_file_like(f: PyObject, truncate: bool) -> PyResult> Ok(get_either_file(f, truncate)?.into_dyn()) } -/// If the give file-like is a BytesIO, read its contents efficiently. +/// If the give file-like is a BytesIO, read its contents in a memory-efficient +/// way. fn read_if_bytesio(py_f: Bound) -> Bound { if py_f.getattr("read").is_ok() { // Note that BytesIO has some memory optimizations so much of the time From 86aaac2faac1bf73640b7df00d8312c1e68b74cd Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 9 Jan 2025 10:46:47 -0500 Subject: [PATCH 06/12] It's always slow --- py-polars/tests/unit/io/test_scan.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/py-polars/tests/unit/io/test_scan.py b/py-polars/tests/unit/io/test_scan.py index 6e3cf08de2ee..2fbfc632d1e3 100644 --- a/py-polars/tests/unit/io/test_scan.py +++ b/py-polars/tests/unit/io/test_scan.py @@ -925,13 +925,15 @@ def test_predicate_stats_eval_nested_binary() -> None: pl.scan_parquet(bufs) # The literal eval depth limit is 4 - # * crates/polars-expr/src/expressions/mod.rs::PhysicalExpr::evaluate_inline - .filter(pl.col("x") == pl.lit("222").str.slice(0, 1).cast(pl.Int64)) - .collect() + .filter( + pl.col("x") == pl.lit("222").str.slice(0, 1).cast(pl.Int64) + ).collect() ), pl.DataFrame({"x": [2]}), ) +@pytest.mark.slow @pytest.mark.parametrize("streaming", [True, False]) def test_scan_csv_bytesio_memory_usage( streaming: bool, From f9a91160da1393fb0b6b6149400d4bc83750d851 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 9 Jan 2025 10:47:53 -0500 Subject: [PATCH 07/12] Clarify --- crates/polars-python/src/file.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/polars-python/src/file.rs b/crates/polars-python/src/file.rs index e902fdc00192..caf8ca5b0007 100644 --- a/crates/polars-python/src/file.rs +++ b/crates/polars-python/src/file.rs @@ -386,8 +386,8 @@ pub fn get_file_like(f: PyObject, truncate: bool) -> PyResult> /// way. fn read_if_bytesio(py_f: Bound) -> Bound { if py_f.getattr("read").is_ok() { - // Note that BytesIO has some memory optimizations so much of the time - // getvalue() doesn't need to copy of the underlying data: + // Note that BytesIO has some memory optimizations ensuring that much of + // the time getvalue() doesn't need to copy the underlying data: let Ok(bytes) = py_f.call_method0("getvalue") else { return py_f; }; From ec4dfbac112cbbff28069d490ac976245d081e86 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 9 Jan 2025 10:49:36 -0500 Subject: [PATCH 08/12] Lints --- crates/polars-python/src/file.rs | 5 ++++- py-polars/tests/unit/io/test_scan.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/polars-python/src/file.rs b/crates/polars-python/src/file.rs index caf8ca5b0007..f454467602a6 100644 --- a/crates/polars-python/src/file.rs +++ b/crates/polars-python/src/file.rs @@ -51,7 +51,10 @@ impl PyFileLikeObject { // to save memory. let py_f = self.inner.bind(py); if let Ok(bytes) = read_if_bytesio(py_f.clone()).downcast::() { - return MemSlice::from_arc(bytes.as_bytes(), Arc::new(bytes.clone().unbind().clone_ref(py))); + return MemSlice::from_arc( + bytes.as_bytes(), + Arc::new(bytes.clone().unbind().clone_ref(py)), + ); } let bytes = self diff --git a/py-polars/tests/unit/io/test_scan.py b/py-polars/tests/unit/io/test_scan.py index 2fbfc632d1e3..df21d799b9a5 100644 --- a/py-polars/tests/unit/io/test_scan.py +++ b/py-polars/tests/unit/io/test_scan.py @@ -953,7 +953,7 @@ def test_scan_csv_bytesio_memory_usage( assert ( pl.scan_csv(f) .filter(pl.col("mydata") == 9_999_999) - .collect(new_streaming=streaming) + .collect(new_streaming=streaming) # type: ignore[call-overload] .item() == 9_999_999 ) From cfc5b6954f5944104306ce0d103170090dcd960e Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 9 Jan 2025 10:51:07 -0500 Subject: [PATCH 09/12] Put sleep in correct location --- py-polars/tests/unit/conftest.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/py-polars/tests/unit/conftest.py b/py-polars/tests/unit/conftest.py index c7f05a4e281f..11af90824728 100644 --- a/py-polars/tests/unit/conftest.py +++ b/py-polars/tests/unit/conftest.py @@ -236,10 +236,11 @@ def memory_usage_without_pyarrow() -> Generator[MemoryUsage, Any, Any]: try: yield MemoryUsage() finally: - tracemalloc.stop() + # Workaround for https://github.com/python/cpython/issues/128679 + time.sleep(1) + gc.collect() - # Workaround for https://github.com/python/cpython/issues/128679 - time.sleep(1) + tracemalloc.stop() @pytest.fixture(params=[True, False]) From fa57f53e1e07438a0fd3a25994fcbb2f27582e57 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 9 Jan 2025 11:05:11 -0500 Subject: [PATCH 10/12] Reformat --- py-polars/tests/unit/io/test_scan.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/py-polars/tests/unit/io/test_scan.py b/py-polars/tests/unit/io/test_scan.py index df21d799b9a5..4f19698488f2 100644 --- a/py-polars/tests/unit/io/test_scan.py +++ b/py-polars/tests/unit/io/test_scan.py @@ -925,9 +925,8 @@ def test_predicate_stats_eval_nested_binary() -> None: pl.scan_parquet(bufs) # The literal eval depth limit is 4 - # * crates/polars-expr/src/expressions/mod.rs::PhysicalExpr::evaluate_inline - .filter( - pl.col("x") == pl.lit("222").str.slice(0, 1).cast(pl.Int64) - ).collect() + .filter(pl.col("x") == pl.lit("222").str.slice(0, 1).cast(pl.Int64)) + .collect() ), pl.DataFrame({"x": [2]}), ) From 437e5028467ec1e7f0c91fd2b324c30125fff668 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Fri, 10 Jan 2025 10:39:03 -0500 Subject: [PATCH 11/12] Handle only BytesIO. Pretty sure previously, if StringIO had reached this point (which it wouldn't in practice) you'd end up trying to load a _path_ matching the StringIO's contents... --- crates/polars-python/src/file.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/polars-python/src/file.rs b/crates/polars-python/src/file.rs index f454467602a6..94339a6e279c 100644 --- a/crates/polars-python/src/file.rs +++ b/crates/polars-python/src/file.rs @@ -388,15 +388,14 @@ pub fn get_file_like(f: PyObject, truncate: bool) -> PyResult> /// If the give file-like is a BytesIO, read its contents in a memory-efficient /// way. fn read_if_bytesio(py_f: Bound) -> Bound { - if py_f.getattr("read").is_ok() { + let bytes_io = py_f.py().import("io").unwrap().getattr("BytesIO").unwrap(); + if py_f.is_instance(&bytes_io).unwrap() { // Note that BytesIO has some memory optimizations ensuring that much of // the time getvalue() doesn't need to copy the underlying data: let Ok(bytes) = py_f.call_method0("getvalue") else { return py_f; }; - if bytes.downcast::().is_ok() || bytes.downcast::().is_ok() { - return bytes.clone(); - } + return bytes; } py_f } From b422136a6ac148c6ecc386c5200ae4348351ed90 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Fri, 10 Jan 2025 10:39:43 -0500 Subject: [PATCH 12/12] Handle BytesIO in better location for scan_csv and friends. --- crates/polars-python/src/file.rs | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/crates/polars-python/src/file.rs b/crates/polars-python/src/file.rs index 94339a6e279c..e48111077619 100644 --- a/crates/polars-python/src/file.rs +++ b/crates/polars-python/src/file.rs @@ -45,18 +45,6 @@ impl PyFileLikeObject { pub fn to_memslice(&self) -> MemSlice { Python::with_gil(|py| { - // CPython has some internal tricks that means much of the time - // BytesIO.getvalue() involves no memory copying, unlike - // BytesIO.read(). So we want to handle BytesIO specially in order - // to save memory. - let py_f = self.inner.bind(py); - if let Ok(bytes) = read_if_bytesio(py_f.clone()).downcast::() { - return MemSlice::from_arc( - bytes.as_bytes(), - Arc::new(bytes.clone().unbind().clone_ref(py)), - ); - } - let bytes = self .inner .call_method(py, "read", (), None) @@ -334,14 +322,20 @@ pub fn get_python_scan_source_input( write: bool, ) -> PyResult { Python::with_gil(|py| { - let py_f_0 = py_f; - let py_f = py_f_0.clone_ref(py).into_bound(py); + let py_f = py_f.into_bound(py); + + // CPython has some internal tricks that means much of the time + // BytesIO.getvalue() involves no memory copying, unlike + // BytesIO.read(). So we want to handle BytesIO specially in order + // to save memory. + let py_f = read_if_bytesio(py_f); // If the pyobject is a `bytes` class if let Ok(b) = py_f.downcast::() { return Ok(PythonScanSourceInput::Buffer(MemSlice::from_arc( b.as_bytes(), - Arc::new(py_f_0), + // We want to specifically keep alive the PyBytes object. + Arc::new(b.clone().unbind()), ))); }