Skip to content

Commit

Permalink
[SPARK-37507][SQL] Add a new SQL function to_binary
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Introduce a SQL function `to_binary`: Converts the input string to a binary value based on the supplied format (of how to interpret the string).

Syntax:
```
to_binary(str_column[, fmt])
```
where

- `fmt` can be a case-insensitive string literal of "hex", "utf-8", "base2", or "base64".
- By default, the binary format for conversion is "hex" if `fmt` is omitted.

### Why are the changes needed?
`to_binary` is a common function available in many DBMSes, for example:
- [TO_VARBYTE function - Amazon Redshift](https://docs.aws.amazon.com/redshift/latest/dg/r_TO_VARBYTE.html)
- [TO_BINARY — Snowflake Documentation](https://docs.snowflake.com/en/sql-reference/functions/to_binary.html)
- [Expressions, functions, and operators  |  BigQuery  |  Google Cloud](https://cloud.google.com/bigquery/docs/reference/standard-sql/functions-and-operators#format_string_as_bytes)
- [Teradata Online Documentation | Quick access to technical manuals](https://docs.teradata.com/r/kmuOwjp1zEYg98JsB8fu_A/etRo5aTAY9n5fUPjxSEynw)

Introducing it improves compatibility and the ease of migration.

In addition, `to_binary` can unify existing Spark functions: `encode`, `unhex`, `unbase64`, and `binary`, which makes API easier to remember and use.

### Does this PR introduce _any_ user-facing change?
Yes, a new function for the string to binary conversion with a specified format.

### How was this patch tested?
Unit test.

Closes #35415 from xinrong-databricks/to_binary.

Authored-by: Xinrong Meng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
xinrong-meng authored and HyukjinKwon committed Feb 13, 2022
1 parent 25a4c5f commit 25dd425
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ object FunctionRegistry {
expression[Second]("second"),
expression[ParseToTimestamp]("to_timestamp"),
expression[ParseToDate]("to_date"),
expression[ToBinary]("to_binary"),
expression[ToUnixTimestamp]("to_unix_timestamp"),
expression[ToUTCTimestamp]("to_utc_timestamp"),
expression[ParseToTimestampNTZ]("to_timestamp_ntz"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2538,6 +2538,77 @@ case class Encode(value: Expression, charset: Expression)
newLeft: Expression, newRight: Expression): Encode = copy(value = newLeft, charset = newRight)
}

/**
* Converts the input expression to a binary value based on the supplied format.
*/
// scalastyle:off line.size.limit
@ExpressionDescription(
usage = """
_FUNC_(str[, fmt]) - Converts the input `str` to a binary value based on the supplied `fmt`.
`fmt` can be a case-insensitive string literal of "hex", "utf-8", "base2", or "base64".
By default, the binary format for conversion is "hex" if `fmt` is omitted.
The function returns NULL if at least one of the input parameters is NULL.
""",
examples = """
Examples:
> SELECT _FUNC_('abc', 'utf-8');
abc
""",
since = "3.3.0",
group = "string_funcs")
// scalastyle:on line.size.limit
case class ToBinary(expr: Expression, format: Option[Expression], child: Expression)
extends RuntimeReplaceable {

def this(expr: Expression, format: Expression) = this(expr, Option(format),
format match {
case lit if lit.foldable =>
val value = lit.eval()
if (value == null) Literal(null, BinaryType)
else {
value.asInstanceOf[UTF8String].toString.toLowerCase(Locale.ROOT) match {
case "hex" => Unhex(expr)
case "utf-8" => Encode(expr, Literal("UTF-8"))
case "base64" => UnBase64(expr)
case "base2" => Cast(expr, BinaryType)
case _ => lit
}
}

case other => other
}
)

def this(expr: Expression) = this(expr, None, Unhex(expr))

override def flatArguments: Iterator[Any] = Iterator(expr, format)
override def exprsReplaced: Seq[Expression] = expr +: format.toSeq

override def prettyName: String = "to_binary"
override def dataType: DataType = BinaryType

override def checkInputDataTypes(): TypeCheckResult = {
def checkFormat(lit: Expression) = {
if (lit.foldable) {
val value = lit.eval()
value == null || Seq("hex", "utf-8", "base64", "base2").contains(
value.asInstanceOf[UTF8String].toString.toLowerCase(Locale.ROOT))
} else false
}

if (format.forall(checkFormat)) {
super.checkInputDataTypes()
} else {
TypeCheckResult.TypeCheckFailure(
s"Unsupported encoding format: $format. The format has to be " +
s"a case-insensitive string literal of 'hex', 'utf-8', 'base2', or 'base64'")
}
}

override protected def withNewChildInternal(newChild: Expression): ToBinary =
copy(child = newChild)
}

/**
* Formats the number X to a format like '#,###,###.##', rounded to D decimal places,
* and returns the result as a string. If D is 0, the result has no decimal point or
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<!-- Automatically generated by ExpressionsSchemaSuite -->
## Summary
- Number of queries: 378
- Number of queries: 379
- Number of expressions that missing example: 12
- Expressions missing examples: bigint,binary,boolean,date,decimal,double,float,int,smallint,string,timestamp,tinyint
## Schema of Built-in Functions
Expand Down Expand Up @@ -299,6 +299,7 @@
| org.apache.spark.sql.catalyst.expressions.Tan | tan | SELECT tan(0) | struct<TAN(0):double> |
| org.apache.spark.sql.catalyst.expressions.Tanh | tanh | SELECT tanh(0) | struct<TANH(0):double> |
| org.apache.spark.sql.catalyst.expressions.TimeWindow | window | SELECT a, window.start, window.end, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY a, start | struct<a:string,start:timestamp,end:timestamp,cnt:bigint> |
| org.apache.spark.sql.catalyst.expressions.ToBinary | to_binary | SELECT to_binary('abc', 'utf-8') | struct<to_binary(abc, utf-8):binary> |
| org.apache.spark.sql.catalyst.expressions.ToDegrees | degrees | SELECT degrees(3.141592653589793) | struct<DEGREES(3.141592653589793):double> |
| org.apache.spark.sql.catalyst.expressions.ToNumber | to_number | SELECT to_number('454', '999') | struct<to_number(454, 999):decimal(3,0)> |
| org.apache.spark.sql.catalyst.expressions.ToRadians | radians | SELECT radians(180) | struct<RADIANS(180):double> |
Expand Down
16 changes: 16 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/string-functions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,19 @@ select to_number('-454', '-000');
select to_number('-454', 'S000');
select to_number('12,454.8-', '00,000.9-');
select to_number('00,454.8-', '00,000.9-');

-- to_binary
select to_binary('abc');
select to_binary('abc', 'utf-8');
select to_binary('abc', 'base64');
select to_binary('abc', 'base2');
select to_binary('abc', 'hex');
select to_binary('abc', concat('utf', '-8'));
select to_binary('abc', concat('base', '64'));
select to_binary('abc', 'Hex');
select to_binary('abc', 'UTF-8');
select to_binary('abc', null);
select to_binary(null, 'utf-8');
select to_binary(null, null);
select to_binary(null, cast(null as string));
select to_binary('abc', 'invalidFormat');
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 102
-- Number of queries: 116


-- !query
Expand Down Expand Up @@ -824,3 +824,116 @@ select to_number('00,454.8-', '00,000.9-')
struct<to_number(00,454.8-, 00,000.9-):decimal(6,1)>
-- !query output
-454.8


-- !query
select to_binary('abc')
-- !query schema
struct<to_binary(abc):binary>
-- !query output


-- !query
select to_binary('abc', 'utf-8')
-- !query schema
struct<to_binary(abc, utf-8):binary>
-- !query output
abc


-- !query
select to_binary('abc', 'base64')
-- !query schema
struct<to_binary(abc, base64):binary>
-- !query output
i�


-- !query
select to_binary('abc', 'base2')
-- !query schema
struct<to_binary(abc, base2):binary>
-- !query output
abc


-- !query
select to_binary('abc', 'hex')
-- !query schema
struct<to_binary(abc, hex):binary>
-- !query output


-- !query
select to_binary('abc', concat('utf', '-8'))
-- !query schema
struct<to_binary(abc, concat(utf, -8)):binary>
-- !query output
abc


-- !query
select to_binary('abc', concat('base', '64'))
-- !query schema
struct<to_binary(abc, concat(base, 64)):binary>
-- !query output
i�


-- !query
select to_binary('abc', 'Hex')
-- !query schema
struct<to_binary(abc, Hex):binary>
-- !query output


-- !query
select to_binary('abc', 'UTF-8')
-- !query schema
struct<to_binary(abc, UTF-8):binary>
-- !query output
abc


-- !query
select to_binary('abc', null)
-- !query schema
struct<to_binary(abc, NULL):binary>
-- !query output
NULL


-- !query
select to_binary(null, 'utf-8')
-- !query schema
struct<to_binary(NULL, utf-8):binary>
-- !query output
NULL


-- !query
select to_binary(null, null)
-- !query schema
struct<to_binary(NULL, NULL):binary>
-- !query output
NULL


-- !query
select to_binary(null, cast(null as string))
-- !query schema
struct<to_binary(NULL, CAST(NULL AS STRING)):binary>
-- !query output
NULL


-- !query
select to_binary('abc', 'invalidFormat')
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
cannot resolve 'to_binary('abc', 'invalidFormat')' due to data type mismatch: Unsupported encoding format: Some(invalidFormat). The format has to be a case-insensitive string literal of 'hex', 'utf-8', 'base2', or 'base64'; line 1 pos 7
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 102
-- Number of queries: 116


-- !query
Expand Down Expand Up @@ -820,3 +820,116 @@ select to_number('00,454.8-', '00,000.9-')
struct<to_number(00,454.8-, 00,000.9-):decimal(6,1)>
-- !query output
-454.8


-- !query
select to_binary('abc')
-- !query schema
struct<to_binary(abc):binary>
-- !query output


-- !query
select to_binary('abc', 'utf-8')
-- !query schema
struct<to_binary(abc, utf-8):binary>
-- !query output
abc


-- !query
select to_binary('abc', 'base64')
-- !query schema
struct<to_binary(abc, base64):binary>
-- !query output
i�


-- !query
select to_binary('abc', 'base2')
-- !query schema
struct<to_binary(abc, base2):binary>
-- !query output
abc


-- !query
select to_binary('abc', 'hex')
-- !query schema
struct<to_binary(abc, hex):binary>
-- !query output


-- !query
select to_binary('abc', concat('utf', '-8'))
-- !query schema
struct<to_binary(abc, concat(utf, -8)):binary>
-- !query output
abc


-- !query
select to_binary('abc', concat('base', '64'))
-- !query schema
struct<to_binary(abc, concat(base, 64)):binary>
-- !query output
i�


-- !query
select to_binary('abc', 'Hex')
-- !query schema
struct<to_binary(abc, Hex):binary>
-- !query output


-- !query
select to_binary('abc', 'UTF-8')
-- !query schema
struct<to_binary(abc, UTF-8):binary>
-- !query output
abc


-- !query
select to_binary('abc', null)
-- !query schema
struct<to_binary(abc, NULL):binary>
-- !query output
NULL


-- !query
select to_binary(null, 'utf-8')
-- !query schema
struct<to_binary(NULL, utf-8):binary>
-- !query output
NULL


-- !query
select to_binary(null, null)
-- !query schema
struct<to_binary(NULL, NULL):binary>
-- !query output
NULL


-- !query
select to_binary(null, cast(null as string))
-- !query schema
struct<to_binary(NULL, CAST(NULL AS STRING)):binary>
-- !query output
NULL


-- !query
select to_binary('abc', 'invalidFormat')
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
cannot resolve 'to_binary('abc', 'invalidFormat')' due to data type mismatch: Unsupported encoding format: Some(invalidFormat). The format has to be a case-insensitive string literal of 'hex', 'utf-8', 'base2', or 'base64'; line 1 pos 7

0 comments on commit 25dd425

Please sign in to comment.