Skip to content

Commit

Permalink
[SPARK-39313][SQL] toCatalystOrdering should fail if V2Expression c…
Browse files Browse the repository at this point in the history
…an not be translated

### What changes were proposed in this pull request?

After reading code changes in #35657, I guess the original intention of changing the return type of `V2ExpressionUtils.toCatalyst` from `Expression` to `Option[Expression]` is, for reading, spark can ignore unrecognized distribution and ordering, but for writing, it should always be strict.

Specifically, `V2ExpressionUtils.toCatalystOrdering` should fail if V2Expression can not be translated instead of returning empty Seq.

### Why are the changes needed?

`V2ExpressionUtils.toCatalystOrdering` is used by `DistributionAndOrderingUtils`, the current behavior will break the semantics of `RequiresDistributionAndOrdering#requiredOrdering` in some cases(see UT).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New UT.

Closes #36697 from pan3793/SPARK-39313.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
  • Loading branch information
pan3793 authored and sunchao committed Jun 1, 2022
1 parent 5a3ba9b commit ef0b87a
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.spark.sql.connector.catalog.functions._
import org.apache.spark.sql.connector.expressions.{BucketTransform, Expression => V2Expression, FieldReference, IdentityTransform, NamedReference, NamedTransform, NullOrdering => V2NullOrdering, SortDirection => V2SortDirection, SortOrder => V2SortOrder, SortValue, Transform}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types._
import org.apache.spark.util.collection.Utils.sequenceToOption

/**
* A utility class that converts public connector expressions into Catalyst expressions.
Expand All @@ -54,19 +53,25 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
* Converts the array of input V2 [[V2SortOrder]] into their counterparts in catalyst.
*/
def toCatalystOrdering(ordering: Array[V2SortOrder], query: LogicalPlan): Seq[SortOrder] = {
sequenceToOption(ordering.map(toCatalyst(_, query))).asInstanceOf[Option[Seq[SortOrder]]]
.getOrElse(Seq.empty)
ordering.map(toCatalyst(_, query).asInstanceOf[SortOrder])
}

def toCatalyst(
expr: V2Expression,
query: LogicalPlan,
funCatalogOpt: Option[FunctionCatalog] = None): Expression =
toCatalystOpt(expr, query, funCatalogOpt)
.getOrElse(throw new AnalysisException(s"$expr is not currently supported"))

def toCatalystOpt(
expr: V2Expression,
query: LogicalPlan,
funCatalogOpt: Option[FunctionCatalog] = None): Option[Expression] = {
expr match {
case t: Transform =>
toCatalystTransform(t, query, funCatalogOpt)
toCatalystTransformOpt(t, query, funCatalogOpt)
case SortValue(child, direction, nullOrdering) =>
toCatalyst(child, query, funCatalogOpt).map { catalystChild =>
toCatalystOpt(child, query, funCatalogOpt).map { catalystChild =>
SortOrder(catalystChild, toCatalyst(direction), toCatalyst(nullOrdering), Seq.empty)
}
case ref: FieldReference =>
Expand All @@ -76,7 +81,7 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
}
}

def toCatalystTransform(
def toCatalystTransformOpt(
trans: Transform,
query: LogicalPlan,
funCatalogOpt: Option[FunctionCatalog] = None): Option[Expression] = trans match {
Expand All @@ -89,7 +94,7 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
// look up the V2 function.
val numBucketsRef = AttributeReference("numBuckets", IntegerType, nullable = false)()
funCatalogOpt.flatMap { catalog =>
loadV2Function(catalog, "bucket", Seq(numBucketsRef) ++ resolvedRefs).map { bound =>
loadV2FunctionOpt(catalog, "bucket", Seq(numBucketsRef) ++ resolvedRefs).map { bound =>
TransformExpression(bound, resolvedRefs, Some(numBuckets))
}
}
Expand All @@ -99,15 +104,15 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
resolveRef[NamedExpression](r, query)
}
funCatalogOpt.flatMap { catalog =>
loadV2Function(catalog, name, resolvedRefs).map { bound =>
loadV2FunctionOpt(catalog, name, resolvedRefs).map { bound =>
TransformExpression(bound, resolvedRefs)
}
}
case _ =>
throw new AnalysisException(s"Transform $trans is not currently supported")
}

private def loadV2Function(
private def loadV2FunctionOpt(
catalog: FunctionCatalog,
name: String,
args: Seq[Expression]): Option[BoundFunction] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.types.StringType

class V2ExpressionUtilsSuite extends SparkFunSuite {

test("SPARK-39313: toCatalystOrdering should fail if V2Expression can not be translated") {
val supportedV2Sort = SortValue(
FieldReference("a"), SortDirection.ASCENDING, NullOrdering.NULLS_FIRST)
val unsupportedV2Sort = supportedV2Sort.copy(
expression = ApplyTransform("v2Fun", FieldReference("a") :: Nil))
val exc = intercept[AnalysisException] {
V2ExpressionUtils.toCatalystOrdering(
Array(supportedV2Sort, unsupportedV2Sort),
LocalRelation.apply(AttributeReference("a", StringType)()))
}
assert(exc.message.contains("v2Fun(a) ASC NULLS FIRST is not currently supported"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.scalatest.Assertions._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow}
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils}
import org.apache.spark.sql.connector.distributions.{ClusteredDistribution, Distribution, Distributions}
import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
import org.apache.spark.sql.connector.read._
Expand Down Expand Up @@ -292,9 +292,12 @@ class InMemoryTable(
}

override def outputPartitioning(): Partitioning = {
InMemoryTable.this.distribution match {
case cd: ClusteredDistribution => new KeyGroupedPartitioning(cd.clustering(), data.size)
case _ => new UnknownPartitioning(data.size)
if (InMemoryTable.this.partitioning.nonEmpty) {
new KeyGroupedPartitioning(
InMemoryTable.this.partitioning.map(_.asInstanceOf[Expression]),
data.size)
} else {
new UnknownPartitioning(data.size)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, RebalancePartit
import org.apache.spark.sql.connector.distributions._
import org.apache.spark.sql.connector.write.{RequiresDistributionAndOrdering, Write}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.util.collection.Utils.sequenceToOption

object DistributionAndOrderingUtils {

Expand All @@ -33,9 +32,7 @@ object DistributionAndOrderingUtils {

val distribution = write.requiredDistribution match {
case d: OrderedDistribution => toCatalystOrdering(d.ordering(), query)
case d: ClusteredDistribution =>
sequenceToOption(d.clustering.map(e => toCatalyst(e, query)))
.getOrElse(Seq.empty[Expression])
case d: ClusteredDistribution => d.clustering.map(e => toCatalyst(e, query)).toSeq
case _: UnspecifiedDistribution => Seq.empty[Expression]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ import org.apache.spark.util.collection.Utils.sequenceToOption
*/
object V2ScanPartitioning extends Rule[LogicalPlan] with SQLConfHelper {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case d @ DataSourceV2ScanRelation(relation, scan: SupportsReportPartitioning, _, _) =>
case d @ DataSourceV2ScanRelation(relation, scan: SupportsReportPartitioning, _, None) =>
val funCatalogOpt = relation.catalog.flatMap {
case c: FunctionCatalog => Some(c)
case _ => None
}

val catalystPartitioning = scan.outputPartitioning() match {
case kgp: KeyGroupedPartitioning => sequenceToOption(kgp.keys().map(
V2ExpressionUtils.toCatalyst(_, relation, funCatalogOpt)))
V2ExpressionUtils.toCatalystOpt(_, relation, funCatalogOpt)))
case _: UnknownPartitioning => None
case p => throw new IllegalArgumentException("Unsupported data source V2 partitioning " +
"type: " + p.getClass.getSimpleName)
Expand Down
Loading

0 comments on commit ef0b87a

Please sign in to comment.