Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python FFI bridge for Schema, Field and DataType #439

Merged
merged 12 commits into from
Jul 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ on:

jobs:

docker:
integration:
name: Integration Test
runs-on: ubuntu-latest
steps:
Expand All @@ -46,3 +46,54 @@ jobs:
run: pip install -e dev/archery[docker]
- name: Execute Docker Build
run: archery docker run -e ARCHERY_INTEGRATION_WITH_RUST=1 conda-integration

# test FFI against the C-Data interface exposed by pyarrow
pyarrow-integration-test:
name: Test Pyarrow C Data Interface
runs-on: ubuntu-latest
strategy:
matrix:
rust: [stable]
steps:
- uses: actions/checkout@v2
with:
submodules: true
- name: Setup Rust toolchain
run: |
rustup toolchain install ${{ matrix.rust }}
rustup default ${{ matrix.rust }}
rustup component add rustfmt clippy
- name: Cache Cargo
uses: actions/cache@v2
with:
path: /home/runner/.cargo
key: cargo-maturin-cache-
- name: Cache Rust dependencies
uses: actions/cache@v2
with:
path: /home/runner/target
# this key is not equal because maturin uses different compilation flags.
key: ${{ runner.os }}-${{ matrix.arch }}-target-maturin-cache-${{ matrix.rust }}-
- uses: actions/setup-python@v2
with:
python-version: '3.7'
- name: Upgrade pip and setuptools
run: pip install --upgrade pip setuptools wheel
- name: Install python dependencies
run: pip install maturin==0.8.2 toml==0.10.1 pytest pytz
- name: Install nightly pyarrow wheel
# this points to a nightly pyarrow build containing neccessary
# API for integration testing (https://github.com/apache/arrow/pull/10529)
# the hardcoded version is wrong and should be removed either
# after https://issues.apache.org/jira/browse/ARROW-13083
# gets fixes or pyarrow 5.0 gets released
hardcoded version is wrong, bot contains
run: pip install --index-url https://pypi.fury.io/arrow-nightlies/ pyarrow==3.1.0.dev1030
- name: Run tests
env:
CARGO_HOME: "/home/runner/.cargo"
CARGO_TARGET_DIR: "/home/runner/target"
working-directory: arrow-pyarrow-integration-testing
run: |
maturin develop
pytest -v .
46 changes: 0 additions & 46 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -283,52 +283,6 @@ jobs:
continue-on-error: true
run: bash <(curl -s https://codecov.io/bash)

# test FFI against the C-Data interface exposed by pyarrow
pyarrow-integration-test:
name: Test Pyarrow C Data Interface
runs-on: ubuntu-latest
strategy:
matrix:
rust: [stable]
steps:
- uses: actions/checkout@v2
with:
submodules: true
- name: Setup Rust toolchain
run: |
rustup toolchain install ${{ matrix.rust }}
rustup default ${{ matrix.rust }}
rustup component add rustfmt clippy
- name: Cache Cargo
uses: actions/cache@v2
with:
path: /home/runner/.cargo
key: cargo-maturin-cache-
- name: Cache Rust dependencies
uses: actions/cache@v2
with:
path: /home/runner/target
# this key is not equal because maturin uses different compilation flags.
key: ${{ runner.os }}-${{ matrix.arch }}-target-maturin-cache-${{ matrix.rust }}-
- uses: actions/setup-python@v2
with:
python-version: '3.7'
- name: Install Python dependencies
run: python -m pip install --upgrade pip setuptools wheel
- name: Run tests
run: |
export CARGO_HOME="/home/runner/.cargo"
export CARGO_TARGET_DIR="/home/runner/target"

cd arrow-pyarrow-integration-testing

python -m venv venv
source venv/bin/activate

pip install maturin==0.8.2 toml==0.10.1 pyarrow==1.0.0 pytz
maturin develop
python -m unittest discover tests

# test the arrow crate builds against wasm32 in stable rust
wasm32-build:
name: Build wasm32 on AMD64 Rust ${{ matrix.rust }}
Expand Down
158 changes: 128 additions & 30 deletions arrow-pyarrow-integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! This library demonstrates a minimal usage of Rust's C data interface to pass
//! arrays from and to Python.

use std::convert::TryFrom;
use std::error;
use std::fmt;
use std::sync::Arc;
Expand All @@ -28,8 +29,10 @@ use pyo3::{libc::uintptr_t, prelude::*};

use arrow::array::{make_array_from_raw, ArrayRef, Int64Array};
use arrow::compute::kernels;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::error::ArrowError;
use arrow::ffi;
use arrow::ffi::FFI_ArrowSchema;

/// an error that bridges ArrowError with a Python error
#[derive(Debug)]
Expand Down Expand Up @@ -68,7 +71,107 @@ impl From<PyO3ArrowError> for PyErr {
}
}

fn to_rust(ob: PyObject, py: Python) -> PyResult<ArrayRef> {
#[pyclass]
struct PyDataType {
inner: DataType,
}

#[pyclass]
struct PyField {
inner: Field,
}

#[pyclass]
struct PySchema {
inner: Schema,
}

#[pymethods]
impl PyDataType {
#[staticmethod]
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
value.call_method1("_export_to_c", (c_schema_ptr as uintptr_t,))?;
let dtype = DataType::try_from(&c_schema).map_err(PyO3ArrowError::from)?;
Ok(Self { inner: dtype })
}

fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let c_schema =
FFI_ArrowSchema::try_from(&self.inner).map_err(PyO3ArrowError::from)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let module = py.import("pyarrow")?;
let class = module.getattr("DataType")?;
let dtype = class.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?;
Ok(dtype.into())
}
}

#[pymethods]
impl PyField {
#[staticmethod]
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
value.call_method1("_export_to_c", (c_schema_ptr as uintptr_t,))?;
let field = Field::try_from(&c_schema).map_err(PyO3ArrowError::from)?;
Ok(Self { inner: field })
}

fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let c_schema =
FFI_ArrowSchema::try_from(&self.inner).map_err(PyO3ArrowError::from)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let module = py.import("pyarrow")?;
let class = module.getattr("Field")?;
let dtype = class.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?;
Ok(dtype.into())
}
}

#[pymethods]
impl PySchema {
#[staticmethod]
fn from_pyarrow(value: &PyAny) -> PyResult<Self> {
let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
value.call_method1("_export_to_c", (c_schema_ptr as uintptr_t,))?;
let schema = Schema::try_from(&c_schema).map_err(PyO3ArrowError::from)?;
Ok(Self { inner: schema })
}

fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
let c_schema =
FFI_ArrowSchema::try_from(&self.inner).map_err(PyO3ArrowError::from)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let module = py.import("pyarrow")?;
let class = module.getattr("Schema")?;
let schema =
class.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?;
Ok(schema.into())
}
}
jorgecarleitao marked this conversation as resolved.
Show resolved Hide resolved

impl<'source> FromPyObject<'source> for PyDataType {
fn extract(value: &'source PyAny) -> PyResult<Self> {
PyDataType::from_pyarrow(value)
}
}

impl<'source> FromPyObject<'source> for PyField {
fn extract(value: &'source PyAny) -> PyResult<Self> {
PyField::from_pyarrow(value)
}
}

impl<'source> FromPyObject<'source> for PySchema {
fn extract(value: &'source PyAny) -> PyResult<Self> {
PySchema::from_pyarrow(value)
}
}

fn array_to_rust(ob: PyObject, py: Python) -> PyResult<ArrayRef> {
// prepare a pointer to receive the Array struct
let (array_pointer, schema_pointer) =
ffi::ArrowArray::into_raw(unsafe { ffi::ArrowArray::empty() });
Expand All @@ -82,13 +185,12 @@ fn to_rust(ob: PyObject, py: Python) -> PyResult<ArrayRef> {
)?;

let array = unsafe { make_array_from_raw(array_pointer, schema_pointer) }
.map_err(|e| PyO3ArrowError::from(e))?;
.map_err(PyO3ArrowError::from)?;
Ok(array)
}

fn to_py(array: ArrayRef, py: Python) -> PyResult<PyObject> {
let (array_pointer, schema_pointer) =
array.to_raw().map_err(|e| PyO3ArrowError::from(e))?;
fn array_to_py(array: ArrayRef, py: Python) -> PyResult<PyObject> {
let (array_pointer, schema_pointer) = array.to_raw().map_err(PyO3ArrowError::from)?;

let pa = py.import("pyarrow")?;

Expand All @@ -103,22 +205,17 @@ fn to_py(array: ArrayRef, py: Python) -> PyResult<PyObject> {
#[pyfunction]
fn double(array: PyObject, py: Python) -> PyResult<PyObject> {
// import
let array = to_rust(array, py)?;
let array = array_to_rust(array, py)?;

// perform some operation
let array =
array
.as_any()
.downcast_ref::<Int64Array>()
.ok_or(PyO3ArrowError::ArrowError(ArrowError::ParseError(
"Expects an int64".to_string(),
)))?;
let array =
kernels::arithmetic::add(&array, &array).map_err(|e| PyO3ArrowError::from(e))?;
let array = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
PyO3ArrowError::ArrowError(ArrowError::ParseError("Expects an int64".to_string()))
})?;
let array = kernels::arithmetic::add(&array, &array).map_err(PyO3ArrowError::from)?;
let array = Arc::new(array);

// export
to_py(array, py)
array_to_py(array, py)
}

/// calls a lambda function that receives and returns an array
Expand All @@ -130,11 +227,9 @@ fn double_py(lambda: PyObject, py: Python) -> PyResult<bool> {
let expected = Arc::new(Int64Array::from(vec![Some(2), None, Some(6)])) as ArrayRef;

// to py
let array = to_py(array, py)?;

let array = lambda.call1(py, (array,))?;

let array = to_rust(array, py)?;
let pyarray = array_to_py(array, py)?;
let pyarray = lambda.call1(py, (pyarray,))?;
let array = array_to_rust(pyarray, py)?;

Ok(array == expected)
}
Expand All @@ -143,42 +238,45 @@ fn double_py(lambda: PyObject, py: Python) -> PyResult<bool> {
#[pyfunction]
fn substring(array: PyObject, start: i64, py: Python) -> PyResult<PyObject> {
// import
let array = to_rust(array, py)?;
let array = array_to_rust(array, py)?;

// substring
let array = kernels::substring::substring(array.as_ref(), start, &None)
.map_err(|e| PyO3ArrowError::from(e))?;
.map_err(PyO3ArrowError::from)?;

// export
to_py(array, py)
array_to_py(array, py)
}

/// Returns the concatenate
#[pyfunction]
fn concatenate(array: PyObject, py: Python) -> PyResult<PyObject> {
// import
let array = to_rust(array, py)?;
let array = array_to_rust(array, py)?;

// concat
let array = kernels::concat::concat(&[array.as_ref(), array.as_ref()])
.map_err(|e| PyO3ArrowError::from(e))?;
.map_err(PyO3ArrowError::from)?;

// export
to_py(array, py)
array_to_py(array, py)
}

/// Converts to rust and back to python
#[pyfunction]
fn round_trip(array: PyObject, py: Python) -> PyResult<PyObject> {
fn round_trip(pyarray: PyObject, py: Python) -> PyResult<PyObject> {
// import
let array = to_rust(array, py)?;
let array = array_to_rust(pyarray, py)?;

// export
to_py(array, py)
array_to_py(array, py)
}

#[pymodule]
fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<PyDataType>()?;
m.add_class::<PyField>()?;
m.add_class::<PySchema>()?;
m.add_wrapped(wrap_pyfunction!(double))?;
m.add_wrapped(wrap_pyfunction!(double_py))?;
m.add_wrapped(wrap_pyfunction!(substring))?;
Expand Down
Loading