From 0c39d5d497c39bf1280dfdf15307af8eba2a6bd0 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Mon, 22 Jul 2024 22:00:15 -0400 Subject: [PATCH 1/5] Read from Arrow C Data interface --- vegafusion-common/src/data/ffi.rs | 62 +++++++++++++++++++++++++++++ vegafusion-common/src/data/mod.rs | 3 ++ vegafusion-common/src/data/table.rs | 10 ++++- vegafusion-python-embed/src/lib.rs | 4 ++ 4 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 vegafusion-common/src/data/ffi.rs diff --git a/vegafusion-common/src/data/ffi.rs b/vegafusion-common/src/data/ffi.rs new file mode 100644 index 000000000..0ef3a5e48 --- /dev/null +++ b/vegafusion-common/src/data/ffi.rs @@ -0,0 +1,62 @@ +use arrow::array::{RecordBatch, RecordBatchReader}; +use arrow::datatypes::SchemaRef; +use arrow::ffi_stream::ArrowArrayStreamReader; +use arrow::ffi_stream::FFI_ArrowArrayStream; +use pyo3::exceptions::{PyTypeError, PyValueError}; +use pyo3::prelude::*; +use pyo3::types::PyCapsule; + +/// Validate PyCapsule has provided name +fn validate_pycapsule_name(capsule: &PyCapsule, expected_name: &str) -> PyResult<()> { + let capsule_name = capsule.name()?; + if let Some(capsule_name) = capsule_name { + let capsule_name = capsule_name.to_str()?; + if capsule_name != expected_name { + return Err(PyValueError::new_err(format!( + "Expected name '{}' in PyCapsule, instead got '{}'", + expected_name, capsule_name + ))); + } + } else { + return Err(PyValueError::new_err( + "Expected schema PyCapsule to have name set.", + )); + } + + Ok(()) +} + +/// Import `__arrow_c_stream__` across Python boundary. +fn call_arrow_c_stream(ob: &'_ PyAny) -> PyResult<&'_ PyCapsule> { + if !ob.hasattr("__arrow_c_stream__")? { + return Err(PyValueError::new_err( + "Expected an object with dunder __arrow_c_stream__", + )); + } + + let capsule = ob.getattr("__arrow_c_stream__")?.call0()?.downcast()?; + Ok(capsule) +} + +fn import_stream_pycapsule(capsule: &PyCapsule) -> PyResult { + validate_pycapsule_name(capsule, "arrow_array_stream")?; + + let stream = unsafe { FFI_ArrowArrayStream::from_raw(capsule.pointer() as _) }; + Ok(stream) +} + +pub(crate) fn import_arrow_c_stream(ob: &'_ PyAny) -> PyResult<(Vec, SchemaRef)> { + let capsule = call_arrow_c_stream(ob)?; + let stream = import_stream_pycapsule(capsule)?; + let stream_reader = ArrowArrayStreamReader::try_new(stream) + .map_err(|err| PyValueError::new_err(err.to_string()))?; + let schema = stream_reader.schema(); + + let mut batches = vec![]; + for batch in stream_reader { + let batch = batch.map_err(|err| PyTypeError::new_err(err.to_string()))?; + batches.push(batch); + } + + Ok((batches, schema)) +} diff --git a/vegafusion-common/src/data/mod.rs b/vegafusion-common/src/data/mod.rs index cbbc7c93e..71c6d3b21 100644 --- a/vegafusion-common/src/data/mod.rs +++ b/vegafusion-common/src/data/mod.rs @@ -1,5 +1,8 @@ pub mod table; +#[cfg(feature = "pyarrow")] +mod ffi; + #[cfg(feature = "json")] pub mod json_writer; pub mod scalar; diff --git a/vegafusion-common/src/data/table.rs b/vegafusion-common/src/data/table.rs index 50568364b..6d48f876f 100644 --- a/vegafusion-common/src/data/table.rs +++ b/vegafusion-common/src/data/table.rs @@ -8,6 +8,8 @@ use arrow::{ record_batch::RecordBatch, }; +#[cfg(feature = "pyarrow")] +use crate::data::ffi::import_arrow_c_stream; use crate::{ data::{ORDER_COL, ORDER_COL_DTYPE}, error::{Result, ResultWithContext, VegaFusionError}, @@ -36,7 +38,7 @@ use { pyo3::{ prelude::PyModule, types::{PyList, PyTuple}, - PyAny, PyErr, PyObject, Python, + PyAny, PyErr, PyObject, PyResult, Python, }, }; @@ -267,6 +269,12 @@ impl VegaFusionTable { } } + #[cfg(feature = "pyarrow")] + pub fn from_arrow_c_stream(ob: &PyAny) -> PyResult { + let (batches, schema) = import_arrow_c_stream(ob)?; + Ok(VegaFusionTable::try_new(schema, batches)?) + } + #[cfg(feature = "pyarrow")] pub fn from_pyarrow(py: Python, pyarrow_table: &PyAny) -> std::result::Result { // Extract table.schema as a Rust Schema diff --git a/vegafusion-python-embed/src/lib.rs b/vegafusion-python-embed/src/lib.rs index f64ff5700..1b21e5e97 100644 --- a/vegafusion-python-embed/src/lib.rs +++ b/vegafusion-python-embed/src/lib.rs @@ -186,6 +186,10 @@ impl PyVegaFusionRuntime { .scan_py_datasource(inline_dataset.to_object(py)), )?; VegaFusionDataset::DataFrame(df) + } else if inline_dataset.hasattr("__arrow_c_stream__")? { + // Import via Arrow PyCapsule Interface + let table = VegaFusionTable::from_arrow_c_stream(inline_dataset)?; + VegaFusionDataset::from_table_ipc_bytes(&table.to_ipc_bytes()?)? } else { // Assume PyArrow Table // We convert to ipc bytes for two reasons: From 3db45e071407ad4b72557b7a62eacbea686deb04 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Tue, 23 Jul 2024 12:26:30 -0400 Subject: [PATCH 2/5] add comment --- vegafusion-common/src/data/table.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/vegafusion-common/src/data/table.rs b/vegafusion-common/src/data/table.rs index 6d48f876f..9b5ab82a0 100644 --- a/vegafusion-common/src/data/table.rs +++ b/vegafusion-common/src/data/table.rs @@ -269,6 +269,11 @@ impl VegaFusionTable { } } + // TODO: when updated to latest arrow version, the below function can change to + // pub fn from_arrow_c_stream(table: pyo3_arrow::PyTable) -> PyResult { + // let (batches, schema) = table.into_inner(); + // Ok(VegaFusionTable::try_new(schema, batches)?) + // } #[cfg(feature = "pyarrow")] pub fn from_arrow_c_stream(ob: &PyAny) -> PyResult { let (batches, schema) = import_arrow_c_stream(ob)?; From 98aae9c641a4437105bc79fcfa9cf0f7221ac16b Mon Sep 17 00:00:00 2001 From: Jon Mease Date: Wed, 14 Aug 2024 20:16:50 -0400 Subject: [PATCH 3/5] Finish merge --- python/vegafusion/vegafusion/runtime.py | 2 + vegafusion-common/Cargo.toml | 6 ++- vegafusion-common/src/data/ffi.rs | 62 ------------------------- vegafusion-common/src/data/mod.rs | 3 -- vegafusion-common/src/data/table.rs | 13 ++---- 5 files changed, 11 insertions(+), 75 deletions(-) delete mode 100644 vegafusion-common/src/data/ffi.rs diff --git a/python/vegafusion/vegafusion/runtime.py b/python/vegafusion/vegafusion/runtime.py index e04d694bf..67d59f957 100644 --- a/python/vegafusion/vegafusion/runtime.py +++ b/python/vegafusion/vegafusion/runtime.py @@ -215,6 +215,8 @@ def _import_or_register_inline_datasets(self, inline_datasets=None): pass imported_inline_datasets[name] = PandasDatasource(value) + elif hasattr(value, "__arrow_c_stream__"): + imported_inline_datasets[name] = value elif hasattr(value, "__dataframe__"): # Let polars convert to pyarrow since it has broader support than the raw dataframe interchange # protocol, and "This operation is mostly zero copy." diff --git a/vegafusion-common/Cargo.toml b/vegafusion-common/Cargo.toml index 56645cfa2..87492e972 100644 --- a/vegafusion-common/Cargo.toml +++ b/vegafusion-common/Cargo.toml @@ -6,7 +6,7 @@ description = "Common components required by multiple VegaFusion crates" license = "BSD-3-Clause" [features] -pyarrow = [ "pyo3", "arrow/pyarrow",] +pyarrow = [ "pyo3", "arrow/pyarrow", "pyo3-arrow"] json = [ "serde_json/preserve_order", "arrow/json", "chrono",] prettyprint = [ "arrow/prettyprint",] proto = ["datafusion-proto", "datafusion-proto-common"] @@ -52,6 +52,10 @@ optional = true workspace = true optional = true +[dependencies.pyo3-arrow] +workspace = true +optional = true + [dependencies.jni] version = "0.21.1" optional = true diff --git a/vegafusion-common/src/data/ffi.rs b/vegafusion-common/src/data/ffi.rs deleted file mode 100644 index 0ef3a5e48..000000000 --- a/vegafusion-common/src/data/ffi.rs +++ /dev/null @@ -1,62 +0,0 @@ -use arrow::array::{RecordBatch, RecordBatchReader}; -use arrow::datatypes::SchemaRef; -use arrow::ffi_stream::ArrowArrayStreamReader; -use arrow::ffi_stream::FFI_ArrowArrayStream; -use pyo3::exceptions::{PyTypeError, PyValueError}; -use pyo3::prelude::*; -use pyo3::types::PyCapsule; - -/// Validate PyCapsule has provided name -fn validate_pycapsule_name(capsule: &PyCapsule, expected_name: &str) -> PyResult<()> { - let capsule_name = capsule.name()?; - if let Some(capsule_name) = capsule_name { - let capsule_name = capsule_name.to_str()?; - if capsule_name != expected_name { - return Err(PyValueError::new_err(format!( - "Expected name '{}' in PyCapsule, instead got '{}'", - expected_name, capsule_name - ))); - } - } else { - return Err(PyValueError::new_err( - "Expected schema PyCapsule to have name set.", - )); - } - - Ok(()) -} - -/// Import `__arrow_c_stream__` across Python boundary. -fn call_arrow_c_stream(ob: &'_ PyAny) -> PyResult<&'_ PyCapsule> { - if !ob.hasattr("__arrow_c_stream__")? { - return Err(PyValueError::new_err( - "Expected an object with dunder __arrow_c_stream__", - )); - } - - let capsule = ob.getattr("__arrow_c_stream__")?.call0()?.downcast()?; - Ok(capsule) -} - -fn import_stream_pycapsule(capsule: &PyCapsule) -> PyResult { - validate_pycapsule_name(capsule, "arrow_array_stream")?; - - let stream = unsafe { FFI_ArrowArrayStream::from_raw(capsule.pointer() as _) }; - Ok(stream) -} - -pub(crate) fn import_arrow_c_stream(ob: &'_ PyAny) -> PyResult<(Vec, SchemaRef)> { - let capsule = call_arrow_c_stream(ob)?; - let stream = import_stream_pycapsule(capsule)?; - let stream_reader = ArrowArrayStreamReader::try_new(stream) - .map_err(|err| PyValueError::new_err(err.to_string()))?; - let schema = stream_reader.schema(); - - let mut batches = vec![]; - for batch in stream_reader { - let batch = batch.map_err(|err| PyTypeError::new_err(err.to_string()))?; - batches.push(batch); - } - - Ok((batches, schema)) -} diff --git a/vegafusion-common/src/data/mod.rs b/vegafusion-common/src/data/mod.rs index 71c6d3b21..cbbc7c93e 100644 --- a/vegafusion-common/src/data/mod.rs +++ b/vegafusion-common/src/data/mod.rs @@ -1,8 +1,5 @@ pub mod table; -#[cfg(feature = "pyarrow")] -mod ffi; - #[cfg(feature = "json")] pub mod json_writer; pub mod scalar; diff --git a/vegafusion-common/src/data/table.rs b/vegafusion-common/src/data/table.rs index 173389a8d..4aee9d4b6 100644 --- a/vegafusion-common/src/data/table.rs +++ b/vegafusion-common/src/data/table.rs @@ -8,8 +8,6 @@ use arrow::{ record_batch::RecordBatch, }; -#[cfg(feature = "pyarrow")] -use crate::data::ffi::import_arrow_c_stream; use crate::{ data::{ORDER_COL, ORDER_COL_DTYPE}, error::{Result, ResultWithContext, VegaFusionError}, @@ -45,6 +43,7 @@ use { #[cfg(feature = "base64")] use base64::{engine::general_purpose, Engine as _}; use datafusion_common::utils::array_into_list_array; +use pyo3::conversion::FromPyObjectBound; #[derive(Clone, Debug)] pub struct VegaFusionTable { @@ -273,14 +272,10 @@ impl VegaFusionTable { } } - // TODO: when updated to latest arrow version, the below function can change to - // pub fn from_arrow_c_stream(table: pyo3_arrow::PyTable) -> PyResult { - // let (batches, schema) = table.into_inner(); - // Ok(VegaFusionTable::try_new(schema, batches)?) - // } #[cfg(feature = "pyarrow")] - pub fn from_arrow_c_stream(ob: &PyAny) -> PyResult { - let (batches, schema) = import_arrow_c_stream(ob)?; + pub fn from_arrow_c_stream(table: &Bound) -> PyResult { + let pytable = pyo3_arrow::PyTable::from_py_object_bound(table.as_borrowed())?; + let (batches, schema) = pytable.into_inner(); Ok(VegaFusionTable::try_new(schema, batches)?) } From d690caab2bedd52021e3a500c8114506e74ae18f Mon Sep 17 00:00:00 2001 From: Jon Mease Date: Wed, 14 Aug 2024 20:21:21 -0400 Subject: [PATCH 4/5] update Cargo.toml --- Cargo.lock | 73 ++++++++++++++++++++++++++++- Cargo.toml | 1 + vegafusion-common/src/data/table.rs | 2 +- 3 files changed, 73 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1c41afa48..29cd1bfd5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2673,6 +2673,16 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "matrixmultiply" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9380b911e3e96d10c1f415da0876389aaf1b56759054eeb0de7df940c456ba1a" +dependencies = [ + "autocfg", + "rawpointer", +] + [[package]] name = "md-5" version = "0.10.6" @@ -2759,6 +2769,19 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "ndarray" +version = "0.15.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adb12d4e967ec485a5f71c6311fe28158e9d6f4bc4a447b474184d0f91a8fa32" +dependencies = [ + "matrixmultiply", + "num-complex", + "num-integer", + "num-traits", + "rawpointer", +] + [[package]] name = "normalize-line-endings" version = "0.3.0" @@ -2862,6 +2885,22 @@ dependencies = [ "libc", ] +[[package]] +name = "numpy" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec170733ca37175f5d75a5bea5911d6ff45d2cd52849ce98b685394e4f2f37f4" +dependencies = [ + "half", + "libc", + "ndarray", + "num-complex", + "num-integer", + "num-traits", + "pyo3", + "rustc-hash 1.1.0", +] + [[package]] name = "object" version = "0.32.2" @@ -3362,6 +3401,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a5e00b96a521718e08e03b1a622f01c8a8deb50719335de3f60b3b3950f069d8" dependencies = [ "cfg-if", + "indexmap 2.2.3", "indoc", "libc", "memoffset", @@ -3373,6 +3413,22 @@ dependencies = [ "unindent", ] +[[package]] +name = "pyo3-arrow" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bc9742f9022bfbeb9c82f4d3d6437dea55aff2d885950c813084ac712569d3a" +dependencies = [ + "arrow", + "arrow-array", + "arrow-buffer", + "arrow-schema", + "indexmap 2.2.3", + "numpy", + "pyo3", + "thiserror", +] + [[package]] name = "pyo3-build-config" version = "0.21.2" @@ -3448,7 +3504,7 @@ dependencies = [ "pin-project-lite", "quinn-proto", "quinn-udp", - "rustc-hash", + "rustc-hash 2.0.0", "rustls 0.23.12", "socket2 0.5.6", "thiserror", @@ -3465,7 +3521,7 @@ dependencies = [ "bytes", "rand", "ring", - "rustc-hash", + "rustc-hash 2.0.0", "rustls 0.23.12", "slab", "thiserror", @@ -3525,6 +3581,12 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rawpointer" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a357793950651c4ed0f3f52338f53b2f809f32d83a07f72909fa13e4c6c1e3" + [[package]] name = "rayon" version = "1.8.1" @@ -3805,6 +3867,12 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc-hash" version = "2.0.0" @@ -4859,6 +4927,7 @@ dependencies = [ "jni", "object_store", "pyo3", + "pyo3-arrow", "serde_json", "sqlparser", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index 8b9ba4068..75e4bc74b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ chrono-tz = {version = "0.9.0", features=["case-insensitive", "filter-by-regex"] reqwest = { version = "0.11.22", default-features = false } tokio = { version = "1.36.0" } pyo3 = { version = "0.21.1" } +pyo3-arrow = { version = "0.2.0" } pythonize = { version = "0.21.1" } prost = { version = "0.12.3" } prost-types = { version = "0.12.3" } diff --git a/vegafusion-common/src/data/table.rs b/vegafusion-common/src/data/table.rs index 4aee9d4b6..4087414db 100644 --- a/vegafusion-common/src/data/table.rs +++ b/vegafusion-common/src/data/table.rs @@ -36,6 +36,7 @@ use { pyo3::{ prelude::*, types::{PyList, PyTuple}, + conversion::FromPyObjectBound, Bound, PyAny, PyErr, PyObject, PyResult, Python, }, }; @@ -43,7 +44,6 @@ use { #[cfg(feature = "base64")] use base64::{engine::general_purpose, Engine as _}; use datafusion_common::utils::array_into_list_array; -use pyo3::conversion::FromPyObjectBound; #[derive(Clone, Debug)] pub struct VegaFusionTable { From 26340fa25469d886f8810bd1fd72e1a1ec111ba9 Mon Sep 17 00:00:00 2001 From: Jon Mease Date: Wed, 14 Aug 2024 20:23:43 -0400 Subject: [PATCH 5/5] fmt --- vegafusion-common/src/data/table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vegafusion-common/src/data/table.rs b/vegafusion-common/src/data/table.rs index 4087414db..aa0cc68f5 100644 --- a/vegafusion-common/src/data/table.rs +++ b/vegafusion-common/src/data/table.rs @@ -34,9 +34,9 @@ use { use { arrow::pyarrow::{FromPyArrow, ToPyArrow}, pyo3::{ + conversion::FromPyObjectBound, prelude::*, types::{PyList, PyTuple}, - conversion::FromPyObjectBound, Bound, PyAny, PyErr, PyObject, PyResult, Python, }, };