Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PyO3 bridge for pyarrow interoperability / fix arrow integration test #691

Merged
merged 8 commits into from
Sep 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,19 @@ jobs:
with:
python-version: '3.7'
- name: Upgrade pip and setuptools
run: pip install --upgrade pip setuptools wheel
- name: Install python dependencies
run: pip install maturin==0.8.2 toml==0.10.1 pytest pytz
- name: Install nightly pyarrow wheel
# this points to a nightly pyarrow build containing neccessary
# API for integration testing (https://github.com/apache/arrow/pull/10529)
# the hardcoded version is wrong and should be removed either
# after https://issues.apache.org/jira/browse/ARROW-13083
# gets fixes or pyarrow 5.0 gets released
hardcoded version is wrong, bot contains
run: pip install --index-url https://pypi.fury.io/arrow-nightlies/ pyarrow==3.1.0.dev1030
run: pip install --upgrade pip setuptools wheel virtualenv
- name: Create virtualenv and install dependencies
run: |
virtualenv venv
source venv/bin/activate
pip install maturin toml pytest pytz pyarrow>=5.0
- name: Run tests
env:
CARGO_HOME: "/home/runner/.cargo"
CARGO_TARGET_DIR: "/home/runner/target"
working-directory: arrow-pyarrow-integration-testing
run: |
source venv/bin/activate
pushd arrow-pyarrow-integration-testing
maturin develop
pytest -v .
popd
4 changes: 2 additions & 2 deletions arrow-pyarrow-integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ name = "arrow_pyarrow_integration_testing"
crate-type = ["cdylib"]

[dependencies]
arrow = { path = "../arrow", version = "6.0.0-SNAPSHOT" }
pyo3 = { version = "0.12.1", features = ["extension-module"] }
arrow = { path = "../arrow", version = "6.0.0-SNAPSHOT", features = ["pyarrow"] }
pyo3 = { version = "0.14", features = ["extension-module"] }

[package.metadata.maturin]
requires-dist = ["pyarrow>=1"]
260 changes: 50 additions & 210 deletions arrow-pyarrow-integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,269 +18,109 @@
//! This library demonstrates a minimal usage of Rust's C data interface to pass
//! arrays from and to Python.

use std::convert::TryFrom;
use std::error;
use std::fmt;
use std::sync::Arc;

use pyo3::exceptions::PyOSError;
use pyo3::prelude::*;
use pyo3::wrap_pyfunction;
use pyo3::{libc::uintptr_t, prelude::*};

use arrow::array::{make_array_from_raw, ArrayRef, Int64Array};
use arrow::array::{ArrayData, ArrayRef, Int64Array};
use arrow::compute::kernels;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::error::ArrowError;
use arrow::ffi;
use arrow::ffi::FFI_ArrowSchema;

/// an error that bridges ArrowError with a Python error
#[derive(Debug)]
enum PyO3ArrowError {
ArrowError(ArrowError),
}

impl fmt::Display for PyO3ArrowError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
PyO3ArrowError::ArrowError(ref e) => e.fmt(f),
}
}
}

impl error::Error for PyO3ArrowError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match *self {
// The cause is the underlying implementation error type. Is implicitly
// cast to the trait object `&error::Error`. This works because the
// underlying type already implements the `Error` trait.
PyO3ArrowError::ArrowError(ref e) => Some(e),
}
}
}

impl From<ArrowError> for PyO3ArrowError {
fn from(err: ArrowError) -> PyO3ArrowError {
PyO3ArrowError::ArrowError(err)
}
}

impl From<PyO3ArrowError> for PyErr {
fn from(err: PyO3ArrowError) -> PyErr {
PyOSError::new_err(err.to_string())
}
}

#[pyclass]
struct PyDataType {
inner: DataType,
}

#[pyclass]
struct PyField {
inner: Field,
}

#[pyclass]
struct PySchema {
inner: Schema,
}

#[pymethods]
impl PyDataType {
#[staticmethod]
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
value.call_method1("_export_to_c", (c_schema_ptr as uintptr_t,))?;
let dtype = DataType::try_from(&c_schema).map_err(PyO3ArrowError::from)?;
Ok(Self { inner: dtype })
}

fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let c_schema =
FFI_ArrowSchema::try_from(&self.inner).map_err(PyO3ArrowError::from)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let module = py.import("pyarrow")?;
let class = module.getattr("DataType")?;
let dtype = class.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?;
Ok(dtype.into())
}
}

#[pymethods]
impl PyField {
#[staticmethod]
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
value.call_method1("_export_to_c", (c_schema_ptr as uintptr_t,))?;
let field = Field::try_from(&c_schema).map_err(PyO3ArrowError::from)?;
Ok(Self { inner: field })
}

fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let c_schema =
FFI_ArrowSchema::try_from(&self.inner).map_err(PyO3ArrowError::from)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let module = py.import("pyarrow")?;
let class = module.getattr("Field")?;
let dtype = class.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?;
Ok(dtype.into())
}
}

#[pymethods]
impl PySchema {
#[staticmethod]
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
value.call_method1("_export_to_c", (c_schema_ptr as uintptr_t,))?;
let schema = Schema::try_from(&c_schema).map_err(PyO3ArrowError::from)?;
Ok(Self { inner: schema })
}

fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let c_schema =
FFI_ArrowSchema::try_from(&self.inner).map_err(PyO3ArrowError::from)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let module = py.import("pyarrow")?;
let class = module.getattr("Schema")?;
let schema =
class.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?;
Ok(schema.into())
}
}

impl<'source> FromPyObject<'source> for PyDataType {
fn extract(value: &'source PyAny) -> PyResult<Self> {
PyDataType::from_pyarrow(value)
}
}

impl<'source> FromPyObject<'source> for PyField {
fn extract(value: &'source PyAny) -> PyResult<Self> {
PyField::from_pyarrow(value)
}
}

impl<'source> FromPyObject<'source> for PySchema {
fn extract(value: &'source PyAny) -> PyResult<Self> {
PySchema::from_pyarrow(value)
}
}

fn array_to_rust(ob: PyObject, py: Python) -> PyResult<ArrayRef> {
// prepare a pointer to receive the Array struct
let (array_pointer, schema_pointer) =
ffi::ArrowArray::into_raw(unsafe { ffi::ArrowArray::empty() });

// make the conversion through PyArrow's private API
// this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds
ob.call_method1(
py,
"_export_to_c",
(array_pointer as uintptr_t, schema_pointer as uintptr_t),
)?;

let array = unsafe { make_array_from_raw(array_pointer, schema_pointer) }
.map_err(PyO3ArrowError::from)?;
Ok(array)
}

fn array_to_py(array: ArrayRef, py: Python) -> PyResult<PyObject> {
let (array_pointer, schema_pointer) = array.to_raw().map_err(PyO3ArrowError::from)?;

let pa = py.import("pyarrow")?;

let array = pa.getattr("Array")?.call_method1(
"_import_from_c",
(array_pointer as uintptr_t, schema_pointer as uintptr_t),
)?;
Ok(array.to_object(py))
}
use arrow::pyarrow::PyArrowConvert;
use arrow::record_batch::RecordBatch;

/// Returns `array + array` of an int64 array.
#[pyfunction]
fn double(array: PyObject, py: Python) -> PyResult<PyObject> {
fn double(array: &PyAny, py: Python) -> PyResult<PyObject> {
// import
let array = array_to_rust(array, py)?;
let array = ArrayRef::from_pyarrow(array)?;

// perform some operation
let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
PyO3ArrowError::ArrowError(ArrowError::ParseError("Expects an int64".to_string()))
})?;
let array = kernels::arithmetic::add(&array, &array).map_err(PyO3ArrowError::from)?;
let array = Arc::new(array);
let array = array
.as_any()
.downcast_ref::<Int64Array>()
.ok_or(ArrowError::ParseError("Expects an int64".to_string()))?;
let array = kernels::arithmetic::add(array, array)?;

// export
array_to_py(array, py)
array.to_pyarrow(py)
}

/// calls a lambda function that receives and returns an array
/// whose result must be the array multiplied by two
#[pyfunction]
fn double_py(lambda: PyObject, py: Python) -> PyResult<bool> {
fn double_py(lambda: &PyAny, py: Python) -> PyResult<bool> {
// create
let array = Arc::new(Int64Array::from(vec![Some(1), None, Some(3)]));
let expected = Arc::new(Int64Array::from(vec![Some(2), None, Some(6)])) as ArrayRef;

// to py
let pyarray = array_to_py(array, py)?;
let pyarray = lambda.call1(py, (pyarray,))?;
let array = array_to_rust(pyarray, py)?;
let pyarray = array.to_pyarrow(py)?;
let pyarray = lambda.call1((pyarray,))?;
let array = ArrayRef::from_pyarrow(pyarray)?;

Ok(array == expected)
}

/// Returns the substring
#[pyfunction]
fn substring(array: PyObject, start: i64, py: Python) -> PyResult<PyObject> {
fn substring(array: ArrayData, start: i64) -> PyResult<ArrayData> {
// import
let array = array_to_rust(array, py)?;
let array = ArrayRef::from(array);

// substring
let array = kernels::substring::substring(array.as_ref(), start, &None)
.map_err(PyO3ArrowError::from)?;
let array = kernels::substring::substring(array.as_ref(), start, &None)?;

// export
array_to_py(array, py)
Ok(array.data().to_owned())
}

/// Returns the concatenate
#[pyfunction]
fn concatenate(array: PyObject, py: Python) -> PyResult<PyObject> {
// import
let array = array_to_rust(array, py)?;
fn concatenate(array: ArrayData, py: Python) -> PyResult<PyObject> {
let array = ArrayRef::from(array);

// concat
let array = kernels::concat::concat(&[array.as_ref(), array.as_ref()])
.map_err(PyO3ArrowError::from)?;
let array = kernels::concat::concat(&[array.as_ref(), array.as_ref()])?;

// export
array_to_py(array, py)
array.to_pyarrow(py)
}

/// Converts to rust and back to python
#[pyfunction]
fn round_trip(pyarray: PyObject, py: Python) -> PyResult<PyObject> {
// import
let array = array_to_rust(pyarray, py)?;
fn round_trip_type(obj: DataType) -> PyResult<DataType> {
Ok(obj)
}

// export
array_to_py(array, py)
#[pyfunction]
fn round_trip_field(obj: Field) -> PyResult<Field> {
Ok(obj)
}

#[pyfunction]
fn round_trip_schema(obj: Schema) -> PyResult<Schema> {
Ok(obj)
}

#[pyfunction]
fn round_trip_array(obj: ArrayData) -> PyResult<ArrayData> {
Ok(obj)
}

#[pyfunction]
fn round_trip_record_batch(obj: RecordBatch) -> PyResult<RecordBatch> {
Ok(obj)
}

#[pymodule]
fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<PyDataType>()?;
m.add_class::<PyField>()?;
m.add_class::<PySchema>()?;
m.add_wrapped(wrap_pyfunction!(double))?;
m.add_wrapped(wrap_pyfunction!(double_py))?;
m.add_wrapped(wrap_pyfunction!(substring))?;
m.add_wrapped(wrap_pyfunction!(concatenate))?;
m.add_wrapped(wrap_pyfunction!(round_trip))?;
m.add_wrapped(wrap_pyfunction!(round_trip_type))?;
m.add_wrapped(wrap_pyfunction!(round_trip_field))?;
m.add_wrapped(wrap_pyfunction!(round_trip_schema))?;
m.add_wrapped(wrap_pyfunction!(round_trip_array))?;
m.add_wrapped(wrap_pyfunction!(round_trip_record_batch))?;
Ok(())
}
Loading