Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support array_append #1072

Merged
merged 8 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ use datafusion::{
},
prelude::SessionContext,
};
use datafusion_functions_nested::concat::ArrayAppend;
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};

use datafusion_comet_proto::{
Expand All @@ -107,7 +108,8 @@ use datafusion_common::{
};
use datafusion_expr::expr::find_df_window_func;
use datafusion_expr::{
AggregateUDF, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition,
AggregateUDF, ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits,
WindowFunctionDefinition,
};
use datafusion_physical_expr::expressions::{Literal, StatsType};
use datafusion_physical_expr::window::WindowExpr;
Expand Down Expand Up @@ -691,6 +693,33 @@ impl PhysicalPlanner {
expr.ordinal as usize,
)))
}
ExprStruct::ArrayAppend(expr) => {
let left =
self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?;
let right =
self.create_expr(expr.right.as_ref().unwrap(), Arc::clone(&input_schema))?;
let return_type = left.data_type(&input_schema)?;
let args = vec![Arc::clone(&left), right];
let datafusion_array_append =
Arc::new(ScalarUDF::new_from_impl(ArrayAppend::new()));
let array_append_expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
"array_append",
datafusion_array_append,
args,
return_type,
));

let is_null_expr: Arc<dyn PhysicalExpr> = Arc::new(IsNullExpr::new(left));
let null_literal_expr: Arc<dyn PhysicalExpr> =
Arc::new(Literal::new(ScalarValue::Null));

let case_expr = CaseExpr::try_new(
None,
vec![(is_null_expr, null_literal_expr)],
Some(array_append_expr),
)?;
Ok(Arc::new(case_expr))
}
expr => Err(ExecutionError::GeneralError(format!(
"Not implemented: {:?}",
expr
Expand Down
1 change: 1 addition & 0 deletions native/proto/src/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ message Expr {
ToJson to_json = 55;
ListExtract list_extract = 56;
GetArrayStructFields get_array_struct_fields = 57;
BinaryExpr array_append = 58;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2237,7 +2237,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
withInfo(expr, "unsupported arguments for GetArrayStructFields", child)
None
}

case _ if expr.prettyName == "array_append" =>
createBinaryExpr(
expr.children(0),
expr.children(1),
inputs,
(builder, binaryExpr) => builder.setArrayAppend(binaryExpr))
case _ =>
withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*)
None
Expand Down
24 changes: 24 additions & 0 deletions spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2313,4 +2313,28 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}
}

test("array_append") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to also have a test where the first argument to array_append is null, but where the argument is not a literal null but an expression that evaluates to null. I am not sure how easy it is to add that test until we have support for reading arrays from Parquet (which is coming soon) so I am fine if we want to handle this as a separate issue. It does look like DataFusion's array_append does not support null as the first argument, but Spark does. Maybe we could improve DataFusion's implementation to return null if the first argument is null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I think I managed to create a test which creates this scenario:
checkSparkAnswerAndOperator( df.select(array_append(expr("CASE WHEN _2=_3 THEN array(_19) END"), col("_19"))))
Which fails with the following error message:

  !== Correct Answer - 10 ==                                                             == Spark Answer - 10 ==
   struct<array_append(CASE WHEN (_2 = _3) THEN array(_19) END, _19):array<timestamp>>   struct<array_append(CASE WHEN (_2 = _3) THEN array(_19) END, _19):array<timestamp>>
   [ArrayBuffer(1969-12-31 16:00:00.0, 1969-12-31 16:00:00.0)]                           [ArrayBuffer(1969-12-31 16:00:00.0, 1969-12-31 16:00:00.0)]
   [ArrayBuffer(1969-12-31 16:00:00.0, 1969-12-31 16:00:00.0)]                           [ArrayBuffer(1969-12-31 16:00:00.0, 1969-12-31 16:00:00.0)]
   [ArrayBuffer(1969-12-31 16:00:00.000001, 1969-12-31 16:00:00.000001)]                 [ArrayBuffer(1969-12-31 16:00:00.000001, 1969-12-31 16:00:00.000001)]
   [ArrayBuffer(1969-12-31 16:00:00.000002, 1969-12-31 16:00:00.000002)]                 [ArrayBuffer(1969-12-31 16:00:00.000002, 1969-12-31 16:00:00.000002)]
   [ArrayBuffer(1969-12-31 16:00:00.000003, 1969-12-31 16:00:00.000003)]                 [ArrayBuffer(1969-12-31 16:00:00.000003, 1969-12-31 16:00:00.000003)]
   [ArrayBuffer(1969-12-31 16:00:00.000003, 1969-12-31 16:00:00.000003)]                 [ArrayBuffer(1969-12-31 16:00:00.000003, 1969-12-31 16:00:00.000003)]
  ![null]                                                                                [ArrayBuffer(null)]
  ![null]                                                                                [ArrayBuffer(null)]

Should I create an issue on the DataFusion repo? Or is it unlikely that they will support it because their implementation seems to match the one of Postgres?

Datafusion / Postgres

SELECT array_append( CASE WHEN 1=2 THEN Array[1,2,3] END,  4);
returns: [4]

Spark

spark.sql("SELECT array_prepend(     CASE WHEN 1=2 THEN ARRAY(1,2,3) END,  4);").show
returns:  NULL

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Let's handle this in Comet rather than modify DataFusion. One option is to fork the array_append code and modify it. Another option would be to translate array_append(a, b) into something like CASE WHEN a IS NULL THEN null ELSE array_append(a, b) END.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have now updated the branch with a version which adds a case statement to check for NULL. In my opinion it is less effort that way than fork that part of the DataFusion code.

// array append has been added in Spark 3.4 and in Spark 4.0 it gets written to ArrayInsert
assume(isSpark34Plus && !isSpark40Plus)
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000)
spark.read.parquet(path.toString).createOrReplaceTempView("t1");
checkSparkAnswerAndOperator(spark.sql("Select array_append(array(_1),false) from t1"))
checkSparkAnswerAndOperator(
spark.sql("SELECT array_append(array(_2, _3, _4), 4) FROM t1"))
checkSparkAnswerAndOperator(
spark.sql("SELECT array_append(array(_2, _3, _4), null) FROM t1"));
checkSparkAnswerAndOperator(
spark.sql("SELECT array_append(array(_6, _7), CAST(6.5 AS DOUBLE)) FROM t1"));
checkSparkAnswerAndOperator(spark.sql("SELECT array_append(array(_8), 'test') FROM t1"));
checkSparkAnswerAndOperator(spark.sql("SELECT array_append(array(_19), _19) FROM t1"));
checkSparkAnswerAndOperator(
spark.sql("SELECT array_append((CASE WHEN _2 =_3 THEN array(_4) END), _4) FROM t1"));
}

}
}
}
Loading