diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index b4446b1538cd6..200ddc9a20f3d 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -379,6 +379,7 @@ When ANSI mode is on, it throws exceptions for invalid operations. You can use t - `try_avg`: identical to the function `avg`, except that it returns `NULL` result instead of throwing an exception on decimal/interval value overflow. - `try_element_at`: identical to the function `element_at`, except that it returns `NULL` result instead of throwing an exception on array's index out of bound. - `try_to_timestamp`: identical to the function `to_timestamp`, except that it returns `NULL` result instead of throwing an exception on string parsing error. + - `try_parse_url`: identical to the function `parse_url`, except that it returns `NULL` result instead of throwing an exception on url parsing error. ### SQL Keywords (optional, disabled by default) diff --git a/python/docs/source/reference/pyspark.sql/functions.rst b/python/docs/source/reference/pyspark.sql/functions.rst index 53904718fff6a..bf73fec58280d 100644 --- a/python/docs/source/reference/pyspark.sql/functions.rst +++ b/python/docs/source/reference/pyspark.sql/functions.rst @@ -587,6 +587,7 @@ URL Functions :toctree: api/ parse_url + try_parse_url url_decode url_encode try_url_decode diff --git a/python/pyspark/sql/connect/functions/builtin.py b/python/pyspark/sql/connect/functions/builtin.py index e88dfb2cc8bef..d93cbd71e686e 100644 --- a/python/pyspark/sql/connect/functions/builtin.py +++ b/python/pyspark/sql/connect/functions/builtin.py @@ -2819,6 +2819,18 @@ def parse_url( parse_url.__doc__ = pysparkfuncs.parse_url.__doc__ +def try_parse_url( + url: "ColumnOrName", partToExtract: "ColumnOrName", key: Optional["ColumnOrName"] = None +) -> Column: + if key is not None: + return _invoke_function_over_columns("try_parse_url", url, partToExtract, key) + else: + return _invoke_function_over_columns("try_parse_url", url, partToExtract) + + +try_parse_url.__doc__ = pysparkfuncs.try_parse_url.__doc__ + + def printf(format: "ColumnOrName", *cols: "ColumnOrName") -> Column: return _invoke_function("printf", _to_col(format), *[_to_col(c) for c in cols]) diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index 65d8bfde1411f..872411b5bb995 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -13090,6 +13090,122 @@ def substr( return _invoke_function_over_columns("substr", str, pos) +@_try_remote_functions +def try_parse_url( + url: "ColumnOrName", partToExtract: "ColumnOrName", key: Optional["ColumnOrName"] = None +) -> Column: + """ + This is a special version of `parse_url` that performs the same operation, but returns a + NULL value instead of raising an error if the parsing cannot be performed. + + .. versionadded:: 4.0.0 + + Parameters + ---------- + url : :class:`~pyspark.sql.Column` or str + A column of strings, each representing a URL. + partToExtract : :class:`~pyspark.sql.Column` or str + A column of strings, each representing the part to extract from the URL. + key : :class:`~pyspark.sql.Column` or str, optional + A column of strings, each representing the key of a query parameter in the URL. + + Returns + ------- + :class:`~pyspark.sql.Column` + A new column of strings, each representing the value of the extracted part from the URL. + + Examples + -------- + Example 1: Extracting the query part from a URL + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame( + ... [("https://spark.apache.org/path?query=1", "QUERY")], + ... ["url", "part"] + ... ) + >>> df.select(sf.try_parse_url(df.url, df.part)).show() + +------------------------+ + |try_parse_url(url, part)| + +------------------------+ + | query=1| + +------------------------+ + + Example 2: Extracting the value of a specific query parameter from a URL + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame( + ... [("https://spark.apache.org/path?query=1", "QUERY", "query")], + ... ["url", "part", "key"] + ... ) + >>> df.select(sf.try_parse_url(df.url, df.part, df.key)).show() + +-----------------------------+ + |try_parse_url(url, part, key)| + +-----------------------------+ + | 1| + +-----------------------------+ + + Example 3: Extracting the protocol part from a URL + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame( + ... [("https://spark.apache.org/path?query=1", "PROTOCOL")], + ... ["url", "part"] + ... ) + >>> df.select(sf.try_parse_url(df.url, df.part)).show() + +------------------------+ + |try_parse_url(url, part)| + +------------------------+ + | https| + +------------------------+ + + Example 4: Extracting the host part from a URL + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame( + ... [("https://spark.apache.org/path?query=1", "HOST")], + ... ["url", "part"] + ... ) + >>> df.select(sf.try_parse_url(df.url, df.part)).show() + +------------------------+ + |try_parse_url(url, part)| + +------------------------+ + | spark.apache.org| + +------------------------+ + + Example 5: Extracting the path part from a URL + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame( + ... [("https://spark.apache.org/path?query=1", "PATH")], + ... ["url", "part"] + ... ) + >>> df.select(sf.try_parse_url(df.url, df.part)).show() + +------------------------+ + |try_parse_url(url, part)| + +------------------------+ + | /path| + +------------------------+ + + Example 6: Invalid URL + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame( + ... [("inva lid://spark.apache.org/path?query=1", "QUERY", "query")], + ... ["url", "part", "key"] + ... ) + >>> df.select(sf.try_parse_url(df.url, df.part, df.key)).show() + +-----------------------------+ + |try_parse_url(url, part, key)| + +-----------------------------+ + | NULL| + +-----------------------------+ + """ + if key is not None: + return _invoke_function_over_columns("try_parse_url", url, partToExtract, key) + else: + return _invoke_function_over_columns("try_parse_url", url, partToExtract) + + @_try_remote_functions def parse_url( url: "ColumnOrName", partToExtract: "ColumnOrName", key: Optional["ColumnOrName"] = None diff --git a/python/pyspark/sql/tests/test_functions.py b/python/pyspark/sql/tests/test_functions.py index f4f00297c70c7..f6f54aee6283d 100644 --- a/python/pyspark/sql/tests/test_functions.py +++ b/python/pyspark/sql/tests/test_functions.py @@ -333,6 +333,20 @@ def test_rand_functions(self): rndn2 = df.select("key", F.randn(0)).collect() self.assertEqual(sorted(rndn1), sorted(rndn2)) + def test_try_parse_url(self): + df = self.spark.createDataFrame( + [("https://spark.apache.org/path?query=1", "QUERY", "query")], + ["url", "part", "key"], + ) + actual = df.select(F.try_parse_url(df.url, df.part, df.key)).collect() + self.assertEqual(actual, [Row("1")]) + df = self.spark.createDataFrame( + [("inva lid://spark.apache.org/path?query=1", "QUERY", "query")], + ["url", "part", "key"], + ) + actual = df.select(F.try_parse_url(df.url, df.part, df.key)).collect() + self.assertEqual(actual, [Row(None)]) + def test_string_functions(self): string_functions = [ "upper", diff --git a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala index ece9d638e7c61..d81b9c5060f68 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala @@ -4678,6 +4678,24 @@ object functions { */ def substr(str: Column, pos: Column): Column = Column.fn("substr", str, pos) + /** + * Extracts a part from a URL. + * + * @group url_funcs + * @since 4.0.0 + */ + def try_parse_url(url: Column, partToExtract: Column, key: Column): Column = + Column.fn("try_parse_url", url, partToExtract, key) + + /** + * Extracts a part from a URL. + * + * @group url_funcs + * @since 4.0.0 + */ + def try_parse_url(url: Column, partToExtract: Column): Column = + Column.fn("try_parse_url", url, partToExtract) + /** * Extracts a part from a URL. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index abe61619a2331..3836eabe6bec6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -615,6 +615,7 @@ object FunctionRegistry { expression[UrlEncode]("url_encode"), expression[UrlDecode]("url_decode"), expression[ParseUrl]("parse_url"), + expression[TryParseUrl]("try_parse_url"), // datetime functions expression[AddMonths]("add_months"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/urlExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/urlExpressions.scala index 95f22663eb59a..bf1a788554284 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/urlExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/urlExpressions.scala @@ -169,6 +169,36 @@ object ParseUrl { private val REGEXSUBFIX = "=([^&]*)" } +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(url, partToExtract[, key]) - This is a special version of `parse_url` that performs the same operation, but returns a NULL value instead of raising an error if the parsing cannot be performed.", + examples = """ + Examples: + > SELECT _FUNC_('http://spark.apache.org/path?query=1', 'HOST'); + spark.apache.org + > SELECT _FUNC_('http://spark.apache.org/path?query=1', 'QUERY'); + query=1 + > SELECT _FUNC_('inva lid://spark.apache.org/path?query=1', 'QUERY'); + NULL + > SELECT _FUNC_('http://spark.apache.org/path?query=1', 'QUERY', 'query'); + 1 + """, + since = "4.0.0", + group = "url_funcs") +// scalastyle:on line.size.limit +case class TryParseUrl(params: Seq[Expression], replacement: Expression) + extends RuntimeReplaceable with InheritAnalysisRules { + def this(children: Seq[Expression]) = this(children, ParseUrl(children, failOnError = false)) + + override def prettyName: String = "try_parse_url" + + override def parameters: Seq[Expression] = params + + override protected def withNewChildInternal(newChild: Expression): Expression = { + copy(replacement = newChild) + } +} + /** * Extracts a part from a URL */ diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md index 5ad1380e1fb82..9006a20d13f08 100644 --- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md +++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md @@ -356,6 +356,7 @@ | org.apache.spark.sql.catalyst.expressions.TryElementAt | try_element_at | SELECT try_element_at(array(1, 2, 3), 2) | struct | | org.apache.spark.sql.catalyst.expressions.TryMod | try_mod | SELECT try_mod(3, 2) | struct | | org.apache.spark.sql.catalyst.expressions.TryMultiply | try_multiply | SELECT try_multiply(2, 3) | struct | +| org.apache.spark.sql.catalyst.expressions.TryParseUrl | try_parse_url | SELECT try_parse_url('http://spark.apache.org/path?query=1', 'HOST') | struct | | org.apache.spark.sql.catalyst.expressions.TryReflect | try_reflect | SELECT try_reflect('java.util.UUID', 'randomUUID') | struct | | org.apache.spark.sql.catalyst.expressions.TrySubtract | try_subtract | SELECT try_subtract(2, 1) | struct | | org.apache.spark.sql.catalyst.expressions.TryToBinary | try_to_binary | SELECT try_to_binary('abc', 'utf-8') | struct | diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UrlFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UrlFunctionsSuite.scala index 428065fb6986f..aeb7101371cda 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UrlFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UrlFunctionsSuite.scala @@ -84,6 +84,50 @@ class UrlFunctionsSuite extends QueryTest with SharedSparkSession { } } + test("url try_parse_url function") { + + def testUrl(url: String, expected: Row): Unit = { + checkAnswer(Seq[String]((url)).toDF("url").selectExpr( + "try_parse_url(url, 'HOST')", "try_parse_url(url, 'PATH')", + "try_parse_url(url, 'QUERY')", "try_parse_url(url, 'REF')", + "try_parse_url(url, 'PROTOCOL')", "try_parse_url(url, 'FILE')", + "try_parse_url(url, 'AUTHORITY')", "try_parse_url(url, 'USERINFO')", + "try_parse_url(url, 'QUERY', 'query')"), expected) + } + + testUrl( + "http://userinfo@spark.apache.org/path?query=1#Ref", + Row("spark.apache.org", "/path", "query=1", "Ref", + "http", "/path?query=1", "userinfo@spark.apache.org", "userinfo", "1")) + + testUrl( + "https://use%20r:pas%20s@example.com/dir%20/pa%20th.HTML?query=x%20y&q2=2#Ref%20two", + Row("example.com", "/dir%20/pa%20th.HTML", "query=x%20y&q2=2", "Ref%20two", + "https", "/dir%20/pa%20th.HTML?query=x%20y&q2=2", "use%20r:pas%20s@example.com", + "use%20r:pas%20s", "x%20y")) + + testUrl( + "http://user:pass@host", + Row("host", "", null, null, "http", "", "user:pass@host", "user:pass", null)) + + testUrl( + "http://user:pass@host/", + Row("host", "/", null, null, "http", "/", "user:pass@host", "user:pass", null)) + + testUrl( + "http://user:pass@host/?#", + Row("host", "/", "", "", "http", "/?", "user:pass@host", "user:pass", null)) + + testUrl( + "http://user:pass@host/file;param?query;p2", + Row("host", "/file;param", "query;p2", null, "http", "/file;param?query;p2", + "user:pass@host", "user:pass", null)) + + testUrl( + "inva lid://user:pass@host/file;param?query;p2", + Row(null, null, null, null, null, null, null, null, null)) + } + test("url encode/decode function") { def testUrl(url: String, fn: String, expected: Row): Unit = { checkAnswer(Seq[String]((url)).toDF("url")