From e5adc844d8c8468af2e1858795a41abcf2c6cb84 Mon Sep 17 00:00:00 2001 From: tangruilin Date: Thu, 25 Jan 2024 21:34:34 +0800 Subject: [PATCH] [task #8987]add_to_date_function Signed-off-by: tangruilin --- datafusion-examples/examples/to_date.rs | 60 +++++++ datafusion/common/src/scalar/mod.rs | 8 +- datafusion/expr/src/built_in_function.rs | 6 + datafusion/expr/src/expr_fn.rs | 5 + .../physical-expr/src/datetime_expressions.rs | 111 ++++++++++++- datafusion/physical-expr/src/functions.rs | 1 + datafusion/proto/proto/datafusion.proto | 2 + datafusion/proto/src/generated/pbjson.rs | 3 + datafusion/proto/src/generated/prost.rs | 3 + .../proto/src/logical_plan/from_proto.rs | 11 ++ datafusion/proto/src/logical_plan/to_proto.rs | 1 + datafusion/sqllogictest/test_files/dates.slt | 151 ++++++++++++++++++ 12 files changed, 360 insertions(+), 2 deletions(-) create mode 100644 datafusion-examples/examples/to_date.rs diff --git a/datafusion-examples/examples/to_date.rs b/datafusion-examples/examples/to_date.rs new file mode 100644 index 0000000000000..7dcd3a7d2fbc1 --- /dev/null +++ b/datafusion-examples/examples/to_date.rs @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion::arrow::array::StringArray; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::error::Result; +use datafusion::prelude::*; + +/// This example demonstrates how to use the to_timestamp series +/// of functions in the DataFrame API as well as via sql. +#[tokio::main] +async fn main() -> Result<()> { + // define a schema. + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)])); + + // define data. + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(StringArray::from(vec![ + "2020-09-08T13:42:29Z", + "2020-09-08T13:42:29.190855-05:00", + "2020-08-09 12:13:29", + "2020-01-02", + ]))], + )?; + + // declare a new context. In spark API, this corresponds to a new spark SQLsession + let ctx = SessionContext::new(); + + // declare a table in memory. In spark API, this corresponds to createDataFrame(...). + ctx.register_batch("t", batch)?; + let df = ctx.table("t").await?; + + // use to_timestamp function to convert col 'a' to timestamp type using the default parsing + let df = df.with_column("a", to_date(vec![col("a")]))?; + + let df = df.select_columns(&["a"])?; + + // print the results + df.show().await?; + + Ok(()) +} diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 7e53415090e08..acd2535bf7bfd 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -45,7 +45,7 @@ use arrow::{ compute::kernels::cast::{cast_with_options, CastOptions}, datatypes::{ i256, ArrowDictionaryKeyType, ArrowNativeType, ArrowTimestampType, DataType, - Field, Float32Type, Int16Type, Int32Type, Int64Type, Int8Type, + Date32Type, Field, Float32Type, Int16Type, Int32Type, Int64Type, Int8Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, IntervalYearMonthType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, @@ -3293,6 +3293,12 @@ impl ScalarType for TimestampNanosecondType { } } +impl ScalarType for Date32Type { + fn scalar(r: Option) -> ScalarValue { + ScalarValue::Date32(r) + } +} + #[cfg(test)] mod tests { use std::cmp::Ordering; diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs index 09077a557a7fc..90785ceaad49f 100644 --- a/datafusion/expr/src/built_in_function.rs +++ b/datafusion/expr/src/built_in_function.rs @@ -284,6 +284,8 @@ pub enum BuiltinScalarFunction { ToTimestampSeconds, /// from_unixtime FromUnixtime, + /// to_date + ToDate, ///now Now, ///current_date @@ -486,6 +488,7 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::Upper => Volatility::Immutable, BuiltinScalarFunction::Struct => Volatility::Immutable, BuiltinScalarFunction::FromUnixtime => Volatility::Immutable, + BuiltinScalarFunction::ToDate => Volatility::Immutable, BuiltinScalarFunction::ArrowTypeof => Volatility::Immutable, BuiltinScalarFunction::OverLay => Volatility::Immutable, BuiltinScalarFunction::Levenshtein => Volatility::Immutable, @@ -797,6 +800,7 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ToTimestampMicros => Ok(Timestamp(Microsecond, None)), BuiltinScalarFunction::ToTimestampSeconds => Ok(Timestamp(Second, None)), BuiltinScalarFunction::FromUnixtime => Ok(Timestamp(Second, None)), + BuiltinScalarFunction::ToDate => Ok(Date32), BuiltinScalarFunction::Now => { Ok(Timestamp(Nanosecond, Some("+00:00".into()))) } @@ -1096,6 +1100,7 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::FromUnixtime => { Signature::uniform(1, vec![Int64], self.volatility()) } + BuiltinScalarFunction::ToDate => Signature::variadic_any(self.volatility()), BuiltinScalarFunction::Digest => Signature::one_of( vec![ Exact(vec![Utf8, Utf8]), @@ -1544,6 +1549,7 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ToTimestampSeconds => &["to_timestamp_seconds"], BuiltinScalarFunction::ToTimestampNanos => &["to_timestamp_nanos"], BuiltinScalarFunction::FromUnixtime => &["from_unixtime"], + BuiltinScalarFunction::ToDate => &["to_date"], // hashing functions BuiltinScalarFunction::Digest => &["digest"], diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 4aa270e6dde6f..18b2b41d593da 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -895,6 +895,11 @@ scalar_expr!( datetime format, "converts a date, time, timestamp or duration to a string based on the provided format" ); +nary_scalar_expr!( + ToDate, + to_date, + "converts string to date according to the given format" +); nary_scalar_expr!( ToTimestamp, to_timestamp, diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs index e125c64471546..deb093a000b0c 100644 --- a/datafusion/physical-expr/src/datetime_expressions.rs +++ b/datafusion/physical-expr/src/datetime_expressions.rs @@ -54,7 +54,8 @@ use datafusion_common::cast::{ as_timestamp_nanosecond_array, as_timestamp_second_array, }; use datafusion_common::{ - exec_err, not_impl_err, DataFusionError, Result, ScalarType, ScalarValue, + exec_err, internal_datafusion_err, not_impl_err, DataFusionError, Result, ScalarType, + ScalarValue, }; use datafusion_expr::ColumnarValue; @@ -424,6 +425,84 @@ fn to_timestamp_impl>( } } +/// # Examples +/// +/// ``` +/// # use std::sync::Arc; + +/// # use datafusion::arrow::array::StringArray; +/// # use datafusion::arrow::datatypes::{DataType, Field, Schema}; +/// # use datafusion::arrow::record_batch::RecordBatch; +/// # use datafusion::error::Result; +/// # use datafusion::prelude::*; + +/// # #[tokio::main] +/// # async fn main() -> Result<()> { +/// // define a schema. +/// let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)])); + +/// // define data. +/// let batch = RecordBatch::try_new( +/// schema, +/// vec![Arc::new(StringArray::from(vec![ +/// "2020-09-08T13:42:29Z", +/// "2020-09-08T13:42:29.190855-05:00", +/// "2020-08-09 12:13:29", +/// "2020-01-02", +/// ]))], +/// )?; + +/// // declare a new context. In spark API, this corresponds to a new spark SQLsession +/// let ctx = SessionContext::new(); + +/// // declare a table in memory. In spark API, this corresponds to createDataFrame(...). +/// ctx.register_batch("t", batch)?; +/// let df = ctx.table("t").await?; + +/// // use to_timestamp function to convert col 'a' to timestamp type using the default parsing +/// let df = df.with_column("a", to_date(vec![col("a")]))?; + +/// let df = df.select_columns(&["a"])?; + +/// // print the results +/// df.show().await?; + +/// Ok(()) +/// } +/// ``` +pub fn to_date(args: &[ColumnarValue]) -> Result { + match args.len() { + 1 => handle::( + args, + |s| { + string_to_timestamp_nanos_shim(s) + .map(|n| n / (1_000_000 * 24 * 60 * 60 * 1_000)) + .and_then(|v| { + v.try_into().map_err(|_| { + internal_datafusion_err!("Unable to cast to Date32 for converting from i64 to i32 failed") + }) + }) + }, + "to_date", + ), + n if n >= 2 => handle_multiple::( + args, + |s, format| { + string_to_timestamp_nanos_formatted(s, format) + .map(|n| n / (1_000_000 * 24 * 60 * 60 * 1_000)) + .and_then(|v| { + v.try_into().map_err(|_| { + internal_datafusion_err!("Unable to cast to Date32 for converting from i64 to i32 failed") + }) + }) + }, + |n| n, + "to_date", + ), + _ => exec_err!("Unsupported 0 argument count for function to_date"), + } +} + /// to_timestamp SQL function /// /// Note: `to_timestamp` returns `Timestamp(Nanosecond)` though its arguments are interpreted as **seconds**. @@ -1567,6 +1646,36 @@ fn validate_to_timestamp_data_types( None } +/// to_date SQL function implementation +pub fn to_date_invoke(args: &[ColumnarValue]) -> Result { + if args.is_empty() { + return exec_err!( + "to_date function requires 1 or more arguments, got {}", + args.len() + ); + } + + // validate that any args after the first one are Utf8 + if args.len() > 1 { + if let Some(value) = validate_to_timestamp_data_types(args, "to_date") { + return value; + } + } + + match args[0].data_type() { + DataType::Int32 + | DataType::Int64 + | DataType::Null + | DataType::Float64 + | DataType::Date32 + | DataType::Date64 => cast_column(&args[0], &DataType::Date32, None), + DataType::Utf8 => to_date(args), + other => { + exec_err!("Unsupported data type {:?} for function to_date", other) + } + } +} + /// to_timestamp() SQL function implementation pub fn to_timestamp_invoke(args: &[ColumnarValue]) -> Result { if args.is_empty() { diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 8446a65d72c8c..7782fa10ffc00 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -531,6 +531,7 @@ pub fn create_physical_fun( BuiltinScalarFunction::FromUnixtime => { Arc::new(datetime_expressions::from_unixtime_invoke) } + BuiltinScalarFunction::ToDate => Arc::new(datetime_expressions::to_date_invoke), BuiltinScalarFunction::InitCap => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { make_scalar_function_inner(string_expressions::initcap::)(args) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index e779e29cb8dac..a794d55e032e6 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1,4 +1,5 @@ /* + * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -679,6 +680,7 @@ enum ScalarFunction { ArrayReverse = 134; RegexpLike = 135; ToChar = 136; + ToDate = 137; } message ScalarFunctionNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index f5f15aa3e4288..f25c22d7878d6 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -22435,6 +22435,7 @@ impl serde::Serialize for ScalarFunction { Self::ArrayReverse => "ArrayReverse", Self::RegexpLike => "RegexpLike", Self::ToChar => "ToChar", + Self::ToDate => "ToDate", }; serializer.serialize_str(variant) } @@ -22578,6 +22579,7 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "ArrayReverse", "RegexpLike", "ToChar", + "ToDate", ]; struct GeneratedVisitor; @@ -22750,6 +22752,7 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "ArrayReverse" => Ok(ScalarFunction::ArrayReverse), "RegexpLike" => Ok(ScalarFunction::RegexpLike), "ToChar" => Ok(ScalarFunction::ToChar), + "ToDate" => Ok(ScalarFunction::ToDate), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 69d035239cb87..f23b706bca476 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2766,6 +2766,7 @@ pub enum ScalarFunction { ArrayReverse = 134, RegexpLike = 135, ToChar = 136, + ToDate = 137, } impl ScalarFunction { /// String value of the enum field names used in the ProtoBuf definition. @@ -2906,6 +2907,7 @@ impl ScalarFunction { ScalarFunction::ArrayReverse => "ArrayReverse", ScalarFunction::RegexpLike => "RegexpLike", ScalarFunction::ToChar => "ToChar", + ScalarFunction::ToDate => "ToDate", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -3043,6 +3045,7 @@ impl ScalarFunction { "ArrayReverse" => Some(Self::ArrayReverse), "RegexpLike" => Some(Self::RegexpLike), "ToChar" => Some(Self::ToChar), + "ToDate" => Some(Self::ToDate), _ => None, } } diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index f1ee84a8221dd..5ddc5f8e9609c 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -574,6 +574,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::Levenshtein => Self::Levenshtein, ScalarFunction::SubstrIndex => Self::SubstrIndex, ScalarFunction::FindInSet => Self::FindInSet, + ScalarFunction::ToDate => Self::ToDate, } } } @@ -1822,6 +1823,16 @@ pub fn parse_expr( ScalarFunction::StructFun => { Ok(struct_fun(parse_expr(&args[0], registry)?)) } + ScalarFunction::ToDate => { + let args: Vec<_> = args + .iter() + .map(|expr| parse_expr(expr, registry)) + .collect::>()?; + Ok(Expr::ScalarFunction(expr::ScalarFunction::new( + BuiltinScalarFunction::ToDate, + args, + ))) + } } } ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode { fun_name, args }) => { diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index a6348e909cb07..7fab055c31dda 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -1552,6 +1552,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction { BuiltinScalarFunction::Levenshtein => Self::Levenshtein, BuiltinScalarFunction::SubstrIndex => Self::SubstrIndex, BuiltinScalarFunction::FindInSet => Self::FindInSet, + BuiltinScalarFunction::ToDate => Self::ToDate, }; Ok(scalar_function) diff --git a/datafusion/sqllogictest/test_files/dates.slt b/datafusion/sqllogictest/test_files/dates.slt index a93a7ff7e73cd..d7b65d850afd2 100644 --- a/datafusion/sqllogictest/test_files/dates.slt +++ b/datafusion/sqllogictest/test_files/dates.slt @@ -107,3 +107,154 @@ query ? SELECT '2023-01-01T00:00:00'::timestamp - DATE '2021-01-01'; ---- 730 days 0 hours 0 mins 0.000000000 secs + +# to_date_test +statement ok +create table to_date_t1(ts bigint) as VALUES + (1235865600000), + (1235865660000), + (1238544000000); + +# query_cast_timestamp_millis +query D +SELECT to_date(ts / 100000000) FROM to_date_t1 LIMIT 3 +---- +2003-11-02 +2003-11-02 +2003-11-29 + +query D +SELECT to_date('01-14-2023 01:01:30+05:30', '%q', '%d-%m-%Y %H/%M/%S', '%+', '%m-%d-%Y %H:%M:%S%#z'); +---- +2023-01-13 + +statement error DataFusion error: Execution error: to_date function unsupported data type at index 1: List +SELECT to_date('2022-08-03T14:38:50+05:30', make_array('%s', '%q', '%d-%m-%Y %H:%M:%S%#z', '%+')); + +# query date with arrow_cast +query D +select to_date(arrow_cast(123, 'Int64')) +---- +1970-05-04 + +statement error DataFusion error: Arrow error: +SELECT to_date('21311111'); + +# verify date cast with integer input +query DDDDDD +SELECT to_date(null), to_date(0), to_date(19266320), to_date(1), to_date(-1), to_date(0-1) +---- +NULL 1970-01-01 +54719-05-25 1970-01-02 1969-12-31 1969-12-31 + +# verify date output types +query TTT +SELECT arrow_typeof(to_date(1)), arrow_typeof(to_date(null)), arrow_typeof(to_date('2023-01-10 12:34:56.000')) +---- +Date32 Date32 Date32 + +# verify date data with formatting options +query DDDDDD +SELECT to_date(null, '%+'), to_date(0, '%s'), to_date(192663, '%s'), to_date(1, '%+', '%s'), to_date(-1, '%c', '%+', '%s'), to_date(0-1, '%c', '%+', '%s') +---- +NULL 1970-01-01 2497-06-29 1970-01-02 1969-12-31 1969-12-31 + +# verify date data with formatting options +query DDDDDD +SELECT to_date(null, '%+'), to_date(0, '%s'), to_date(192663, '%s'), to_date(1, '%+', '%s'), to_date(-1, '%c', '%+', '%s'), to_date(0-1, '%c', '%+', '%s') +---- +NULL 1970-01-01 2497-06-29 1970-01-02 1969-12-31 1969-12-31 + +# verify date output types with formatting options +query TTT +SELECT arrow_typeof(to_date(1, '%c', '%s')), arrow_typeof(to_date(null, '%+', '%s')), arrow_typeof(to_date('2023-01-10 12:34:56.000', '%Y-%m-%d %H:%M:%S%.f')) +---- +Date32 Date32 Date32 + +# to_date with invalid formatting +query error input contains invalid characters +SELECT to_date('2020-09-08 12/00/00+00:00', '%c', '%+') + +# to_date with invalid formatting +query error input contains invalid characters +SELECT to_date('2020-09-08 12/00/00+00:00', '%c', '%+') + +# to_date with invalid formatting +query error input contains invalid characters +SELECT to_date('2020-09-08 12/00/00+00:00', '%c', '%+') + +# to_date with invalid formatting +query error input contains invalid characters +SELECT to_date('2020-09-08 12/00/00+00:00', '%c', '%+') + +# to_date with invalid formatting +query error input contains invalid characters +SELECT to_date('2020-09-08 12/00/00+00:00', '%c', '%+') + +# to_date with broken formatting +query error bad or unsupported format string +SELECT to_date('2020-09-08 12/00/00+00:00', '%q') + +# to_date with broken formatting +query error bad or unsupported format string +SELECT to_date('2020-09-08 12/00/00+00:00', '%q') + +# to_date with broken formatting +query error bad or unsupported format string +SELECT to_date('2020-09-08 12/00/00+00:00', '%q') + +# to_date with broken formatting +query error bad or unsupported format string +SELECT to_date('2020-09-08 12/00/00+00:00', '%q') + +# to_date with broken formatting +query error bad or unsupported format string +SELECT to_date('2020-09-08 12/00/00+00:00', '%q') + +statement ok +create table ts_utf8_data(ts varchar(100), format varchar(100)) as values + ('2020-09-08 12/00/00+00:00', '%Y-%m-%d %H/%M/%S%#z'), + ('2031-01-19T23:33:25+05:00', '%+'), + ('08-09-2020 12:00:00+00:00', '%d-%m-%Y %H:%M:%S%#z'), + ('1926632005', '%s'), + ('2000-01-01T01:01:01+07:00', '%+'); + +# verify date data using tables with formatting options +query D +SELECT to_date(t.ts, t.format) from ts_utf8_data as t +---- +2020-09-08 +2031-01-19 +2020-09-08 +2031-01-19 +1999-12-31 + +# verify date data using tables with formatting options +query D +SELECT to_date(t.ts, '%Y-%m-%d %H/%M/%S%#z', '%+', '%s', '%d-%m-%Y %H:%M:%S%#z') from ts_utf8_data as t +---- +2020-09-08 +2031-01-19 +2020-09-08 +2031-01-19 +1999-12-31 + +# verify date data using tables with formatting options where at least one column cannot be parsed +query error Error parsing timestamp from '1926632005' using format '%d-%m-%Y %H:%M:%S%#z': input contains invalid characters +SELECT to_date(t.ts, '%Y-%m-%d %H/%M/%S%#z', '%+', '%d-%m-%Y %H:%M:%S%#z') from ts_utf8_data as t + +# verify date data using tables with formatting options where one of the formats is invalid +query D +SELECT to_date(t.ts, '%Y-%m-%d %H/%M/%S%#z', '%s', '%q', '%d-%m-%Y %H:%M:%S%#z', '%+') from ts_utf8_data as t +---- +2020-09-08 +2031-01-19 +2020-09-08 +2031-01-19 +1999-12-31 + +# timestamp data using tables with formatting options in an array is not supported at this time +query error function unsupported data type at index 1: +SELECT to_date(t.ts, make_array('%Y-%m-%d %H/%M/%S%#z', '%s', '%q', '%d-%m-%Y %H:%M:%S%#z', '%+')) from ts_utf8_data as t + +statement ok +drop table ts_utf8_data