Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50031][SQL] Add the TryParseUrl expression #48500

Closed
wants to merge 12 commits into from
Closed
1 change: 1 addition & 0 deletions docs/sql-ref-ansi-compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ When ANSI mode is on, it throws exceptions for invalid operations. You can use t
- `try_avg`: identical to the function `avg`, except that it returns `NULL` result instead of throwing an exception on decimal/interval value overflow.
- `try_element_at`: identical to the function `element_at`, except that it returns `NULL` result instead of throwing an exception on array's index out of bound.
- `try_to_timestamp`: identical to the function `to_timestamp`, except that it returns `NULL` result instead of throwing an exception on string parsing error.
- `try_parse_url`: identical to the function `parse_url`, except that it returns `NULL` result instead of throwing an exception on url parsing error.

### SQL Keywords (optional, disabled by default)

Expand Down
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.sql/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ URL Functions
:toctree: api/

parse_url
try_parse_url
url_decode
url_encode
try_url_decode
Expand Down
12 changes: 12 additions & 0 deletions python/pyspark/sql/connect/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2816,6 +2816,18 @@ def parse_url(
parse_url.__doc__ = pysparkfuncs.parse_url.__doc__


def try_parse_url(
url: "ColumnOrName", partToExtract: "ColumnOrName", key: Optional["ColumnOrName"] = None
) -> Column:
if key is not None:
return _invoke_function_over_columns("try_parse_url", url, partToExtract, key)
else:
return _invoke_function_over_columns("try_parse_url", url, partToExtract)


try_parse_url.__doc__ = pysparkfuncs.try_parse_url.__doc__


def printf(format: "ColumnOrName", *cols: "ColumnOrName") -> Column:
return _invoke_function("printf", _to_col(format), *[_to_col(c) for c in cols])

Expand Down
116 changes: 116 additions & 0 deletions python/pyspark/sql/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13090,6 +13090,122 @@ def substr(
return _invoke_function_over_columns("substr", str, pos)


@_try_remote_functions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a one unittest in, e.g., test_functions.py? then the tests will be resued in both Spark connect and classic.

def try_parse_url(
url: "ColumnOrName", partToExtract: "ColumnOrName", key: Optional["ColumnOrName"] = None
) -> Column:
"""
This is a special version of `parse_url` that performs the same operation, but returns a
NULL value instead of raising an error if the decoding cannot be performed.

.. versionadded:: 4.0.0

Parameters
----------
url : :class:`~pyspark.sql.Column` or str
A column of strings, each representing a URL.
partToExtract : :class:`~pyspark.sql.Column` or str
A column of strings, each representing the part to extract from the URL.
key : :class:`~pyspark.sql.Column` or str, optional
A column of strings, each representing the key of a query parameter in the URL.

Returns
-------
:class:`~pyspark.sql.Column`
A new column of strings, each representing the value of the extracted part from the URL.

Examples
--------
Example 1: Extracting the query part from a URL

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame(
... [("https://spark.apache.org/path?query=1", "QUERY")],
... ["url", "part"]
... )
>>> df.select(sf.try_parse_url(df.url, df.part)).show()
+------------------------+
|try_parse_url(url, part)|
+------------------------+
| query=1|
+------------------------+

Example 2: Extracting the value of a specific query parameter from a URL

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame(
... [("https://spark.apache.org/path?query=1", "QUERY", "query")],
... ["url", "part", "key"]
... )
>>> df.select(sf.try_parse_url(df.url, df.part, df.key)).show()
+-----------------------------+
|try_parse_url(url, part, key)|
+-----------------------------+
| 1|
+-----------------------------+

Example 3: Extracting the protocol part from a URL

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame(
... [("https://spark.apache.org/path?query=1", "PROTOCOL")],
... ["url", "part"]
... )
>>> df.select(sf.try_parse_url(df.url, df.part)).show()
+------------------------+
|try_parse_url(url, part)|
+------------------------+
| https|
+------------------------+

Example 4: Extracting the host part from a URL

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame(
... [("https://spark.apache.org/path?query=1", "HOST")],
... ["url", "part"]
... )
>>> df.select(sf.try_parse_url(df.url, df.part)).show()
+------------------------+
|try_parse_url(url, part)|
+------------------------+
| spark.apache.org|
+------------------------+

Example 5: Extracting the path part from a URL

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame(
... [("https://spark.apache.org/path?query=1", "PATH")],
... ["url", "part"]
... )
>>> df.select(sf.try_parse_url(df.url, df.part)).show()
+------------------------+
|try_parse_url(url, part)|
+------------------------+
| /path|
+------------------------+

Example 6: Invalid URL

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame(
... [("inva lid://spark.apache.org/path?query=1", "QUERY", "query")],
... ["url", "part", "key"]
... )
>>> df.select(sf.try_parse_url(df.url, df.part, df.key)).show()
+-----------------------------+
|try_parse_url(url, part, key)|
+-----------------------------+
| NULL|
+-----------------------------+
"""
if key is not None:
return _invoke_function_over_columns("try_parse_url", url, partToExtract, key)
else:
return _invoke_function_over_columns("try_parse_url", url, partToExtract)


@_try_remote_functions
def parse_url(
url: "ColumnOrName", partToExtract: "ColumnOrName", key: Optional["ColumnOrName"] = None
Expand Down
14 changes: 14 additions & 0 deletions python/pyspark/sql/tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,20 @@ def test_rand_functions(self):
rndn2 = df.select("key", F.randn(0)).collect()
self.assertEqual(sorted(rndn1), sorted(rndn2))

def test_try_parse_url(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also add a case here that returns null? This is to make sure python works as well with invalid urls.

df = self.spark.createDataFrame(
[("https://spark.apache.org/path?query=1", "QUERY", "query")],
["url", "part", "key"],
)
actual = df.select(F.try_parse_url(df.url, df.part, df.key)).collect()
self.assertEqual(actual, [Row("1")])
df = self.spark.createDataFrame(
[("inva lid://spark.apache.org/path?query=1", "QUERY", "query")],
["url", "part", "key"],
)
actual = df.select(F.try_parse_url(df.url, df.part, df.key)).collect()
self.assertEqual(actual, [Row(None)])

def test_string_functions(self):
string_functions = [
"upper",
Expand Down
18 changes: 18 additions & 0 deletions sql/api/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4676,6 +4676,24 @@ object functions {
*/
def substr(str: Column, pos: Column): Column = Column.fn("substr", str, pos)

/**
* Extracts a part from a URL.
*
* @group url_funcs
* @since 4.0.0
*/
def try_parse_url(url: Column, partToExtract: Column, key: Column): Column =
Column.fn("try_parse_url", url, partToExtract, key)

/**
* Extracts a part from a URL.
*
* @group url_funcs
* @since 4.0.0
*/
def try_parse_url(url: Column, partToExtract: Column): Column =
Column.fn("try_parse_url", url, partToExtract)

/**
* Extracts a part from a URL.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ object FunctionRegistry {
expression[UrlEncode]("url_encode"),
expression[UrlDecode]("url_decode"),
expression[ParseUrl]("parse_url"),
expression[TryParseUrl]("try_parse_url"),

// datetime functions
expression[AddMonths]("add_months"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,37 @@ object ParseUrl {
private val REGEXSUBFIX = "=([^&]*)"
}

/**
* Extracts a part from a URL
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is useless. Let's remove it.

*/
@ExpressionDescription(
usage = "_FUNC_(url, partToExtract[, key]) - Extracts a part from a URL.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ParseUrl has the same description. Could write a few words what destiguish new expression from ParseUrl.

examples = """
Examples:
> SELECT _FUNC_('http://spark.apache.org/path?query=1', 'HOST');
spark.apache.org
> SELECT _FUNC_('http://spark.apache.org/path?query=1', 'QUERY');
query=1
> SELECT _FUNC_('inva lid://spark.apache.org/path?query=1', 'QUERY');
NULL
> SELECT _FUNC_('http://spark.apache.org/path?query=1', 'QUERY', 'query');
1
""",
since = "4.0.0",
group = "url_funcs")
case class TryParseUrl(params: Seq[Expression], replacement: Expression)
extends RuntimeReplaceable with InheritAnalysisRules {
def this(children: Seq[Expression]) = this(children, ParseUrl(children, failOnError = false))

override def prettyName: String = "try_parse_url"

override def parameters: Seq[Expression] = params

override protected def withNewChildInternal(newChild: Expression): Expression = {
copy(replacement = newChild)
}
}

/**
* Extracts a part from a URL
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@
| org.apache.spark.sql.catalyst.expressions.TryElementAt | try_element_at | SELECT try_element_at(array(1, 2, 3), 2) | struct<try_element_at(array(1, 2, 3), 2):int> |
| org.apache.spark.sql.catalyst.expressions.TryMod | try_mod | SELECT try_mod(3, 2) | struct<try_mod(3, 2):int> |
| org.apache.spark.sql.catalyst.expressions.TryMultiply | try_multiply | SELECT try_multiply(2, 3) | struct<try_multiply(2, 3):int> |
| org.apache.spark.sql.catalyst.expressions.TryParseUrl | try_parse_url | SELECT try_parse_url('http://spark.apache.org/path?query=1', 'HOST') | struct<try_parse_url(http://spark.apache.org/path?query=1, HOST):string> |
| org.apache.spark.sql.catalyst.expressions.TryReflect | try_reflect | SELECT try_reflect('java.util.UUID', 'randomUUID') | struct<try_reflect(java.util.UUID, randomUUID):string> |
| org.apache.spark.sql.catalyst.expressions.TrySubtract | try_subtract | SELECT try_subtract(2, 1) | struct<try_subtract(2, 1):int> |
| org.apache.spark.sql.catalyst.expressions.TryToBinary | try_to_binary | SELECT try_to_binary('abc', 'utf-8') | struct<try_to_binary(abc, utf-8):binary> |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,50 @@ class UrlFunctionsSuite extends QueryTest with SharedSparkSession {
}
}

test("url try_parse_url function") {

def testUrl(url: String, expected: Row): Unit = {
checkAnswer(Seq[String]((url)).toDF("url").selectExpr(
"try_parse_url(url, 'HOST')", "try_parse_url(url, 'PATH')",
"try_parse_url(url, 'QUERY')", "try_parse_url(url, 'REF')",
"try_parse_url(url, 'PROTOCOL')", "try_parse_url(url, 'FILE')",
"try_parse_url(url, 'AUTHORITY')", "try_parse_url(url, 'USERINFO')",
"try_parse_url(url, 'QUERY', 'query')"), expected)
}

testUrl(
"http://[email protected]/path?query=1#Ref",
Row("spark.apache.org", "/path", "query=1", "Ref",
"http", "/path?query=1", "[email protected]", "userinfo", "1"))

testUrl(
"https://use%20r:pas%[email protected]/dir%20/pa%20th.HTML?query=x%20y&q2=2#Ref%20two",
Row("example.com", "/dir%20/pa%20th.HTML", "query=x%20y&q2=2", "Ref%20two",
"https", "/dir%20/pa%20th.HTML?query=x%20y&q2=2", "use%20r:pas%[email protected]",
"use%20r:pas%20s", "x%20y"))

testUrl(
"http://user:pass@host",
Row("host", "", null, null, "http", "", "user:pass@host", "user:pass", null))

testUrl(
"http://user:pass@host/",
Row("host", "/", null, null, "http", "/", "user:pass@host", "user:pass", null))

testUrl(
"http://user:pass@host/?#",
Row("host", "/", "", "", "http", "/?", "user:pass@host", "user:pass", null))

testUrl(
"http://user:pass@host/file;param?query;p2",
Row("host", "/file;param", "query;p2", null, "http", "/file;param?query;p2",
"user:pass@host", "user:pass", null))

testUrl(
"inva lid://user:pass@host/file;param?query;p2",
Row(null, null, null, null, null, null, null, null, null))
}

test("url encode/decode function") {
def testUrl(url: String, fn: String, expected: Row): Unit = {
checkAnswer(Seq[String]((url)).toDF("url")
Expand Down