From 8cb05a571311c0dae463af7dbcd6359a485a9631 Mon Sep 17 00:00:00 2001 From: jiangzhx Date: Mon, 12 Jun 2023 12:13:58 +0800 Subject: [PATCH 1/3] port left_join_using_2,timestamp_join to sqllogictest --- datafusion/core/tests/sql/joins.rs | 78 ---------- .../core/tests/sqllogictests/src/main.rs | 7 + .../core/tests/sqllogictests/src/setup.rs | 136 +++++++++++++++++- .../tests/sqllogictests/test_files/joins.slt | 42 ++++++ 4 files changed, 184 insertions(+), 79 deletions(-) diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index 054db2c3f72f..a6ad46aabb1a 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -37,84 +37,6 @@ async fn nestedjoin_with_alias() -> Result<()> { Ok(()) } -#[tokio::test] -async fn join_timestamp() -> Result<()> { - let ctx = SessionContext::new(); - ctx.register_table("t", table_with_timestamps()).unwrap(); - - let expected = vec![ - "+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+", - "| nanos | micros | millis | secs | name | nanos | micros | millis | secs | name |", - "+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+", - "| 2011-12-13T11:13:10.123450 | 2011-12-13T11:13:10.123450 | 2011-12-13T11:13:10.123 | 2011-12-13T11:13:10 | Row 1 | 2011-12-13T11:13:10.123450 | 2011-12-13T11:13:10.123450 | 2011-12-13T11:13:10.123 | 2011-12-13T11:13:10 | Row 1 |", - "| 2018-11-13T17:11:10.011375885 | 2018-11-13T17:11:10.011375 | 2018-11-13T17:11:10.011 | 2018-11-13T17:11:10 | Row 0 | 2018-11-13T17:11:10.011375885 | 2018-11-13T17:11:10.011375 | 2018-11-13T17:11:10.011 | 2018-11-13T17:11:10 | Row 0 |", - "| 2021-01-01T05:11:10.432 | 2021-01-01T05:11:10.432 | 2021-01-01T05:11:10.432 | 2021-01-01T05:11:10 | Row 3 | 2021-01-01T05:11:10.432 | 2021-01-01T05:11:10.432 | 2021-01-01T05:11:10.432 | 2021-01-01T05:11:10 | Row 3 |", - "+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+", - ]; - - let results = execute_to_batches( - &ctx, - "SELECT * FROM t as t1 \ - JOIN (SELECT * FROM t) as t2 \ - ON t1.nanos = t2.nanos", - ) - .await; - - assert_batches_sorted_eq!(expected, &results); - - let results = execute_to_batches( - &ctx, - "SELECT * FROM t as t1 \ - JOIN (SELECT * FROM t) as t2 \ - ON t1.micros = t2.micros", - ) - .await; - - assert_batches_sorted_eq!(expected, &results); - - let results = execute_to_batches( - &ctx, - "SELECT * FROM t as t1 \ - JOIN (SELECT * FROM t) as t2 \ - ON t1.millis = t2.millis", - ) - .await; - - assert_batches_sorted_eq!(expected, &results); - - Ok(()) -} - -#[tokio::test] -async fn left_join_using_2() -> Result<()> { - let results = execute_with_partition( - "SELECT t1.c1, t2.c2 FROM test t1 JOIN test t2 USING (c2) ORDER BY t2.c2", - 1, - ) - .await?; - assert_eq!(results.len(), 1); - - let expected = vec![ - "+----+----+", - "| c1 | c2 |", - "+----+----+", - "| 0 | 1 |", - "| 0 | 2 |", - "| 0 | 3 |", - "| 0 | 4 |", - "| 0 | 5 |", - "| 0 | 6 |", - "| 0 | 7 |", - "| 0 | 8 |", - "| 0 | 9 |", - "| 0 | 10 |", - "+----+----+", - ]; - - assert_batches_eq!(expected, &results); - Ok(()) -} - #[tokio::test] async fn left_join_using_join_key_projection() -> Result<()> { let results = execute_with_partition( diff --git a/datafusion/core/tests/sqllogictests/src/main.rs b/datafusion/core/tests/sqllogictests/src/main.rs index d93d59fb3e1a..ee5a2017ca58 100644 --- a/datafusion/core/tests/sqllogictests/src/main.rs +++ b/datafusion/core/tests/sqllogictests/src/main.rs @@ -285,6 +285,13 @@ async fn context_for_test_file(relative_path: &Path) -> Option { return None; } } + "joins.slt" => { + let mut test_ctx = test_ctx; + info!("Registering timestamps tables"); + setup::register_timestamps_table(test_ctx.session_ctx()).await; + setup::register_partition_table(&mut test_ctx).await; + return Some(test_ctx); + } _ => { info!("Using default SessionContext"); } diff --git a/datafusion/core/tests/sqllogictests/src/setup.rs b/datafusion/core/tests/sqllogictests/src/setup.rs index 8072a0f74f5f..c64dcc45130b 100644 --- a/datafusion/core/tests/sqllogictests/src/setup.rs +++ b/datafusion/core/tests/sqllogictests/src/setup.rs @@ -15,6 +15,10 @@ // specific language governing permissions and limitations // under the License. +use arrow_array::{ + Array, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, +}; use datafusion::{ arrow::{ array::{ @@ -28,9 +32,11 @@ use datafusion::{ prelude::{CsvReadOptions, SessionContext}, test_util, }; +use std::fs::File; +use std::io::Write; use std::sync::Arc; -use crate::utils; +use crate::{utils, TestContext}; #[cfg(feature = "avro")] pub async fn register_avro_tables(ctx: &mut crate::TestContext) { @@ -212,3 +218,131 @@ fn register_nan_table(ctx: &SessionContext) { .unwrap(); ctx.register_batch("test_float", data).unwrap(); } + +pub async fn register_timestamps_table(ctx: &SessionContext) { + let batch = make_timestamps(); + let schema = batch.schema(); + let partitions = vec![vec![batch]]; + + ctx.register_table( + "test_timestamps_table", + Arc::new(MemTable::try_new(schema, partitions).unwrap()), + ) + .unwrap(); +} + +/// Return record batch with all of the supported timestamp types +/// values +/// +/// Columns are named: +/// "nanos" --> TimestampNanosecondArray +/// "micros" --> TimestampMicrosecondArray +/// "millis" --> TimestampMillisecondArray +/// "secs" --> TimestampSecondArray +/// "names" --> StringArray +pub fn make_timestamps() -> RecordBatch { + let ts_strings = vec![ + Some("2018-11-13T17:11:10.011375885995"), + Some("2011-12-13T11:13:10.12345"), + None, + Some("2021-1-1T05:11:10.432"), + ]; + + let ts_nanos = ts_strings + .into_iter() + .map(|t| { + t.map(|t| { + t.parse::() + .unwrap() + .timestamp_nanos() + }) + }) + .collect::>(); + + let ts_micros = ts_nanos + .iter() + .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000)) + .collect::>(); + + let ts_millis = ts_nanos + .iter() + .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000)) + .collect::>(); + + let ts_secs = ts_nanos + .iter() + .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000000)) + .collect::>(); + + let names = ts_nanos + .iter() + .enumerate() + .map(|(i, _)| format!("Row {i}")) + .collect::>(); + + let arr_nanos = TimestampNanosecondArray::from(ts_nanos); + let arr_micros = TimestampMicrosecondArray::from(ts_micros); + let arr_millis = TimestampMillisecondArray::from(ts_millis); + let arr_secs = TimestampSecondArray::from(ts_secs); + + let names = names.iter().map(|s| s.as_str()).collect::>(); + let arr_names = StringArray::from(names); + + let schema = Schema::new(vec![ + Field::new("nanos", arr_nanos.data_type().clone(), true), + Field::new("micros", arr_micros.data_type().clone(), true), + Field::new("millis", arr_millis.data_type().clone(), true), + Field::new("secs", arr_secs.data_type().clone(), true), + Field::new("name", arr_names.data_type().clone(), true), + ]); + let schema = Arc::new(schema); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(arr_nanos), + Arc::new(arr_micros), + Arc::new(arr_millis), + Arc::new(arr_secs), + Arc::new(arr_names), + ], + ) + .unwrap() +} + +/// Generate a partitioned CSV file and register it with an execution context +pub async fn register_partition_table(test_ctx: &mut TestContext) { + test_ctx.enable_testdir(); + let partition_count = 1; + let file_extension = "csv"; + let schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::UInt32, false), + Field::new("c2", DataType::UInt64, false), + Field::new("c3", DataType::Boolean, false), + ])); + // generate a partitioned file + for partition in 0..partition_count { + let filename = format!("partition-{partition}.{file_extension}"); + let file_path = test_ctx.testdir_path().join(filename); + println!("{}", file_path.clone().display()); + + let mut file = File::create(file_path).unwrap(); + + // generate some data + for i in 0..=10 { + let data = format!("{},{},{}\n", partition, i, i % 2 == 0); + file.write_all(data.as_bytes()).unwrap() + } + } + + // register csv file with the execution context + test_ctx + .ctx + .register_csv( + "test_partition_table", + test_ctx.testdir_path().to_str().unwrap(), + CsvReadOptions::new().schema(&schema), + ) + .await + .unwrap(); +} diff --git a/datafusion/core/tests/sqllogictests/test_files/joins.slt b/datafusion/core/tests/sqllogictests/test_files/joins.slt index 8e8527fc3d5a..21f8e12e66ea 100644 --- a/datafusion/core/tests/sqllogictests/test_files/joins.slt +++ b/datafusion/core/tests/sqllogictests/test_files/joins.slt @@ -2337,3 +2337,45 @@ WHERE NOT EXISTS( statement ok set datafusion.explain.logical_plan_only = false; + + +# test timestamp join on nanos datatype +query PPPPTPPPPT rowsort +SELECT * FROM test_timestamps_table as t1 JOIN (SELECT * FROM test_timestamps_table ) as t2 ON t1.nanos = t2.nanos; +---- +2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123 2011-12-13T11:13:10 Row 1 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123 2011-12-13T11:13:10 Row 1 +2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 Row 0 2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 Row 0 +2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 Row 3 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 Row 3 + +# test timestamp join on micros datatype +query PPPPTPPPPT rowsort +SELECT * FROM test_timestamps_table as t1 JOIN (SELECT * FROM test_timestamps_table ) as t2 ON t1.micros = t2.micros +---- +2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123 2011-12-13T11:13:10 Row 1 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123 2011-12-13T11:13:10 Row 1 +2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 Row 0 2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 Row 0 +2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 Row 3 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 Row 3 + +# test timestamp join on millis datatype +query PPPPTPPPPT rowsort +SELECT * FROM test_timestamps_table as t1 JOIN (SELECT * FROM test_timestamps_table ) as t2 ON t1.millis = t2.millis +---- +2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123 2011-12-13T11:13:10 Row 1 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123450 2011-12-13T11:13:10.123 2011-12-13T11:13:10 Row 1 +2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 Row 0 2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 Row 0 +2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 Row 3 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 Row 3 + + + +# left_join_using_2 +query II +SELECT t1.c1, t2.c2 FROM test_partition_table t1 JOIN test_partition_table t2 USING (c2) ORDER BY t2.c2; +---- +0 1 +0 2 +0 3 +0 4 +0 5 +0 6 +0 7 +0 8 +0 9 +0 10 \ No newline at end of file From 3a3e6b56f6eda84a6e3ad9ac2e53cc00d396ce44 Mon Sep 17 00:00:00 2001 From: jiangzhx Date: Mon, 12 Jun 2023 14:39:23 +0800 Subject: [PATCH 2/3] move remiand joins testcase to sqllogicatest --- datafusion/core/tests/sql/joins.rs | 548 +---------------- datafusion/core/tests/sql/mod.rs | 288 --------- .../core/tests/sqllogictests/src/main.rs | 6 +- .../core/tests/sqllogictests/src/setup.rs | 190 +++++- .../tests/sqllogictests/test_files/joins.slt | 564 +++++++++++++++++- 5 files changed, 753 insertions(+), 843 deletions(-) diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index a6ad46aabb1a..57a7ad99084a 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -16,6 +16,7 @@ // under the License. use super::*; +use arrow::util::pretty::print_batches; #[tokio::test] #[ignore] @@ -37,66 +38,6 @@ async fn nestedjoin_with_alias() -> Result<()> { Ok(()) } -#[tokio::test] -async fn left_join_using_join_key_projection() -> Result<()> { - let results = execute_with_partition( - "SELECT t1.c1, t1.c2, t2.c2 FROM test t1 JOIN test t2 USING (c2) ORDER BY t2.c2", - 1, - ) - .await?; - assert_eq!(results.len(), 1); - - let expected = vec![ - "+----+----+----+", - "| c1 | c2 | c2 |", - "+----+----+----+", - "| 0 | 1 | 1 |", - "| 0 | 2 | 2 |", - "| 0 | 3 | 3 |", - "| 0 | 4 | 4 |", - "| 0 | 5 | 5 |", - "| 0 | 6 | 6 |", - "| 0 | 7 | 7 |", - "| 0 | 8 | 8 |", - "| 0 | 9 | 9 |", - "| 0 | 10 | 10 |", - "+----+----+----+", - ]; - - assert_batches_eq!(expected, &results); - Ok(()) -} - -#[tokio::test] -async fn left_join_2() -> Result<()> { - let results = execute_with_partition( - "SELECT t1.c1, t1.c2, t2.c2 FROM test t1 JOIN test t2 ON t1.c2 = t2.c2 ORDER BY t1.c2", - 1, - ) - .await?; - assert_eq!(results.len(), 1); - - let expected = vec![ - "+----+----+----+", - "| c1 | c2 | c2 |", - "+----+----+----+", - "| 0 | 1 | 1 |", - "| 0 | 2 | 2 |", - "| 0 | 3 | 3 |", - "| 0 | 4 | 4 |", - "| 0 | 5 | 5 |", - "| 0 | 6 | 6 |", - "| 0 | 7 | 7 |", - "| 0 | 8 | 8 |", - "| 0 | 9 | 9 |", - "| 0 | 10 | 10 |", - "+----+----+----+", - ]; - - assert_batches_eq!(expected, &results); - Ok(()) -} - #[tokio::test] async fn join_partitioned() -> Result<()> { // self join on partition id (workaround for duplicate column name) @@ -114,388 +55,6 @@ async fn join_partitioned() -> Result<()> { Ok(()) } -#[tokio::test] -async fn hash_join_with_date32() -> Result<()> { - let ctx = create_hashjoin_datatype_context()?; - - // inner join on hash supported data type (Date32) - let sql = "select * from t1 join t2 on t1.c1 = t2.c1"; - let msg = format!("Creating logical plan for '{sql}'"); - let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); - let plan = dataframe.into_optimized_plan()?; - let expected = vec![ - "Explain [plan_type:Utf8, plan:Utf8]", - " Inner Join: t1.c1 = t2.c1 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - ]; - let formatted = plan.display_indent_schema().to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - - let expected = vec![ - "+------------+---------------------+---------+-----+------------+---------------------+---------+-----+", - "| c1 | c2 | c3 | c4 | c1 | c2 | c3 | c4 |", - "+------------+---------------------+---------+-----+------------+---------------------+---------+-----+", - "| 1970-01-02 | 1970-01-02T00:00:00 | 1.23 | abc | 1970-01-02 | 1970-01-02T00:00:00 | -123.12 | abc |", - "| 1970-01-04 | | -123.12 | jkl | 1970-01-04 | | 789.00 | |", - "+------------+---------------------+---------+-----+------------+---------------------+---------+-----+", - ]; - - let results = execute_to_batches(&ctx, sql).await; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) -} - -#[tokio::test] -async fn hash_join_with_date64() -> Result<()> { - let ctx = create_hashjoin_datatype_context()?; - - // left join on hash supported data type (Date64) - let sql = "select * from t1 left join t2 on t1.c2 = t2.c2"; - let msg = format!("Creating logical plan for '{sql}'"); - let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); - let plan = dataframe.into_optimized_plan()?; - let expected = vec![ - "Explain [plan_type:Utf8, plan:Utf8]", - " Left Join: t1.c2 = t2.c2 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - ]; - let formatted = plan.display_indent_schema().to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - - let expected = vec![ - "+------------+---------------------+---------+-----+------------+---------------------+---------+--------+", - "| c1 | c2 | c3 | c4 | c1 | c2 | c3 | c4 |", - "+------------+---------------------+---------+-----+------------+---------------------+---------+--------+", - "| | 1970-01-04T00:00:00 | 789.00 | ghi | | 1970-01-04T00:00:00 | 0.00 | qwerty |", - "| 1970-01-02 | 1970-01-02T00:00:00 | 1.23 | abc | 1970-01-02 | 1970-01-02T00:00:00 | -123.12 | abc |", - "| 1970-01-03 | 1970-01-03T00:00:00 | 456.00 | def | | | | |", - "| 1970-01-04 | | -123.12 | jkl | | | | |", - "+------------+---------------------+---------+-----+------------+---------------------+---------+--------+", - ]; - - let results = execute_to_batches(&ctx, sql).await; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) -} - -#[tokio::test] -async fn hash_join_with_decimal() -> Result<()> { - let ctx = create_hashjoin_datatype_context()?; - - // right join on hash supported data type (Decimal) - let sql = "select * from t1 right join t2 on t1.c3 = t2.c3"; - let msg = format!("Creating logical plan for '{sql}'"); - let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); - let plan = dataframe.into_optimized_plan()?; - let expected = vec![ - "Explain [plan_type:Utf8, plan:Utf8]", - " Right Join: CAST(t1.c3 AS Decimal128(10, 2)) = t2.c3 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - ]; - let formatted = plan.display_indent_schema().to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - - let expected = vec![ - "+------------+---------------------+---------+-----+------------+---------------------+-----------+---------+", - "| c1 | c2 | c3 | c4 | c1 | c2 | c3 | c4 |", - "+------------+---------------------+---------+-----+------------+---------------------+-----------+---------+", - "| | | | | | | 100000.00 | abcdefg |", - "| | | | | | 1970-01-04T00:00:00 | 0.00 | qwerty |", - "| | 1970-01-04T00:00:00 | 789.00 | ghi | 1970-01-04 | | 789.00 | |", - "| 1970-01-04 | | -123.12 | jkl | 1970-01-02 | 1970-01-02T00:00:00 | -123.12 | abc |", - "+------------+---------------------+---------+-----+------------+---------------------+-----------+---------+", - ]; - - let results = execute_to_batches(&ctx, sql).await; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) -} - -#[tokio::test] -async fn hash_join_with_dictionary() -> Result<()> { - let ctx = create_hashjoin_datatype_context()?; - - // inner join on hash supported data type (Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))) - let sql = "select * from t1 join t2 on t1.c4 = t2.c4"; - let msg = format!("Creating logical plan for '{sql}'"); - let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg); - let plan = dataframe.into_optimized_plan()?; - let expected = vec![ - "Explain [plan_type:Utf8, plan:Utf8]", - " Inner Join: t1.c4 = t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - ]; - let formatted = plan.display_indent_schema().to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - - let expected = vec![ - "+------------+---------------------+------+-----+------------+---------------------+---------+-----+", - "| c1 | c2 | c3 | c4 | c1 | c2 | c3 | c4 |", - "+------------+---------------------+------+-----+------------+---------------------+---------+-----+", - "| 1970-01-02 | 1970-01-02T00:00:00 | 1.23 | abc | 1970-01-02 | 1970-01-02T00:00:00 | -123.12 | abc |", - "+------------+---------------------+------+-----+------------+---------------------+---------+-----+", - ]; - - let results = execute_to_batches(&ctx, sql).await; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) -} - -#[tokio::test] -async fn sort_merge_join_on_date32() -> Result<()> { - let ctx = create_sort_merge_join_datatype_context()?; - - // inner sort merge join on data type (Date32) - let sql = "select * from t1 join t2 on t1.c1 = t2.c1"; - let msg = format!("Creating logical plan for '{sql}'"); - let dataframe = ctx.sql(sql).await.expect(&msg); - let physical_plan = dataframe.create_physical_plan().await?; - let expected = vec![ - "SortMergeJoin: join_type=Inner, on=[(Column { name: \"c1\", index: 0 }, Column { name: \"c1\", index: 0 })]", - " SortExec: expr=[c1@0 ASC]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2", - " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[1]", - " SortExec: expr=[c1@0 ASC]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2", - " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[1]", - ]; - let formatted = displayable(physical_plan.as_ref()).indent().to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - - let expected = vec![ - "+------------+---------------------+---------+-----+------------+---------------------+---------+-----+", - "| c1 | c2 | c3 | c4 | c1 | c2 | c3 | c4 |", - "+------------+---------------------+---------+-----+------------+---------------------+---------+-----+", - "| 1970-01-02 | 1970-01-02T00:00:00 | 1.23 | abc | 1970-01-02 | 1970-01-02T00:00:00 | -123.12 | abc |", - "| 1970-01-04 | | -123.12 | jkl | 1970-01-04 | | 789.00 | |", - "+------------+---------------------+---------+-----+------------+---------------------+---------+-----+", - ]; - - let results = execute_to_batches(&ctx, sql).await; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) -} - -#[tokio::test] -async fn sort_merge_join_on_decimal() -> Result<()> { - let ctx = create_sort_merge_join_datatype_context()?; - - // right join on data type (Decimal) - let sql = "select * from t1 right join t2 on t1.c3 = t2.c3"; - let msg = format!("Creating logical plan for '{sql}'"); - let dataframe = ctx.sql(sql).await.expect(&msg); - let physical_plan = dataframe.create_physical_plan().await?; - let expected = vec![ - "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, c1@5 as c1, c2@6 as c2, c3@7 as c3, c4@8 as c4]", - " SortMergeJoin: join_type=Right, on=[(Column { name: \"CAST(t1.c3 AS Decimal128(10, 2))\", index: 4 }, Column { name: \"c3\", index: 2 })]", - " SortExec: expr=[CAST(t1.c3 AS Decimal128(10, 2))@4 ASC]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"CAST(t1.c3 AS Decimal128(10, 2))\", index: 4 }], 2), input_partitions=2", - " ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, CAST(c3@2 AS Decimal128(10, 2)) as CAST(t1.c3 AS Decimal128(10, 2))]", - " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[1]", - " SortExec: expr=[c3@2 ASC]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"c3\", index: 2 }], 2), input_partitions=2", - " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[1]", - ]; - let formatted = displayable(physical_plan.as_ref()).indent().to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - - let expected = vec![ - "+------------+---------------------+---------+-----+------------+---------------------+-----------+---------+", - "| c1 | c2 | c3 | c4 | c1 | c2 | c3 | c4 |", - "+------------+---------------------+---------+-----+------------+---------------------+-----------+---------+", - "| | | | | | | 100000.00 | abcdefg |", - "| | | | | | 1970-01-04T00:00:00 | 0.00 | qwerty |", - "| | 1970-01-04T00:00:00 | 789.00 | ghi | 1970-01-04 | | 789.00 | |", - "| 1970-01-04 | | -123.12 | jkl | 1970-01-02 | 1970-01-02T00:00:00 | -123.12 | abc |", - "+------------+---------------------+---------+-----+------------+---------------------+-----------+---------+", - ]; - - let results = execute_to_batches(&ctx, sql).await; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) -} - -#[tokio::test] -async fn left_semi_join() -> Result<()> { - let test_repartition_joins = vec![true, false]; - for repartition_joins in test_repartition_joins { - let ctx = create_left_semi_anti_join_context_with_null_ids( - "t1_id", - "t2_id", - repartition_joins, - ) - .unwrap(); - - let sql = "SELECT t1_id, t1_name FROM t1 WHERE t1_id IN (SELECT t2_id FROM t2) ORDER BY t1_id"; - let msg = format!("Creating logical plan for '{sql}'"); - let dataframe = ctx.sql(sql).await.expect(&msg); - let physical_plan = dataframe.create_physical_plan().await?; - let expected = if repartition_joins { - vec![ - "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]", - " SortExec: expr=[t1_id@0 ASC NULLS LAST]", - " CoalesceBatchesExec: target_batch_size=4096", - " HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2", - " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[1]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2", - " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[1]", - ] - } else { - vec![ - "SortExec: expr=[t1_id@0 ASC NULLS LAST]", - " CoalesceBatchesExec: target_batch_size=4096", - " HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]", - " MemoryExec: partitions=1, partition_sizes=[1]", - " MemoryExec: partitions=1, partition_sizes=[1]", - ] - }; - let formatted = displayable(physical_plan.as_ref()).indent().to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+-------+---------+", - "| t1_id | t1_name |", - "+-------+---------+", - "| 11 | a |", - "| 11 | a |", - "| 22 | b |", - "| 44 | d |", - "+-------+---------+", - ]; - assert_batches_eq!(expected, &actual); - - let sql = "SELECT t1_id, t1_name FROM t1 WHERE EXISTS (SELECT 1 FROM t2 WHERE t1_id = t2_id) ORDER BY t1_id"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+-------+---------+", - "| t1_id | t1_name |", - "+-------+---------+", - "| 11 | a |", - "| 11 | a |", - "| 22 | b |", - "| 44 | d |", - "+-------+---------+", - ]; - assert_batches_eq!(expected, &actual); - - let sql = "SELECT t1_id FROM t1 INTERSECT SELECT t2_id FROM t2 ORDER BY t1_id"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+-------+", - "| t1_id |", - "+-------+", - "| 11 |", - "| 22 |", - "| 44 |", - "| |", - "+-------+", - ]; - assert_batches_eq!(expected, &actual); - - let sql = "SELECT t1_id, t1_name FROM t1 LEFT SEMI JOIN t2 ON (t1_id = t2_id) ORDER BY t1_id"; - let msg = format!("Creating logical plan for '{sql}'"); - let dataframe = ctx.sql(sql).await.expect(&msg); - let physical_plan = dataframe.create_physical_plan().await?; - let expected = if repartition_joins { - vec![ - "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]", - " SortExec: expr=[t1_id@0 ASC NULLS LAST]", - " CoalesceBatchesExec: target_batch_size=4096", - " HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2", - " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[1]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2", - " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[1]", - ] - } else { - vec![ - "SortExec: expr=[t1_id@0 ASC NULLS LAST]", - " CoalesceBatchesExec: target_batch_size=4096", - " HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]", - " MemoryExec: partitions=1, partition_sizes=[1]", - " MemoryExec: partitions=1, partition_sizes=[1]", - ] - }; - let formatted = displayable(physical_plan.as_ref()).indent().to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+-------+---------+", - "| t1_id | t1_name |", - "+-------+---------+", - "| 11 | a |", - "| 11 | a |", - "| 22 | b |", - "| 44 | d |", - "+-------+---------+", - ]; - assert_batches_eq!(expected, &actual); - } - - Ok(()) -} - #[tokio::test] #[ignore = "Test ignored, will be enabled after fixing the NAAJ bug"] // https://github.com/apache/arrow-datafusion/issues/4211 @@ -517,108 +76,3 @@ async fn null_aware_left_anti_join() -> Result<()> { Ok(()) } - -#[tokio::test] -async fn right_semi_join() -> Result<()> { - let test_repartition_joins = vec![true, false]; - for repartition_joins in test_repartition_joins { - let ctx = create_right_semi_anti_join_context_with_null_ids( - "t1_id", - "t2_id", - repartition_joins, - ) - .unwrap(); - - let sql = "SELECT t1_id, t1_name, t1_int FROM t1 WHERE EXISTS (SELECT * FROM t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id"; - let msg = format!("Creating logical plan for '{sql}'"); - let dataframe = ctx.sql(sql).await.expect(&msg); - let physical_plan = dataframe.create_physical_plan().await?; - let expected = if repartition_joins { - vec![ - "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]", - " SortExec: expr=[t1_id@0 ASC NULLS LAST]", - " CoalesceBatchesExec: target_batch_size=4096", - " HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=t2_name@1 != t1_name@0", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2", - " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[1]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2", - " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[1]", - ] - } else { - vec![ - "SortExec: expr=[t1_id@0 ASC NULLS LAST]", - " CoalesceBatchesExec: target_batch_size=4096", - " HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=t2_name@1 != t1_name@0", - " MemoryExec: partitions=1, partition_sizes=[1]", - " MemoryExec: partitions=1, partition_sizes=[1]", - ] - }; - let formatted = displayable(physical_plan.as_ref()).indent().to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+-------+---------+--------+", - "| t1_id | t1_name | t1_int |", - "+-------+---------+--------+", - "| 11 | a | 1 |", - "+-------+---------+--------+", - ]; - assert_batches_eq!(expected, &actual); - - let sql = "SELECT t1_id, t1_name, t1_int FROM t2 RIGHT SEMI JOIN t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id"; - let msg = format!("Creating logical plan for '{sql}'"); - let dataframe = ctx.sql(sql).await.expect(&msg); - let physical_plan = dataframe.create_physical_plan().await?; - let expected = if repartition_joins { - vec![ - "SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]", - " SortExec: expr=[t1_id@0 ASC NULLS LAST]", - " CoalesceBatchesExec: target_batch_size=4096", - " HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=t2_name@0 != t1_name@1", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2", - " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[1]", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2", - " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[1]", - ] - } else { - vec![ - "SortExec: expr=[t1_id@0 ASC NULLS LAST]", - " CoalesceBatchesExec: target_batch_size=4096", - " HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: \"t2_id\", index: 0 }, Column { name: \"t1_id\", index: 0 })], filter=t2_name@0 != t1_name@1", - " MemoryExec: partitions=1, partition_sizes=[1]", - " MemoryExec: partitions=1, partition_sizes=[1]", - ] - }; - let formatted = displayable(physical_plan.as_ref()).indent().to_string(); - let actual: Vec<&str> = formatted.trim().lines().collect(); - assert_eq!( - expected, actual, - "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" - ); - - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+-------+---------+--------+", - "| t1_id | t1_name | t1_int |", - "+-------+---------+--------+", - "| 11 | a | 1 |", - "+-------+---------+--------+", - ]; - assert_batches_eq!(expected, &actual); - } - - Ok(()) -} diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 42de0d23a3dd..9b5598a51bb2 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -351,207 +351,6 @@ fn create_left_semi_anti_join_context_with_null_ids( Ok(ctx) } -fn create_right_semi_anti_join_context_with_null_ids( - column_left: &str, - column_right: &str, - repartition_joins: bool, -) -> Result { - let ctx = SessionContext::with_config( - SessionConfig::new() - .with_repartition_joins(repartition_joins) - .with_target_partitions(2) - .with_batch_size(4096), - ); - - let t1_schema = Arc::new(Schema::new(vec![ - Field::new(column_left, DataType::UInt32, true), - Field::new("t1_name", DataType::Utf8, true), - Field::new("t1_int", DataType::UInt32, true), - ])); - let t1_data = RecordBatch::try_new( - t1_schema, - vec![ - Arc::new(UInt32Array::from(vec![ - Some(11), - Some(22), - Some(33), - Some(44), - None, - ])), - Arc::new(StringArray::from(vec![ - Some("a"), - Some("b"), - Some("c"), - Some("d"), - Some("e"), - ])), - Arc::new(UInt32Array::from(vec![1, 2, 3, 4, 0])), - ], - )?; - ctx.register_batch("t1", t1_data)?; - - let t2_schema = Arc::new(Schema::new(vec![ - Field::new(column_right, DataType::UInt32, true), - Field::new("t2_name", DataType::Utf8, true), - ])); - // t2 data size is smaller than t1 - let t2_data = RecordBatch::try_new( - t2_schema, - vec![ - Arc::new(UInt32Array::from(vec![Some(11), Some(11), None])), - Arc::new(StringArray::from(vec![Some("a"), Some("x"), None])), - ], - )?; - ctx.register_batch("t2", t2_data)?; - - Ok(ctx) -} - -fn create_hashjoin_datatype_context() -> Result { - let ctx = SessionContext::new(); - - let t1_schema = Schema::new(vec![ - Field::new("c1", DataType::Date32, true), - Field::new("c2", DataType::Date64, true), - Field::new("c3", DataType::Decimal128(5, 2), true), - Field::new( - "c4", - DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), - true, - ), - ]); - let dict1: DictionaryArray = - vec!["abc", "def", "ghi", "jkl"].into_iter().collect(); - let t1_data = RecordBatch::try_new( - Arc::new(t1_schema), - vec![ - Arc::new(Date32Array::from(vec![Some(1), Some(2), None, Some(3)])), - Arc::new(Date64Array::from(vec![ - Some(86400000), - Some(172800000), - Some(259200000), - None, - ])), - Arc::new( - Decimal128Array::from_iter_values([123, 45600, 78900, -12312]) - .with_precision_and_scale(5, 2) - .unwrap(), - ), - Arc::new(dict1), - ], - )?; - ctx.register_batch("t1", t1_data)?; - - let t2_schema = Schema::new(vec![ - Field::new("c1", DataType::Date32, true), - Field::new("c2", DataType::Date64, true), - Field::new("c3", DataType::Decimal128(10, 2), true), - Field::new( - "c4", - DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), - true, - ), - ]); - let dict2: DictionaryArray = - vec!["abc", "abcdefg", "qwerty", ""].into_iter().collect(); - let t2_data = RecordBatch::try_new( - Arc::new(t2_schema), - vec![ - Arc::new(Date32Array::from(vec![Some(1), None, None, Some(3)])), - Arc::new(Date64Array::from(vec![ - Some(86400000), - None, - Some(259200000), - None, - ])), - Arc::new( - Decimal128Array::from_iter_values([-12312, 10000000, 0, 78900]) - .with_precision_and_scale(10, 2) - .unwrap(), - ), - Arc::new(dict2), - ], - )?; - ctx.register_batch("t2", t2_data)?; - - Ok(ctx) -} - -fn create_sort_merge_join_datatype_context() -> Result { - let mut config = ConfigOptions::new(); - config.optimizer.prefer_hash_join = false; - config.execution.target_partitions = 2; - config.execution.batch_size = 4096; - - let ctx = SessionContext::with_config(config.into()); - - let t1_schema = Schema::new(vec![ - Field::new("c1", DataType::Date32, true), - Field::new("c2", DataType::Date64, true), - Field::new("c3", DataType::Decimal128(5, 2), true), - Field::new( - "c4", - DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), - true, - ), - ]); - let dict1: DictionaryArray = - vec!["abc", "def", "ghi", "jkl"].into_iter().collect(); - let t1_data = RecordBatch::try_new( - Arc::new(t1_schema), - vec![ - Arc::new(Date32Array::from(vec![Some(1), Some(2), None, Some(3)])), - Arc::new(Date64Array::from(vec![ - Some(86400000), - Some(172800000), - Some(259200000), - None, - ])), - Arc::new( - Decimal128Array::from_iter_values([123, 45600, 78900, -12312]) - .with_precision_and_scale(5, 2) - .unwrap(), - ), - Arc::new(dict1), - ], - )?; - ctx.register_batch("t1", t1_data)?; - - let t2_schema = Schema::new(vec![ - Field::new("c1", DataType::Date32, true), - Field::new("c2", DataType::Date64, true), - Field::new("c3", DataType::Decimal128(10, 2), true), - Field::new( - "c4", - DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), - true, - ), - ]); - let dict2: DictionaryArray = - vec!["abc", "abcdefg", "qwerty", ""].into_iter().collect(); - let t2_data = RecordBatch::try_new( - Arc::new(t2_schema), - vec![ - Arc::new(Date32Array::from(vec![Some(1), None, None, Some(3)])), - Arc::new(Date64Array::from(vec![ - Some(86400000), - None, - Some(259200000), - None, - ])), - Arc::new( - Decimal128Array::from_iter_values([-12312, 10000000, 0, 78900]) - .with_precision_and_scale(10, 2) - .unwrap(), - ), - Arc::new(dict2), - ], - )?; - ctx.register_batch("t2", t2_data)?; - - Ok(ctx) -} - fn get_tpch_table_schema(table: &str) -> Schema { match table { "customer" => Schema::new(vec![ @@ -1106,93 +905,6 @@ fn normalize_vec_for_explain(v: Vec>) -> Vec> { .collect::>() } -/// Return a new table provider containing all of the supported timestamp types -pub fn table_with_timestamps() -> Arc { - let batch = make_timestamps(); - let schema = batch.schema(); - let partitions = vec![vec![batch]]; - Arc::new(MemTable::try_new(schema, partitions).unwrap()) -} - -/// Return record batch with all of the supported timestamp types -/// values -/// -/// Columns are named: -/// "nanos" --> TimestampNanosecondArray -/// "micros" --> TimestampMicrosecondArray -/// "millis" --> TimestampMillisecondArray -/// "secs" --> TimestampSecondArray -/// "names" --> StringArray -pub fn make_timestamps() -> RecordBatch { - let ts_strings = vec![ - Some("2018-11-13T17:11:10.011375885995"), - Some("2011-12-13T11:13:10.12345"), - None, - Some("2021-1-1T05:11:10.432"), - ]; - - let ts_nanos = ts_strings - .into_iter() - .map(|t| { - t.map(|t| { - t.parse::() - .unwrap() - .timestamp_nanos() - }) - }) - .collect::>(); - - let ts_micros = ts_nanos - .iter() - .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000)) - .collect::>(); - - let ts_millis = ts_nanos - .iter() - .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000)) - .collect::>(); - - let ts_secs = ts_nanos - .iter() - .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000000)) - .collect::>(); - - let names = ts_nanos - .iter() - .enumerate() - .map(|(i, _)| format!("Row {i}")) - .collect::>(); - - let arr_nanos = TimestampNanosecondArray::from(ts_nanos); - let arr_micros = TimestampMicrosecondArray::from(ts_micros); - let arr_millis = TimestampMillisecondArray::from(ts_millis); - let arr_secs = TimestampSecondArray::from(ts_secs); - - let names = names.iter().map(|s| s.as_str()).collect::>(); - let arr_names = StringArray::from(names); - - let schema = Schema::new(vec![ - Field::new("nanos", arr_nanos.data_type().clone(), true), - Field::new("micros", arr_micros.data_type().clone(), true), - Field::new("millis", arr_millis.data_type().clone(), true), - Field::new("secs", arr_secs.data_type().clone(), true), - Field::new("name", arr_names.data_type().clone(), true), - ]); - let schema = Arc::new(schema); - - RecordBatch::try_new( - schema, - vec![ - Arc::new(arr_nanos), - Arc::new(arr_micros), - Arc::new(arr_millis), - Arc::new(arr_secs), - Arc::new(arr_names), - ], - ) - .unwrap() -} - #[tokio::test] async fn nyc() -> Result<()> { // schema for nyxtaxi csv files diff --git a/datafusion/core/tests/sqllogictests/src/main.rs b/datafusion/core/tests/sqllogictests/src/main.rs index ee5a2017ca58..6f17010a39cf 100644 --- a/datafusion/core/tests/sqllogictests/src/main.rs +++ b/datafusion/core/tests/sqllogictests/src/main.rs @@ -286,9 +286,13 @@ async fn context_for_test_file(relative_path: &Path) -> Option { } } "joins.slt" => { - let mut test_ctx = test_ctx; info!("Registering timestamps tables"); setup::register_timestamps_table(test_ctx.session_ctx()).await; + setup::register_hashjoin_datatype_table(test_ctx.session_ctx()).await; + setup::register_left_semi_anti_join_table(test_ctx.session_ctx()).await; + setup::register_right_semi_anti_join_table(test_ctx.session_ctx()).await; + + let mut test_ctx = test_ctx; setup::register_partition_table(&mut test_ctx).await; return Some(test_ctx); } diff --git a/datafusion/core/tests/sqllogictests/src/setup.rs b/datafusion/core/tests/sqllogictests/src/setup.rs index c64dcc45130b..96090d676a4d 100644 --- a/datafusion/core/tests/sqllogictests/src/setup.rs +++ b/datafusion/core/tests/sqllogictests/src/setup.rs @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. +use arrow_array::types::Int32Type; use arrow_array::{ - Array, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, - TimestampNanosecondArray, TimestampSecondArray, + Array, Date32Array, Date64Array, Decimal128Array, DictionaryArray, StringArray, + TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, }; use datafusion::{ arrow::{ @@ -324,8 +326,6 @@ pub async fn register_partition_table(test_ctx: &mut TestContext) { for partition in 0..partition_count { let filename = format!("partition-{partition}.{file_extension}"); let file_path = test_ctx.testdir_path().join(filename); - println!("{}", file_path.clone().display()); - let mut file = File::create(file_path).unwrap(); // generate some data @@ -346,3 +346,185 @@ pub async fn register_partition_table(test_ctx: &mut TestContext) { .await .unwrap(); } + +pub async fn register_hashjoin_datatype_table(ctx: &SessionContext) { + let t1_schema = Schema::new(vec![ + Field::new("c1", DataType::Date32, true), + Field::new("c2", DataType::Date64, true), + Field::new("c3", DataType::Decimal128(5, 2), true), + Field::new( + "c4", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + true, + ), + ]); + let dict1: DictionaryArray = + vec!["abc", "def", "ghi", "jkl"].into_iter().collect(); + let t1_data = RecordBatch::try_new( + Arc::new(t1_schema), + vec![ + Arc::new(Date32Array::from(vec![Some(1), Some(2), None, Some(3)])), + Arc::new(Date64Array::from(vec![ + Some(86400000), + Some(172800000), + Some(259200000), + None, + ])), + Arc::new( + Decimal128Array::from_iter_values([123, 45600, 78900, -12312]) + .with_precision_and_scale(5, 2) + .unwrap(), + ), + Arc::new(dict1), + ], + ) + .unwrap(); + ctx.register_batch("hashjoin_datatype_table_t1", t1_data) + .unwrap(); + + let t2_schema = Schema::new(vec![ + Field::new("c1", DataType::Date32, true), + Field::new("c2", DataType::Date64, true), + Field::new("c3", DataType::Decimal128(10, 2), true), + Field::new( + "c4", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + true, + ), + ]); + let dict2: DictionaryArray = vec!["abc", "abcdefg", "qwerty", "qwe"] + .into_iter() + .collect(); + let t2_data = RecordBatch::try_new( + Arc::new(t2_schema), + vec![ + Arc::new(Date32Array::from(vec![Some(1), None, None, Some(3)])), + Arc::new(Date64Array::from(vec![ + Some(86400000), + None, + Some(259200000), + None, + ])), + Arc::new( + Decimal128Array::from_iter_values([-12312, 10000000, 0, 78900]) + .with_precision_and_scale(10, 2) + .unwrap(), + ), + Arc::new(dict2), + ], + ) + .unwrap(); + ctx.register_batch("hashjoin_datatype_table_t2", t2_data) + .unwrap(); +} + +pub async fn register_left_semi_anti_join_table(ctx: &SessionContext) { + let t1_schema = Arc::new(Schema::new(vec![ + Field::new("t1_id", DataType::UInt32, true), + Field::new("t1_name", DataType::Utf8, true), + Field::new("t1_int", DataType::UInt32, true), + ])); + let t1_data = RecordBatch::try_new( + t1_schema, + vec![ + Arc::new(UInt32Array::from(vec![ + Some(11), + Some(11), + Some(22), + Some(33), + Some(44), + None, + ])), + Arc::new(StringArray::from(vec![ + Some("a"), + Some("a"), + Some("b"), + Some("c"), + Some("d"), + Some("e"), + ])), + Arc::new(UInt32Array::from(vec![1, 1, 2, 3, 4, 0])), + ], + ) + .unwrap(); + ctx.register_batch("left_semi_anti_join_table_t1", t1_data) + .unwrap(); + + let t2_schema = Arc::new(Schema::new(vec![ + Field::new("t2_id", DataType::UInt32, true), + Field::new("t2_name", DataType::Utf8, true), + Field::new("t2_int", DataType::UInt32, true), + ])); + let t2_data = RecordBatch::try_new( + t2_schema, + vec![ + Arc::new(UInt32Array::from(vec![ + Some(11), + Some(11), + Some(22), + Some(44), + Some(55), + None, + ])), + Arc::new(StringArray::from(vec![ + Some("z"), + Some("z"), + Some("y"), + Some("x"), + Some("w"), + Some("v"), + ])), + Arc::new(UInt32Array::from(vec![3, 3, 1, 3, 3, 0])), + ], + ) + .unwrap(); + ctx.register_batch("left_semi_anti_join_table_t2", t2_data) + .unwrap(); +} + +pub async fn register_right_semi_anti_join_table(ctx: &SessionContext) { + let t1_schema = Arc::new(Schema::new(vec![ + Field::new("t1_id", DataType::UInt32, true), + Field::new("t1_name", DataType::Utf8, true), + Field::new("t1_int", DataType::UInt32, true), + ])); + let t1_data = RecordBatch::try_new( + t1_schema, + vec![ + Arc::new(UInt32Array::from(vec![ + Some(11), + Some(22), + Some(33), + Some(44), + None, + ])), + Arc::new(StringArray::from(vec![ + Some("a"), + Some("b"), + Some("c"), + Some("d"), + Some("e"), + ])), + Arc::new(UInt32Array::from(vec![1, 2, 3, 4, 0])), + ], + ) + .unwrap(); + ctx.register_batch("right_semi_anti_join_table_t1", t1_data) + .unwrap(); + + let t2_schema = Arc::new(Schema::new(vec![ + Field::new("t2_id", DataType::UInt32, true), + Field::new("t2_name", DataType::Utf8, true), + ])); + // t2 data size is smaller than t1 + let t2_data = RecordBatch::try_new( + t2_schema, + vec![ + Arc::new(UInt32Array::from(vec![Some(11), Some(11), None])), + Arc::new(StringArray::from(vec![Some("a"), Some("x"), None])), + ], + ) + .unwrap(); + ctx.register_batch("right_semi_anti_join_table_t2", t2_data) + .unwrap(); +} diff --git a/datafusion/core/tests/sqllogictests/test_files/joins.slt b/datafusion/core/tests/sqllogictests/test_files/joins.slt index 21f8e12e66ea..4486d7c47b49 100644 --- a/datafusion/core/tests/sqllogictests/test_files/joins.slt +++ b/datafusion/core/tests/sqllogictests/test_files/joins.slt @@ -2363,8 +2363,6 @@ SELECT * FROM test_timestamps_table as t1 JOIN (SELECT * FROM test_timestamps_ta 2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 Row 0 2018-11-13T17:11:10.011375885 2018-11-13T17:11:10.011375 2018-11-13T17:11:10.011 2018-11-13T17:11:10 Row 0 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 Row 3 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10.432 2021-01-01T05:11:10 Row 3 - - # left_join_using_2 query II SELECT t1.c1, t2.c2 FROM test_partition_table t1 JOIN test_partition_table t2 USING (c2) ORDER BY t2.c2; @@ -2378,4 +2376,564 @@ SELECT t1.c1, t2.c2 FROM test_partition_table t1 JOIN test_partition_table t2 US 0 7 0 8 0 9 -0 10 \ No newline at end of file +0 10 + +# left_join_using_join_key_projection +query III +SELECT t1.c1, t1.c2, t2.c2 FROM test_partition_table t1 JOIN test_partition_table t2 USING (c2) ORDER BY t2.c2 +---- +0 1 1 +0 2 2 +0 3 3 +0 4 4 +0 5 5 +0 6 6 +0 7 7 +0 8 8 +0 9 9 +0 10 10 + +# left_join_2 +query III +SELECT t1.c1, t1.c2, t2.c2 FROM test_partition_table t1 JOIN test_partition_table t2 ON t1.c2 = t2.c2 ORDER BY t1.c2 +---- +0 1 1 +0 2 2 +0 3 3 +0 4 4 +0 5 5 +0 6 6 +0 7 7 +0 8 8 +0 9 9 +0 10 10 + +#### +# Config setup +#### + +statement ok +set datafusion.explain.logical_plan_only = true + +# explain hash_join_with_date32 +query TT +explain select * from hashjoin_datatype_table_t1 t1 join hashjoin_datatype_table_t2 t2 on t1.c1 = t2.c1 +---- +logical_plan +Inner Join: t1.c1 = t2.c1 +--SubqueryAlias: t1 +----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4] +--SubqueryAlias: t2 +----TableScan: hashjoin_datatype_table_t2 projection=[c1, c2, c3, c4] + +# hash_join_with_date32 +query DDR?DDR? rowsort +select * from hashjoin_datatype_table_t1 t1 join hashjoin_datatype_table_t2 t2 on t1.c1 = t2.c1 +---- +1970-01-02 1970-01-02T00:00:00 1.23 abc 1970-01-02 1970-01-02T00:00:00 -123.12 abc +1970-01-04 NULL -123.12 jkl 1970-01-04 NULL 789 qwe + + +# explain hash_join_with_date64 +query TT +explain select * from hashjoin_datatype_table_t1 t1 left join hashjoin_datatype_table_t2 t2 on t1.c2 = t2.c2 +---- +logical_plan +Left Join: t1.c2 = t2.c2 +--SubqueryAlias: t1 +----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4] +--SubqueryAlias: t2 +----TableScan: hashjoin_datatype_table_t2 projection=[c1, c2, c3, c4] + +# hash_join_with_date64 +query DDR?DDR? rowsort +select * from hashjoin_datatype_table_t1 t1 left join hashjoin_datatype_table_t2 t2 on t1.c2 = t2.c2 +---- +1970-01-02 1970-01-02T00:00:00 1.23 abc 1970-01-02 1970-01-02T00:00:00 -123.12 abc +1970-01-03 1970-01-03T00:00:00 456 def NULL NULL NULL NULL +1970-01-04 NULL -123.12 jkl NULL NULL NULL NULL +NULL 1970-01-04T00:00:00 789 ghi NULL 1970-01-04T00:00:00 0 qwerty + + +# explain hash_join_with_decimal +query TT +explain select * from hashjoin_datatype_table_t1 t1 right join hashjoin_datatype_table_t1 t2 on t1.c3 = t2.c3 +---- +logical_plan +Right Join: t1.c3 = t2.c3 +--SubqueryAlias: t1 +----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4] +--SubqueryAlias: t2 +----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4] + +# hash_join_with_decimal +query DDR?DDR? rowsort +select * from hashjoin_datatype_table_t1 t1 right join hashjoin_datatype_table_t1 t2 on t1.c3 = t2.c3 +---- +1970-01-02 1970-01-02T00:00:00 1.23 abc 1970-01-02 1970-01-02T00:00:00 1.23 abc +1970-01-03 1970-01-03T00:00:00 456 def 1970-01-03 1970-01-03T00:00:00 456 def +1970-01-04 NULL -123.12 jkl 1970-01-04 NULL -123.12 jkl +NULL 1970-01-04T00:00:00 789 ghi NULL 1970-01-04T00:00:00 789 ghi + +# explain hash_join_with_dictionary +query TT +explain select * from hashjoin_datatype_table_t1 t1 join hashjoin_datatype_table_t1 t2 on t1.c4 = t2.c4 +---- +logical_plan +Inner Join: t1.c4 = t2.c4 +--SubqueryAlias: t1 +----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4] +--SubqueryAlias: t2 +----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4] + +# hash_join_with_dictionary +query DDR?DDR? rowsort +select * from hashjoin_datatype_table_t1 t1 join hashjoin_datatype_table_t2 t2 on t1.c4 = t2.c4 +---- +1970-01-02 1970-01-02T00:00:00 1.23 abc 1970-01-02 1970-01-02T00:00:00 -123.12 abc + +#### +# Config teardown +#### +statement ok +set datafusion.explain.logical_plan_only = false + + +#### +# Config setup +#### +statement ok +set datafusion.explain.logical_plan_only = false; + +statement ok +set datafusion.optimizer.prefer_hash_join = false; + +statement ok +set datafusion.execution.target_partitions = 2; + +statement ok +set datafusion.execution.batch_size = 4096; + +# explain sort_merge_join_on_date32 inner sort merge join on data type (Date32) +query TT +explain select * from hashjoin_datatype_table_t1 t1 join hashjoin_datatype_table_t2 t2 on t1.c1 = t2.c1 +---- +logical_plan +Inner Join: t1.c1 = t2.c1 +--SubqueryAlias: t1 +----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4] +--SubqueryAlias: t2 +----TableScan: hashjoin_datatype_table_t2 projection=[c1, c2, c3, c4] +physical_plan +SortMergeJoin: join_type=Inner, on=[(Column { name: "c1", index: 0 }, Column { name: "c1", index: 0 })] +--SortExec: expr=[c1@0 ASC] +----CoalesceBatchesExec: target_batch_size=4096 +------RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2 +--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +----------MemoryExec: partitions=1, partition_sizes=[1] +--SortExec: expr=[c1@0 ASC] +----CoalesceBatchesExec: target_batch_size=4096 +------RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 2), input_partitions=2 +--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +----------MemoryExec: partitions=1, partition_sizes=[1] + +# sort_merge_join_on_date32 inner sort merge join on data type (Date32) +query DDR?DDR? rowsort +select * from hashjoin_datatype_table_t1 t1 join hashjoin_datatype_table_t2 t2 on t1.c1 = t2.c1 +---- +1970-01-02 1970-01-02T00:00:00 1.23 abc 1970-01-02 1970-01-02T00:00:00 -123.12 abc +1970-01-04 NULL -123.12 jkl 1970-01-04 NULL 789 qwe + +# explain sort_merge_join_on_decimal right join on data type (Decimal) +query TT +explain select * from hashjoin_datatype_table_t1 t1 right join hashjoin_datatype_table_t2 t2 on t1.c3 = t2.c3 +---- +logical_plan +Right Join: CAST(t1.c3 AS Decimal128(10, 2)) = t2.c3 +--SubqueryAlias: t1 +----TableScan: hashjoin_datatype_table_t1 projection=[c1, c2, c3, c4] +--SubqueryAlias: t2 +----TableScan: hashjoin_datatype_table_t2 projection=[c1, c2, c3, c4] +physical_plan +ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, c1@5 as c1, c2@6 as c2, c3@7 as c3, c4@8 as c4] +--SortMergeJoin: join_type=Right, on=[(Column { name: "CAST(t1.c3 AS Decimal128(10, 2))", index: 4 }, Column { name: "c3", index: 2 })] +----SortExec: expr=[CAST(t1.c3 AS Decimal128(10, 2))@4 ASC] +------CoalesceBatchesExec: target_batch_size=4096 +--------RepartitionExec: partitioning=Hash([Column { name: "CAST(t1.c3 AS Decimal128(10, 2))", index: 4 }], 2), input_partitions=2 +----------ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, CAST(c3@2 AS Decimal128(10, 2)) as CAST(t1.c3 AS Decimal128(10, 2))] +------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +--------------MemoryExec: partitions=1, partition_sizes=[1] +----SortExec: expr=[c3@2 ASC] +------CoalesceBatchesExec: target_batch_size=4096 +--------RepartitionExec: partitioning=Hash([Column { name: "c3", index: 2 }], 2), input_partitions=2 +----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +------------MemoryExec: partitions=1, partition_sizes=[1] + +# sort_merge_join_on_decimal right join on data type (Decimal) +query DDR?DDR? rowsort +select * from hashjoin_datatype_table_t1 t1 right join hashjoin_datatype_table_t2 t2 on t1.c3 = t2.c3 +---- +1970-01-04 NULL -123.12 jkl 1970-01-02 1970-01-02T00:00:00 -123.12 abc +NULL 1970-01-04T00:00:00 789 ghi 1970-01-04 NULL 789 qwe +NULL NULL NULL NULL NULL 1970-01-04T00:00:00 0 qwerty +NULL NULL NULL NULL NULL NULL 100000 abcdefg + +#### +# Config teardown +#### +statement ok +set datafusion.explain.logical_plan_only = true; + +statement ok +set datafusion.optimizer.prefer_hash_join = true; + +statement ok +set datafusion.execution.target_partitions = 2; + +statement ok +set datafusion.execution.batch_size = 4096; + + + +#Test the left_semi_join scenarios where the current repartition_joins parameter is set to true . +#### +# Config setup +#### +statement ok +set datafusion.explain.logical_plan_only = false; + +statement ok +set datafusion.explain.physical_plan_only = true; + +statement ok +set datafusion.optimizer.repartition_joins = true; + +statement ok +set datafusion.execution.target_partitions = 2; + +statement ok +set datafusion.execution.batch_size = 4096; + +query TT +explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id +---- +physical_plan +SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] +--SortExec: expr=[t1_id@0 ASC NULLS LAST] +----CoalesceBatchesExec: target_batch_size=4096 +------HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: "t1_id", index: 0 }, Column { name: "t2_id", index: 0 })] +--------CoalesceBatchesExec: target_batch_size=4096 +----------RepartitionExec: partitioning=Hash([Column { name: "t1_id", index: 0 }], 2), input_partitions=2 +------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +--------------MemoryExec: partitions=1, partition_sizes=[1] +--------CoalesceBatchesExec: target_batch_size=4096 +----------RepartitionExec: partitioning=Hash([Column { name: "t2_id", index: 0 }], 2), input_partitions=2 +------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +--------------MemoryExec: partitions=1, partition_sizes=[1] + +query IT rowsort +SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id +---- +11 a +11 a +22 b +44 d + +query IT rowsort +SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT 1 FROM left_semi_anti_join_table_t2 t2 WHERE t1_id = t2_id) ORDER BY t1_id +---- +11 a +11 a +22 b +44 d + +query I rowsort +SELECT t1_id FROM left_semi_anti_join_table_t1 t1 INTERSECT SELECT t2_id FROM left_semi_anti_join_table_t2 t2 ORDER BY t1_id +---- +11 +22 +44 +NULL + +query TT +explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOIN left_semi_anti_join_table_t2 t2 ON (t1_id = t2_id) ORDER BY t1_id +---- +physical_plan +SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] +--SortExec: expr=[t1_id@0 ASC NULLS LAST] +----CoalesceBatchesExec: target_batch_size=4096 +------HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(Column { name: "t1_id", index: 0 }, Column { name: "t2_id", index: 0 })] +--------CoalesceBatchesExec: target_batch_size=4096 +----------RepartitionExec: partitioning=Hash([Column { name: "t1_id", index: 0 }], 2), input_partitions=2 +------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +--------------MemoryExec: partitions=1, partition_sizes=[1] +--------CoalesceBatchesExec: target_batch_size=4096 +----------RepartitionExec: partitioning=Hash([Column { name: "t2_id", index: 0 }], 2), input_partitions=2 +------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +--------------MemoryExec: partitions=1, partition_sizes=[1] + +query IT +SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOIN left_semi_anti_join_table_t2 t2 ON (t1_id = t2_id) ORDER BY t1_id +---- +11 a +11 a +22 b +44 d + +#### +# Config teardown +#### +statement ok +set datafusion.explain.logical_plan_only = true; + +statement ok +set datafusion.explain.physical_plan_only = false; + +statement ok +set datafusion.optimizer.repartition_joins = true; + +statement ok +set datafusion.execution.target_partitions = 2; + +statement ok +set datafusion.execution.batch_size = 4096; + +#Test the left_semi_join scenarios where the current repartition_joins parameter is set to false . +#### +# Config setup +#### +statement ok +set datafusion.explain.logical_plan_only = false; + +statement ok +set datafusion.explain.physical_plan_only = true; + +statement ok +set datafusion.optimizer.repartition_joins = false; + +statement ok +set datafusion.execution.target_partitions = 2; + +statement ok +set datafusion.execution.batch_size = 4096; + +query TT +explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id +---- +physical_plan +SortExec: expr=[t1_id@0 ASC NULLS LAST] +--CoalesceBatchesExec: target_batch_size=4096 +----HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: "t1_id", index: 0 }, Column { name: "t2_id", index: 0 })] +------MemoryExec: partitions=1, partition_sizes=[1] +------MemoryExec: partitions=1, partition_sizes=[1] + +query IT rowsort +SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id +---- +11 a +11 a +22 b +44 d + +query IT rowsort +SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT 1 FROM left_semi_anti_join_table_t2 t2 WHERE t1_id = t2_id) ORDER BY t1_id +---- +11 a +11 a +22 b +44 d + +query I rowsort +SELECT t1_id FROM left_semi_anti_join_table_t1 t1 INTERSECT SELECT t2_id FROM left_semi_anti_join_table_t2 t2 ORDER BY t1_id +---- +11 +22 +44 +NULL + +query TT +explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOIN left_semi_anti_join_table_t2 t2 ON (t1_id = t2_id) ORDER BY t1_id +---- +physical_plan +SortExec: expr=[t1_id@0 ASC NULLS LAST] +--CoalesceBatchesExec: target_batch_size=4096 +----HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: "t1_id", index: 0 }, Column { name: "t2_id", index: 0 })] +------MemoryExec: partitions=1, partition_sizes=[1] +------MemoryExec: partitions=1, partition_sizes=[1] + +query IT +SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOIN left_semi_anti_join_table_t2 t2 ON (t1_id = t2_id) ORDER BY t1_id +---- +11 a +11 a +22 b +44 d + +#### +# Config teardown +#### +statement ok +set datafusion.explain.logical_plan_only = true; + +statement ok +set datafusion.explain.physical_plan_only = false; + +statement ok +set datafusion.optimizer.repartition_joins = true; + +statement ok +set datafusion.execution.target_partitions = 2; + +statement ok +set datafusion.execution.batch_size = 4096; + + +#Test the right_semi_join scenarios where the current repartition_joins parameter is set to true . +#### +# Config setup +#### +statement ok +set datafusion.explain.logical_plan_only = false; + +statement ok +set datafusion.explain.physical_plan_only = true; + +statement ok +set datafusion.optimizer.repartition_joins = true; + +statement ok +set datafusion.execution.target_partitions = 2; + +statement ok +set datafusion.execution.batch_size = 4096; + +query TT +explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id +---- +physical_plan +SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] +--SortExec: expr=[t1_id@0 ASC NULLS LAST] +----CoalesceBatchesExec: target_batch_size=4096 +------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(Column { name: "t2_id", index: 0 }, Column { name: "t1_id", index: 0 })], filter=t2_name@1 != t1_name@0 +--------CoalesceBatchesExec: target_batch_size=4096 +----------RepartitionExec: partitioning=Hash([Column { name: "t2_id", index: 0 }], 2), input_partitions=2 +------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +--------------MemoryExec: partitions=1, partition_sizes=[1] +--------CoalesceBatchesExec: target_batch_size=4096 +----------RepartitionExec: partitioning=Hash([Column { name: "t1_id", index: 0 }], 2), input_partitions=2 +------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +--------------MemoryExec: partitions=1, partition_sizes=[1] + +query ITI rowsort +SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id +---- +11 a 1 + +query TT +explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGHT SEMI JOIN right_semi_anti_join_table_t1 t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id +---- +physical_plan +SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] +--SortExec: expr=[t1_id@0 ASC NULLS LAST] +----CoalesceBatchesExec: target_batch_size=4096 +------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(Column { name: "t2_id", index: 0 }, Column { name: "t1_id", index: 0 })], filter=t2_name@0 != t1_name@1 +--------CoalesceBatchesExec: target_batch_size=4096 +----------RepartitionExec: partitioning=Hash([Column { name: "t2_id", index: 0 }], 2), input_partitions=2 +------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +--------------MemoryExec: partitions=1, partition_sizes=[1] +--------CoalesceBatchesExec: target_batch_size=4096 +----------RepartitionExec: partitioning=Hash([Column { name: "t1_id", index: 0 }], 2), input_partitions=2 +------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +--------------MemoryExec: partitions=1, partition_sizes=[1] + +query ITI rowsort +SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGHT SEMI JOIN right_semi_anti_join_table_t1 t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id +---- +11 a 1 + +#### +# Config teardown +#### +statement ok +set datafusion.explain.logical_plan_only = true; + +statement ok +set datafusion.explain.physical_plan_only = false; + +statement ok +set datafusion.optimizer.repartition_joins = true; + +statement ok +set datafusion.execution.target_partitions = 2; + +statement ok +set datafusion.execution.batch_size = 4096; + + +#Test the right_semi_join scenarios where the current repartition_joins parameter is set to false . +#### +# Config setup +#### +statement ok +set datafusion.explain.logical_plan_only = false; + +statement ok +set datafusion.explain.physical_plan_only = true; + +statement ok +set datafusion.optimizer.repartition_joins = false; + +statement ok +set datafusion.execution.target_partitions = 2; + +statement ok +set datafusion.execution.batch_size = 4096; + +query TT +explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id +---- +physical_plan +SortExec: expr=[t1_id@0 ASC NULLS LAST] +--CoalesceBatchesExec: target_batch_size=4096 +----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: "t2_id", index: 0 }, Column { name: "t1_id", index: 0 })], filter=t2_name@1 != t1_name@0 +------MemoryExec: partitions=1, partition_sizes=[1] +------MemoryExec: partitions=1, partition_sizes=[1] + +query ITI rowsort +SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id +---- +11 a 1 + +query TT +explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGHT SEMI JOIN right_semi_anti_join_table_t1 t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id +---- +physical_plan +SortExec: expr=[t1_id@0 ASC NULLS LAST] +--CoalesceBatchesExec: target_batch_size=4096 +----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(Column { name: "t2_id", index: 0 }, Column { name: "t1_id", index: 0 })], filter=t2_name@0 != t1_name@1 +------MemoryExec: partitions=1, partition_sizes=[1] +------MemoryExec: partitions=1, partition_sizes=[1] + +query ITI rowsort +SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGHT SEMI JOIN right_semi_anti_join_table_t1 t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id +---- +11 a 1 + +#### +# Config teardown +#### +statement ok +set datafusion.explain.logical_plan_only = true; + +statement ok +set datafusion.explain.physical_plan_only = false; + +statement ok +set datafusion.optimizer.repartition_joins = true; + +statement ok +set datafusion.execution.target_partitions = 2; + +statement ok +set datafusion.execution.batch_size = 4096; + From 1307306e1a1f2f31f8bd9755318cde8e5c1ba360 Mon Sep 17 00:00:00 2001 From: jiangzhx Date: Mon, 12 Jun 2023 14:55:57 +0800 Subject: [PATCH 3/3] clippy fix --- datafusion/core/tests/sql/joins.rs | 1 - datafusion/core/tests/sql/mod.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index 57a7ad99084a..116a3c1a7978 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -16,7 +16,6 @@ // under the License. use super::*; -use arrow::util::pretty::print_batches; #[tokio::test] #[ignore] diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 9b5598a51bb2..db2e7cfcff2d 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -25,7 +25,6 @@ use arrow::{ use chrono::prelude::*; use chrono::Duration; -use datafusion::config::ConfigOptions; use datafusion::datasource::TableProvider; use datafusion::logical_expr::{Aggregate, LogicalPlan, TableScan}; use datafusion::physical_plan::metrics::MetricValue;