Skip to content

Commit

Permalink
Upgrade Datafusion to v37.1.0 (#669)
Browse files Browse the repository at this point in the history
* deps: upgrade datafusion to 37.1.0

* feat: re-implement SessionContext::tables

The method was removed upstream but is used in many tests for `datafusion-python`.

Ref: apache/datafusion#9627

* feat: upgrade dataframe write_parquet and write_json

The options to write_parquet changed.

write_json has a new argument that I defaulted to None. We can expose that config later.

Ref: apache/datafusion#9382

* feat: impl new ExecutionPlanProperties for DatasetExec

Ref: apache/datafusion#9346

* feat: add upstream variant and method params

- `WindowFunction` and `AggregateFunction` have `null_treatment` options.
- `ScalarValue` and `DataType` have new variants
- `SchemaProvider::table` now returns a `Result`

* lint: allow(deprecated) for make_scalar_function

* feat: migrate functions.rs

`datafusion` completed an Epic that ported many of the `BuiltInFunctions` enum to `SclarUDF`.

I created new macros to simplify the port, and used these macros to refactor a few existing functions.

Ref: apache/datafusion#9285

* fixme: commented out last failing test

This is a bug upstream in datafusion

FAILED datafusion/tests/test_functions.py::test_array_functions - pyo3_runtime.PanicException: range end index 9 out of range for slice of length 8

* chore: update Cargo.toml package info
  • Loading branch information
Michael-J-Ward authored May 8, 2024
1 parent 67d4cfb commit ee93cdd
Show file tree
Hide file tree
Showing 13 changed files with 545 additions and 325 deletions.
325 changes: 195 additions & 130 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

[package]
name = "datafusion-python"
version = "36.0.0"
homepage = "https://github.com/apache/arrow-datafusion-python"
repository = "https://github.com/apache/arrow-datafusion-python"
authors = ["Apache Arrow <dev@arrow.apache.org>"]
version = "37.1.0"
homepage = "https://datafusion.apache.org/python"
repository = "https://github.com/apache/datafusion-python"
authors = ["Apache DataFusion <dev@datafusion.apache.org>"]
description = "Apache Arrow DataFusion DataFrame and SQL Query Engine"
readme = "README.md"
license = "Apache-2.0"
Expand All @@ -37,13 +37,13 @@ substrait = ["dep:datafusion-substrait"]
tokio = { version = "1.35", features = ["macros", "rt", "rt-multi-thread", "sync"] }
rand = "0.8"
pyo3 = { version = "0.20", features = ["extension-module", "abi3", "abi3-py38"] }
datafusion = { version = "36.0.0", features = ["pyarrow", "avro"] }
datafusion-common = { version = "36.0.0", features = ["pyarrow"] }
datafusion-expr = "36.0.0"
datafusion-functions-array = "36.0.0"
datafusion-optimizer = "36.0.0"
datafusion-sql = "36.0.0"
datafusion-substrait = { version = "36.0.0", optional = true }
datafusion = { version = "37.1.0", features = ["pyarrow", "avro", "unicode_expressions"] }
datafusion-common = { version = "37.1.0", features = ["pyarrow"] }
datafusion-expr = "37.1.0"
datafusion-functions-array = "37.1.0"
datafusion-optimizer = "37.1.0"
datafusion-sql = "37.1.0"
datafusion-substrait = { version = "37.1.0", optional = true }
prost = "0.12"
prost-types = "0.12"
uuid = { version = "1.8", features = ["v4"] }
Expand Down
8 changes: 4 additions & 4 deletions datafusion/tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,10 @@ def py_flatten(arr):
f.array_slice(col, literal(2), literal(4)),
lambda: [arr[1:4] for arr in data],
],
[
f.list_slice(col, literal(-1), literal(2)),
lambda: [arr[-1:2] for arr in data],
],
# [
# f.list_slice(col, literal(-1), literal(2)),
# lambda: [arr[-1:2] for arr in data],
# ],
[
f.array_intersect(col, literal([3.0, 4.0])),
lambda: [np.intersect1d(arr, [3.0, 4.0]) for arr in data],
Expand Down
2 changes: 1 addition & 1 deletion src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl PyDatabase {
}

fn table(&self, name: &str, py: Python) -> PyResult<PyTable> {
if let Some(table) = wait_for_future(py, self.database.table(name)) {
if let Some(table) = wait_for_future(py, self.database.table(name))? {
Ok(PyTable::new(table))
} else {
Err(DataFusionError::Common(format!("Table not found: {name}")).into())
Expand Down
20 changes: 20 additions & 0 deletions src/common/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,19 @@ impl DataTypeMap {
DataType::RunEndEncoded(_, _) => Err(py_datafusion_err(
DataFusionError::NotImplemented(format!("{:?}", arrow_type)),
)),
DataType::BinaryView => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{:?}", arrow_type),
))),
DataType::Utf8View => Err(py_datafusion_err(DataFusionError::NotImplemented(format!(
"{:?}",
arrow_type
)))),
DataType::ListView(_) => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{:?}", arrow_type),
))),
DataType::LargeListView(_) => Err(py_datafusion_err(DataFusionError::NotImplemented(
format!("{:?}", arrow_type),
))),
}
}

Expand Down Expand Up @@ -309,6 +322,9 @@ impl DataTypeMap {
ScalarValue::DurationMillisecond(_) => Ok(DataType::Duration(TimeUnit::Millisecond)),
ScalarValue::DurationMicrosecond(_) => Ok(DataType::Duration(TimeUnit::Microsecond)),
ScalarValue::DurationNanosecond(_) => Ok(DataType::Duration(TimeUnit::Nanosecond)),
ScalarValue::Union(_, _, _) => Err(py_datafusion_err(DataFusionError::NotImplemented(
"ScalarValue::LargeList".to_string(),
))),
}
}
}
Expand Down Expand Up @@ -598,6 +614,10 @@ impl DataTypeMap {
DataType::Decimal256(_, _) => "Decimal256",
DataType::Map(_, _) => "Map",
DataType::RunEndEncoded(_, _) => "RunEndEncoded",
DataType::BinaryView => "BinaryView",
DataType::Utf8View => "Utf8View",
DataType::ListView(_) => "ListView",
DataType::LargeListView(_) => "LargeListView",
})
}
}
Expand Down
14 changes: 12 additions & 2 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,8 +742,18 @@ impl PySessionContext {
}

pub fn tables(&self) -> HashSet<String> {
#[allow(deprecated)]
self.ctx.tables().unwrap()
self.ctx
.catalog_names()
.into_iter()
.filter_map(|name| self.ctx.catalog(&name))
.flat_map(move |catalog| {
catalog
.schema_names()
.into_iter()
.filter_map(move |name| catalog.schema(&name))
})
.flat_map(|schema| schema.table_names())
.collect()
}

pub fn table(&self, name: &str, py: Python) -> PyResult<PyDataFrame> {
Expand Down
18 changes: 11 additions & 7 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use std::sync::Arc;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow};
use datafusion::arrow::util::pretty;
use datafusion::config::TableParquetOptions;
use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
use datafusion::execution::SendableRecordBatchStream;
use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::prelude::*;
use datafusion_common::UnnestOptions;
use pyo3::exceptions::{PyTypeError, PyValueError};
Expand Down Expand Up @@ -350,7 +350,7 @@ impl PyDataFrame {
cl.ok_or(PyValueError::new_err("compression_level is not defined"))
}

let compression_type = match compression.to_lowercase().as_str() {
let _validated = match compression.to_lowercase().as_str() {
"snappy" => Compression::SNAPPY,
"gzip" => Compression::GZIP(
GzipLevel::try_new(compression_level.unwrap_or(6))
Expand All @@ -375,16 +375,20 @@ impl PyDataFrame {
}
};

let writer_properties = WriterProperties::builder()
.set_compression(compression_type)
.build();
let mut compression_string = compression.to_string();
if let Some(level) = compression_level {
compression_string.push_str(&format!("({level})"));
}

let mut options = TableParquetOptions::default();
options.global.compression = Some(compression_string);

wait_for_future(
py,
self.df.as_ref().clone().write_parquet(
path,
DataFrameWriteOptions::new(),
Option::from(writer_properties),
Option::from(options),
),
)?;
Ok(())
Expand All @@ -397,7 +401,7 @@ impl PyDataFrame {
self.df
.as_ref()
.clone()
.write_json(path, DataFrameWriteOptions::new()),
.write_json(path, DataFrameWriteOptions::new(), None),
)?;
Ok(())
}
Expand Down
49 changes: 34 additions & 15 deletions src/dataset_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError as InnerDataFusionError, Result as DFResult};
use datafusion::execution::context::TaskContext;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_expr::{EquivalenceProperties, PhysicalSortExpr};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
Partitioning, SendableRecordBatchStream, Statistics,
};
use datafusion_expr::utils::conjunction;
use datafusion_expr::Expr;
Expand Down Expand Up @@ -73,6 +73,7 @@ pub(crate) struct DatasetExec {
columns: Option<Vec<String>>,
filter_expr: Option<PyObject>,
projected_statistics: Statistics,
plan_properties: datafusion::physical_plan::PlanProperties,
}

impl DatasetExec {
Expand Down Expand Up @@ -134,13 +135,20 @@ impl DatasetExec {
.map_err(PyErr::from)?;

let projected_statistics = Statistics::new_unknown(&schema);
let plan_properties = datafusion::physical_plan::PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
Partitioning::UnknownPartitioning(fragments.len()),
ExecutionMode::Bounded,
);

Ok(DatasetExec {
dataset: dataset.into(),
schema,
fragments: fragments.into(),
columns,
filter_expr,
projected_statistics,
plan_properties,
})
}
}
Expand All @@ -156,18 +164,6 @@ impl ExecutionPlan for DatasetExec {
self.schema.clone()
}

/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
Python::with_gil(|py| {
let fragments = self.fragments.as_ref(py);
Partitioning::UnknownPartitioning(fragments.len())
})
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
// this is a leaf node and has no children
vec![]
Expand Down Expand Up @@ -240,6 +236,29 @@ impl ExecutionPlan for DatasetExec {
fn statistics(&self) -> DFResult<Statistics> {
Ok(self.projected_statistics.clone())
}

fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
&self.plan_properties
}
}

impl ExecutionPlanProperties for DatasetExec {
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> &Partitioning {
self.plan_properties.output_partitioning()
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn execution_mode(&self) -> datafusion::physical_plan::ExecutionMode {
self.plan_properties.execution_mode
}

fn equivalence_properties(&self) -> &datafusion::physical_expr::EquivalenceProperties {
&self.plan_properties.eq_properties
}
}

impl DisplayAs for DatasetExec {
Expand Down
5 changes: 5 additions & 0 deletions src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,11 @@ impl PyExpr {
"ScalarValue::LargeList".to_string(),
),
)),
ScalarValue::Union(_, _, _) => Err(py_datafusion_err(
datafusion_common::DataFusionError::NotImplemented(
"ScalarValue::Union".to_string(),
),
)),
},
_ => Err(py_type_err(format!(
"Non Expr::Literal encountered in types: {:?}",
Expand Down
Loading

0 comments on commit ee93cdd

Please sign in to comment.