Skip to content

Commit

Permalink
Merge commit 'a7041feff32c2af09854c144a760d945e30fb38a' into chunchun…
Browse files Browse the repository at this point in the history
…/update-df-july-week-2
  • Loading branch information
appletreeisyellow committed Jul 22, 2024
2 parents bd57304 + a7041fe commit d207f59
Show file tree
Hide file tree
Showing 57 changed files with 4,472 additions and 410 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/dev_pr/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

development-process:
- changed-files:
- any-glob-to-any-file: ['dev/**.*', '.github/**.*', 'ci/**.*', '.asf.yaml']
- any-glob-to-any-file: ['dev/**/*', '.github/**/*', 'ci/**/*', '.asf.yaml']

documentation:
- changed-files:
- any-glob-to-any-file: ['docs/**.*', 'README.md', './**/README.md', 'DEVELOPERS.md', 'datafusion/docs/**.*']
- any-glob-to-any-file: ['docs/**/*', 'README.md', './**/README.md', 'DEVELOPERS.md', 'datafusion/docs/**/*']

sql:
- changed-files:
Expand Down
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ cargo run --example dataframe
- [`parquet_index.rs`](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries
- [`parquet_sql_multiple_files.rs`](examples/parquet_sql_multiple_files.rs): Build and run a query plan from a SQL statement against multiple local Parquet files
- [`parquet_exec_visitor.rs`](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution
- [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into Datafusion `Expr`.
- [`plan_to_sql.rs`](examples/plan_to_sql.rs): Generate SQL from Datafusion `Expr` and `LogicalPlan`
- [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`.
- [`plan_to_sql.rs`](examples/plan_to_sql.rs): Generate SQL from DataFusion `Expr` and `LogicalPlan`
- [`pruning.rs`](examples/pruning.rs): Use pruning to rule out files based on statistics
- [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3
- [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async fn main() -> Result<()> {
Ok(())
}

/// Datafusion's `expr_fn` API makes it easy to create [`Expr`]s for the
/// DataFusion's `expr_fn` API makes it easy to create [`Expr`]s for the
/// full range of expression types such as aggregates and window functions.
fn expr_fn_demo() -> Result<()> {
// Let's say you want to call the "first_value" aggregate function
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ config_namespace! {
/// Currently experimental
pub split_file_groups_by_statistics: bool, default = false

/// Should Datafusion keep the columns used for partition_by in the output RecordBatches
/// Should DataFusion keep the columns used for partition_by in the output RecordBatches
pub keep_partition_by_columns: bool, default = false
}
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/file_options/csv_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ impl TryFrom<&CsvOptions> for CsvWriterOptions {
if let Some(v) = &value.timestamp_format {
builder = builder.with_timestamp_format(v.into())
}
if let Some(v) = &value.timestamp_tz_format {
builder = builder.with_timestamp_tz_format(v.into())
}
if let Some(v) = &value.time_format {
builder = builder.with_time_format(v.into())
}
Expand Down
33 changes: 29 additions & 4 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,9 +896,8 @@ impl DataFrame {
join_type: JoinType,
on_exprs: impl IntoIterator<Item = Expr>,
) -> Result<DataFrame> {
let expr = on_exprs.into_iter().reduce(Expr::and);
let plan = LogicalPlanBuilder::from(self.plan)
.join_on(right.plan, join_type, expr)?
.join_on(right.plan, join_type, on_exprs)?
.build()?;
Ok(DataFrame {
session_state: self.session_state,
Expand Down Expand Up @@ -1472,7 +1471,7 @@ impl DataFrame {
///
/// The method supports case sensitive rename with wrapping column name into one of following symbols ( " or ' or ` )
///
/// Alternatively setting Datafusion param `datafusion.sql_parser.enable_ident_normalization` to `false` will enable
/// Alternatively setting DataFusion param `datafusion.sql_parser.enable_ident_normalization` to `false` will enable
/// case sensitive rename without need to wrap column name into special symbols
///
/// # Example
Expand Down Expand Up @@ -1694,7 +1693,7 @@ mod tests {
use crate::test_util::{register_aggregate_csv, test_table, test_table_with_name};

use arrow::array::{self, Int32Array};
use datafusion_common::{Constraint, Constraints};
use datafusion_common::{Constraint, Constraints, ScalarValue};
use datafusion_common_runtime::SpawnedTask;
use datafusion_expr::{
array_agg, cast, create_udf, expr, lit, BuiltInWindowFunction,
Expand Down Expand Up @@ -2555,6 +2554,32 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn join_on_filter_datatype() -> Result<()> {
let left = test_table_with_name("a").await?.select_columns(&["c1"])?;
let right = test_table_with_name("b").await?.select_columns(&["c1"])?;

// JOIN ON untyped NULL
let join = left.clone().join_on(
right.clone(),
JoinType::Inner,
Some(Expr::Literal(ScalarValue::Null)),
)?;
let expected_plan = "CrossJoin:\
\n TableScan: a projection=[c1], full_filters=[Boolean(NULL)]\
\n TableScan: b projection=[c1]";
assert_eq!(expected_plan, format!("{:?}", join.into_optimized_plan()?));

// JOIN ON expression must be boolean type
let join = left.join_on(right, JoinType::Inner, Some(lit("TRUE")))?;
let expected = join.into_optimized_plan().unwrap_err();
assert_eq!(
expected.strip_backtrace(),
"type_coercion\ncaused by\nError during planning: Join condition must be boolean type, but got Utf8"
);
Ok(())
}

#[tokio::test]
async fn join_ambiguous_filter() -> Result<()> {
let left = test_table_with_name("a")
Expand Down
27 changes: 16 additions & 11 deletions datafusion/core/src/datasource/file_format/write/orchestration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,20 @@ use tokio::task::JoinSet;
type WriterType = Box<dyn AsyncWrite + Send + Unpin>;
type SerializerType = Arc<dyn BatchSerializer>;

/// Serializes a single data stream in parallel and writes to an ObjectStore
/// concurrently. Data order is preserved. In the event of an error,
/// the ObjectStore writer is returned to the caller in addition to an error,
/// so that the caller may handle aborting failed writes.
/// Serializes a single data stream in parallel and writes to an ObjectStore concurrently.
/// Data order is preserved.
///
/// In the event of a non-IO error which does not involve the ObjectStore writer,
/// the writer returned to the caller in addition to the error,
/// so that failed writes may be aborted.
///
/// In the event of an IO error involving the ObjectStore writer,
/// the writer is dropped to avoid calling further methods on it which might panic.
pub(crate) async fn serialize_rb_stream_to_object_store(
mut data_rx: Receiver<RecordBatch>,
serializer: Arc<dyn BatchSerializer>,
mut writer: WriterType,
) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> {
) -> std::result::Result<(WriterType, u64), (Option<WriterType>, DataFusionError)> {
let (tx, mut rx) =
mpsc::channel::<SpawnedTask<Result<(usize, Bytes), DataFusionError>>>(100);
let serialize_task = SpawnedTask::spawn(async move {
Expand Down Expand Up @@ -82,7 +87,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
Ok(_) => (),
Err(e) => {
return Err((
writer,
None,
DataFusionError::Execution(format!(
"Error writing to object store: {e}"
)),
Expand All @@ -93,12 +98,12 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
}
Ok(Err(e)) => {
// Return the writer along with the error
return Err((writer, e));
return Err((Some(writer), e));
}
Err(e) => {
// Handle task panic or cancellation
return Err((
writer,
Some(writer),
DataFusionError::Execution(format!(
"Serialization task panicked or was cancelled: {e}"
)),
Expand All @@ -109,10 +114,10 @@ pub(crate) async fn serialize_rb_stream_to_object_store(

match serialize_task.join().await {
Ok(Ok(_)) => (),
Ok(Err(e)) => return Err((writer, e)),
Ok(Err(e)) => return Err((Some(writer), e)),
Err(_) => {
return Err((
writer,
Some(writer),
internal_datafusion_err!("Unknown error writing to object store"),
))
}
Expand Down Expand Up @@ -153,7 +158,7 @@ pub(crate) async fn stateless_serialize_and_write_files(
row_count += cnt;
}
Err((writer, e)) => {
finished_writers.push(writer);
finished_writers.extend(writer);
any_errors = true;
triggering_error = Some(e);
}
Expand Down
136 changes: 85 additions & 51 deletions datafusion/core/src/physical_optimizer/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,31 +140,29 @@ fn take_optimizable_column_and_table_count(
stats: &Statistics,
) -> Option<(ScalarValue, String)> {
let col_stats = &stats.column_statistics;
if let Some(agg_expr) = agg_expr.as_any().downcast_ref::<AggregateFunctionExpr>() {
if agg_expr.fun().name() == "count" && !agg_expr.is_distinct() {
if let Precision::Exact(num_rows) = stats.num_rows {
let exprs = agg_expr.expressions();
if exprs.len() == 1 {
// TODO optimize with exprs other than Column
if let Some(col_expr) =
exprs[0].as_any().downcast_ref::<expressions::Column>()
{
let current_val = &col_stats[col_expr.index()].null_count;
if let &Precision::Exact(val) = current_val {
return Some((
ScalarValue::Int64(Some((num_rows - val) as i64)),
agg_expr.name().to_string(),
));
}
} else if let Some(lit_expr) =
exprs[0].as_any().downcast_ref::<expressions::Literal>()
{
if lit_expr.value() == &COUNT_STAR_EXPANSION {
return Some((
ScalarValue::Int64(Some(num_rows as i64)),
agg_expr.name().to_string(),
));
}
if is_non_distinct_count(agg_expr) {
if let Precision::Exact(num_rows) = stats.num_rows {
let exprs = agg_expr.expressions();
if exprs.len() == 1 {
// TODO optimize with exprs other than Column
if let Some(col_expr) =
exprs[0].as_any().downcast_ref::<expressions::Column>()
{
let current_val = &col_stats[col_expr.index()].null_count;
if let &Precision::Exact(val) = current_val {
return Some((
ScalarValue::Int64(Some((num_rows - val) as i64)),
agg_expr.name().to_string(),
));
}
} else if let Some(lit_expr) =
exprs[0].as_any().downcast_ref::<expressions::Literal>()
{
if lit_expr.value() == &COUNT_STAR_EXPANSION {
return Some((
ScalarValue::Int64(Some(num_rows as i64)),
agg_expr.name().to_string(),
));
}
}
}
Expand All @@ -182,34 +180,30 @@ fn take_optimizable_min(
match *num_rows {
0 => {
// MIN/MAX with 0 rows is always null
if let Some(casted_expr) =
agg_expr.as_any().downcast_ref::<expressions::Min>()
{
if is_min(agg_expr) {
if let Ok(min_data_type) =
ScalarValue::try_from(casted_expr.field().unwrap().data_type())
ScalarValue::try_from(agg_expr.field().unwrap().data_type())
{
return Some((min_data_type, casted_expr.name().to_string()));
return Some((min_data_type, agg_expr.name().to_string()));
}
}
}
value if value > 0 => {
let col_stats = &stats.column_statistics;
if let Some(casted_expr) =
agg_expr.as_any().downcast_ref::<expressions::Min>()
{
if casted_expr.expressions().len() == 1 {
if is_min(agg_expr) {
let exprs = agg_expr.expressions();
if exprs.len() == 1 {
// TODO optimize with exprs other than Column
if let Some(col_expr) = casted_expr.expressions()[0]
.as_any()
.downcast_ref::<expressions::Column>()
if let Some(col_expr) =
exprs[0].as_any().downcast_ref::<expressions::Column>()
{
if let Precision::Exact(val) =
&col_stats[col_expr.index()].min_value
{
if !val.is_null() {
return Some((
val.clone(),
casted_expr.name().to_string(),
agg_expr.name().to_string(),
));
}
}
Expand All @@ -232,34 +226,30 @@ fn take_optimizable_max(
match *num_rows {
0 => {
// MIN/MAX with 0 rows is always null
if let Some(casted_expr) =
agg_expr.as_any().downcast_ref::<expressions::Max>()
{
if is_max(agg_expr) {
if let Ok(max_data_type) =
ScalarValue::try_from(casted_expr.field().unwrap().data_type())
ScalarValue::try_from(agg_expr.field().unwrap().data_type())
{
return Some((max_data_type, casted_expr.name().to_string()));
return Some((max_data_type, agg_expr.name().to_string()));
}
}
}
value if value > 0 => {
let col_stats = &stats.column_statistics;
if let Some(casted_expr) =
agg_expr.as_any().downcast_ref::<expressions::Max>()
{
if casted_expr.expressions().len() == 1 {
if is_max(agg_expr) {
let exprs = agg_expr.expressions();
if exprs.len() == 1 {
// TODO optimize with exprs other than Column
if let Some(col_expr) = casted_expr.expressions()[0]
.as_any()
.downcast_ref::<expressions::Column>()
if let Some(col_expr) =
exprs[0].as_any().downcast_ref::<expressions::Column>()
{
if let Precision::Exact(val) =
&col_stats[col_expr.index()].max_value
{
if !val.is_null() {
return Some((
val.clone(),
casted_expr.name().to_string(),
agg_expr.name().to_string(),
));
}
}
Expand All @@ -273,6 +263,50 @@ fn take_optimizable_max(
None
}

// TODO: Move this check into AggregateUDFImpl
// https://github.com/apache/datafusion/issues/11153
fn is_non_distinct_count(agg_expr: &dyn AggregateExpr) -> bool {
if let Some(agg_expr) = agg_expr.as_any().downcast_ref::<AggregateFunctionExpr>() {
if agg_expr.fun().name() == "count" && !agg_expr.is_distinct() {
return true;
}
}

false
}

// TODO: Move this check into AggregateUDFImpl
// https://github.com/apache/datafusion/issues/11153
fn is_min(agg_expr: &dyn AggregateExpr) -> bool {
if agg_expr.as_any().is::<expressions::Min>() {
return true;
}

if let Some(agg_expr) = agg_expr.as_any().downcast_ref::<AggregateFunctionExpr>() {
if agg_expr.fun().name() == "min" {
return true;
}
}

false
}

// TODO: Move this check into AggregateUDFImpl
// https://github.com/apache/datafusion/issues/11153
fn is_max(agg_expr: &dyn AggregateExpr) -> bool {
if agg_expr.as_any().is::<expressions::Max>() {
return true;
}

if let Some(agg_expr) = agg_expr.as_any().downcast_ref::<AggregateFunctionExpr>() {
if agg_expr.fun().name() == "max" {
return true;
}
}

false
}

#[cfg(test)]
pub(crate) mod tests {
use super::*;
Expand Down
Loading

0 comments on commit d207f59

Please sign in to comment.