Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37377][SQL] Initial implementation of Storage-Partitioned Join #35657

Closed
wants to merge 13 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class AvroRowReaderSuite

val df = spark.read.format("avro").load(dir.getCanonicalPath)
val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: AvroScan, _) => f
case BatchScanExec(_, f: AvroScan, _, _) => f
}
val filePath = fileScan.get.fileIndex.inputFiles(0)
val fileSize = new File(new URI(filePath)).length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2335,7 +2335,7 @@ class AvroV2Suite extends AvroSuite with ExplainSuiteHelper {
})

val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: AvroScan, _) => f
case BatchScanExec(_, f: AvroScan, _, _) => f
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.nonEmpty)
Expand Down Expand Up @@ -2368,7 +2368,7 @@ class AvroV2Suite extends AvroSuite with ExplainSuiteHelper {
assert(filterCondition.isDefined)

val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: AvroScan, _) => f
case BatchScanExec(_, f: AvroScan, _, _) => f
}
assert(fileScan.nonEmpty)
assert(fileScan.get.partitionFilters.isEmpty)
Expand Down Expand Up @@ -2449,7 +2449,7 @@ class AvroV2Suite extends AvroSuite with ExplainSuiteHelper {
.where("value = 'a'")

val fileScan = df.queryExecution.executedPlan collectFirst {
case BatchScanExec(_, f: AvroScan, _) => f
case BatchScanExec(_, f: AvroScan, _, _) => f
}
assert(fileScan.nonEmpty)
if (filtersPushdown) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu

private def checkAggregatePushed(df: DataFrame, funcName: String): Unit = {
df.queryExecution.optimizedPlan.collect {
case DataSourceV2ScanRelation(_, scan, _) =>
case DataSourceV2ScanRelation(_, scan, _, _) =>
assert(scan.isInstanceOf[V1ScanWrapper])
val wrapper = scan.asInstanceOf[V1ScanWrapper]
assert(wrapper.pushedDownOperators.aggregation.isDefined)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,13 @@ private[spark] object Utils {
}
ordering.leastOf(input.asJava, num).iterator.asScala
}

/**
* Only returns `Some` iff ALL elements in `input` are defined. In this case, it is
* equivalent to `Some(input.flatten)`.
*
* Otherwise, returns `None`.
*/
def sequenceToOption[T](input: Seq[Option[T]]): Option[Seq[T]] =
if (input.forall(_.isDefined)) Some(input.flatten) else None
}
8 changes: 7 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,13 @@ object MimaExcludes {
// [SPARK-37600][BUILD] Upgrade to Hadoop 3.3.2
ProblemFilters.exclude[MissingClassProblem]("org.apache.hadoop.shaded.net.jpountz.lz4.LZ4Compressor"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.hadoop.shaded.net.jpountz.lz4.LZ4Factory"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.hadoop.shaded.net.jpountz.lz4.LZ4SafeDecompressor")
ProblemFilters.exclude[MissingClassProblem]("org.apache.hadoop.shaded.net.jpountz.lz4.LZ4SafeDecompressor"),

// [SPARK-37377][SQL] Initial implementation of Storage-Partitioned Join
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.connector.read.partitioning.ClusteredDistribution"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.connector.read.partitioning.Distribution"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.connector.read.partitioning.Partitioning.*"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.connector.read.partitioning.Partitioning.*")
)

// Exclude rules for 3.2.x from 3.1.1
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.read.partitioning;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.Expression;

/**
* Represents a partitioning where rows are split across partitions based on the
* partition transform expressions returned by {@link KeyGroupedPartitioning#keys}.
* <p>
* Note: Data source implementations should make sure for a single partition, all of its rows
* must be evaluated to the same partition value after being applied by
* {@link KeyGroupedPartitioning#keys} expressions. Different partitions can share the same
* partition value: Spark will group these into a single logical partition during planning phase.
*
* @since 3.3.0
*/
@Evolving
public class KeyGroupedPartitioning implements Partitioning {
private final Expression[] keys;
private final int numPartitions;

public KeyGroupedPartitioning(Expression[] keys, int numPartitions) {
this.keys = keys;
this.numPartitions = numPartitions;
}

/**
* Returns the partition transform expressions for this partitioning.
*/
public Expression[] keys() {
return keys;
}

@Override
public int numPartitions() {
return numPartitions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,25 @@
package org.apache.spark.sql.connector.read.partitioning;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.SupportsReportPartitioning;

/**
* An interface to represent the output data partitioning for a data source, which is returned by
* {@link SupportsReportPartitioning#outputPartitioning()}. Note that this should work
* like a snapshot. Once created, it should be deterministic and always report the same number of
* partitions and the same "satisfy" result for a certain distribution.
* {@link SupportsReportPartitioning#outputPartitioning()}.
* <p>
* Note: implementors <b>should NOT</b> directly implement this interface. Instead, they should
* use one of the following subclasses:
* <ul>
* <li>{@link KeyGroupedPartitioning}</li>
* <li>{@link UnknownPartitioning}</li>
* </ul>
*
* @since 3.0.0
*/
@Evolving
public interface Partitioning {

/**
* Returns the number of partitions(i.e., {@link InputPartition}s) the data source outputs.
* Returns the number of partitions that the data is split across.
*/
int numPartitions();

/**
* Returns true if this partitioning can satisfy the given distribution, which means Spark does
* not need to shuffle the output data of this data source for some certain operations.
* <p>
* Note that, Spark may add new concrete implementations of {@link Distribution} in new releases.
* This method should be aware of it and always return false for unrecognized distributions. It's
* recommended to check every Spark new release and support new distributions if possible, to
* avoid shuffle at Spark side for more cases.
*/
boolean satisfy(Distribution distribution);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,22 @@
package org.apache.spark.sql.connector.read.partitioning;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.read.PartitionReader;

/**
* A concrete implementation of {@link Distribution}. Represents a distribution where records that
* share the same values for the {@link #clusteredColumns} will be produced by the same
* {@link PartitionReader}.
* Represents a partitioning where rows are split across partitions in an unknown pattern.
*
* @since 3.0.0
* @since 3.3.0
*/
@Evolving
public class ClusteredDistribution implements Distribution {
public class UnknownPartitioning implements Partitioning {
private final int numPartitions;

/**
* The names of the clustered columns. Note that they are order insensitive.
*/
public final String[] clusteredColumns;
public UnknownPartitioning(int numPartitions) {
this.numPartitions = numPartitions;
}

public ClusteredDistribution(String[] clusteredColumns) {
this.clusteredColumns = clusteredColumns;
@Override
public int numPartitions() {
return numPartitions;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.util

import scala.collection.mutable

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Murmur3HashFunction, RowOrdering}
import org.apache.spark.sql.types.{DataType, StructField, StructType}

/**
* A mutable Set with [[InternalRow]] as its element type. It uses Spark's internal murmur hash to
* compute hash code from an row, and uses [[RowOrdering]] to perform equality checks.
*
* @param dataTypes the data types for the row keys this set holds
*/
class InternalRowSet(val dataTypes: Seq[DataType]) extends mutable.Set[InternalRow] {
private val baseSet = new mutable.HashSet[InternalRowContainer]

private val structType = StructType(dataTypes.map(t => StructField("f", t)))
private val ordering = RowOrdering.createNaturalAscendingOrdering(dataTypes)

override def contains(row: InternalRow): Boolean =
baseSet.contains(new InternalRowContainer(row))

private class InternalRowContainer(val row: InternalRow) {
override def hashCode(): Int = Murmur3HashFunction.hash(row, structType, 42L).toInt

override def equals(other: Any): Boolean = other match {
case r: InternalRowContainer => ordering.compare(row, r.row) == 0
case r => this == r
}
}

override def +=(row: InternalRow): InternalRowSet.this.type = {
val rowKey = new InternalRowContainer(row)
baseSet += rowKey
this
}

override def -=(row: InternalRow): InternalRowSet.this.type = {
val rowKey = new InternalRowContainer(row)
baseSet -= rowKey
this
}

override def iterator: Iterator[InternalRow] = {
baseSet.iterator.map(_.row)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.util

import scala.collection.mutable

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Murmur3HashFunction, RowOrdering}
import org.apache.spark.sql.types.{DataType, StructField, StructType}

/**
* A mutable Set with [[InternalRow]] as its element type. It uses Spark's internal murmur hash to
* compute hash code from an row, and uses [[RowOrdering]] to perform equality checks.
*
* @param dataTypes the data types for the row keys this set holds
*/
class InternalRowSet(val dataTypes: Seq[DataType]) extends mutable.Set[InternalRow] {
private val baseSet = new mutable.HashSet[InternalRowContainer]

private val structType = StructType(dataTypes.map(t => StructField("f", t)))
private val ordering = RowOrdering.createNaturalAscendingOrdering(dataTypes)

override def contains(row: InternalRow): Boolean =
baseSet.contains(new InternalRowContainer(row))

private class InternalRowContainer(val row: InternalRow) {
override def hashCode(): Int = Murmur3HashFunction.hash(row, structType, 42L).toInt

override def equals(other: Any): Boolean = other match {
case r: InternalRowContainer => ordering.compare(row, r.row) == 0
case r => this == r
}
}

override def addOne(row: InternalRow): InternalRowSet.this.type = {
val rowKey = new InternalRowContainer(row)
baseSet += rowKey
this
}

override def subtractOne(row: InternalRow): InternalRowSet.this.type = {
val rowKey = new InternalRowContainer(row)
baseSet -= rowKey
this
}

override def clear(): Unit = {
baseSet.clear()
}

override def iterator: Iterator[InternalRow] = {
baseSet.iterator.map(_.row)
}
}
Loading