diff --git a/datafusion/functions/src/datetime/date_bin.rs b/datafusion/functions/src/datetime/date_bin.rs index e777e5ea95d0..516e0cfd31b8 100644 --- a/datafusion/functions/src/datetime/date_bin.rs +++ b/datafusion/functions/src/datetime/date_bin.rs @@ -16,9 +16,11 @@ // under the License. use std::any::Any; +use std::ops::Sub; use std::sync::Arc; use arrow::array::temporal_conversions::NANOSECONDS; +use arrow::array::timezone::Tz; use arrow::array::types::{ ArrowTimestampType, IntervalDayTimeType, IntervalMonthDayNanoType, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, @@ -38,7 +40,7 @@ use datafusion_expr::{ ColumnarValue, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD, }; -use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc}; +use chrono::{DateTime, Datelike, Duration, Months, Offset, TimeDelta, TimeZone, Utc}; #[derive(Debug)] pub struct DateBinFunc { @@ -341,6 +343,7 @@ fn date_bin_impl( Millisecond => NANOSECONDS / 1_000, Second => NANOSECONDS, }; + move |x: i64| stride_fn(stride, x * scale, origin) / scale } @@ -348,32 +351,59 @@ fn date_bin_impl( ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => { let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); + + // TODO chunchun: maybe simplify this part + let date_binned_ts = v.map(apply_stride_fn); + let adjusted_ts = match (date_binned_ts, tz_opt) { + (Some(ts), Some(tz)) => Some(adjust_to_local_time(ts, tz)), + (_, _) => date_binned_ts, + }; + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( - v.map(apply_stride_fn), + adjusted_ts, tz_opt.clone(), )) } ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => { let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); + + let date_binned_ts = v.map(apply_stride_fn); + let adjusted_ts = match (date_binned_ts, tz_opt) { + (Some(ts), Some(tz)) => Some(adjust_to_local_time(ts, tz)), + (_, _) => date_binned_ts, + }; + ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( - v.map(apply_stride_fn), + adjusted_ts, tz_opt.clone(), )) } ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => { let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); + let date_binned_ts = v.map(apply_stride_fn); + let adjusted_ts = match (date_binned_ts, tz_opt) { + (Some(ts), Some(tz)) => Some(adjust_to_local_time(ts, tz)), + (_, _) => date_binned_ts, + }; ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( - v.map(apply_stride_fn), + adjusted_ts, tz_opt.clone(), )) } ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => { let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); + + let date_binned_ts = v.map(apply_stride_fn); + let adjusted_ts = match (date_binned_ts, tz_opt) { + (Some(ts), Some(tz)) => Some(adjust_to_local_time(ts, tz)), + (_, _) => date_binned_ts, + }; + ColumnarValue::Scalar(ScalarValue::TimestampSecond( - v.map(apply_stride_fn), + adjusted_ts, tz_opt.clone(), )) } @@ -392,7 +422,13 @@ fn date_bin_impl( let array = as_primitive_array::(array)?; let apply_stride_fn = stride_map_fn::(origin, stride, stride_fn); let array: PrimitiveArray = array - .unary(apply_stride_fn) + .unary(|ts| { + let date_binned_ts = apply_stride_fn(ts); + match tz_opt { + Some(tz) => adjust_to_local_time(date_binned_ts, tz), + None => date_binned_ts, + } + }) .with_timezone_opt(tz_opt.clone()); Ok(ColumnarValue::Array(Arc::new(array))) @@ -435,6 +471,23 @@ fn date_bin_impl( }) } +// TODO chunchun: add description +fn adjust_to_local_time(ts: i64, timezone: &str) -> i64 { + let tz: Tz = timezone.parse().unwrap(); + + let date_time = DateTime::from_timestamp_nanos(ts).naive_utc(); + + let offset_seconds: i64 = tz + .offset_from_utc_datetime(&date_time) + .fix() + .local_minus_utc() as i64; + + let adjusted_date_time = + date_time.sub(TimeDelta::try_seconds(offset_seconds).unwrap()); + + adjusted_date_time.and_utc().timestamp_nanos_opt().unwrap() +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -673,6 +726,42 @@ mod tests { "2020-09-08T00:00:00Z", ], ), + ( + vec![ + "2024-03-31T00:30:00Z", + "2024-03-31T01:00:00Z", + "2024-03-31T04:20:00Z", + "2024-04-01T04:20:00Z", + "2024-05-01T00:30:00Z", + ], + Some("Europe/Brussels".into()), + "1970-01-01T00:00:00Z", + vec![ + "2024-03-31T00:00:00+01:00", + "2024-03-31T00:00:00+01:00", + "2024-03-31T00:00:00+01:00", + "2024-04-01T00:00:00+02:00", + "2024-05-01T00:00:00+02:00", + ], + ), + ( + vec![ + "2024-03-31T00:30:00", + "2024-03-31T01:00:00", + "2024-03-31T04:20:00", + "2024-04-01T04:20:00", + "2024-05-01T00:30:00", + ], + Some("Europe/Brussels".into()), + "1970-01-01T00:00:00Z", + vec![ + "2024-03-31T00:00:00+01:00", + "2024-03-31T00:00:00+01:00", + "2024-03-31T00:00:00+01:00", + "2024-04-01T00:00:00+02:00", + "2024-05-01T00:00:00+02:00", + ], + ), ( vec![ "2020-09-08T00:00:00Z", @@ -684,11 +773,11 @@ mod tests { Some("-02".into()), "1970-01-01T00:00:00Z", vec![ - "2020-09-08T00:00:00Z", - "2020-09-08T00:00:00Z", - "2020-09-08T00:00:00Z", - "2020-09-08T00:00:00Z", - "2020-09-08T00:00:00Z", + "2020-09-08T00:00:00-02:00", + "2020-09-08T00:00:00-02:00", + "2020-09-08T00:00:00-02:00", + "2020-09-08T00:00:00-02:00", + "2020-09-08T00:00:00-02:00", ], ), ( @@ -702,11 +791,11 @@ mod tests { Some("+05".into()), "1970-01-01T00:00:00+05", vec![ - "2020-09-08T00:00:00+05", - "2020-09-08T00:00:00+05", - "2020-09-08T00:00:00+05", - "2020-09-08T00:00:00+05", - "2020-09-08T00:00:00+05", + "2020-09-07T19:00:00+05:00", + "2020-09-07T19:00:00+05:00", + "2020-09-07T19:00:00+05:00", + "2020-09-07T19:00:00+05:00", + "2020-09-07T19:00:00+05:00", ], ), ( @@ -720,11 +809,11 @@ mod tests { Some("+08".into()), "1970-01-01T00:00:00+08", vec![ - "2020-09-08T00:00:00+08", - "2020-09-08T00:00:00+08", - "2020-09-08T00:00:00+08", - "2020-09-08T00:00:00+08", - "2020-09-08T00:00:00+08", + "2020-09-07T16:00:00+08:00", + "2020-09-07T16:00:00+08:00", + "2020-09-07T16:00:00+08:00", + "2020-09-07T16:00:00+08:00", + "2020-09-07T16:00:00+08:00", ], ), ]; @@ -767,6 +856,8 @@ mod tests { }); } + // TODO chunchun: may need to add test for single: nano, milli, macro, sec, ... + #[test] fn test_date_bin_single() { let cases = vec![ diff --git a/datafusion/functions/src/datetime/mod.rs b/datafusion/functions/src/datetime/mod.rs index 9c2f80856bf8..c47efb322a7f 100644 --- a/datafusion/functions/src/datetime/mod.rs +++ b/datafusion/functions/src/datetime/mod.rs @@ -32,6 +32,7 @@ pub mod make_date; pub mod now; pub mod to_char; pub mod to_date; +pub mod to_local_time; pub mod to_timestamp; pub mod to_unixtime; @@ -50,6 +51,7 @@ make_udf_function!( make_udf_function!(now::NowFunc, NOW, now); make_udf_function!(to_char::ToCharFunc, TO_CHAR, to_char); make_udf_function!(to_date::ToDateFunc, TO_DATE, to_date); +make_udf_function!(to_local_time::ToLocalTimeFunc, TO_LOCAL_TIME, to_local_time); make_udf_function!(to_unixtime::ToUnixtimeFunc, TO_UNIXTIME, to_unixtime); make_udf_function!(to_timestamp::ToTimestampFunc, TO_TIMESTAMP, to_timestamp); make_udf_function!( @@ -108,7 +110,14 @@ pub mod expr_fn { ),( now, "returns the current timestamp in nanoseconds, using the same value for all instances of now() in same statement", - ),( + ), + // TODO chunchun: add more doc examples + ( + to_local_time, + "converts a timestamp with a timezone to a local time, returns a timestamp without timezone", + args, + ), + ( to_unixtime, "converts a string and optional formats to a Unixtime", args, @@ -277,6 +286,7 @@ pub fn functions() -> Vec> { now(), to_char(), to_date(), + to_local_time(), to_unixtime(), to_timestamp(), to_timestamp_seconds(), diff --git a/datafusion/functions/src/datetime/to_local_time.rs b/datafusion/functions/src/datetime/to_local_time.rs new file mode 100644 index 000000000000..7cb1931b48a0 --- /dev/null +++ b/datafusion/functions/src/datetime/to_local_time.rs @@ -0,0 +1,229 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::ops::Add; + +use arrow::array::timezone::Tz; +use arrow::datatypes::DataType; +use arrow::datatypes::DataType::Timestamp; +use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second}; + +use chrono::{DateTime, Offset, TimeDelta, TimeZone}; +use datafusion_common::{exec_err, Result, ScalarValue}; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; + +#[derive(Debug)] +pub struct ToLocalTimeFunc { + signature: Signature, +} + +impl Default for ToLocalTimeFunc { + fn default() -> Self { + Self::new() + } +} + +impl ToLocalTimeFunc { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + } + } + + fn to_local_time(&self, args: &[ColumnarValue]) -> Result { + if args.len() != 1 { + return exec_err!( + "to_local_time function requires 1 argument, got {}", + args.len() + ); + } + + let time_value = args[0].clone(); + let arg_type = time_value.data_type(); + match arg_type { + DataType::Timestamp(_, None) => { + // if no timezone specificed, just return the input + Ok(time_value.clone()) + } + // if has timezone, adjust the underlying time value. the current time value + // is stored as i64 in UTC, even though the timezone may not be in UTC, so + // we need to adjust the time value to the local time. see [`adjust_to_local_time`] + // for more details. + // + // Then remove the timezone in return type, i.e. return None + DataType::Timestamp(_, Some(_)) => match time_value { + ColumnarValue::Scalar(ScalarValue::TimestampSecond( + Some(ts), + Some(tz), + )) => { + let adjusted_ts = adjust_to_local_time(ts, &tz); + Ok(ColumnarValue::Scalar(ScalarValue::TimestampSecond( + Some(adjusted_ts), + None, + ))) + } + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( + Some(ts), + Some(tz), + )) => { + let adjusted_ts = adjust_to_local_time(ts, &tz); + Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( + Some(adjusted_ts), + None, + ))) + } + ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( + Some(ts), + Some(tz), + )) => { + let adjusted_ts = adjust_to_local_time(ts, &tz); + Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond( + Some(adjusted_ts), + None, + ))) + } + ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( + Some(ts), + Some(tz), + )) => { + let adjusted_ts = adjust_to_local_time(ts, &tz); + Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( + Some(adjusted_ts), + None, + ))) + } + _ => { + exec_err!( + "to_local_time function requires timestamp argument, got {:?}", + arg_type + ) + } + }, + _ => { + exec_err!( + "to_local_time function requires timestamp argument, got {:?}", + arg_type + ) + } + } + } +} + +pub fn adjust_to_local_time(ts: i64, timezone: &str) -> i64 { + let tz: Tz = timezone.parse().unwrap(); + + let date_time = DateTime::from_timestamp_nanos(ts).naive_utc(); + + let offset_seconds: i64 = tz + .offset_from_utc_datetime(&date_time) + .fix() + .local_minus_utc() as i64; + + let adjusted_date_time = + date_time.add(TimeDelta::try_seconds(offset_seconds).unwrap()); + + // let adjusted_ts = adjusted_date_time.and_utc().timestamp_nanos_opt().unwrap(); + // println!( + // "chunchun - adjust_to_local_time:\ninput timestamp: {:?}\nin NavieDateTime: {:?}\noffset: {:?}\nadjusted_date_time: {:?}\nadjusted_ts: {:?}\n", + // ts, date_time, offset_seconds, adjusted_date_time, adjusted_ts + // ); + adjusted_date_time.and_utc().timestamp_nanos_opt().unwrap() +} + +impl ScalarUDFImpl for ToLocalTimeFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "to_local_time" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + if arg_types.is_empty() { + return exec_err!("to_local_time function requires 1 arguments, got 0"); + } + + match &arg_types[0] { + Timestamp(Nanosecond, _) => Ok(Timestamp(Nanosecond, None)), + Timestamp(Microsecond, _) => Ok(Timestamp(Microsecond, None)), + Timestamp(Millisecond, _) => Ok(Timestamp(Millisecond, None)), + Timestamp(Second, _) => Ok(Timestamp(Second, None)), + _ => exec_err!( + "The to_local_time function can only accept timestamp as the arg." + ), + } + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + if args.is_empty() { + return exec_err!( + "to_local_time function requires 1 or more arguments, got 0" + ); + } + + self.to_local_time(args) + } +} + +#[cfg(test)] +mod tests { + use datafusion_common::ScalarValue; + use datafusion_expr::{ColumnarValue, ScalarUDFImpl}; + + use super::ToLocalTimeFunc; + + #[test] + fn test_to_local_time() { + // TODO chunchun: update test cases to assert the result + let res = ToLocalTimeFunc::new().invoke(&[ColumnarValue::Scalar( + ScalarValue::TimestampSecond(Some(1), None), + )]); + assert!(res.is_ok()); + + let res = ToLocalTimeFunc::new().invoke(&[ColumnarValue::Scalar( + ScalarValue::TimestampSecond(Some(1), Some("+01:00".into())), + )]); + assert!(res.is_ok()); + + let res = ToLocalTimeFunc::new().invoke(&[ColumnarValue::Scalar( + ScalarValue::TimestampNanosecond(Some(1), Some("Europe/Brussels".into())), + )]); + assert!(res.is_ok()); + + let res = ToLocalTimeFunc::new().invoke(&[ColumnarValue::Scalar( + ScalarValue::TimestampSecond( + Some(2_000_000_000), // 2033-05-18T03:33:20Z + Some("Europe/Brussels".into()), + ), + )]); + assert!(res.is_ok()); + + let res = ToLocalTimeFunc::new().invoke(&[ColumnarValue::Scalar( + ScalarValue::TimestampNanosecond( + Some(1711922401000000000), // 2024-03-31T22:00:01Z + Some("Europe/Brussels".into()), // 2024-04-01T00:00:01+02:00 + ), + )]); + assert!(res.is_ok()); + } +}