Skip to content

Commit

Permalink
refactor: parse timezone once and update examples in description
Browse files Browse the repository at this point in the history
  • Loading branch information
appletreeisyellow committed Jul 9, 2024
1 parent ae9f1ad commit 043c997
Showing 1 changed file with 144 additions and 157 deletions.
301 changes: 144 additions & 157 deletions datafusion/functions/src/datetime/to_local_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,76 +42,6 @@ use datafusion_expr::{
/// A UDF function that converts a timezone-aware timestamp to local time (with no offset or
/// timezone information). In other words, this function strips off the timezone from the timestamp,
/// while keep the display value of the timestamp the same.
///
/// # Example 1
///
/// ```
/// # use datafusion_common::ScalarValue;
/// # use datafusion_expr::ColumnarValue;
/// # use datafusion_functions::datetime::to_local_time::ToLocalTimeFunc;
/// # use datafusion_expr::ScalarUDFImpl;
///
/// // 2019-03-31 01:00:00 +01:00
/// let res = ToLocalTimeFunc::new()
/// .invoke(&[ColumnarValue::Scalar(ScalarValue::TimestampSecond(
/// Some(1_553_990_400),
/// Some("Europe/Brussels".into()),
/// ))])
/// .unwrap();
///
/// // 2019-03-31 01:00:00 <-- this timestamp no longer has +01:00 offset
/// let expected = ScalarValue::TimestampSecond(Some(1_553_994_000), None);
///
/// match res {
/// ColumnarValue::Scalar(res) => {
/// assert_eq!(res, expected);
/// }
/// _ => panic!("unexpected return type"),
/// }
/// ```
///
/// # Example 2
///
/// ```
/// # use datafusion_common::ScalarValue;
/// # use datafusion_expr::ColumnarValue;
/// # use chrono::NaiveDateTime;
/// # use datafusion_functions::datetime::to_local_time::ToLocalTimeFunc;
/// # use datafusion_expr::ScalarUDFImpl;
///
/// let timestamp_str = "2020-03-31T13:40:00";
/// let timezone_str = "America/New_York";
/// let tz: arrow::array::timezone::Tz =
/// timezone_str.parse().expect("Invalid timezone");
///
/// let timestamp = timestamp_str
/// .parse::<NaiveDateTime>()
/// .unwrap()
/// .and_local_timezone(tz) // this is in a local timezone
/// .unwrap()
/// .timestamp_nanos_opt()
/// .unwrap();
///
/// let expected_timestamp = timestamp_str
/// .parse::<NaiveDateTime>()
/// .unwrap()
/// .and_utc() // this is in UTC
/// .timestamp_nanos_opt()
/// .unwrap();
///
/// let input =
/// ScalarValue::TimestampNanosecond(Some(timestamp), Some(timezone_str.into()));
/// let res = ToLocalTimeFunc::new()
/// .invoke(&[ColumnarValue::Scalar(input)])
/// .unwrap();
/// let expected = ScalarValue::TimestampNanosecond(Some(expected_timestamp), None);
/// match res {
/// ColumnarValue::Scalar(res) => {
/// assert_eq!(res, expected);
/// }
/// _ => panic!("unexpected return type"),
/// }
/// ```
#[derive(Debug)]
pub struct ToLocalTimeFunc {
signature: Signature,
Expand Down Expand Up @@ -158,101 +88,105 @@ impl ToLocalTimeFunc {
// if no timezone specificed, just return the input
Ok(time_value.clone())
}
// if has timezone, adjust the underlying time value. the current time value
// is stored as i64 in UTC, even though the timezone may not be in UTC, so
// we need to adjust the time value to the local time. see [`adjust_to_local_time`]
// If has timezone, adjust the underlying time value. The current time value
// is stored as i64 in UTC, even though the timezone may not be in UTC. Therefore,
// we need to adjust the time value to the local time. See [`adjust_to_local_time`]
// for more details.
//
// Then remove the timezone in return type, i.e. return None
DataType::Timestamp(_, Some(_)) => match time_value {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
Some(ts),
Some(tz),
)) => {
let adjusted_ts =
adjust_to_local_time::<TimestampNanosecondType>(*ts, tz);
Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
Some(adjusted_ts),
None,
)))
}
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
Some(ts),
Some(tz),
)) => {
let adjusted_ts =
adjust_to_local_time::<TimestampMicrosecondType>(*ts, tz);
Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
Some(adjusted_ts),
None,
)))
}
ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
Some(ts),
Some(tz),
)) => {
let adjusted_ts =
adjust_to_local_time::<TimestampMillisecondType>(*ts, tz);
Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
Some(adjusted_ts),
None,
)))
}
ColumnarValue::Scalar(ScalarValue::TimestampSecond(
Some(ts),
Some(tz),
)) => {
let adjusted_ts =
adjust_to_local_time::<TimestampSecondType>(*ts, tz);
Ok(ColumnarValue::Scalar(ScalarValue::TimestampSecond(
Some(adjusted_ts),
None,
)))
}
ColumnarValue::Array(array) => {
fn transform_array<T>(
array: &ArrayRef,
tz: &str,
) -> Result<ColumnarValue>
where
T: ArrowTimestampType,
{
let array = as_primitive_array::<T>(array)?;
let array: PrimitiveArray<T> =
array.unary(|ts| adjust_to_local_time::<T>(ts, tz));

Ok(ColumnarValue::Array(Arc::new(array)))
DataType::Timestamp(_, Some(timezone)) => {
let tz: Tz = timezone.parse().unwrap(); // TODO chunchun: remove unwrap

match time_value {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
Some(ts),
Some(_),
)) => {
let adjusted_ts =
adjust_to_local_time::<TimestampNanosecondType>(*ts, tz);
Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
Some(adjusted_ts),
None,
)))
}

match array.data_type() {
Timestamp(_, None) => {
// if no timezone specificed, just return the input
Ok(ColumnarValue::Array(Arc::clone(array)))
}
Timestamp(Nanosecond, Some(tz)) => {
transform_array::<TimestampNanosecondType>(array, tz)
}
Timestamp(Microsecond, Some(tz)) => {
transform_array::<TimestampMicrosecondType>(array, tz)
}
Timestamp(Millisecond, Some(tz)) => {
transform_array::<TimestampMillisecondType>(array, tz)
}
Timestamp(Second, Some(tz)) => {
transform_array::<TimestampSecondType>(array, tz)
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
Some(ts),
Some(_),
)) => {
let adjusted_ts =
adjust_to_local_time::<TimestampMicrosecondType>(*ts, tz);
Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
Some(adjusted_ts),
None,
)))
}
ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
Some(ts),
Some(_),
)) => {
let adjusted_ts =
adjust_to_local_time::<TimestampMillisecondType>(*ts, tz);
Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
Some(adjusted_ts),
None,
)))
}
ColumnarValue::Scalar(ScalarValue::TimestampSecond(
Some(ts),
Some(_),
)) => {
let adjusted_ts =
adjust_to_local_time::<TimestampSecondType>(*ts, tz);
Ok(ColumnarValue::Scalar(ScalarValue::TimestampSecond(
Some(adjusted_ts),
None,
)))
}
ColumnarValue::Array(array) => {
fn transform_array<T>(
array: &ArrayRef,
tz: Tz,
) -> Result<ColumnarValue>
where
T: ArrowTimestampType,
{
let array = as_primitive_array::<T>(array)?;
let array: PrimitiveArray<T> =
array.unary(|ts| adjust_to_local_time::<T>(ts, tz));

Ok(ColumnarValue::Array(Arc::new(array)))
}
_ => {
exec_err!("to_local_time function requires timestamp argument in array, got {:?}", array.data_type())

match array.data_type() {
Timestamp(_, None) => {
// if no timezone specificed, just return the input
Ok(ColumnarValue::Array(Arc::clone(array)))
}
Timestamp(Nanosecond, Some(_)) => {
transform_array::<TimestampNanosecondType>(array, tz)
}
Timestamp(Microsecond, Some(_)) => {
transform_array::<TimestampMicrosecondType>(array, tz)
}
Timestamp(Millisecond, Some(_)) => {
transform_array::<TimestampMillisecondType>(array, tz)
}
Timestamp(Second, Some(_)) => {
transform_array::<TimestampSecondType>(array, tz)
}
_ => {
exec_err!("to_local_time function requires timestamp argument in array, got {:?}", array.data_type())
}
}
}
}
_ => {
exec_err!(
_ => {
exec_err!(
"to_local_time function requires timestamp argument, got {:?}",
time_value.data_type()
)
}
}
},
}
_ => {
exec_err!(
"to_local_time function requires timestamp argument, got {:?}",
Expand Down Expand Up @@ -314,15 +248,43 @@ impl ToLocalTimeFunc {
/// ```text
/// 2019-03-31T01:00:00
/// ```
fn adjust_to_local_time<T: ArrowTimestampType>(ts: i64, timezone: &str) -> i64 {
///
/// # Example
///
/// ```
/// # use chrono::NaiveDateTime;
/// # use arrow::array::types::TimestampNanosecondType;
///
/// let timestamp_str = "2020-03-31T13:40:00";
/// let tz: arrow::array::timezone::Tz =
/// "America/New_York".parse().expect("Invalid timezone");
///
/// let timestamp = timestamp_str
/// .parse::<NaiveDateTime>()
/// .unwrap()
/// .and_local_timezone(tz) // this is in a local timezone
/// .unwrap()
/// .timestamp_nanos_opt()
/// .unwrap();
///
/// let expected_timestamp = timestamp_str
/// .parse::<NaiveDateTime>()
/// .unwrap()
/// .and_utc() // this is in UTC
/// .timestamp_nanos_opt()
/// .unwrap();
///
/// let res = adjust_to_local_time::<TimestampNanosecondType>(timestamp, tz);
/// assert_eq!(res, expected_timestamp);
/// ```
fn adjust_to_local_time<T: ArrowTimestampType>(ts: i64, tz: Tz) -> i64 {
let date_time = match T::UNIT {
Nanosecond => Utc.timestamp_nanos(ts),
Microsecond => Utc.timestamp_micros(ts).unwrap(), // TODO chunchun: replace unwrap
Millisecond => Utc.timestamp_millis_opt(ts).unwrap(),
Second => Utc.timestamp_opt(ts, 0).unwrap(),
};

let tz: Tz = timezone.parse().unwrap();
let offset_seconds: i64 = tz
.offset_from_utc_datetime(&date_time.naive_utc())
.fix()
Expand Down Expand Up @@ -395,7 +357,32 @@ mod tests {
use datafusion_common::ScalarValue;
use datafusion_expr::{ColumnarValue, ScalarUDFImpl};

use super::ToLocalTimeFunc;
use super::{adjust_to_local_time, ToLocalTimeFunc};

#[test]
fn test_adjust_to_local_time() {
let timestamp_str = "2020-03-31T13:40:00";
let tz: arrow::array::timezone::Tz =
"America/New_York".parse().expect("Invalid timezone");

let timestamp = timestamp_str
.parse::<NaiveDateTime>()
.unwrap()
.and_local_timezone(tz) // this is in a local timezone
.unwrap()
.timestamp_nanos_opt()
.unwrap();

let expected_timestamp = timestamp_str
.parse::<NaiveDateTime>()
.unwrap()
.and_utc() // this is in UTC
.timestamp_nanos_opt()
.unwrap();

let res = adjust_to_local_time::<TimestampNanosecondType>(timestamp, tz);
assert_eq!(res, expected_timestamp);
}

#[test]
fn test_to_local_time() {
Expand Down

0 comments on commit 043c997

Please sign in to comment.