Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50031][SQL] Add the TryParseUrl expression #48500

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3207,7 +3207,7 @@
},
"INVALID_URL" : {
"message" : [
"The url is invalid: <url>. If necessary set <ansiConfig> to \"false\" to bypass this error."
"The url is invalid: <url>. Use \"try_parse_url\" to tolerate invalid URL and return NULL instead."
],
"sqlState" : "22P02"
},
Expand Down
1 change: 1 addition & 0 deletions docs/sql-ref-ansi-compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ When ANSI mode is on, it throws exceptions for invalid operations. You can use t
- `try_avg`: identical to the function `avg`, except that it returns `NULL` result instead of throwing an exception on decimal/interval value overflow.
- `try_element_at`: identical to the function `element_at`, except that it returns `NULL` result instead of throwing an exception on array's index out of bound.
- `try_to_timestamp`: identical to the function `to_timestamp`, except that it returns `NULL` result instead of throwing an exception on string parsing error.
- `try_parse_url`: identical to the function `parse_url`, except that it returns `NULL` result instead of throwing an exception on url parsing error.

### SQL Keywords (optional, disabled by default)

Expand Down
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.sql/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ URL Functions
:toctree: api/

parse_url
try_parse_url
url_decode
url_encode
try_url_decode
Expand Down
12 changes: 12 additions & 0 deletions python/pyspark/sql/connect/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2788,6 +2788,18 @@ def parse_url(
parse_url.__doc__ = pysparkfuncs.parse_url.__doc__


def try_parse_url(
url: "ColumnOrName", partToExtract: "ColumnOrName", key: Optional["ColumnOrName"] = None
) -> Column:
if key is not None:
return _invoke_function_over_columns("try_parse_url", url, partToExtract, key)
else:
return _invoke_function_over_columns("try_parse_url", url, partToExtract)


try_parse_url.__doc__ = pysparkfuncs.try_parse_url.__doc__


def printf(format: "ColumnOrName", *cols: "ColumnOrName") -> Column:
return _invoke_function("printf", _to_col(format), *[_to_col(c) for c in cols])

Expand Down
102 changes: 102 additions & 0 deletions python/pyspark/sql/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -12912,6 +12912,108 @@ def substr(
return _invoke_function_over_columns("substr", str, pos)


@_try_remote_functions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a one unittest in, e.g., test_functions.py? then the tests will be resued in both Spark connect and classic.

def try_parse_url(
url: "ColumnOrName", partToExtract: "ColumnOrName", key: Optional["ColumnOrName"] = None
) -> Column:
"""
URL function: Extracts a specified part from a URL. If a key is provided,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update this description to give the similar message as other try functions?

it returns the associated query parameter value.

.. versionadded:: 4.0.0

Parameters
----------
url : :class:`~pyspark.sql.Column` or str
A column of strings, each representing a URL.
partToExtract : :class:`~pyspark.sql.Column` or str
A column of strings, each representing the part to extract from the URL.
key : :class:`~pyspark.sql.Column` or str, optional
A column of strings, each representing the key of a query parameter in the URL.

Returns
-------
:class:`~pyspark.sql.Column`
A new column of strings, each representing the value of the extracted part from the URL.

Examples
--------
Example 1: Extracting the query part from a URL

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame(
... [("https://spark.apache.org/path?query=1", "QUERY")],
... ["url", "part"]
... )
>>> df.select(sf.parse_url(df.url, df.part)).show()
+--------------------+
|parse_url(url, part)|
+--------------------+
| query=1|
+--------------------+

Example 2: Extracting the value of a specific query parameter from a URL

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame(
... [("https://spark.apache.org/path?query=1", "QUERY", "query")],
... ["url", "part", "key"]
... )
>>> df.select(sf.parse_url(df.url, df.part, df.key)).show()
+-------------------------+
|parse_url(url, part, key)|
+-------------------------+
| 1|
+-------------------------+

Example 3: Extracting the protocol part from a URL

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame(
... [("https://spark.apache.org/path?query=1", "PROTOCOL")],
... ["url", "part"]
... )
>>> df.select(sf.parse_url(df.url, df.part)).show()
+--------------------+
|parse_url(url, part)|
+--------------------+
| https|
+--------------------+

Example 4: Extracting the host part from a URL

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame(
... [("https://spark.apache.org/path?query=1", "HOST")],
... ["url", "part"]
... )
>>> df.select(sf.parse_url(df.url, df.part)).show()
+--------------------+
|parse_url(url, part)|
+--------------------+
| spark.apache.org|
+--------------------+

Example 5: Extracting the path part from a URL

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame(
... [("https://spark.apache.org/path?query=1", "PATH")],
... ["url", "part"]
... )
>>> df.select(sf.parse_url(df.url, df.part)).show()
+--------------------+
|parse_url(url, part)|
+--------------------+
| /path|
+--------------------+
"""
if key is not None:
return _invoke_function_over_columns("try_parse_url", url, partToExtract, key)
else:
return _invoke_function_over_columns("try_parse_url", url, partToExtract)


@_try_remote_functions
def parse_url(
url: "ColumnOrName", partToExtract: "ColumnOrName", key: Optional["ColumnOrName"] = None
Expand Down
18 changes: 18 additions & 0 deletions sql/api/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4638,6 +4638,24 @@ object functions {
*/
def substr(str: Column, pos: Column): Column = Column.fn("substr", str, pos)

/**
* Extracts a part from a URL.
*
* @group url_funcs
* @since 4.0.0
*/
def try_parse_url(url: Column, partToExtract: Column, key: Column): Column =
Column.fn("try_parse_url", url, partToExtract, key)

/**
* Extracts a part from a URL.
*
* @group url_funcs
* @since 4.0.0
*/
def try_parse_url(url: Column, partToExtract: Column): Column =
Column.fn("try_parse_url", url, partToExtract)

/**
* Extracts a part from a URL.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ object FunctionRegistry {
expression[UrlEncode]("url_encode"),
expression[UrlDecode]("url_decode"),
expression[ParseUrl]("parse_url"),
expression[TryParseUrl]("try_parse_url"),

// datetime functions
expression[AddMonths]("add_months"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,35 @@ object ParseUrl {
private val REGEXSUBFIX = "=([^&]*)"
}

/**
* Extracts a part from a URL
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is useless. Let's remove it.

*/
@ExpressionDescription(
usage = "_FUNC_(url, partToExtract[, key]) - Extracts a part from a URL.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ParseUrl has the same description. Could write a few words what destiguish new expression from ParseUrl.

examples = """
Examples:
> SELECT _FUNC_('http://spark.apache.org/path?query=1', 'HOST');
spark.apache.org
> SELECT _FUNC_('http://spark.apache.org/path?query=1', 'QUERY');
query=1
> SELECT _FUNC_('http://spark.apache.org/path?query=1', 'QUERY', 'query');
1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you show an example of the NULL result?

""",
since = "4.0.0",
group = "url_funcs")
case class TryParseUrl(params: Seq[Expression], replacement: Expression)
extends RuntimeReplaceable with InheritAnalysisRules {
def this(children: Seq[Expression]) = this(children, ParseUrl(children, failOnError = false))

override def prettyName: String = "try_parse_url"

override def parameters: Seq[Expression] = params

override protected def withNewChildInternal(newChild: Expression): Expression = {
copy(replacement = newChild)
}
}

/**
* Extracts a part from a URL
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,7 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
def invalidUrlError(url: UTF8String, e: URISyntaxException): SparkIllegalArgumentException = {
new SparkIllegalArgumentException(
errorClass = "INVALID_URL",
messageParameters = Map(
"url" -> url.toString,
"ansiConfig" -> toSQLConf(SQLConf.ANSI_ENABLED.key)),
messageParameters = Map("url" -> url.toString),
cause = e)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@
| org.apache.spark.sql.catalyst.expressions.TryElementAt | try_element_at | SELECT try_element_at(array(1, 2, 3), 2) | struct<try_element_at(array(1, 2, 3), 2):int> |
| org.apache.spark.sql.catalyst.expressions.TryMod | try_mod | SELECT try_mod(3, 2) | struct<try_mod(3, 2):int> |
| org.apache.spark.sql.catalyst.expressions.TryMultiply | try_multiply | SELECT try_multiply(2, 3) | struct<try_multiply(2, 3):int> |
| org.apache.spark.sql.catalyst.expressions.TryParseUrl | try_parse_url | SELECT try_parse_url('http://spark.apache.org/path?query=1', 'HOST') | struct<try_parse_url(http://spark.apache.org/path?query=1, HOST):string> |
| org.apache.spark.sql.catalyst.expressions.TryReflect | try_reflect | SELECT try_reflect('java.util.UUID', 'randomUUID') | struct<try_reflect(java.util.UUID, randomUUID):string> |
| org.apache.spark.sql.catalyst.expressions.TrySubtract | try_subtract | SELECT try_subtract(2, 1) | struct<try_subtract(2, 1):int> |
| org.apache.spark.sql.catalyst.expressions.TryToBinary | try_to_binary | SELECT try_to_binary('abc', 'utf-8') | struct<try_to_binary(abc, utf-8):binary> |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,50 @@ class UrlFunctionsSuite extends QueryTest with SharedSparkSession {
}
}

test("url try_parse_url function") {

def testUrl(url: String, expected: Row): Unit = {
checkAnswer(Seq[String]((url)).toDF("url").selectExpr(
"try_parse_url(url, 'HOST')", "try_parse_url(url, 'PATH')",
"try_parse_url(url, 'QUERY')", "try_parse_url(url, 'REF')",
"try_parse_url(url, 'PROTOCOL')", "try_parse_url(url, 'FILE')",
"try_parse_url(url, 'AUTHORITY')", "try_parse_url(url, 'USERINFO')",
"try_parse_url(url, 'QUERY', 'query')"), expected)
}

testUrl(
"http://[email protected]/path?query=1#Ref",
Row("spark.apache.org", "/path", "query=1", "Ref",
"http", "/path?query=1", "[email protected]", "userinfo", "1"))

testUrl(
"https://use%20r:pas%[email protected]/dir%20/pa%20th.HTML?query=x%20y&q2=2#Ref%20two",
Row("example.com", "/dir%20/pa%20th.HTML", "query=x%20y&q2=2", "Ref%20two",
"https", "/dir%20/pa%20th.HTML?query=x%20y&q2=2", "use%20r:pas%[email protected]",
"use%20r:pas%20s", "x%20y"))

testUrl(
"http://user:pass@host",
Row("host", "", null, null, "http", "", "user:pass@host", "user:pass", null))

testUrl(
"http://user:pass@host/",
Row("host", "/", null, null, "http", "/", "user:pass@host", "user:pass", null))

testUrl(
"http://user:pass@host/?#",
Row("host", "/", "", "", "http", "/?", "user:pass@host", "user:pass", null))

testUrl(
"http://user:pass@host/file;param?query;p2",
Row("host", "/file;param", "query;p2", null, "http", "/file;param?query;p2",
"user:pass@host", "user:pass", null))

testUrl(
"inva lid://user:pass@host/file;param?query;p2",
Row(null, null, null, null, null, null, null, null, null))
}

test("url encode/decode function") {
def testUrl(url: String, fn: String, expected: Row): Unit = {
checkAnswer(Seq[String]((url)).toDF("url")
Expand Down