Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize semaphore acquisition in GpuShuffledHashJoinExec #4588

Merged
merged 14 commits into from
Feb 8, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,28 @@ object ConcatAndConsumeAll {
* @return a single batch with all of them concated together.
*/
def buildNonEmptyBatch(arrayOfBatches: Array[ColumnarBatch],
schema: StructType): ColumnarBatch = {
schema: StructType): ColumnarBatch =
buildNonEmptyBatchFromTypes(
arrayOfBatches, GpuColumnVector.extractTypes(schema))

/**
* Build a single batch from the batches collected so far. If array is empty this will likely
* blow up.
* @param arrayOfBatches the batches to concat. This will be consumed and you do not need to
* close any of the batches after this is called.
* @param dataTypes the output types.
* @return a single batch with all of them concated together.
*/
def buildNonEmptyBatchFromTypes(arrayOfBatches: Array[ColumnarBatch],
dataTypes: Array[DataType]): ColumnarBatch = {
if (arrayOfBatches.length == 1) {
arrayOfBatches(0)
} else {
val tables = arrayOfBatches.map(GpuColumnVector.from)
try {
val combined = Table.concatenate(tables: _*)
try {
GpuColumnVector.from(combined, GpuColumnVector.extractTypes(schema))
GpuColumnVector.from(combined, dataTypes)
} finally {
combined.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@

package com.nvidia.spark.rapids

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{JCudfSerialization, NvtxColor, NvtxRange}
import com.nvidia.spark.rapids.shims.v2.{GpuHashPartitioning, GpuJoinUtils, ShimBinaryExecNode}

import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
Expand Down Expand Up @@ -129,22 +133,22 @@ case class GpuShuffledHashJoinExec(
val joinOutputRows = gpuLongMetric(JOIN_OUTPUT_ROWS)
val targetSize = RapidsConf.GPU_BATCH_SIZE_BYTES.get(conf)
val spillCallback = GpuMetric.makeSpillCallback(allMetrics)
val localBuildOutput: Seq[Attribute] = buildPlan.output

streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) {
(streamIter, buildIter) => {
val stIt = new CollectTimeIterator("shuffled join stream", streamIter, streamTime)
val startTime = System.nanoTime()

withResource(ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter,
localBuildOutput)) { builtBatch =>
val (builtBatch, maybeBufferedStreamIter) =
GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter(
targetSize,
buildPlan.output,
buildIter,
new CollectTimeIterator("shuffled join stream", streamIter, streamTime),
allMetrics(SEMAPHORE_WAIT_TIME),
buildTime)
withResource(builtBatch) { _ =>
// doJoin will increment the reference counts as needed for the builtBatch
val delta = System.nanoTime() - startTime
buildTime += delta
buildDataSize += GpuColumnVector.getTotalDeviceMemoryUsed(builtBatch)

doJoin(builtBatch, stIt, targetSize, spillCallback,
numOutputRows, joinOutputRows, numOutputBatches,
doJoin(builtBatch, maybeBufferedStreamIter,
targetSize, spillCallback, numOutputRows, joinOutputRows, numOutputBatches,
opTime, joinTime)
}
}
Expand All @@ -155,3 +159,169 @@ case class GpuShuffledHashJoinExec(
if (isSkewJoin) super.nodeName + "(skew=true)" else super.nodeName
}
}

object GpuShuffledHashJoinExec extends Arm {
/**
* Helper iterator that wraps a BufferedIterator of AutoCloseable subclasses.
* This iterator also implements AutoCloseable, so it can be closed in case
* of exceptions.
*
* @param wrapped the buffered iterator
* @tparam T an AutoCloseable subclass
*/
class CloseableBufferedIterator[T <: AutoCloseable](wrapped: BufferedIterator[T])
extends BufferedIterator[T] with AutoCloseable {
override def head: T = wrapped.head
override def headOption: Option[T] = wrapped.headOption
override def next: T = wrapped.next
override def hasNext: Boolean = wrapped.hasNext
override def close(): Unit = {
headOption.foreach(_.close())
}
}

/**
* Gets a `ColumnarBatch` and stream Iterator[ColumnarBatch] pair by acquiring
* the GPU semaphore optimally in the scenario where the build side is relatively
* small (less than batch goal).
*
* In the optimal case, this function will load the build side on the host up to the
* goal configuration and if it fits within the goal, allow the stream iterator
* to also pull to host its first batch. After the first batch is on the host, the
* stream iterator acquires the semaphore and then the build side is copied to the GPU.
*
* Prior to this we would get a build batch on the GPU, acquiring
* the semaphore in the process, and then begin pulling from the stream iterator,
* which could include IO (while holding onto the semaphore).
*
* The function handles the case where the build side goes above the configured batch
* goal, in which case it will concat on the host, grab the semaphore, and continue to
* pull the build iterator to build a bigger batch on the GPU. This is not optimized
* because we hold onto the semaphore during the entire time after realizing the goal
* has been hit.
*
* @param targetBatchSize configured target batch size
* @param buildOutput output attributes of the build plan
* @param buildIter build iterator
* @param streamIter stream iterator
* @param semWait wait GPU metric
* @param buildTime build GPU metric
* @return a pair of `ColumnarBatch` and streamed iterator that can be
* used for the join
*/
def getBuiltBatchAndStreamIter(targetBatchSize: Long,
buildOutput: Seq[Attribute],
buildIter: Iterator[ColumnarBatch],
streamIter: Iterator[ColumnarBatch],
semWait: GpuMetric,
buildTime: GpuMetric): (ColumnarBatch, Iterator[ColumnarBatch]) = {
var bufferedBuildIterator: CloseableBufferedIterator[ColumnarBatch] = null
closeOnExcept(bufferedBuildIterator) { _ =>
val startTime = System.nanoTime()
// find if the build side is non-empty, and if the first batch is
// a serialized batch. If neither condition is met, we fallback to the
// `getSingleBatchWithVerification` method.
val firstBatchIsSerialized = {
if (!buildIter.hasNext) {
false
} else {
bufferedBuildIterator = new CloseableBufferedIterator(buildIter.buffered)
val firstBatch = bufferedBuildIterator.head
if (firstBatch.numCols() != 1) {
false
} else {
firstBatch.column(0) match {
case _: SerializedTableColumn => true
case _ => false
}
}
}
}

if (!firstBatchIsSerialized || !streamIter.hasNext) {
// fallback if we failed to find serialized build batches, or if the
// stream iterator is empty
val builtBatch =
ConcatAndConsumeAll.getSingleBatchWithVerification(
Option(bufferedBuildIterator).getOrElse(buildIter), buildOutput)
val delta = System.nanoTime() - startTime
buildTime += delta
(builtBatch, streamIter)
} else {
val dataTypes = buildOutput.map(_.dataType).toArray
val bufferedStreamIter = new CloseableBufferedIterator(streamIter.buffered)
val hostBuffs = new ArrayBuffer[SerializedTableColumn]()
var buildBatch: ColumnarBatch = null
closeOnExcept(bufferedStreamIter) { _ =>
closeOnExcept(buildBatch) { _ =>
while (bufferedBuildIterator.hasNext) {
withResource(new NvtxRange("get build batches", NvtxColor.ORANGE)) { _ =>
var sizeInHost = 0L
while (bufferedBuildIterator.hasNext && sizeInHost <= targetBatchSize) {
val hostBatch = bufferedBuildIterator.next()
val serializedTableColumn =
hostBatch.column(0).asInstanceOf[SerializedTableColumn]
hostBuffs += serializedTableColumn
sizeInHost += serializedTableColumn.hostBuffer.getLength
}

// concat our host batches on the host
val hostConcat = withResource(hostBuffs) { _ =>
JCudfSerialization.concatToHostBuffer(
hostBuffs.map(_.header).toArray,
hostBuffs.map(_.hostBuffer).toArray)
}
hostBuffs.clear()

// Optimal case, we drained the build iterator and we didn't have a prior
// build batch (so it was a single batch, and is entirely on the host.
// We peek at the stream iterator with `hasNext` on the buffered
// iterator, which will grab the semaphore when putting the first stream
// batch on the GPU, and then after that we will bring the build batch
// to the GPU and return.
if (buildBatch == null && !bufferedBuildIterator.hasNext) {
buildTime += System.nanoTime() - startTime
require(bufferedStreamIter.hasNext,
"BufferedStreamIterator was empty, but the input stream iterator was not")
val buildBatchToDeviceTime = System.nanoTime()
withResource(hostConcat) { _ =>
withResource(new NvtxRange("build batch to GPU", NvtxColor.GREEN)) { _ =>
withResource(hostConcat.toContiguousTable) { contiguousTable =>
buildBatch = GpuColumnVectorFromBuffer.from(contiguousTable, dataTypes)
buildTime += System.nanoTime() - buildBatchToDeviceTime
}
}
}
} else {
// Non-optimal case, we reached our limit and we still have build-side batches
// that need to be coalesce (and this needs to occur while holding onto the
// semaphore)
GpuSemaphore.acquireIfNecessary(TaskContext.get(), semWait)
withResource(hostConcat) { _ =>
val buildBatchToDeviceTime = System.nanoTime()
val thisBuildBatch =
withResource(hostConcat.toContiguousTable) { contigTable =>
GpuColumnVectorFromBuffer.from(contigTable, dataTypes)
}

// concatenate what we have, and keep to build a bigger batch
buildBatch = if (buildBatch != null) {
// if we had a prior batch, we need to concatenate
ConcatAndConsumeAll.buildNonEmptyBatchFromTypes(
Seq(buildBatch, thisBuildBatch).toArray, dataTypes)
} else {
thisBuildBatch
}
buildTime += System.nanoTime() - buildBatchToDeviceTime
}
}
}
}
(buildBatch, bufferedStreamIter)
}
}
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,27 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
p.withNewChildren(p.children.map(optimizeCoalesce))
}

/**
* Removes `GpuCoalesceBatches(GpuShuffleCoalesceExec(build side))` for the build side
* for the shuffled hash join. The coalesce logic has been moved to the
* `GpuShuffleCoalesceExec` class, and is handled differently to prevent holding onto the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine for now, but I really would prefer to have us just build the plan the right way from the beginning. Perhaps we can have a flag in GpuExec that says for this input I know how to handle raw shuffle data so then the rules that insert the host side coalesce can deal with it there.

* GPU semaphore for stream IO.
*/
def optimizeShuffledHashJoin(plan: SparkPlan): SparkPlan = plan match {
case x@GpuShuffledHashJoinExec(
_,_,_, buildSide,_,
left: GpuShuffleCoalesceExec,
GpuCoalesceBatches(GpuShuffleCoalesceExec(rc, _), _),_) if buildSide == GpuBuildRight =>
x.withNewChildren(Seq(optimizeShuffledHashJoin(left), optimizeShuffledHashJoin(rc)))
case x@GpuShuffledHashJoinExec(
_,_,_, buildSide,_,
GpuCoalesceBatches(GpuShuffleCoalesceExec(lc, _), _),
right: GpuShuffleCoalesceExec, _) if buildSide == GpuBuildLeft =>
x.withNewChildren(Seq(optimizeShuffledHashJoin(lc), optimizeShuffledHashJoin(right)))
case p =>
p.withNewChildren(p.children.map(optimizeShuffledHashJoin))
}

private def insertCoalesce(plans: Seq[SparkPlan], goals: Seq[CoalesceGoal],
disableUntilInput: Boolean): Seq[SparkPlan] = {
plans.zip(goals).map {
Expand Down Expand Up @@ -550,6 +571,9 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
}
updatedPlan = fixupHostColumnarTransitions(updatedPlan)
updatedPlan = optimizeCoalesce(updatedPlan)
if (rapidsConf.optimizeShuffledHashJoin) {
updatedPlan = optimizeShuffledHashJoin(updatedPlan)
}
if (rapidsConf.exportColumnarRdd) {
updatedPlan = detectAndTagFinalColumnarOutput(updatedPlan)
}
Expand Down
10 changes: 10 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,15 @@ object RapidsConf {
.booleanConf
.createWithDefault(false)

val OPTIMIZE_SHUFFLED_HASH_JOIN = conf("spark.rapids.sql.optimizeShuffledHashJoin")
.doc("Enable or disable an optimization where build side batches are kept on the host " +
"while the first stream batch is loaded onto the GPU. The optimization increases " +
"off-heap host memory usage to avoid holding onto the GPU semaphore while waiting for " +
"stream side IO.")
.internal()
.booleanConf
.createWithDefault(true)

val STABLE_SORT = conf("spark.rapids.sql.stableSort.enabled")
.doc("Enable or disable stable sorting. Apache Spark's sorting is typically a stable " +
"sort, but sort stability cannot be guaranteed in distributed work loads because the " +
Expand Down Expand Up @@ -1473,6 +1482,7 @@ class RapidsConf(conf: Map[String, String]) extends Logging {
lazy val isUdfCompilerEnabled: Boolean = get(UDF_COMPILER_ENABLED)

lazy val exportColumnarRdd: Boolean = get(EXPORT_COLUMNAR_RDD)
lazy val optimizeShuffledHashJoin: Boolean = get(OPTIMIZE_SHUFFLED_HASH_JOIN)

lazy val stableSort: Boolean = get(STABLE_SORT)

Expand Down
Loading