diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 00425d7e2306c..8653b11c78754 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1514,6 +1514,15 @@ object SQLConf { .stringConf .createWithDefault("") + val BUCKET_SORTED_SCAN_ENABLED = + buildConf("spark.sql.sources.bucketing.sortedScan.enabled") + .doc("When true, the bucketed table scan will respect table sort columns, " + + "and read multiple sorted files per bucket in a sort-merge way to preserve ordering. " + + "Note: tasks might have more memory footprint and OOM with vectorized reader, " + + "because multiple rows or columnar batches from different files will be read at same time.") + .booleanConf + .createWithDefault(false) + object PartitionOverwriteMode extends Enumeration { val STATIC, DYNAMIC = Value } @@ -2192,6 +2201,8 @@ class SQLConf extends Serializable with Logging { def setCommandRejectsSparkCoreConfs: Boolean = getConf(SQLConf.SET_COMMAND_REJECTS_SPARK_CORE_CONFS) + def bucketSortedScanEnabled: Boolean = getConf(SQLConf.BUCKET_SORTED_SCAN_ENABLED) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index e4b2a1db476b8..5df26b4b2647e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -17,17 +17,17 @@ package org.apache.spark.sql.execution -import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.collection.mutable.HashMap import org.apache.commons.lang3.StringUtils -import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} +import org.apache.hadoop.fs.Path import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString @@ -121,6 +121,14 @@ case class RowDataSourceScanExec( tableIdentifier = None) } +sealed abstract class ScanMode + +case object RegularMode extends ScanMode + +case class SortedBucketMode(sortOrdering: Ordering[InternalRow]) extends ScanMode { + override def toString: String = "SortedBucketMode" +} + /** * Physical plan node for scanning data from HadoopFsRelations. * @@ -148,6 +156,14 @@ case class FileSourceScanExec( relation.fileFormat.supportBatch(relation.sparkSession, schema) } + private lazy val scanMode: ScanMode = + if (conf.bucketSortedScanEnabled && outputOrdering.nonEmpty && !singleFilePartitions) { + val sortOrdering = new LazilyGeneratedOrdering(outputOrdering, output) + SortedBucketMode(sortOrdering) + } else { + RegularMode + } + private lazy val needsUnsafeRowConversion: Boolean = { if (relation.fileFormat.isInstanceOf[ParquetSource]) { SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled @@ -185,6 +201,13 @@ case class FileSourceScanExec( ret } + private lazy val singleFilePartitions: Boolean = { + val files = selectedPartitions.flatMap(partition => partition.files) + val bucketToFilesGrouping = + files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) + bucketToFilesGrouping.forall(p => p._2.length <= 1) + } + override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) { relation.bucketSpec @@ -225,15 +248,12 @@ case class FileSourceScanExec( // In case of bucketing, its possible to have multiple files belonging to the // same bucket in a given relation. Each of these files are locally sorted // but those files combined together are not globally sorted. Given that, - // the RDD partition will not be sorted even if the relation has sort columns set - // Current solution is to check if all the buckets have a single file in it - - val files = selectedPartitions.flatMap(partition => partition.files) - val bucketToFilesGrouping = - files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) - val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1) - - if (singleFilePartitions) { + // the RDD partition will not be sorted even if the relation has sort columns set. + // + // 1. With configuration "spark.sql.sources.bucketing.sortedScan.enabled" being enabled, + // output ordering is preserved by reading those sorted files in sort-merge way. + // 2. If not, output ordering is preserved if each bucket has no more than one file. + if (conf.bucketSortedScanEnabled || singleFilePartitions) { // TODO Currently Spark does not support writing columns sorting in descending order // so using Ascending order. This can be fixed in future sortColumns.map(attribute => SortOrder(attribute, Ascending)) @@ -269,7 +289,8 @@ case class FileSourceScanExec( "PartitionFilters" -> seqToString(partitionFilters), "PushedFilters" -> seqToString(pushedDownFilters), "DataFilters" -> seqToString(dataFilters), - "Location" -> locationDesc) + "Location" -> locationDesc, + "ScanMode" -> scanMode.toString) val withOptPartitionCount = relation.partitionSchemaOption.map { _ => metadata + ("PartitionCount" -> selectedPartitions.size.toString) @@ -408,7 +429,12 @@ case class FileSourceScanExec( FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Nil)) } - new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) + scanMode match { + case SortedBucketMode(sortOrdering) => + new FileSortedMergeScanRDD(fsRelation.sparkSession, readFile, filePartitions, sortOrdering) + case RegularMode => + new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanIterators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanIterators.scala new file mode 100644 index 0000000000000..ba0f3cc7d0609 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanIterators.scala @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.{FileNotFoundException, IOException} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.parquet.io.ParquetDecodingException + +import org.apache.spark.{Partition => RDDPartition, TaskContext} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.executor.InputMetrics +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.InputFileBlockHolder +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.{QueryExecutionException, RowIterator} +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.NextIterator + +/** + * Holds common logic for iterators to scan files + */ +abstract class BaseFileScanIterator( + split: RDDPartition, + context: TaskContext, + ignoreCorruptFiles: Boolean, + ignoreMissingFiles: Boolean, + readFunction: PartitionedFile => Iterator[InternalRow]) + extends Iterator[Object] + with AutoCloseable + with Logging { + + protected val inputMetrics: InputMetrics = context.taskMetrics().inputMetrics + private val existingBytesRead = inputMetrics.bytesRead + + // Find a function that will return the FileSystem bytes read by this thread. Do this before + // apply readFunction, because it might read some bytes. + private val getBytesReadCallback = + SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() + + // We get our input bytes from thread-local Hadoop FileSystem statistics. + // If we do a coalesce, however, we are likely to compute multiple partitions in the same + // task and in the same thread, in which case we need to avoid override values written by + // previous partitions (SPARK-13071). + protected def incTaskInputMetricsBytesRead(): Unit = { + inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback()) + } + + private[this] val files = split.asInstanceOf[FilePartition].files.toIterator + protected[this] var currentFile: PartitionedFile = null + protected[this] var currentIterator: Iterator[Object] = null + + override def hasNext: Boolean = { + // Kill the task in case it has been marked as killed. This logic is from + // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order + // to avoid performance overhead. + context.killTaskIfInterrupted() + (currentIterator != null && currentIterator.hasNext) || nextIterator() + } + + override def next(): Object + + private def readFile(file: PartitionedFile): Iterator[InternalRow] = { + try { + readFunction(file) + } catch { + case e: FileNotFoundException => + throw new FileNotFoundException( + e.getMessage + "\n" + + "It is possible the underlying files have been updated. " + + "You can explicitly invalidate the cache in Spark by " + + "running 'REFRESH TABLE tableName' command in SQL or " + + "by recreating the Dataset/DataFrame involved.") + } + } + + /** Advances to the next file. Returns true if a new non-empty iterator is available. */ + protected def nextIterator(): Boolean = { + if (files.hasNext) { + currentFile = files.next() + logInfo(s"Reading File $currentFile") + // Sets InputFileBlockHolder for the file block's information + InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length) + + if (ignoreMissingFiles || ignoreCorruptFiles) { + currentIterator = new NextIterator[Object] { + private val file = currentFile + // The readFunction may read some bytes before consuming the iterator, e.g., + // vectorized Parquet reader. Here we use lazy val to delay the creation of + // iterator so that we will throw exception in `getNext`. + private lazy val internalIter = readFile(file) + + override def getNext(): AnyRef = { + try { + if (internalIter.hasNext) { + internalIter.next() + } else { + finished = true + null + } + } catch { + case e: FileNotFoundException if ignoreMissingFiles => + logWarning(s"Skipped missing file: $currentFile", e) + finished = true + null + // Throw FileNotFoundException even if `ignoreCorruptFiles` is true + case e: FileNotFoundException if !ignoreMissingFiles => throw e + case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => + logWarning( + s"Skipped the rest of the content in the corrupted file: $currentFile", e) + finished = true + null + } + } + + override def close(): Unit = {} + } + } else { + currentIterator = readFile(currentFile) + } + + try { + // We cannot just call hasNext here since FileSortedBucketScanIterator needs to + // override hasNext() + context.killTaskIfInterrupted() + (currentIterator != null && currentIterator.hasNext) || nextIterator() + } catch { + case e: SchemaColumnConvertNotSupportedException => + val message = "Parquet column cannot be converted in " + + s"file ${currentFile.filePath}. Column: ${e.getColumn}, " + + s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}" + throw new QueryExecutionException(message, e) + case e: ParquetDecodingException => + if (e.getMessage.contains("Can not read value at")) { + val message = "Encounter error while reading parquet files. " + + "One possible cause: Parquet column cannot be converted in the " + + "corresponding files. Details: " + throw new QueryExecutionException(message, e) + } + throw e + } + } else { + currentFile = null + InputFileBlockHolder.unset() + false + } + } + + override def close(): Unit = { + incTaskInputMetricsBytesRead() + InputFileBlockHolder.unset() + } +} + +/** + * Iterator to scan files all together at the same time, + * and read row by row based on `sortOrdering` in sort-merge way. + * It uses standard scala priority queue to decide read order. + * This iterator is used for reading sorted bucketed table only. + * + * @param sortOrdering The order to read rows in multiple files + */ +class FileSortedBucketScanIterator( + split: RDDPartition, + context: TaskContext, + ignoreCorruptFiles: Boolean, + ignoreMissingFiles: Boolean, + readFunction: PartitionedFile => Iterator[InternalRow], + sortOrdering: Ordering[InternalRow]) + extends BaseFileScanIterator(split, context, ignoreCorruptFiles, ignoreMissingFiles, + readFunction) { + + // The priority queue to keep the latest row from each file + private val rowHeap = new mutable.PriorityQueue[IteratorWithRow]()( + // Reverse the order as priority queue de-queues the highest priority one + Ordering.by[IteratorWithRow, InternalRow](_.getRow)(sortOrdering).reverse) + private var heapInitialized: Boolean = false + protected var currentIteratorWithRow: IteratorWithRow = _ + + override def hasNext: Boolean = { + // Kill the task in case it has been marked as killed. This logic is from + // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order + // to avoid performance overhead. This is the same as `BaseFileScanIterator.hasNext()`. + context.killTaskIfInterrupted() + + if (!heapInitialized) { + initializeHeapWithFirstRows() + heapInitialized = true + } + rowHeap.nonEmpty + } + + override def next(): Object = { + currentIteratorWithRow = rowHeap.dequeue() + + // Make a copy of row because we need to enqueue next row (if there any) to the heap + val nextRow = currentIteratorWithRow.getRow.copy() + if (currentIteratorWithRow.advanceNext()) { + rowHeap.enqueue(currentIteratorWithRow) + } + + // Too costly to update every record + if (inputMetrics.recordsRead % + SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) { + incTaskInputMetricsBytesRead() + } + inputMetrics.incRecordsRead(1) + + // Set InputFileBlockHolder for the file block's information + currentFile = currentIteratorWithRow.getFile + InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length) + + nextRow + } + + private def initializeHeapWithFirstRows(): Unit = { + while (nextIterator()) { + require(currentIterator != null && currentFile != null, + "currentIterator and currentFile should not be null if nextIterator() returns true") + // In case of columnar batch, read row by row in one batch + val convertedIter: Iterator[InternalRow] = currentIterator.flatMap { + case batch: ColumnarBatch => batch.rowIterator().asScala + case row => Iterator.single(row.asInstanceOf[InternalRow]) + } + currentIteratorWithRow = new IteratorWithRow(convertedIter, currentFile) + if (currentIteratorWithRow.advanceNext()) { + rowHeap.enqueue(currentIteratorWithRow) + } + } + } +} + +/** + * A wrapper for iterator, its file and its current latest row. + * Designed to be instantiated once per each file in one thread, and reused. + */ +private[execution] class IteratorWithRow( + iterator: Iterator[InternalRow], + file: PartitionedFile) extends RowIterator { + private var row: InternalRow = _ + + override def advanceNext(): Boolean = { + if (iterator.hasNext) { + row = iterator.next() + true + } else { + row = null + false + } + } + + override def getRow: InternalRow = row + + def getFile: PartitionedFile = file +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSortedMergeScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSortedMergeScanRDD.scala new file mode 100644 index 0000000000000..49d4c0353397f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSortedMergeScanRDD.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow + +class FileSortedMergeScanRDD( + @transient sparkSession: SparkSession, + readFunction: (PartitionedFile) => Iterator[InternalRow], + @transient filePartitions: Seq[FilePartition], + val sortOrdering: Ordering[InternalRow] +) extends FileScanRDD(sparkSession, readFunction, filePartitions) { + private val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles + private val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val iterator = new FileSortedBucketScanIterator(split, context, + ignoreCorruptFiles, ignoreMissingFiles, readFunction, sortOrdering) + // Register an on-task-completion callback to close the input stream. + context.addTaskCompletionListener[Unit](_ => iterator.close()) + + iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack. + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index a2bc651bb2bd5..76aff25b7d930 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -483,50 +483,70 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { test("check sort and shuffle when bucket and sort columns are join keys") { // In case of bucketing, its possible to have multiple files belonging to the // same bucket in a given relation. Each of these files are locally sorted - // but those files combined together are not globally sorted. Given that, - // the RDD partition will not be sorted even if the relation has sort columns set - // Therefore, we still need to keep the Sort in both sides. + // but those files combined together are not globally sorted. With configuration + // "spark.sql.sources.bucketing.sortedScan.enabled" being enabled, sort ordering + // is preserved by reading those sorted files in sort-merge way. val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))) - val bucketedTableTestSpecLeft1 = BucketedTableTestSpec( - bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) - val bucketedTableTestSpecRight1 = BucketedTableTestSpec( - bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft1, - bucketedTableTestSpecRight = bucketedTableTestSpecRight1, - joinCondition = joinCondition(Seq("i", "j")) - ) + Seq(true, false).foreach(sortedScanEnabled => { + withSQLConf(SQLConf.BUCKET_SORTED_SCAN_ENABLED.key -> sortedScanEnabled.toString) { + val bucketedTableTestSpecLeft1 = BucketedTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, + expectedSort = !sortedScanEnabled) + val bucketedTableTestSpecRight1 = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft1, + bucketedTableTestSpecRight = bucketedTableTestSpecRight1, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + }) - val bucketedTableTestSpecLeft2 = BucketedTableTestSpec( - bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) - val bucketedTableTestSpecRight2 = BucketedTableTestSpec( - bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft2, - bucketedTableTestSpecRight = bucketedTableTestSpecRight2, - joinCondition = joinCondition(Seq("i", "j")) - ) + Seq(true, false).foreach(sortedScanEnabled => { + withSQLConf(SQLConf.BUCKET_SORTED_SCAN_ENABLED.key -> sortedScanEnabled.toString) { + val bucketedTableTestSpecLeft2 = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketedTableTestSpecRight2 = BucketedTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, + expectedSort = !sortedScanEnabled) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft2, + bucketedTableTestSpecRight = bucketedTableTestSpecRight2, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + }) - val bucketedTableTestSpecLeft3 = BucketedTableTestSpec( - bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) - val bucketedTableTestSpecRight3 = BucketedTableTestSpec( - bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft3, - bucketedTableTestSpecRight = bucketedTableTestSpecRight3, - joinCondition = joinCondition(Seq("i", "j")) - ) + Seq(true, false).foreach(sortedScanEnabled => { + withSQLConf(SQLConf.BUCKET_SORTED_SCAN_ENABLED.key -> sortedScanEnabled.toString) { + val bucketedTableTestSpecLeft3 = BucketedTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, + expectedSort = !sortedScanEnabled) + val bucketedTableTestSpecRight3 = BucketedTableTestSpec( + bucketSpec, numPartitions = 50, expectedShuffle = false, + expectedSort = !sortedScanEnabled) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft3, + bucketedTableTestSpecRight = bucketedTableTestSpecRight3, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + }) - val bucketedTableTestSpecLeft4 = BucketedTableTestSpec( - bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) - val bucketedTableTestSpecRight4 = BucketedTableTestSpec( - bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) - testBucketing( - bucketedTableTestSpecLeft = bucketedTableTestSpecLeft4, - bucketedTableTestSpecRight = bucketedTableTestSpecRight4, - joinCondition = joinCondition(Seq("i", "j")) - ) + Seq(true, false).foreach(sortedScanEnabled => { + withSQLConf(SQLConf.BUCKET_SORTED_SCAN_ENABLED.key -> sortedScanEnabled.toString) { + val bucketedTableTestSpecLeft4 = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + val bucketedTableTestSpecRight4 = BucketedTableTestSpec( + bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false) + testBucketing( + bucketedTableTestSpecLeft = bucketedTableTestSpecLeft4, + bucketedTableTestSpecRight = bucketedTableTestSpecRight4, + joinCondition = joinCondition(Seq("i", "j")) + ) + } + }) } test("avoid shuffle and sort when sort columns are a super set of join keys") {