Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Temporal datatype support for interval arithmetic #5971

Merged
merged 67 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
1869363
first implementation and tests of timestamp subtraction
berkaysynnada Mar 7, 2023
2f01278
improvement after review
berkaysynnada Mar 7, 2023
806b4d3
postgre interval format option
berkaysynnada Mar 8, 2023
708d717
random tests extended
berkaysynnada Mar 8, 2023
c5bacbe
corrections after review
berkaysynnada Mar 8, 2023
011933f
operator check
berkaysynnada Mar 8, 2023
e475f58
flag is removed
berkaysynnada Mar 9, 2023
423fb65
clippy fix
berkaysynnada Mar 9, 2023
1291758
toml conflict
berkaysynnada Mar 9, 2023
055ed81
Merge branch 'main' into feature/time-interval-support
berkaysynnada Mar 9, 2023
d7f3696
minor changes
berkaysynnada Mar 9, 2023
8d5c8e3
deterministic matches
berkaysynnada Mar 11, 2023
31577d9
simplifications (clippy error)
berkaysynnada Mar 12, 2023
c274aef
test format changed
berkaysynnada Mar 13, 2023
968a682
minor test fix
berkaysynnada Mar 13, 2023
49506ed
Merge branch 'main' into feature/time-interval-support
berkaysynnada Mar 13, 2023
ed63779
Update scalar.rs
berkaysynnada Mar 13, 2023
68ea647
Refactoring and simplifications
ozankabak Mar 13, 2023
ed04466
Make ScalarValue support interval comparison
ozankabak Mar 14, 2023
3bf8fd6
naming tests
berkaysynnada Mar 14, 2023
0f8a7a7
macro renaming
berkaysynnada Mar 14, 2023
cf892fe
renaming macro
berkaysynnada Mar 14, 2023
6b5484e
Merge branch 'apache:main' into feature/timestamp-interval-arith-query
berkaysynnada Mar 15, 2023
a078dbb
ok till arrow kernel ops
berkaysynnada Mar 20, 2023
1c8fd69
Merge branch 'main' into feature/timestamp-interval-arith-query
berkaysynnada Mar 20, 2023
f27bdb7
Merge branch 'apache:main' into feature/timestamp-interval-arith-query
berkaysynnada Mar 20, 2023
49727e1
Merge branch 'apache:main' into feature/timestamp-interval-arith-query
berkaysynnada Mar 22, 2023
bbfd9b1
macro will replace matches inside evaluate
berkaysynnada Mar 22, 2023
e14a16f
Code refactor
metesynnada Mar 24, 2023
9f82bbb
retract changes in scalar and datetime
mustafasrepo Mar 24, 2023
25d76f3
ts op interval with chrono functions
berkaysynnada Mar 24, 2023
9de7875
bug fix and refactor
berkaysynnada Mar 26, 2023
d637efe
test refactor
berkaysynnada Mar 27, 2023
e2ee0ed
Enhance commenting
metesynnada Mar 27, 2023
3e03a54
new binary operation logic, handling the inside errors
metesynnada Mar 28, 2023
03d3aed
slt and minor changes
berkaysynnada Mar 28, 2023
20b276a
tz parsing excluded
berkaysynnada Mar 28, 2023
ef1c194
replace try_binary and as_datetime, and keep timezone for ts+interval op
berkaysynnada Mar 30, 2023
f1e78f2
Merge branch 'main' into feature/timestamp-interval-arith-query
berkaysynnada Mar 30, 2023
21e1df8
fix after merge
berkaysynnada Mar 30, 2023
b20eb77
delete unused functions
berkaysynnada Mar 30, 2023
653d8f8
ready to review
berkaysynnada Apr 3, 2023
c57eea2
Merge branch 'main' into feature/sym-join-temporal-columns
berkaysynnada Apr 3, 2023
8bd2454
correction after merge
berkaysynnada Apr 3, 2023
6486083
change match order
mustafasrepo Apr 4, 2023
476ab85
minor changes
mustafasrepo Apr 4, 2023
5aa05db
simplifications
mustafasrepo Apr 4, 2023
a3e0f95
update lock file
mustafasrepo Apr 4, 2023
9ef7c19
Refactoring tests
metesynnada Apr 4, 2023
7fd0c10
bug detected
berkaysynnada Apr 4, 2023
6b0da54
bug fixed
berkaysynnada Apr 5, 2023
8bee5d9
update cargo
berkaysynnada Apr 5, 2023
aa0f2b5
tests added
berkaysynnada Apr 5, 2023
7e1c072
Merge branch 'apache:main' into feature/sym-join-temporal-columns
berkaysynnada Apr 6, 2023
6f9b70f
minor changes after merge
berkaysynnada Apr 6, 2023
ceb6d79
Merge branch 'main' into feature/sym-join-temporal-columns
berkaysynnada Apr 7, 2023
85bd570
fix after merge
berkaysynnada Apr 7, 2023
f355477
code simplification
berkaysynnada Apr 7, 2023
a44eb23
Some simplifications
metesynnada Apr 7, 2023
9b623cd
Update min_max.rs
metesynnada Apr 7, 2023
3f92e56
arithmetics moved into macros
berkaysynnada Apr 10, 2023
ab433d0
Merge branch 'main' into feature/sym-join-temporal-columns
berkaysynnada Apr 10, 2023
98f4326
fix cargo.lock
berkaysynnada Apr 10, 2023
9ec794d
remove unwraps from tests
berkaysynnada Apr 11, 2023
ff56e17
Remove run-time string comparison from the interval min/max macro
ozankabak Apr 11, 2023
a176147
Merge branch 'main' into feature/sym-join-temporal-columns
berkaysynnada Apr 12, 2023
7ca4f0b
adapt upstream changes of timezone signature
berkaysynnada Apr 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 39 additions & 29 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1192,9 +1192,9 @@ pub fn seconds_add_array<const INTERVAL_MODE: i8>(

#[inline]
pub fn milliseconds_add(ts_ms: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
let secs = ts_ms / 1000;
let nsecs = ((ts_ms % 1000) * 1_000_000) as u32;
do_date_time_math(secs, nsecs, scalar, sign).map(|dt| dt.timestamp_millis())
let secs = ts_ms.div_euclid(1000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL div_euclid

let nsecs = ts_ms.rem_euclid(1000) * 1_000_000;
do_date_time_math(secs, nsecs as u32, scalar, sign).map(|dt| dt.timestamp_millis())
}

#[inline]
Expand All @@ -1203,21 +1203,18 @@ pub fn milliseconds_add_array<const INTERVAL_MODE: i8>(
interval: i128,
sign: i32,
) -> Result<i64> {
let mut secs = ts_ms / 1000;
let mut nsecs = ((ts_ms % 1000) * 1_000_000) as i32;
if nsecs < 0 {
secs -= 1;
nsecs += 1_000_000_000;
}
let secs = ts_ms.div_euclid(1000);
let nsecs = ts_ms.rem_euclid(1000) * 1_000_000;
do_date_time_math_array::<INTERVAL_MODE>(secs, nsecs as u32, interval, sign)
.map(|dt| dt.timestamp_millis())
}

#[inline]
pub fn microseconds_add(ts_us: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
let secs = ts_us / 1_000_000;
let nsecs = ((ts_us % 1_000_000) * 1000) as u32;
do_date_time_math(secs, nsecs, scalar, sign).map(|dt| dt.timestamp_nanos() / 1000)
let secs = ts_us.div_euclid(1_000_000);
let nsecs = ts_us.rem_euclid(1_000_000) * 1_000;
do_date_time_math(secs, nsecs as u32, scalar, sign)
.map(|dt| dt.timestamp_nanos() / 1000)
}

#[inline]
Expand All @@ -1226,21 +1223,17 @@ pub fn microseconds_add_array<const INTERVAL_MODE: i8>(
interval: i128,
sign: i32,
) -> Result<i64> {
let mut secs = ts_us / 1_000_000;
let mut nsecs = ((ts_us % 1_000_000) * 1000) as i32;
if nsecs < 0 {
secs -= 1;
nsecs += 1_000_000_000;
}
let secs = ts_us.div_euclid(1_000_000);
let nsecs = ts_us.rem_euclid(1_000_000) * 1_000;
do_date_time_math_array::<INTERVAL_MODE>(secs, nsecs as u32, interval, sign)
.map(|dt| dt.timestamp_nanos() / 1000)
}

#[inline]
pub fn nanoseconds_add(ts_ns: i64, scalar: &ScalarValue, sign: i32) -> Result<i64> {
let secs = ts_ns / 1_000_000_000;
let nsecs = (ts_ns % 1_000_000_000) as u32;
do_date_time_math(secs, nsecs, scalar, sign).map(|dt| dt.timestamp_nanos())
let secs = ts_ns.div_euclid(1_000_000_000);
let nsecs = ts_ns.rem_euclid(1_000_000_000);
do_date_time_math(secs, nsecs as u32, scalar, sign).map(|dt| dt.timestamp_nanos())
}

#[inline]
Expand All @@ -1249,12 +1242,8 @@ pub fn nanoseconds_add_array<const INTERVAL_MODE: i8>(
interval: i128,
sign: i32,
) -> Result<i64> {
let mut secs = ts_ns / 1_000_000_000;
let mut nsecs = (ts_ns % 1_000_000_000) as i32;
if nsecs < 0 {
secs -= 1;
nsecs += 1_000_000_000;
}
let secs = ts_ns.div_euclid(1_000_000_000);
let nsecs = ts_ns.rem_euclid(1_000_000_000);
do_date_time_math_array::<INTERVAL_MODE>(secs, nsecs as u32, interval, sign)
.map(|dt| dt.timestamp_nanos())
}
Expand Down Expand Up @@ -1297,7 +1286,7 @@ fn do_date_time_math(
) -> Result<NaiveDateTime> {
let prior = NaiveDateTime::from_timestamp_opt(secs, nsecs).ok_or_else(|| {
DataFusionError::Internal(format!(
"Could not conert to NaiveDateTime: secs {secs} nsecs {nsecs} scalar {scalar:?} sign {sign}"
"Could not convert to NaiveDateTime: secs {secs} nsecs {nsecs} scalar {scalar:?} sign {sign}"
))
})?;
do_date_math(prior, scalar, sign)
Expand All @@ -1312,7 +1301,7 @@ fn do_date_time_math_array<const INTERVAL_MODE: i8>(
) -> Result<NaiveDateTime> {
let prior = NaiveDateTime::from_timestamp_opt(secs, nsecs).ok_or_else(|| {
DataFusionError::Internal(format!(
"Could not conert to NaiveDateTime: secs {secs} nsecs {nsecs}"
"Could not convert to NaiveDateTime: secs {secs} nsecs {nsecs}"
))
})?;
do_date_math_array::<_, INTERVAL_MODE>(prior, interval, sign)
Expand Down Expand Up @@ -1768,6 +1757,27 @@ impl ScalarValue {
DataType::UInt64 => ScalarValue::UInt64(Some(0)),
DataType::Float32 => ScalarValue::Float32(Some(0.0)),
DataType::Float64 => ScalarValue::Float64(Some(0.0)),
DataType::Timestamp(TimeUnit::Second, tz) => {
ScalarValue::TimestampSecond(Some(0), tz.clone())
}
DataType::Timestamp(TimeUnit::Millisecond, tz) => {
ScalarValue::TimestampMillisecond(Some(0), tz.clone())
}
DataType::Timestamp(TimeUnit::Microsecond, tz) => {
ScalarValue::TimestampMicrosecond(Some(0), tz.clone())
}
DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
ScalarValue::TimestampNanosecond(Some(0), tz.clone())
}
DataType::Interval(IntervalUnit::YearMonth) => {
ScalarValue::IntervalYearMonth(Some(0))
}
DataType::Interval(IntervalUnit::DayTime) => {
ScalarValue::IntervalDayTime(Some(0))
}
DataType::Interval(IntervalUnit::MonthDayNano) => {
ScalarValue::IntervalMonthDayNano(Some(0))
}
_ => {
return Err(DataFusionError::NotImplemented(format!(
"Can't create a zero scalar from data_type \"{datatype:?}\""
Expand Down
222 changes: 216 additions & 6 deletions datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1496,17 +1496,19 @@ impl SymmetricHashJoinStream {
mod tests {
use std::fs::File;

use arrow::array::ArrayRef;
use arrow::array::{Int32Array, TimestampNanosecondArray};
use arrow::array::{ArrayRef, IntervalDayTimeArray};
use arrow::array::{Int32Array, TimestampMillisecondArray};
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit};
use arrow::util::pretty::pretty_format_batches;
use rstest::*;
use tempfile::TempDir;

use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{binary, col, Column};
use datafusion_physical_expr::intervals::test_utils::gen_conjunctive_numeric_expr;
use datafusion_physical_expr::intervals::test_utils::{
gen_conjunctive_numeric_expr, gen_conjunctive_temporal_expr,
};
use datafusion_physical_expr::PhysicalExpr;

use crate::physical_plan::joins::{
Expand Down Expand Up @@ -1731,6 +1733,44 @@ mod tests {
_ => unreachable!(),
}
}
fn join_expr_tests_fixture_temporal(
expr_id: usize,
left_col: Arc<dyn PhysicalExpr>,
right_col: Arc<dyn PhysicalExpr>,
schema: &Schema,
) -> Result<Arc<dyn PhysicalExpr>> {
match expr_id {
// constructs ((left_col - INTERVAL '100ms') > (right_col - INTERVAL '200ms')) AND ((left_col - INTERVAL '450ms') < (right_col - INTERVAL '300ms'))
0 => gen_conjunctive_temporal_expr(
left_col,
right_col,
Operator::Minus,
Operator::Minus,
Operator::Minus,
Operator::Minus,
ScalarValue::new_interval_dt(0, 100), // 100 ms
ScalarValue::new_interval_dt(0, 200), // 200 ms
ScalarValue::new_interval_dt(0, 450), // 450 ms
ScalarValue::new_interval_dt(0, 300), // 300 ms
schema,
),
// constructs ((left_col - TIMESTAMP '2023-01-01:12.00.03') > (right_col - TIMESTAMP '2023-01-01:12.00.01')) AND ((left_col - TIMESTAMP '2023-01-01:12.00.00') < (right_col - TIMESTAMP '2023-01-01:12.00.02'))
1 => gen_conjunctive_temporal_expr(
left_col,
right_col,
Operator::Minus,
Operator::Minus,
Operator::Minus,
Operator::Minus,
ScalarValue::TimestampMillisecond(Some(1672574403000), None), // 2023-01-01:12.00.03
ScalarValue::TimestampMillisecond(Some(1672574401000), None), // 2023-01-01:12.00.01
ScalarValue::TimestampMillisecond(Some(1672574400000), None), // 2023-01-01:12.00.00
ScalarValue::TimestampMillisecond(Some(1672574402000), None), // 2023-01-01:12.00.02
schema,
),
_ => unreachable!(),
}
}
fn build_sides_record_batches(
table_size: i32,
key_cardinality: (i32, i32),
Expand Down Expand Up @@ -1775,9 +1815,15 @@ mod tests {
.collect::<Vec<Option<i32>>>()
}));

let time = Arc::new(TimestampNanosecondArray::from(
let time = Arc::new(TimestampMillisecondArray::from(
initial_range
.clone()
.map(|x| x as i64 + 1672531200000) // x + 2023-01-01:00.00.00
.collect::<Vec<i64>>(),
));
let interval_time: ArrayRef = Arc::new(IntervalDayTimeArray::from(
initial_range
.map(|x| 1664264591000000000 + (5000000000 * (x as i64)))
.map(|x| x as i64 * 100) // x * 100ms
.collect::<Vec<i64>>(),
));

Expand All @@ -1791,6 +1837,7 @@ mod tests {
("l_asc_null_first", ordered_asc_null_first.clone()),
("l_asc_null_last", ordered_asc_null_last.clone()),
("l_desc_null_first", ordered_desc_null_first.clone()),
("li1", interval_time.clone()),
])?;
let right = RecordBatch::try_from_iter(vec![
("ra1", ordered.clone()),
Expand All @@ -1802,6 +1849,7 @@ mod tests {
("r_asc_null_first", ordered_asc_null_first),
("r_asc_null_last", ordered_asc_null_last),
("r_desc_null_first", ordered_desc_null_first),
("ri1", interval_time),
])?;
Ok((left, right))
}
Expand Down Expand Up @@ -2723,4 +2771,166 @@ mod tests {
assert_eq!(left_side_joiner.visited_rows.is_empty(), should_be_empty);
Ok(())
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn testing_with_temporal_columns(
#[values(
JoinType::Inner,
JoinType::Left,
JoinType::Right,
JoinType::RightSemi,
JoinType::LeftSemi,
JoinType::LeftAnti,
JoinType::RightAnti,
JoinType::Full
)]
join_type: JoinType,
#[values(
(4, 5),
(99, 12),
)]
cardinality: (i32, i32),
#[values(0, 1)] case_expr: usize,
) -> Result<()> {
let config = SessionConfig::new().with_repartition_joins(false);
let session_ctx = SessionContext::with_config(config);
let task_ctx = session_ctx.task_ctx();
let (left_batch, right_batch) =
build_sides_record_batches(TABLE_SIZE, cardinality)?;
let left_schema = &left_batch.schema();
let right_schema = &right_batch.schema();
let on = vec![(
Column::new_with_schema("lc1", left_schema)?,
Column::new_with_schema("rc1", right_schema)?,
)];
let left_sorted = vec![PhysicalSortExpr {
expr: col("lt1", left_schema)?,
options: SortOptions {
descending: false,
nulls_first: true,
},
}];
let right_sorted = vec![PhysicalSortExpr {
expr: col("rt1", right_schema)?,
options: SortOptions {
descending: false,
nulls_first: true,
},
}];
let (left, right) = create_memory_table(
left_batch,
right_batch,
Some(left_sorted),
Some(right_sorted),
13,
)?;
let intermediate_schema = Schema::new(vec![
Field::new(
"left",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new(
"right",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
]);
let filter_expr = join_expr_tests_fixture_temporal(
case_expr,
col("left", &intermediate_schema)?,
col("right", &intermediate_schema)?,
&intermediate_schema,
)?;
let column_indices = vec![
ColumnIndex {
index: 3,
side: JoinSide::Left,
},
ColumnIndex {
index: 3,
side: JoinSide::Right,
},
];
let filter = JoinFilter::new(filter_expr, column_indices, intermediate_schema);
experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
Ok(())
}
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_with_interval_columns(
#[values(
JoinType::Inner,
JoinType::Left,
JoinType::Right,
JoinType::RightSemi,
JoinType::LeftSemi,
JoinType::LeftAnti,
JoinType::RightAnti,
JoinType::Full
)]
join_type: JoinType,
#[values(
(4, 5),
(99, 12),
)]
cardinality: (i32, i32),
) -> Result<()> {
let config = SessionConfig::new().with_repartition_joins(false);
let session_ctx = SessionContext::with_config(config);
let task_ctx = session_ctx.task_ctx();
let (left_batch, right_batch) =
build_sides_record_batches(TABLE_SIZE, cardinality)?;
let left_schema = &left_batch.schema();
let right_schema = &right_batch.schema();
let on = vec![(
Column::new_with_schema("lc1", left_schema)?,
Column::new_with_schema("rc1", right_schema)?,
)];
let left_sorted = vec![PhysicalSortExpr {
expr: col("li1", left_schema)?,
options: SortOptions {
descending: false,
nulls_first: true,
},
}];
let right_sorted = vec![PhysicalSortExpr {
expr: col("ri1", right_schema)?,
options: SortOptions {
descending: false,
nulls_first: true,
},
}];
let (left, right) = create_memory_table(
left_batch,
right_batch,
Some(left_sorted),
Some(right_sorted),
13,
)?;
let intermediate_schema = Schema::new(vec![
Field::new("left", DataType::Interval(IntervalUnit::DayTime), false),
Field::new("right", DataType::Interval(IntervalUnit::DayTime), false),
]);
let filter_expr = join_expr_tests_fixture_temporal(
0,
col("left", &intermediate_schema)?,
col("right", &intermediate_schema)?,
&intermediate_schema,
)?;
let column_indices = vec![
ColumnIndex {
index: 9,
side: JoinSide::Left,
},
ColumnIndex {
index: 9,
side: JoinSide::Right,
},
];
let filter = JoinFilter::new(filter_expr, column_indices, intermediate_schema);
experiment(left, right, Some(filter), join_type, on, task_ctx).await?;

Ok(())
}
}
Loading