Skip to content

Commit

Permalink
Merge branch 'apache:master' into SPARK-48582
Browse files Browse the repository at this point in the history
  • Loading branch information
LuciferYang authored Jun 11, 2024
2 parents 20fea5a + 452c1b6 commit 1ce4d86
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ case class StackTrace(elems: Seq[String]) {
override def toString: String = elems.mkString

def html: NodeSeq = {
val withNewLine = elems.foldLeft(NodeSeq.Empty) { (acc, elem) =>
val withNewLine = elems.map(_.stripLineEnd).foldLeft(NodeSeq.Empty) { (acc, elem) =>
if (acc.isEmpty) {
acc :+ Text(elem)
} else {
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/sql/connect/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@ def select(self, __cols: Union[List[Column], List[str]]) -> ParentDataFrame:
def select(self, *cols: "ColumnOrName") -> ParentDataFrame: # type: ignore[misc]
if len(cols) == 1 and isinstance(cols[0], list):
cols = cols[0]
if any(not isinstance(c, (str, Column)) for c in cols):
raise PySparkTypeError(
error_class="NOT_LIST_OF_COLUMN_OR_STR",
message_parameters={"arg_name": "columns"},
)
return DataFrame(
plan.Project(self._plan, [F._to_col(c) for c in cols]),
session=self._session,
Expand Down
11 changes: 11 additions & 0 deletions python/pyspark/sql/tests/connect/test_connect_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from pyspark.errors.exceptions.base import SessionNotSameException
from pyspark.sql.types import Row
from pyspark.testing.connectutils import should_test_connect
from pyspark.errors import PySparkTypeError
from pyspark.errors.exceptions.connect import AnalysisException
from pyspark.sql.tests.connect.test_connect_basic import SparkConnectSQLTestCase

Expand Down Expand Up @@ -214,6 +215,16 @@ def test_column_cannot_be_constructed_from_string(self):
with self.assertRaises(TypeError):
Column("col")

def test_select_none(self):
with self.assertRaises(PySparkTypeError) as e1:
self.connect.range(1).select(None)

self.check_error(
exception=e1.exception,
error_class="NOT_LIST_OF_COLUMN_OR_STR",
message_parameters={"arg_name": "columns"},
)


if __name__ == "__main__":
from pyspark.sql.tests.connect.test_connect_error import * # noqa: F401
Expand Down
12 changes: 12 additions & 0 deletions sql/catalyst/benchmarks/EscapePathBenchmark-jdk21-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
================================================================================================
Escape
================================================================================================

OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
Escape Tests: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Legacy 7128 7146 8 0.1 7127.9 1.0X
New 790 795 5 1.3 789.7 9.0X


12 changes: 12 additions & 0 deletions sql/catalyst/benchmarks/EscapePathBenchmark-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
================================================================================================
Escape
================================================================================================

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure
AMD EPYC 7763 64-Core Processor
Escape Tests: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Legacy 6719 6726 6 0.1 6719.3 1.0X
New 735 744 21 1.4 735.3 9.1X


Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object ExternalCatalogUtils {
// The following string escaping code is mainly copied from Hive (o.a.h.h.common.FileUtils).
//////////////////////////////////////////////////////////////////////////////////////////////////

val charToEscape = {
final val (charToEscape, sizeOfCharToEscape) = {
val bitSet = new java.util.BitSet(128)

/**
Expand All @@ -60,28 +60,42 @@ object ExternalCatalogUtils {
Array(' ', '<', '>', '|').foreach(bitSet.set(_))
}

bitSet
(bitSet, bitSet.size)
}

def needsEscaping(c: Char): Boolean = {
c < charToEscape.size() && charToEscape.get(c)
private final val HEX_CHARS = "0123456789ABCDEF".toCharArray

@inline final def needsEscaping(c: Char): Boolean = {
c < sizeOfCharToEscape && charToEscape.get(c)
}

def escapePathName(path: String): String = {
val builder = new StringBuilder()
path.foreach { c =>
if (needsEscaping(c)) {
builder.append('%')
builder.append(f"${c.asInstanceOf[Int]}%02X")
} else {
builder.append(c)
if (path == null || path.isEmpty) {
return path
}
val length = path.length
var firstIndex = 0
while (firstIndex < length && !needsEscaping(path.charAt(firstIndex))) {
firstIndex += 1
}
if (firstIndex == length) {
path
} else {
val sb = new java.lang.StringBuilder(length + 16)
if (firstIndex != 0) sb.append(path, 0, firstIndex)
while(firstIndex < length) {
val c = path.charAt(firstIndex)
if (needsEscaping(c)) {
sb.append('%').append(HEX_CHARS((c & 0xF0) >> 4)).append(HEX_CHARS(c & 0x0F))
} else {
sb.append(c)
}
firstIndex += 1
}
sb.toString
}

builder.toString()
}


def unescapePathName(path: String): String = {
val sb = new StringBuilder
var i = 0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst

import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils

/**
* Benchmark for path escaping
* To run this benchmark:
* {{{
* 1. without sbt:
* bin/spark-submit --class <this class> --jars <spark core test jar> <spark catalyst test jar>
* 2. build/sbt "catalyst/Test/runMain <this class>"
* 3. generate result:
* SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "catalyst/Test/runMain <this class>"
* Results will be written to "benchmarks/EscapePathBenchmark-results.txt".
* }}}
*/
object EscapePathBenchmark extends BenchmarkBase {
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
val N = 1000000
runBenchmark("Escape") {
val benchmark = new Benchmark("Escape Tests", N, 10, output = output)
val paths = Seq(
"https://issues.apache.org/jira/browse/SPARK-48551",
"https...issues.apache.org/jira/browse/SPARK-48551",
"https...issues.apache.org.jira/browse/SPARK-48551",
"https...issues.apache.org.jira.browse/SPARK-48551",
"https...issues.apache.org.jira.browse.SPARK-48551")
benchmark.addCase("Legacy") { _ =>
(1 to N).foreach(_ => paths.foreach(escapePathNameLegacy))
}

benchmark.addCase("New") { _ =>
(1 to N).foreach(_ => {
paths.foreach(ExternalCatalogUtils.escapePathName)
})
}
benchmark.run()
}
}

/**
* Legacy implementation of escapePathName before Spark 4.0
*/
def escapePathNameLegacy(path: String): String = {
val builder = new StringBuilder()
path.foreach { c =>
if (ExternalCatalogUtils.needsEscaping(c)) {
builder.append('%')
builder.append(f"${c.asInstanceOf[Int]}%02X")
} else {
builder.append(c)
}
}

builder.toString()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.catalog

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName

class ExternalCatalogUtilsSuite extends SparkFunSuite {

test("SPARK-48551: escapePathName") {
ExternalCatalogUtils.charToEscape.stream().toArray.map(_.asInstanceOf[Char]).foreach { c =>
// Check parity with old conversion technique:
assert(escapePathName(c.toString) === "%" + f"$c%02X",
s"wrong escaping for $c")
}
assert(escapePathName("") === "")
assert(escapePathName(" ") === " ")
assert(escapePathName("\n") === "%0A")
assert(escapePathName("a b") === "a b")
assert(escapePathName("a:b") === "a%3Ab")
assert(escapePathName(":ab") === "%3Aab")
assert(escapePathName("ab:") === "ab%3A")
assert(escapePathName("a%b") === "a%25b")
assert(escapePathName("a,b") === "a,b")
assert(escapePathName("a/b") === "a%2Fb")
}
}

0 comments on commit 1ce4d86

Please sign in to comment.