Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

preserve array type / timezone in date_bin and date_trunc functions #7729

Merged
merged 8 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,13 +618,20 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ConcatWithSeparator => Ok(Utf8),
BuiltinScalarFunction::DatePart => Ok(Float64),
BuiltinScalarFunction::DateBin | BuiltinScalarFunction::DateTrunc => {
match input_expr_types[1] {
Timestamp(Nanosecond, _) | Utf8 | Null => {
match &input_expr_types[1] {
Timestamp(Nanosecond, None) | Utf8 | Null => {
Ok(Timestamp(Nanosecond, None))
}
Timestamp(Microsecond, _) => Ok(Timestamp(Microsecond, None)),
Timestamp(Millisecond, _) => Ok(Timestamp(Millisecond, None)),
Timestamp(Second, _) => Ok(Timestamp(Second, None)),
Timestamp(Nanosecond, tz_opt) => {
Ok(Timestamp(Nanosecond, tz_opt.clone()))
}
Timestamp(Microsecond, tz_opt) => {
Ok(Timestamp(Microsecond, tz_opt.clone()))
}
Timestamp(Millisecond, tz_opt) => {
Ok(Timestamp(Millisecond, tz_opt.clone()))
}
Timestamp(Second, tz_opt) => Ok(Timestamp(Second, tz_opt.clone())),
_ => plan_err!(
"The {self} function can only accept timestamp as the second arg."
),
Expand Down
299 changes: 287 additions & 12 deletions datafusion/physical-expr/src/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,8 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
granularity.as_str(),
)
})
.collect::<Result<TimestampSecondArray>>()?;
.collect::<Result<TimestampSecondArray>>()?
.with_timezone_opt(tz_opt.clone());
ColumnarValue::Array(Arc::new(array))
}
DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => {
Expand All @@ -449,7 +450,8 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
granularity.as_str(),
)
})
.collect::<Result<TimestampMillisecondArray>>()?;
.collect::<Result<TimestampMillisecondArray>>()?
.with_timezone_opt(tz_opt.clone());
ColumnarValue::Array(Arc::new(array))
}
DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => {
Expand All @@ -465,7 +467,25 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
granularity.as_str(),
)
})
.collect::<Result<TimestampMicrosecondArray>>()?;
.collect::<Result<TimestampMicrosecondArray>>()?
.with_timezone_opt(tz_opt.clone());
ColumnarValue::Array(Arc::new(array))
}
DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => {
let parsed_tz = parse_tz(tz_opt)?;
let array = as_timestamp_nanosecond_array(array)?;
let array = array
.iter()
.map(|x| {
_date_trunc(
TimeUnit::Nanosecond,
&x,
parsed_tz,
granularity.as_str(),
)
})
.collect::<Result<TimestampNanosecondArray>>()?
.with_timezone_opt(tz_opt.clone());
ColumnarValue::Array(Arc::new(array))
}
_ => {
Expand Down Expand Up @@ -713,35 +733,39 @@ fn date_bin_impl(
))
}
ColumnarValue::Array(array) => match array.data_type() {
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => {
let array = as_timestamp_nanosecond_array(array)?
.iter()
.map(f_nanos)
.collect::<TimestampNanosecondArray>();
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());

ColumnarValue::Array(Arc::new(array))
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => {
let array = as_timestamp_microsecond_array(array)?
.iter()
.map(f_micros)
.collect::<TimestampMicrosecondArray>();
.collect::<TimestampMicrosecondArray>()
.with_timezone_opt(tz_opt.clone());

ColumnarValue::Array(Arc::new(array))
}
DataType::Timestamp(TimeUnit::Millisecond, _) => {
DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => {
let array = as_timestamp_millisecond_array(array)?
.iter()
.map(f_millis)
.collect::<TimestampMillisecondArray>();
.collect::<TimestampMillisecondArray>()
.with_timezone_opt(tz_opt.clone());

ColumnarValue::Array(Arc::new(array))
}
DataType::Timestamp(TimeUnit::Second, _) => {
DataType::Timestamp(TimeUnit::Second, tz_opt) => {
let array = as_timestamp_second_array(array)?
.iter()
.map(f_secs)
.collect::<TimestampSecondArray>();
.collect::<TimestampSecondArray>()
.with_timezone_opt(tz_opt.clone());

ColumnarValue::Array(Arc::new(array))
}
Expand Down Expand Up @@ -925,7 +949,9 @@ where
mod tests {
use std::sync::Arc;

use arrow::array::{ArrayRef, Int64Array, IntervalDayTimeArray, StringBuilder};
use arrow::array::{
as_primitive_array, ArrayRef, Int64Array, IntervalDayTimeArray, StringBuilder,
};

use super::*;

Expand Down Expand Up @@ -1051,6 +1077,125 @@ mod tests {
});
}

#[test]
fn test_date_trunc_timezones() {
let cases = vec![
(
vec![
"2020-09-08T00:00:00Z",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the usecase is preseving timezones, it may make sense to also check with timezones other than Z (like maybe +5 or +8 🤔

Copy link
Contributor Author

@mhilton mhilton Oct 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the test I had for the "-02" zone was sufficient. The way times are stored is always as a UTC offset, the zone information really just indicates how the offset needs to be interpretted. These strings are only used to derive the offset and therefore the specified zone is ignored when creating the array. However, more tests don't hurt so I've added the ones you suggested for clarity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks -- sorry for my confusion

"2020-09-08T01:00:00Z",
"2020-09-08T02:00:00Z",
"2020-09-08T03:00:00Z",
"2020-09-08T04:00:00Z",
],
Some("+00".into()),
vec![
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
],
),
(
vec![
"2020-09-08T00:00:00Z",
"2020-09-08T01:00:00Z",
"2020-09-08T02:00:00Z",
"2020-09-08T03:00:00Z",
"2020-09-08T04:00:00Z",
],
None,
vec![
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
],
),
(
vec![
"2020-09-08T00:00:00Z",
"2020-09-08T01:00:00Z",
"2020-09-08T02:00:00Z",
"2020-09-08T03:00:00Z",
"2020-09-08T04:00:00Z",
],
Some("-02".into()),
vec![
"2020-09-07T02:00:00Z",
"2020-09-07T02:00:00Z",
"2020-09-08T02:00:00Z",
"2020-09-08T02:00:00Z",
"2020-09-08T02:00:00Z",
],
),
(
vec![
"2020-09-08T00:00:00+05",
"2020-09-08T01:00:00+05",
"2020-09-08T02:00:00+05",
"2020-09-08T03:00:00+05",
"2020-09-08T04:00:00+05",
],
Some("+05".into()),
vec![
"2020-09-08T00:00:00+05",
"2020-09-08T00:00:00+05",
"2020-09-08T00:00:00+05",
"2020-09-08T00:00:00+05",
"2020-09-08T00:00:00+05",
],
),
(
vec![
"2020-09-08T00:00:00+08",
"2020-09-08T01:00:00+08",
"2020-09-08T02:00:00+08",
"2020-09-08T03:00:00+08",
"2020-09-08T04:00:00+08",
],
Some("+08".into()),
vec![
"2020-09-08T00:00:00+08",
"2020-09-08T00:00:00+08",
"2020-09-08T00:00:00+08",
"2020-09-08T00:00:00+08",
"2020-09-08T00:00:00+08",
],
),
];

cases.iter().for_each(|(original, tz_opt, expected)| {
let input = original
.iter()
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
let right = expected
.iter()
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
let result = date_trunc(&[
ColumnarValue::Scalar(ScalarValue::Utf8(Some("day".to_string()))),
ColumnarValue::Array(Arc::new(input)),
])
.unwrap();
if let ColumnarValue::Array(result) = result {
assert_eq!(
result.data_type(),
&DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
);
let left = as_primitive_array::<TimestampNanosecondType>(&result);
assert_eq!(left, &right);
} else {
panic!("unexpected column type");
}
});
}

#[test]
fn test_date_bin_single() {
use chrono::Duration;
Expand Down Expand Up @@ -1252,6 +1397,136 @@ mod tests {
);
}

#[test]
fn test_date_bin_timezones() {
let cases = vec![
(
vec![
"2020-09-08T00:00:00Z",
"2020-09-08T01:00:00Z",
"2020-09-08T02:00:00Z",
"2020-09-08T03:00:00Z",
"2020-09-08T04:00:00Z",
],
Some("+00".into()),
"1970-01-01T00:00:00Z",
vec![
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
],
),
(
vec![
"2020-09-08T00:00:00Z",
"2020-09-08T01:00:00Z",
"2020-09-08T02:00:00Z",
"2020-09-08T03:00:00Z",
"2020-09-08T04:00:00Z",
],
None,
"1970-01-01T00:00:00Z",
vec![
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
],
),
(
vec![
"2020-09-08T00:00:00Z",
"2020-09-08T01:00:00Z",
"2020-09-08T02:00:00Z",
"2020-09-08T03:00:00Z",
"2020-09-08T04:00:00Z",
],
Some("-02".into()),
"1970-01-01T00:00:00Z",
vec![
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
],
),
(
vec![
"2020-09-08T00:00:00+05",
"2020-09-08T01:00:00+05",
"2020-09-08T02:00:00+05",
"2020-09-08T03:00:00+05",
"2020-09-08T04:00:00+05",
],
Some("+05".into()),
"1970-01-01T00:00:00+05",
vec![
"2020-09-08T00:00:00+05",
"2020-09-08T00:00:00+05",
"2020-09-08T00:00:00+05",
"2020-09-08T00:00:00+05",
"2020-09-08T00:00:00+05",
],
),
(
vec![
"2020-09-08T00:00:00+08",
"2020-09-08T01:00:00+08",
"2020-09-08T02:00:00+08",
"2020-09-08T03:00:00+08",
"2020-09-08T04:00:00+08",
],
Some("+08".into()),
"1970-01-01T00:00:00+08",
vec![
"2020-09-08T00:00:00+08",
"2020-09-08T00:00:00+08",
"2020-09-08T00:00:00+08",
"2020-09-08T00:00:00+08",
"2020-09-08T00:00:00+08",
],
),
];

cases
.iter()
.for_each(|(original, tz_opt, origin, expected)| {
let input = original
.iter()
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
let right = expected
.iter()
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
let result = date_bin(&[
ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
ColumnarValue::Array(Arc::new(input)),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
Some(string_to_timestamp_nanos(origin).unwrap()),
tz_opt.clone(),
)),
])
.unwrap();
if let ColumnarValue::Array(result) = result {
assert_eq!(
result.data_type(),
&DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
);
let left = as_primitive_array::<TimestampNanosecondType>(&result);
assert_eq!(left, &right);
} else {
panic!("unexpected column type");
}
});
}

#[test]
fn to_timestamp_invalid_input_type() -> Result<()> {
// pass the wrong type of input array to to_timestamp and test
Expand Down
Loading