diff --git a/sql/catalyst/src/main/scala-2.13/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala b/sql/catalyst/src/main/scala-2.13/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala index cf74b98b31b0c..0d4b401ea2eb6 100644 --- a/sql/catalyst/src/main/scala-2.13/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala +++ b/sql/catalyst/src/main/scala-2.13/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.expressions -import scala.collection.mutable +import scala.collection.{mutable, View} import scala.collection.mutable.ArrayBuffer object ExpressionSet { @@ -86,6 +86,12 @@ class ExpressionSet protected( } } + override def concat(elems: IterableOnce[Expression]): Set[Expression] = { + val newSet = new ExpressionSet(baseSet.clone(), originals.clone()) + elems.iterator.foreach(newSet.add) + newSet + } + override def iterator: Iterator[Expression] = originals.iterator /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 477863a1b86d0..fdadcccea29ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1281,12 +1281,12 @@ class Analyzer( } if (attrMapping.isEmpty) { - newPlan -> attrMapping + newPlan -> attrMapping.toSeq } else { assert(!attrMapping.groupBy(_._1.exprId) .exists(_._2.map(_._2.exprId).distinct.length > 1), "Found duplicate rewrite attributes") - val attributeRewrites = AttributeMap(attrMapping) + val attributeRewrites = AttributeMap(attrMapping.toSeq) // Using attrMapping from the children plans to rewrite their parent node. // Note that we shouldn't rewrite a node using attrMapping from its sibling nodes. newPlan.transformExpressions { @@ -1294,7 +1294,7 @@ class Analyzer( dedupAttr(a, attributeRewrites) case s: SubqueryExpression => s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites)) - } -> attrMapping + } -> attrMapping.toSeq } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index 74c9b12a109d0..d9de72e1b217b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -3050,6 +3050,7 @@ trait ArraySetLike { @transient protected lazy val nullValueHolder = et match { case ByteType => "(byte) 0" case ShortType => "(short) 0" + case LongType => "(long) 0" case _ => "0" } @@ -3155,7 +3156,7 @@ case class ArrayDistinct(child: Expression) } } } - new GenericArrayData(arrayBuffer) + new GenericArrayData(arrayBuffer.toSeq) } } @@ -3313,7 +3314,7 @@ case class ArrayUnion(left: Expression, right: Expression) extends ArrayBinaryLi i += 1 } } - new GenericArrayData(arrayBuffer) + new GenericArrayData(arrayBuffer.toSeq) } else { (array1, array2) => val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] @@ -3344,7 +3345,7 @@ case class ArrayUnion(left: Expression, right: Expression) extends ArrayBinaryLi arrayBuffer += elem } })) - new GenericArrayData(arrayBuffer) + new GenericArrayData(arrayBuffer.toSeq) } } @@ -3476,7 +3477,7 @@ object ArrayUnion { arrayBuffer += elem } })) - new GenericArrayData(arrayBuffer) + new GenericArrayData(arrayBuffer.toSeq) } } @@ -3538,7 +3539,7 @@ case class ArrayIntersect(left: Expression, right: Expression) extends ArrayBina } i += 1 } - new GenericArrayData(arrayBuffer) + new GenericArrayData(arrayBuffer.toSeq) } else { new GenericArrayData(Array.emptyObjectArray) } @@ -3586,7 +3587,7 @@ case class ArrayIntersect(left: Expression, right: Expression) extends ArrayBina } i += 1 } - new GenericArrayData(arrayBuffer) + new GenericArrayData(arrayBuffer.toSeq) } else { new GenericArrayData(Array.emptyObjectArray) } @@ -3777,7 +3778,7 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL } i += 1 } - new GenericArrayData(arrayBuffer) + new GenericArrayData(arrayBuffer.toSeq) } else { (array1, array2) => val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] @@ -3822,7 +3823,7 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL } i += 1 } - new GenericArrayData(arrayBuffer) + new GenericArrayData(arrayBuffer.toSeq) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala index e5cf8c0a023d9..9fef8e9415e72 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala @@ -501,7 +501,7 @@ case class ArrayFilter( } i += 1 } - new GenericArrayData(buffer) + new GenericArrayData(buffer.toSeq) } override def prettyName: String = "filter" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index 4f339986a44e5..0614cc966734b 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -2370,8 +2370,8 @@ case class Sentences( widx = wi.current if (Character.isLetterOrDigit(word.charAt(0))) words += UTF8String.fromString(word) } - result += new GenericArrayData(words) + result += new GenericArrayData(words.toSeq) } - new GenericArrayData(result) + new GenericArrayData(result.toSeq) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index da76f8eeaf350..0da2baf24fbcb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -465,7 +465,7 @@ class JacksonParser( case null => None case _ => rootConverter.apply(parser) match { case null => throw new RuntimeException("Root converter returned null") - case rows => rows + case rows => rows.toSeq } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 045070f334ee8..bd259a41c50a3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -971,7 +971,7 @@ object CombineUnions extends Rule[LogicalPlan] { flattened += child } } - union.copy(children = flattened) + union.copy(children = flattened.toSeq) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index fe99a8ea3cc12..716ceb4141130 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -1612,13 +1612,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging // Create the function call. val name = ctx.functionName.getText val isDistinct = Option(ctx.setQuantifier()).exists(_.DISTINCT != null) - val arguments = ctx.argument.asScala.map(expression) match { + // Call `toSeq`, otherwise `ctx.argument.asScala.map(expression)` is `Buffer` in Scala 2.13 + val arguments = ctx.argument.asScala.map(expression).toSeq match { case Seq(UnresolvedStar(None)) if name.toLowerCase(Locale.ROOT) == "count" && !isDistinct => // Transform COUNT(*) into COUNT(1). Seq(Literal(1)) case expressions => - expressions.toSeq + expressions } val filter = Option(ctx.where).map(expression(_)) val function = UnresolvedFunction( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index e161dc604b589..4c406a5492ac9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -275,8 +275,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { case s: Seq[_] => s.map(mapChild) case m: Map[_, _] => + // `map.mapValues().view.force` return `Map` in Scala 2.12 but return `IndexedSeq` in Scala + // 2.13, call `toMap` method manually to compatible with Scala 2.12 and Scala 2.13 // `mapValues` is lazy and we need to force it to materialize - m.mapValues(mapChild).view.force + m.mapValues(mapChild).view.force.toMap case arg: TreeNode[_] if containsChild(arg) => mapTreeNode(arg) case Some(child) => Some(mapChild(child)) case nonChild: AnyRef => nonChild @@ -411,6 +413,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { } else { Some(arg) } + // `map.mapValues().view.force` return `Map` in Scala 2.12 but return `IndexedSeq` in Scala + // 2.13, call `toMap` method manually to compatible with Scala 2.12 and Scala 2.13 case m: Map[_, _] => m.mapValues { case arg: TreeNode[_] if containsChild(arg) => val newChild = f(arg.asInstanceOf[BaseType]) @@ -421,7 +425,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { arg } case other => other - }.view.force // `mapValues` is lazy and we need to force it to materialize + }.view.force.toMap // `mapValues` is lazy and we need to force it to materialize case d: DataType => d // Avoid unpacking Structs case args: Stream[_] => args.map(mapChild).force // Force materialization on stream case args: Iterable[_] => args.map(mapChild) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala index 982f6244f8a0d..bedf6ccf44c3d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala @@ -202,8 +202,10 @@ object Metadata { /** Computes the hash code for the types we support. */ private def hash(obj: Any): Int = { obj match { + // `map.mapValues` return `Map` in Scala 2.12 and return `MapView` in Scala 2.13, call + // `toMap` for Scala version compatibility. case map: Map[_, _] => - map.mapValues(hash).## + map.mapValues(hash).toMap.## case arr: Array[_] => // Seq.empty[T] has the same hashCode regardless of T. arr.toSeq.map(hash).## diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala index c83cd52250702..4aae7bd7dc408 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala @@ -118,7 +118,7 @@ private[spark] object SchemaUtils { case (x, ys) if ys.length > 1 => s"`$x`" } throw new AnalysisException( - s"Found duplicate column(s) $colType: ${duplicateColumns.mkString(", ")}") + s"Found duplicate column(s) $colType: ${duplicateColumns.toSeq.sorted.mkString(", ")}") } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala index 9fb8b0f351d51..6bd7a27ac11f1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala @@ -360,7 +360,7 @@ object RandomDataGenerator { arr += gen() i += 1 } - arr + arr.toSeq } fields += data case StructType(children) => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala index 02ee634dba1b6..75caab4145938 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala @@ -41,7 +41,7 @@ class SchemaUtilsSuite extends SparkFunSuite { test(s"Check column name duplication in $testType cases") { def checkExceptionCases(schemaStr: String, duplicatedColumns: Seq[String]): Unit = { val expectedErrorMsg = "Found duplicate column(s) in SchemaUtilsSuite: " + - duplicatedColumns.map(c => s"`${c.toLowerCase(Locale.ROOT)}`").mkString(", ") + duplicatedColumns.sorted.map(c => s"`${c.toLowerCase(Locale.ROOT)}`").mkString(", ") val schema = StructType.fromDDL(schemaStr) var msg = intercept[AnalysisException] { SchemaUtils.checkSchemaColumnNameDuplication(