diff --git a/sql-plugin/src/main/scala/ai/rapids/cudf/HostConcatResultUtil.scala b/sql-plugin/src/main/scala/ai/rapids/cudf/HostConcatResultUtil.scala new file mode 100644 index 00000000000..30d7289c902 --- /dev/null +++ b/sql-plugin/src/main/scala/ai/rapids/cudf/HostConcatResultUtil.scala @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.rapids.cudf + +import ai.rapids.cudf.JCudfSerialization.HostConcatResult +import com.nvidia.spark.rapids.{Arm, GpuColumnVectorFromBuffer} + +import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.vectorized.ColumnarBatch + +object HostConcatResultUtil extends Arm { + /** + * Create a rows-only `HostConcatResult`. + */ + def rowsOnlyHostConcatResult(numRows: Int): HostConcatResult = { + new HostConcatResult( + new JCudfSerialization.SerializedTableHeader( + Array.empty, numRows, 0L), + HostMemoryBuffer.allocate(0, false)) + } + + /** + * Given a `HostConcatResult` and a SparkSchema produce a `ColumnarBatch`, + * handling the rows-only case. + * + * @note This function does not consume the `HostConcatResult`, and + * callers are responsible for closing the resulting `ColumnarBatch` + */ + def getColumnarBatch( + hostConcatResult: HostConcatResult, + sparkSchema: Array[DataType]): ColumnarBatch = { + if (hostConcatResult.getTableHeader.getNumColumns == 0) { + // We expect the caller to have acquired the GPU unconditionally before calling + // `getColumnarBatch`, as a downstream exec may need the GPU, and the assumption is + // that it is acquired in the coalesce code. + new ColumnarBatch(Array.empty, hostConcatResult.getTableHeader.getNumRows) + } else { + withResource(hostConcatResult.toContiguousTable) { ct => + GpuColumnVectorFromBuffer.from(ct, sparkSchema) + } + } + } +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala index d4333f7bc15..26b1c0c4116 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,7 +46,20 @@ object ConcatAndConsumeAll { * @return a single batch with all of them concated together. */ def buildNonEmptyBatch(arrayOfBatches: Array[ColumnarBatch], - schema: StructType): ColumnarBatch = { + schema: StructType): ColumnarBatch = + buildNonEmptyBatchFromTypes( + arrayOfBatches, GpuColumnVector.extractTypes(schema)) + + /** + * Build a single batch from the batches collected so far. If array is empty this will likely + * blow up. + * @param arrayOfBatches the batches to concat. This will be consumed and you do not need to + * close any of the batches after this is called. + * @param dataTypes the output types. + * @return a single batch with all of them concated together. + */ + def buildNonEmptyBatchFromTypes(arrayOfBatches: Array[ColumnarBatch], + dataTypes: Array[DataType]): ColumnarBatch = { if (arrayOfBatches.length == 1) { arrayOfBatches(0) } else { @@ -54,7 +67,7 @@ object ConcatAndConsumeAll { try { val combined = Table.concatenate(tables: _*) try { - GpuColumnVector.from(combined, GpuColumnVector.extractTypes(schema)) + GpuColumnVector.from(combined, dataTypes) } finally { combined.close() } @@ -410,9 +423,8 @@ abstract class AbstractGpuCoalesceIterator( } class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], - schema: StructType, + sparkTypes: Array[DataType], goal: CoalesceSizeGoal, - maxDecompressBatchMemory: Long, numInputRows: GpuMetric, numInputBatches: GpuMetric, numOutputRows: GpuMetric, @@ -422,8 +434,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], opTime: GpuMetric, peakDevMemory: GpuMetric, spillCallback: SpillCallback, - opName: String, - codecConfigs: TableCompressionCodecConfig) + opName: String) extends AbstractGpuCoalesceIterator(iter, goal, numInputRows, @@ -435,8 +446,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], opTime, opName) with Arm { - private val sparkTypes: Array[DataType] = GpuColumnVector.extractTypes(schema) - private val batches: ArrayBuffer[SpillableColumnarBatch] = ArrayBuffer.empty + protected val batches: ArrayBuffer[SpillableColumnarBatch] = ArrayBuffer.empty private var maxDeviceMemory: Long = 0 override def initNewBatch(batch: ColumnarBatch): Unit = { @@ -448,10 +458,85 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], batches.append(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_BATCHING_PRIORITY, spillCallback)) + protected def popAll(): Array[ColumnarBatch] = { + closeOnExcept(batches.toArray.safeMap(_.getColumnarBatch())) { wip => + batches.safeClose() + batches.clear() + wip + } + } + + override def concatAllAndPutOnGPU(): ColumnarBatch = { + val ret = ConcatAndConsumeAll.buildNonEmptyBatchFromTypes(popAll(), sparkTypes) + // sum of current batches and concatenating batches. Approximately sizeof(ret * 2). + maxDeviceMemory = GpuColumnVector.getTotalDeviceMemoryUsed(ret) * 2 + ret + } + + override def cleanupConcatIsDone(): Unit = { + peakDevMemory.set(maxDeviceMemory) + batches.clear() + } + + private var onDeck: Option[SpillableColumnarBatch] = None + + override protected def hasOnDeck: Boolean = onDeck.isDefined + + override protected def saveOnDeck(batch: ColumnarBatch): Unit = { + assert(onDeck.isEmpty) + onDeck = Some(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY, + spillCallback)) + } + + override protected def clearOnDeck(): Unit = { + onDeck.foreach(_.close()) + onDeck = None + } + + override protected def popOnDeck(): ColumnarBatch = { + val ret = onDeck.get.getColumnarBatch() + clearOnDeck() + ret + } +} + +/** + * Compression codec-aware `GpuCoalesceIterator` subclass which should be used in cases + * where the RAPIDS Shuffle Manager could be configured, as batches to be coalesced + * may be compressed. + */ +class GpuCompressionAwareCoalesceIterator( + iter: Iterator[ColumnarBatch], + sparkTypes: Array[DataType], + goal: CoalesceSizeGoal, + maxDecompressBatchMemory: Long, + numInputRows: GpuMetric, + numInputBatches: GpuMetric, + numOutputRows: GpuMetric, + numOutputBatches: GpuMetric, + collectTime: GpuMetric, + concatTime: GpuMetric, + opTime: GpuMetric, + peakDevMemory: GpuMetric, + spillCallback: SpillCallback, + opName: String, + codecConfigs: TableCompressionCodecConfig) + extends GpuCoalesceIterator( + iter, sparkTypes, goal, + numInputRows = numInputRows, + numInputBatches = numInputBatches, + numOutputRows = numOutputRows, + numOutputBatches = numOutputBatches, + collectTime = collectTime, + concatTime = concatTime, + opTime = opTime, + peakDevMemory = peakDevMemory, + spillCallback, opName) { + private[this] var codec: TableCompressionCodec = _ - private[this] def popAllDecompressed(): Array[ColumnarBatch] = { - closeOnExcept(batches.map(_.getColumnarBatch())) { wip => + override protected def popAll(): Array[ColumnarBatch] = { + closeOnExcept(batches.toArray.safeMap(_.getColumnarBatch())) { wip => batches.safeClose() batches.clear() @@ -487,42 +572,9 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], } } } - wip.toArray + wip } } - - override def concatAllAndPutOnGPU(): ColumnarBatch = { - val ret = ConcatAndConsumeAll.buildNonEmptyBatch(popAllDecompressed(), schema) - // sum of current batches and concatenating batches. Approximately sizeof(ret * 2). - maxDeviceMemory = GpuColumnVector.getTotalDeviceMemoryUsed(ret) * 2 - ret - } - - override def cleanupConcatIsDone(): Unit = { - peakDevMemory.set(maxDeviceMemory) - batches.clear() - } - - private var onDeck: Option[SpillableColumnarBatch] = None - - override protected def hasOnDeck: Boolean = onDeck.isDefined - - override protected def saveOnDeck(batch: ColumnarBatch): Unit = { - assert(onDeck.isEmpty) - onDeck = Some(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY, - spillCallback)) - } - - override protected def clearOnDeck(): Unit = { - onDeck.foreach(_.close()) - onDeck = None - } - - override protected def popOnDeck(): ColumnarBatch = { - val ret = onDeck.get.getColumnarBatch() - clearOnDeck() - ret - } } case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal) @@ -579,6 +631,7 @@ case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal) // cache in local vars to avoid serializing the plan val outputSchema = schema + val dataTypes = GpuColumnVector.extractTypes(outputSchema) val decompressMemoryTarget = maxDecompressBatchMemory val batches = child.executeColumnar() @@ -593,7 +646,8 @@ case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal) goal match { case sizeGoal: CoalesceSizeGoal => batches.mapPartitions { iter => - new GpuCoalesceIterator(iter, outputSchema, sizeGoal, decompressMemoryTarget, + new GpuCompressionAwareCoalesceIterator( + iter, dataTypes, sizeGoal, decompressMemoryTarget, numInputRows, numInputBatches, numOutputRows, numOutputBatches, NoopMetric, concatTime, opTime, peakDevMemory, callback, "GpuCoalesceBatches", codecConfigs) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala index ba42c2d8f3e..c85d6fe1a60 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,8 +18,8 @@ package com.nvidia.spark.rapids import java.util -import ai.rapids.cudf.{HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange} -import ai.rapids.cudf.JCudfSerialization.SerializedTableHeader +import ai.rapids.cudf.{HostConcatResultUtil, HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange} +import ai.rapids.cudf.JCudfSerialization.{HostConcatResult, SerializedTableHeader} import com.nvidia.spark.rapids.shims.v2.ShimUnaryExecNode import org.apache.spark.TaskContext @@ -61,10 +61,12 @@ case class GpuShuffleCoalesceExec(child: SparkPlan, targetBatchByteSize: Long) override def doExecuteColumnar(): RDD[ColumnarBatch] = { val metricsMap = allMetrics val targetSize = targetBatchByteSize - val sparkSchema = GpuColumnVector.extractTypes(schema) + val dataTypes = GpuColumnVector.extractTypes(schema) child.executeColumnar().mapPartitions { iter => - new GpuShuffleCoalesceIterator(iter, targetSize, sparkSchema, metricsMap) + new GpuShuffleCoalesceIterator( + new HostShuffleCoalesceIterator(iter, targetSize, dataTypes, metricsMap), + dataTypes, metricsMap) } } } @@ -72,22 +74,18 @@ case class GpuShuffleCoalesceExec(child: SparkPlan, targetBatchByteSize: Long) /** * Iterator that coalesces columnar batches that are expected to only contain * [[SerializedTableColumn]]. The serialized tables within are collected up - * to the target batch size and then concatenated on the host before the data - * is transferred to the GPU. + * to the target batch size and then concatenated on the host before handing + * them to the caller on `.next()` */ -class GpuShuffleCoalesceIterator( +class HostShuffleCoalesceIterator( iter: Iterator[ColumnarBatch], targetBatchByteSize: Long, - sparkSchema: Array[DataType], + dataTypes: Array[DataType], metricsMap: Map[String, GpuMetric]) - extends Iterator[ColumnarBatch] with Arm with AutoCloseable { - private[this] val opTimeMetric = metricsMap(GpuMetric.OP_TIME) + extends Iterator[HostConcatResult] with Arm with AutoCloseable { + private[this] val concatTimeMetric = metricsMap(GpuMetric.CONCAT_TIME) private[this] val inputBatchesMetric = metricsMap(GpuMetric.NUM_INPUT_BATCHES) private[this] val inputRowsMetric = metricsMap(GpuMetric.NUM_INPUT_ROWS) - private[this] val outputBatchesMetric = metricsMap(GpuMetric.NUM_OUTPUT_BATCHES) - private[this] val outputRowsMetric = metricsMap(GpuMetric.NUM_OUTPUT_ROWS) - private[this] val concatTimeMetric = metricsMap(GpuMetric.CONCAT_TIME) - private[this] val semWaitTime = metricsMap(GpuMetric.SEMAPHORE_WAIT_TIME) private[this] val serializedTables = new util.ArrayDeque[SerializedTableColumn] private[this] var numTablesInBatch: Int = 0 private[this] var numRowsInBatch: Int = 0 @@ -95,21 +93,44 @@ class GpuShuffleCoalesceIterator( Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => close())) - override def hasNext: Boolean = { - bufferNextBatch() - numTablesInBatch > 0 + override def close(): Unit = { + serializedTables.forEach(_.close()) + serializedTables.clear() } - override def next(): ColumnarBatch = { - if (!hasNext) { - throw new NoSuchElementException("No more columnar batches") + def concatenateTablesInHost(): HostConcatResult = { + val result = withResource(new MetricRange(concatTimeMetric)) { _ => + val firstHeader = serializedTables.peekFirst().header + if (firstHeader.getNumColumns == 0) { + (0 until numTablesInBatch).foreach(_ => serializedTables.removeFirst()) + HostConcatResultUtil.rowsOnlyHostConcatResult(numRowsInBatch) + } else { + val headers = new Array[SerializedTableHeader](numTablesInBatch) + withResource(new Array[HostMemoryBuffer](numTablesInBatch)) { buffers => + headers.indices.foreach { i => + val serializedTable = serializedTables.removeFirst() + headers(i) = serializedTable.header + buffers(i) = serializedTable.hostBuffer + } + JCudfSerialization.concatToHostBuffer(headers, buffers) + } + } } - concatenateBatch() - } - override def close(): Unit = { - serializedTables.forEach(_.close()) - serializedTables.clear() + // update the stats for the next batch in progress + numTablesInBatch = serializedTables.size + + batchByteSize = 0 + numRowsInBatch = 0 + if (numTablesInBatch > 0) { + require(numTablesInBatch == 1, + "should only track at most one buffer that is not in a batch") + val header = serializedTables.peekFirst().header + batchByteSize = header.getDataLen + numRowsInBatch = header.getNumRows + } + + result } private def bufferNextBatch(): Unit = { @@ -120,7 +141,7 @@ class GpuShuffleCoalesceIterator( inputBatchesMetric += 1 // don't bother tracking empty tables if (batch.numRows > 0) { - inputRowsMetric += batch.numRows + inputRowsMetric += batch.numRows() val tableColumn = batch.column(0).asInstanceOf[SerializedTableColumn] batchCanGrow = canAddToBatch(tableColumn.header) serializedTables.addLast(tableColumn) @@ -138,6 +159,18 @@ class GpuShuffleCoalesceIterator( } } + override def hasNext(): Boolean = { + bufferNextBatch() + numTablesInBatch > 0 + } + + override def next(): HostConcatResult = { + if (!hasNext()) { + throw new NoSuchElementException("No more host batches to concatenate") + } + concatenateTablesInHost() + } + private def canAddToBatch(nextTable: SerializedTableHeader): Boolean = { if (batchByteSize + nextTable.getDataLen > targetBatchByteSize) { return false @@ -147,60 +180,41 @@ class GpuShuffleCoalesceIterator( } true } +} - private def concatenateBatch(): ColumnarBatch = { - val firstHeader = serializedTables.peekFirst().header - val batch = withResource(new MetricRange(concatTimeMetric)) { _ => - if (firstHeader.getNumColumns == 0) { - // acquire the GPU unconditionally for now in this case, as a downstream exec - // may need the GPU, and the assumption is that it is acquired in the coalesce - // code. - GpuSemaphore.acquireIfNecessary(TaskContext.get(), semWaitTime) - (0 until numTablesInBatch).foreach(_ => serializedTables.removeFirst()) - new ColumnarBatch(Array.empty, numRowsInBatch) - } else { - concatenateTablesBatch() - } - } +/** + * Iterator that coalesces columnar batches that are expected to only contain + * [[SerializedTableColumn]]. The serialized tables within are collected up + * to the target batch size and then concatenated on the host before the data + * is transferred to the GPU. + */ +class GpuShuffleCoalesceIterator(iter: Iterator[HostConcatResult], + dataTypes: Array[DataType], + metricsMap: Map[String, GpuMetric]) + extends Iterator[ColumnarBatch] with Arm { + private[this] val semWaitTime = metricsMap(GpuMetric.SEMAPHORE_WAIT_TIME) + private[this] val opTimeMetric = metricsMap(GpuMetric.OP_TIME) + private[this] val outputBatchesMetric = metricsMap(GpuMetric.NUM_OUTPUT_BATCHES) + private[this] val outputRowsMetric = metricsMap(GpuMetric.NUM_OUTPUT_ROWS) - withResource(new MetricRange(opTimeMetric)) { _ => - outputBatchesMetric += 1 - outputRowsMetric += batch.numRows - - // update the stats for the next batch in progress - numTablesInBatch = serializedTables.size - batchByteSize = 0 - numRowsInBatch = 0 - if (numTablesInBatch > 0) { - require(numTablesInBatch == 1, - "should only track at most one buffer that is not in a batch") - val header = serializedTables.peekFirst().header - batchByteSize = header.getDataLen - numRowsInBatch = header.getNumRows - } + override def hasNext: Boolean = iter.hasNext - batch + override def next(): ColumnarBatch = { + if (!hasNext) { + throw new NoSuchElementException("No more columnar batches") } - } - - private def concatenateTablesBatch(): ColumnarBatch = { - val headers = new Array[SerializedTableHeader](numTablesInBatch) - withResource(new Array[HostMemoryBuffer](numTablesInBatch)) { buffers => - headers.indices.foreach { i => - val serializedTable = serializedTables.removeFirst() - headers(i) = serializedTable.header - buffers(i) = serializedTable.hostBuffer - } - - withResource(new NvtxRange("Concat+Load Batch", NvtxColor.YELLOW)) { _ => - withResource(JCudfSerialization.concatToHostBuffer(headers, buffers)) { hostConcatResult => - // about to start using the GPU in this task - GpuSemaphore.acquireIfNecessary(TaskContext.get(), semWaitTime) - withResource(new MetricRange(opTimeMetric)) { _ => - withResource(hostConcatResult.toContiguousTable) { contigTable => - GpuColumnVectorFromBuffer.from(contigTable, sparkSchema) - } - } + withResource(new NvtxRange("Concat+Load Batch", NvtxColor.YELLOW)) { _ => + withResource(iter.next()) { hostConcatResult => + // We acquire the GPU regardless of whether `hostConcatResult` + // is an empty batch or not, because the downstream tasks expect + // the `GpuShuffleCoalesceIterator` to acquire the semaphore and may + // generate GPU data from batches that are empty. + GpuSemaphore.acquireIfNecessary(TaskContext.get(), semWaitTime) + withResource(new MetricRange(opTimeMetric)) { _ => + val batch = HostConcatResultUtil.getColumnarBatch(hostConcatResult, dataTypes) + outputBatchesMetric += 1 + outputRowsMetric += batch.numRows() + batch } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala index 675ec01758e..8c33ede98bc 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala @@ -16,8 +16,11 @@ package com.nvidia.spark.rapids +import ai.rapids.cudf.{HostConcatResultUtil, NvtxColor, NvtxRange} +import ai.rapids.cudf.JCudfSerialization.HostConcatResult import com.nvidia.spark.rapids.shims.v2.{GpuHashPartitioning, GpuJoinUtils, ShimBinaryExecNode} +import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} @@ -97,7 +100,9 @@ case class GpuShuffledHashJoinExec( override val outputBatchesLevel: MetricsLevel = MODERATE_LEVEL override lazy val additionalMetrics: Map[String, GpuMetric] = Map( OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME), + CONCAT_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_CONCAT_TIME), BUILD_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_DATA_SIZE), + PEAK_DEVICE_MEMORY -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_PEAK_DEVICE_MEMORY), BUILD_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_TIME), STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME), JOIN_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_JOIN_TIME), @@ -123,28 +128,39 @@ case class GpuShuffledHashJoinExec( val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS) val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES) val opTime = gpuLongMetric(OP_TIME) - val buildTime = gpuLongMetric(BUILD_TIME) val streamTime = gpuLongMetric(STREAM_TIME) val joinTime = gpuLongMetric(JOIN_TIME) val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS) - val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf) + val batchSizeBytes = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf) val spillCallback = GpuMetric.makeSpillCallback(allMetrics) - val localBuildOutput: Seq[Attribute] = buildPlan.output + val localBuildOutput = buildPlan.output + + // Create a map of metrics that can be handed down to shuffle and coalesce + // iterators, setting as noop certain metrics that the coalesce iterators + // normally update, but that in the case of the join they would produce + // the wrong statistics (since there are conflicts) + val coalesceMetricsMap = allMetrics + + (GpuMetric.NUM_INPUT_ROWS -> NoopMetric, + GpuMetric.NUM_INPUT_BATCHES -> NoopMetric, + GpuMetric.NUM_OUTPUT_BATCHES -> NoopMetric, + GpuMetric.NUM_OUTPUT_ROWS -> NoopMetric) streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) { (streamIter, buildIter) => { - val stIt = new CollectTimeIterator("shuffled join stream", streamIter, streamTime) - val startTime = System.nanoTime() + val (builtBatch, maybeBufferedStreamIter) = + GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter( + batchSizeBytes, + localBuildOutput, + buildIter, + new CollectTimeIterator("shuffled join stream", streamIter, streamTime), + spillCallback, + coalesceMetricsMap) - withResource(ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter, - localBuildOutput)) { builtBatch => + withResource(builtBatch) { _ => // doJoin will increment the reference counts as needed for the builtBatch - val delta = System.nanoTime() - startTime - buildTime += delta buildDataSize += GpuColumnVector.getTotalDeviceMemoryUsed(builtBatch) - - doJoin(builtBatch, stIt, targetSize, spillCallback, - numOutputRows, joinOutputRows, numOutputBatches, + doJoin(builtBatch, maybeBufferedStreamIter, + batchSizeBytes, spillCallback, numOutputRows, joinOutputRows, numOutputBatches, opTime, joinTime) } } @@ -155,3 +171,183 @@ case class GpuShuffledHashJoinExec( if (isSkewJoin) super.nodeName + "(skew=true)" else super.nodeName } } + +object GpuShuffledHashJoinExec extends Arm { + /** + * Helper iterator that wraps a BufferedIterator of AutoCloseable subclasses. + * This iterator also implements AutoCloseable, so it can be closed in case + * of exceptions. + * + * @param wrapped the buffered iterator + * @tparam T an AutoCloseable subclass + */ + class CloseableBufferedIterator[T <: AutoCloseable](wrapped: BufferedIterator[T]) + extends BufferedIterator[T] with AutoCloseable { + // register against task completion to close any leaked buffered items + Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => close())) + + private[this] var isClosed = false + override def head: T = wrapped.head + override def headOption: Option[T] = wrapped.headOption + override def next: T = wrapped.next + override def hasNext: Boolean = wrapped.hasNext + override def close(): Unit = { + if (!isClosed) { + headOption.foreach(_.close()) + isClosed = true + } + } + } + + /** + * Gets a `ColumnarBatch` and stream Iterator[ColumnarBatch] pair by acquiring + * the GPU semaphore optimally in the scenario where the build side is relatively + * small (less than `hostTargetBatchSize`). + * + * In the optimal case, this function will load the build side on the host up to the + * goal configuration and if it fits entirely, allow the stream iterator + * to also pull to host its first batch. After the first stream batch is on the host, the + * stream iterator acquires the semaphore and then the build side is copied to the GPU. + * + * Prior to this we would get a build batch on the GPU, acquiring + * the semaphore in the process, and then begin pulling from the stream iterator, + * which could include IO (while holding onto the semaphore). + * + * The function handles the case where the build side goes above the configured batch + * goal, in which case it will concat on the host, grab the semaphore, and continue to + * pull the build iterator to build a bigger batch on the GPU. This is not optimized + * because we hold onto the semaphore during the entire time after realizing the goal + * has been hit. + * + * @param hostTargetBatchSize target batch size goal on the host + * @param buildOutput output attributes of the build plan + * @param buildIter build iterator + * @param streamIter stream iterator + * @param spillCallback metric updater in case downstream iterators spill + * @param coalesceMetricsMap metrics map with metrics to be used in downstream + * iterators + * @return a pair of `ColumnarBatch` and streamed iterator that can be + * used for the join + */ + def getBuiltBatchAndStreamIter( + hostTargetBatchSize: Long, + buildOutput: Seq[Attribute], + buildIter: Iterator[ColumnarBatch], + streamIter: Iterator[ColumnarBatch], + spillCallback: SpillCallback, + coalesceMetricsMap: Map[String, GpuMetric]): (ColumnarBatch, Iterator[ColumnarBatch]) = { + val semWait = coalesceMetricsMap(GpuMetric.SEMAPHORE_WAIT_TIME) + val buildTime = coalesceMetricsMap(GpuMetric.BUILD_TIME) + var bufferedBuildIterator: CloseableBufferedIterator[ColumnarBatch] = null + closeOnExcept(bufferedBuildIterator) { _ => + val startTime = System.nanoTime() + // find if the build side is non-empty, and if the first batch is + // a serialized batch. If neither condition is met, we fallback to the + // `getSingleBatchWithVerification` method. + val firstBatchIsSerialized = { + if (!buildIter.hasNext) { + false + } else { + bufferedBuildIterator = new CloseableBufferedIterator(buildIter.buffered) + val firstBatch = bufferedBuildIterator.head + if (firstBatch.numCols() != 1) { + false + } else { + firstBatch.column(0).isInstanceOf[SerializedTableColumn] + } + } + } + + if (!firstBatchIsSerialized) { + // In this scenario we are getting non host-side batches in the build side + // given the plan rules we expect this to be a single batch + val builtBatch = + ConcatAndConsumeAll.getSingleBatchWithVerification( + Option(bufferedBuildIterator).getOrElse(buildIter), buildOutput) + val delta = System.nanoTime() - startTime + buildTime += delta + (builtBatch, streamIter) + } else { + val dataTypes = buildOutput.map(_.dataType).toArray + val hostConcatIter = new HostShuffleCoalesceIterator(bufferedBuildIterator, + hostTargetBatchSize, dataTypes, coalesceMetricsMap) + withResource(hostConcatIter) { _ => + closeOnExcept(hostConcatIter.next()) { hostConcatResult => + if (!hostConcatIter.hasNext()) { + // add the time it took to fetch that first host-side build batch + buildTime += System.nanoTime() - startTime + // Optimal case, we drained the build iterator and we didn't have a prior + // so it was a single batch, and is entirely on the host. + // We peek at the stream iterator with `hasNext` on the buffered + // iterator, which will grab the semaphore when putting the first stream + // batch on the GPU, and then we bring the build batch to the GPU and return. + val bufferedStreamIter = new CloseableBufferedIterator(streamIter.buffered) + closeOnExcept(bufferedStreamIter) { _ => + withResource(new NvtxRange("first stream batch", NvtxColor.RED)) { _ => + if (bufferedStreamIter.hasNext) { + bufferedStreamIter.head + } else { + GpuSemaphore.acquireIfNecessary(TaskContext.get(), semWait) + } + } + val buildBatch = getBuildBatchOptimized(hostConcatResult, buildOutput, buildTime) + (buildBatch, bufferedStreamIter) + } + } else { + val buildBatch = getBuildBatchFromUnfinished( + Seq(hostConcatResult).iterator ++ hostConcatIter, + buildOutput, spillCallback, coalesceMetricsMap) + buildTime += System.nanoTime() - startTime + (buildBatch, streamIter) + } + } + } + } + } + } + + private def getBuildBatchFromUnfinished( + iterWithPrior: Iterator[HostConcatResult], + buildOutput: Seq[Attribute], + spillCallback: SpillCallback, + coalesceMetricsMap: Map[String, GpuMetric]): ColumnarBatch = { + // In the fallback case we build the same iterator chain that the Spark plan + // would have produced: + // GpuCoalesceIterator(GpuShuffleCoalesceIterator(shuffled build side)) + // This allows us to make the shuffle batches spillable in case we have a large, + // build-side table, as `RequireSingleBatch` is virtually no limit, and we + // know we are now above `hostTargetBatchSize` (which is 2GB by default) + val dataTypes = buildOutput.map(_.dataType).toArray + val shuffleCoalesce = new GpuShuffleCoalesceIterator( + iterWithPrior, + dataTypes, + coalesceMetricsMap) + val res = ConcatAndConsumeAll.getSingleBatchWithVerification( + new GpuCoalesceIterator(shuffleCoalesce, + dataTypes, + RequireSingleBatch, + NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric, + coalesceMetricsMap(GpuMetric.CONCAT_TIME), + coalesceMetricsMap(GpuMetric.OP_TIME), + coalesceMetricsMap(GpuMetric.PEAK_DEVICE_MEMORY), + spillCallback, + "build batch"), + buildOutput) + res + } + + private def getBuildBatchOptimized( + hostConcatResult: HostConcatResult, + buildOutput: Seq[Attribute], + buildTime: GpuMetric): ColumnarBatch = { + val dataTypes = buildOutput.map(_.dataType).toArray + // we are on the GPU and our build batch is within `targetSizeBytes`. + // we can bring the build batch to the GPU now + withResource(hostConcatResult) { _ => + buildTime.ns { + HostConcatResultUtil.getColumnarBatch(hostConcatResult, dataTypes) + } + } + } +} + diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index 9e9c496e8b4..54bf57e94fa 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -237,6 +237,28 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { p.withNewChildren(p.children.map(optimizeCoalesce)) } + /** + * Removes `GpuCoalesceBatches(GpuShuffleCoalesceExec(build side))` for the build side + * for the shuffled hash join. The coalesce logic has been moved to the + * `GpuShuffleCoalesceExec` class, and is handled differently to prevent holding onto the + * GPU semaphore for stream IO. + */ + def shuffledHashJoinOptimizeShuffle(plan: SparkPlan): SparkPlan = plan match { + case x@GpuShuffledHashJoinExec( + _, _, _, buildSide, _, + left: GpuShuffleCoalesceExec, + GpuCoalesceBatches(GpuShuffleCoalesceExec(rc, _), _),_) if buildSide == GpuBuildRight => + x.withNewChildren( + Seq(shuffledHashJoinOptimizeShuffle(left), shuffledHashJoinOptimizeShuffle(rc))) + case x@GpuShuffledHashJoinExec( + _, _, _, buildSide, _, + GpuCoalesceBatches(GpuShuffleCoalesceExec(lc, _), _), + right: GpuShuffleCoalesceExec, _) if buildSide == GpuBuildLeft => + x.withNewChildren( + Seq(shuffledHashJoinOptimizeShuffle(lc), shuffledHashJoinOptimizeShuffle(right))) + case p => p.withNewChildren(p.children.map(shuffledHashJoinOptimizeShuffle)) + } + private def insertCoalesce(plans: Seq[SparkPlan], goals: Seq[CoalesceGoal], disableUntilInput: Boolean): Seq[SparkPlan] = { plans.zip(goals).map { @@ -550,6 +572,9 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { } updatedPlan = fixupHostColumnarTransitions(updatedPlan) updatedPlan = optimizeCoalesce(updatedPlan) + if (rapidsConf.shuffledHashJoinOptimizeShuffle) { + updatedPlan = shuffledHashJoinOptimizeShuffle(updatedPlan) + } if (rapidsConf.exportColumnarRdd) { updatedPlan = detectAndTagFinalColumnarOutput(updatedPlan) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index aec84a1f4a6..4042a466cbc 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -471,6 +471,16 @@ object RapidsConf { .booleanConf .createWithDefault(false) + val SHUFFLED_HASH_JOIN_OPTIMIZE_SHUFFLE = + conf("spark.rapids.sql.shuffledHashJoin.optimizeShuffle") + .doc("Enable or disable an optimization where shuffled build side batches are kept " + + "on the host while the first stream batch is loaded onto the GPU. The optimization " + + "increases off-heap host memory usage to avoid holding onto the GPU semaphore while " + + "waiting for stream side IO.") + .internal() + .booleanConf + .createWithDefault(true) + val STABLE_SORT = conf("spark.rapids.sql.stableSort.enabled") .doc("Enable or disable stable sorting. Apache Spark's sorting is typically a stable " + "sort, but sort stability cannot be guaranteed in distributed work loads because the " + @@ -1484,6 +1494,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val exportColumnarRdd: Boolean = get(EXPORT_COLUMNAR_RDD) + lazy val shuffledHashJoinOptimizeShuffle: Boolean = get(SHUFFLED_HASH_JOIN_OPTIMIZE_SHUFFLE) + lazy val stableSort: Boolean = get(STABLE_SORT) lazy val isIncompatEnabled: Boolean = get(INCOMPATIBLE_OPS) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/AdaptiveQueryExecSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/AdaptiveQueryExecSuite.scala index 5db550ba083..78913447dae 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/AdaptiveQueryExecSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/AdaptiveQueryExecSuite.scala @@ -177,34 +177,54 @@ class AdaptiveQueryExecSuite spark.sql("INSERT INTO TABLE t1 SELECT a, b FROM testData").collect() spark.sql("INSERT INTO TABLE t2 SELECT a, b FROM testData").collect() - val df = spark.sql( - "SELECT t1.a, t2.b " + + // This test checks that inputs to the SHJ are coalesced. We need to check both sides + // if we are not optimizing the build-side coalescing logic, and only the stream side + // if the optimization is enabled (default). + // See `RapidsConf.SHUFFLED_HASH_JOIN_OPTIMIZE_SHUFFLE` for more information. + Seq(true, false).foreach { shouldOptimizeHashJoinShuffle => + spark.conf.set( + RapidsConf.SHUFFLED_HASH_JOIN_OPTIMIZE_SHUFFLE.key, + shouldOptimizeHashJoinShuffle.toString) + val df = spark.sql( + "SELECT t1.a, t2.b " + "FROM t1 " + "JOIN t2 " + "ON t1.a = t2.a " + "WHERE t2.a = 5" // filter on partition key to force dynamic partition pruning - ) - df.collect() + ) + df.collect() - val isAdaptiveQuery = df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec] - if (cmpSparkVersion(3, 2, 0) < 0) { - // assert that DPP did cause this to run as a non-AQE plan prior to Spark 3.2.0 - assert(!isAdaptiveQuery) - } else { - // In 3.2.0 AQE works with DPP - assert(isAdaptiveQuery) + val isAdaptiveQuery = df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec] + if (cmpSparkVersion(3, 2, 0) < 0) { + // assert that DPP did cause this to run as a non-AQE plan prior to Spark 3.2.0 + assert(!isAdaptiveQuery) + } else { + // In 3.2.0 AQE works with DPP + assert(isAdaptiveQuery) + } + + val shj = TestUtils.findOperator(df.queryExecution.executedPlan, + _.isInstanceOf[GpuShuffledHashJoinExec]).get + .asInstanceOf[GpuShuffledHashJoinExec] + assert(shj.children.length == 2) + val childrenToCheck = if (shouldOptimizeHashJoinShuffle) { + // assert that the stream side of SHJ is coalesced + shj.buildSide match { + case GpuBuildLeft => Seq(shj.right) + case GpuBuildRight => Seq(shj.left) + } + } else { + // assert that both the build and stream side of SHJ are coalesced + // if we are not optimizing the build side shuffle + shj.children + } + assert(childrenToCheck.forall { + case GpuShuffleCoalesceExec(_, _) => true + case GpuCoalesceBatches(GpuShuffleCoalesceExec(_, _), _) => true + case _ => false + }) } - // assert that both inputs to the SHJ are coalesced - val shj = TestUtils.findOperator(df.queryExecution.executedPlan, - _.isInstanceOf[GpuShuffledHashJoinExec]).get - assert(shj.children.length == 2) - assert(shj.children.forall { - case GpuShuffleCoalesceExec(_, _) => true - case GpuCoalesceBatches(GpuShuffleCoalesceExec(_, _), _) => true - case _ => false - }) - }, conf) } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala index bf29a411989..d5332628c5b 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -475,9 +475,9 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { val schema = new StructType().add("i", LongType) .add("j", DecimalType(ai.rapids.cudf.DType.DECIMAL64_MAX_PRECISION, 3)) val dummyMetric = WrappedGpuMetric(new SQLMetric("ignored")) - val coalesceIter = new GpuCoalesceIterator( + val coalesceIter = new GpuCompressionAwareCoalesceIterator( batchIter, - schema, + GpuColumnVector.extractTypes(schema), TargetSize(coalesceTargetBytes), maxCompressedBatchMemoryLimit, dummyMetric, @@ -559,9 +559,9 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { val schema = new StructType().add("i", LongType) .add("j", DecimalType(ai.rapids.cudf.DType.DECIMAL64_MAX_PRECISION, 3)) val dummyMetric = WrappedGpuMetric(new SQLMetric("ignored")) - val coalesceIter = new GpuCoalesceIterator( + val coalesceIter = new GpuCompressionAwareCoalesceIterator( batchIter, - schema, + GpuColumnVector.extractTypes(schema), TargetSize(coalesceTargetBytes), maxCompressedBatchMemoryLimit, dummyMetric, diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExecSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExecSuite.scala new file mode 100644 index 00000000000..ee11387c9ac --- /dev/null +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExecSuite.scala @@ -0,0 +1,312 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream} + +import ai.rapids.cudf.{ColumnVector, HostMemoryBuffer, JCudfSerialization, Table} +import org.mockito.ArgumentMatchers._ +import org.mockito.Mockito._ +import org.scalatest.FunSuite +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark.SparkConf +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.types.IntegerType +import org.apache.spark.sql.vectorized.ColumnarBatch + +class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar { + val metricMap = mock[Map[String, GpuMetric]] + when(metricMap(any())).thenReturn(NoopMetric) + + test("fallback with empty build iterator") { + TestUtils.withGpuSparkSession(new SparkConf()) { _ => + val mockBuildIter = mock[Iterator[ColumnarBatch]] + when(mockBuildIter.hasNext).thenReturn(false) + val mockStreamIter = mock[Iterator[ColumnarBatch]] + val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter( + 0, + Seq.empty, + mockBuildIter, + mockStreamIter, + mock[SpillCallback], + metricMap) + withResource(builtBatch) { _ => + // we ge an empty batch with no columns or rows + assertResult(builtBatch.numCols())(0) + assertResult(builtBatch.numRows())(0) + // 2 invocations, once in the `getBuiltBatchAndStreamIter` + // method, and a second one in `getSingleBatchWithVerification` + verify(mockBuildIter, times(2)).hasNext + verify(mockBuildIter, times(0)).next + verify(mockStreamIter, times(0)).hasNext + } + } + } + + test("fallback with 0 column build batches") { + TestUtils.withGpuSparkSession(new SparkConf()) { _ => + withResource(GpuColumnVector.emptyBatchFromTypes(Array.empty)) { + emptyBatch => + val buildIter = mock[Iterator[ColumnarBatch]] + when(buildIter.hasNext).thenReturn(true, false) + val buildBufferedIter = mock[BufferedIterator[ColumnarBatch]] + when(buildBufferedIter.hasNext).thenReturn(true, false) + when(buildBufferedIter.head).thenReturn(emptyBatch) + when(buildBufferedIter.next).thenReturn(emptyBatch) + when(buildIter.buffered).thenReturn(buildBufferedIter) + val mockStreamIter = mock[Iterator[ColumnarBatch]] + val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter( + 0, + Seq.empty, + buildIter, + mockStreamIter, + mock[SpillCallback], + metricMap) + withResource(builtBatch) { _ => + assertResult(builtBatch.numCols())(0) + assertResult(builtBatch.numRows())(0) + // 1 invocation in the `getBuiltBatchAndStreamIter` + // after which a buffered iterator is obtained and used for the fallback case + verify(buildIter, times(1)).hasNext + verify(buildIter, times(1)).buffered + // we ask the buffered iterator for `head` to inspect the number of columns + verify(buildBufferedIter, times(1)).head + // the buffered iterator is passed to `getSingleBatchWithVerification`, + // and that code calls hasNext twice + verify(buildBufferedIter, times(2)).hasNext + // and calls next to get that batch we buffered + verify(buildBufferedIter, times(1)).next + verify(mockStreamIter, times(0)).hasNext + } + } + } + } + + test("fallback with a non-SerializedTableColumn 1 col and 0 rows") { + TestUtils.withGpuSparkSession(new SparkConf()) { _ => + val emptyBatch = GpuColumnVector.emptyBatchFromTypes(Seq(IntegerType).toArray) + val buildIter = Seq(emptyBatch).iterator + val mockStreamIter = mock[Iterator[ColumnarBatch]] + val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter( + 0, + Seq.empty, + buildIter, + mockStreamIter, + mock[SpillCallback], + metricMap) + withResource(builtBatch) { _ => + assertResult(builtBatch.numCols())(1) + assertResult(builtBatch.numRows())(0) + // 2 invocations, once in the `getBuiltBatchAndStreamIter + // method, and one in `getSingleBatchWithVerification` + verify(mockStreamIter, times(0)).hasNext + // the buffered iterator drained the build iterator + assertResult(buildIter.hasNext)(false) + } + } + } + + test("fallback with a non-SerialiedTableColumn") { + TestUtils.withGpuSparkSession(new SparkConf()) { _ => + closeOnExcept(ColumnVector.fromInts(1, 2, 3, 4, 5)) { cudfCol => + val cv = GpuColumnVector.from(cudfCol, IntegerType) + val batch = new ColumnarBatch(Seq(cv).toArray, 5) + val buildIter = Seq(batch).iterator + val mockStreamIter = mock[Iterator[ColumnarBatch]] + val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter( + 0, + Seq.empty, + buildIter, + mockStreamIter, + mock[SpillCallback], + metricMap) + withResource(builtBatch) { _ => + assertResult(builtBatch.numCols())(1) + assertResult(builtBatch.numRows())(5) + // 2 invocations, once in the `getBuiltBatchAndStreamIter + // method, and one in `getSingleBatchWithVerification` + verify(mockStreamIter, times(0)).hasNext + // the buffered iterator drained the build iterator + assertResult(buildIter.hasNext)(false) + } + } + } + } + + def getSerializedBatch(tbl: Table): ColumnarBatch = { + val outStream = new ByteArrayOutputStream() + JCudfSerialization.writeToStream(tbl, outStream, 0, tbl.getRowCount) + val dIn = new DataInputStream(new ByteArrayInputStream(outStream.toByteArray)) + val header = new JCudfSerialization.SerializedTableHeader(dIn) + closeOnExcept(HostMemoryBuffer.allocate(header.getDataLen, false)) { hostBuffer => + JCudfSerialization.readTableIntoBuffer(dIn, header, hostBuffer) + SerializedTableColumn.from(header, hostBuffer) + } + } + + def getSerializedBatch(numRows: Int): ColumnarBatch = { + val outStream = new ByteArrayOutputStream() + JCudfSerialization.writeRowsToStream(outStream, numRows) + val dIn = new DataInputStream(new ByteArrayInputStream(outStream.toByteArray)) + val header = new JCudfSerialization.SerializedTableHeader(dIn) + closeOnExcept(HostMemoryBuffer.allocate(header.getDataLen, false)) { hostBuffer => + JCudfSerialization.readTableIntoBuffer(dIn, header, hostBuffer) + SerializedTableColumn.from(header, hostBuffer) + } + } + + test("test a 0-column SerializedTableColumn") { + TestUtils.withGpuSparkSession(new SparkConf()) { _ => + val serializedBatch = getSerializedBatch(5) + val mockStreamIter = mock[Iterator[ColumnarBatch]] + val mockBufferedStreamIterator = mock[BufferedIterator[ColumnarBatch]] + when(mockStreamIter.hasNext).thenReturn(true) + when(mockStreamIter.buffered).thenReturn(mockBufferedStreamIterator) + when(mockBufferedStreamIterator.hasNext).thenReturn(true) + closeOnExcept(serializedBatch) { _ => + val buildIter = Seq(serializedBatch).iterator + val attrs = AttributeReference("a", IntegerType, false)() :: Nil + val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter( + 1024, + attrs, + buildIter, + mockStreamIter, + mock[SpillCallback], + metricMap) + withResource(builtBatch) { _ => + verify(mockBufferedStreamIterator, times(1)).hasNext + assertResult(builtBatch.numCols())(0) + assertResult(builtBatch.numRows())(5) + // the buffered iterator drained the build iterator + assertResult(buildIter.hasNext)(false) + } + } + } + } + + test("test a SerializedTableColumn") { + TestUtils.withGpuSparkSession(new SparkConf()) { _ => + closeOnExcept(ColumnVector.fromInts(1, 2, 3, 4, 5)) { cudfCol => + val cv = GpuColumnVector.from(cudfCol, IntegerType) + val batch = new ColumnarBatch(Seq(cv).toArray, 5) + withResource(GpuColumnVector.from(batch)) { tbl => + val serializedBatch = getSerializedBatch(tbl) + val mockStreamIter = mock[Iterator[ColumnarBatch]] + val mockBufferedStreamIterator = mock[BufferedIterator[ColumnarBatch]] + when(mockStreamIter.hasNext).thenReturn(true) + when(mockStreamIter.buffered).thenReturn(mockBufferedStreamIterator) + when(mockBufferedStreamIterator.hasNext).thenReturn(true) + closeOnExcept(serializedBatch) { _ => + val buildIter = Seq(serializedBatch).iterator + val attrs = AttributeReference("a", IntegerType, false)() :: Nil + val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter( + 1024, + attrs, + buildIter, + mockStreamIter, + mock[SpillCallback], + metricMap) + withResource(builtBatch) { _ => + verify(mockBufferedStreamIterator, times(1)).hasNext + assertResult(builtBatch.numCols())(1) + assertResult(builtBatch.numRows())(5) + // the buffered iterator drained the build iterator + assertResult(buildIter.hasNext)(false) + } + } + } + } + } + } + + test("test two batches, going over the limit") { + TestUtils.withGpuSparkSession(new SparkConf()) { _ => + closeOnExcept(ColumnVector.fromInts(1, 2, 3, 4, 5)) { cudfCol => + val cv = GpuColumnVector.from(cudfCol, IntegerType) + val batch = new ColumnarBatch(Seq(cv).toArray, 5) + withResource(GpuColumnVector.from(batch)) { tbl => + val serializedBatch = getSerializedBatch(tbl) + val serializedBatch2 = getSerializedBatch(tbl) + val mockStreamIter = mock[Iterator[ColumnarBatch]] + val mockBufferedStreamIterator = mock[BufferedIterator[ColumnarBatch]] + when(mockStreamIter.hasNext).thenReturn(true) + when(mockStreamIter.buffered).thenReturn(mockBufferedStreamIterator) + when(mockBufferedStreamIterator.hasNext).thenReturn(true) + closeOnExcept(serializedBatch) { _ => + closeOnExcept(serializedBatch2) { _ => + val buildIter = Seq(serializedBatch, serializedBatch2).iterator + val attrs = AttributeReference("a", IntegerType, false)() :: Nil + val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter( + 1, + attrs, + buildIter, + mockStreamIter, + mock[SpillCallback], + metricMap) + withResource(builtBatch) { _ => + verify(mockBufferedStreamIterator, times(0)).hasNext + assertResult(builtBatch.numCols())(1) + assertResult(builtBatch.numRows())(10) + // the buffered iterator drained the build iterator + assertResult(buildIter.hasNext)(false) + } + } + } + } + } + } + } + + test("test two batches, stating within the limit") { + TestUtils.withGpuSparkSession(new SparkConf()) { _ => + closeOnExcept(ColumnVector.fromInts(1, 2, 3, 4, 5)) { cudfCol => + val cv = GpuColumnVector.from(cudfCol, IntegerType) + val batch = new ColumnarBatch(Seq(cv).toArray, 5) + withResource(GpuColumnVector.from(batch)) { tbl => + val serializedBatch = getSerializedBatch(tbl) + val serializedBatch2 = getSerializedBatch(tbl) + val mockStreamIter = mock[Iterator[ColumnarBatch]] + val mockBufferedStreamIterator = mock[BufferedIterator[ColumnarBatch]] + when(mockStreamIter.hasNext).thenReturn(true) + when(mockStreamIter.buffered).thenReturn(mockBufferedStreamIterator) + when(mockBufferedStreamIterator.hasNext).thenReturn(true) + closeOnExcept(serializedBatch) { _ => + closeOnExcept(serializedBatch2) { _ => + val buildIter = Seq(serializedBatch, serializedBatch2).iterator + val attrs = AttributeReference("a", IntegerType, false)() :: Nil + val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter( + 1024, + attrs, + buildIter, + mockStreamIter, + mock[SpillCallback], + metricMap) + withResource(builtBatch) { _ => + verify(mockBufferedStreamIterator, times(1)).hasNext + assertResult(builtBatch.numCols())(1) + assertResult(builtBatch.numRows())(10) + // the buffered iterator drained the build iterator + assertResult(buildIter.hasNext)(false) + } + } + } + } + } + } + } +}