-
Notifications
You must be signed in to change notification settings - Fork 242
/
Copy pathGpuPartitioning.scala
171 lines (155 loc) · 6.58 KB
/
GpuPartitioning.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids
import scala.collection.mutable.ArrayBuffer
import ai.rapids.cudf.{ContiguousTable, Cuda, NvtxColor, NvtxRange, Table}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.GpuShuffleEnv
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch
trait GpuPartitioning extends Partitioning with Arm {
private[this] val maxCompressionBatchSize =
new RapidsConf(SQLConf.get).shuffleCompressionMaxBatchMemory
def sliceBatch(vectors: Array[RapidsHostColumnVector], start: Int, end: Int): ColumnarBatch = {
var ret: ColumnarBatch = null
val count = end - start
if (count > 0) {
ret = new ColumnarBatch(vectors.map(vec => new SlicedGpuColumnVector(vec, start, end)))
ret.setNumRows(count)
}
ret
}
def sliceInternalOnGpu(numRows: Int, partitionIndexes: Array[Int],
partitionColumns: Array[GpuColumnVector]): Array[ColumnarBatch] = {
// The first index will always be 0, so we need to skip it.
val batches = if (numRows > 0) {
val dataTypes = partitionColumns.map(_.dataType())
val parts = partitionIndexes.slice(1, partitionIndexes.length)
closeOnExcept(new ArrayBuffer[ColumnarBatch](numPartitions)) { splits =>
val table = new Table(partitionColumns.map(_.getBase).toArray: _*)
val contiguousTables = withResource(table)(t => t.contiguousSplit(parts: _*))
GpuShuffleEnv.rapidsShuffleCodec match {
case Some(codec) =>
compressSplits(splits, codec, contiguousTables, dataTypes)
case None =>
withResource(contiguousTables) { cts =>
cts.foreach { ct => splits.append(GpuColumnVectorFromBuffer.from(ct, dataTypes)) }
}
}
splits.toArray
}
} else {
Array[ColumnarBatch]()
}
GpuSemaphore.releaseIfNecessary(TaskContext.get())
batches
}
def sliceInternalOnCpu(numRows: Int, partitionIndexes: Array[Int],
partitionColumns: Array[GpuColumnVector]): Array[ColumnarBatch] = {
// We need to make sure that we have a null count calculated ahead of time.
// This should be a temp work around.
partitionColumns.foreach(_.getBase.getNullCount)
val hostPartColumns = partitionColumns.map(_.copyToHost())
try {
// Leaving the GPU for a while
GpuSemaphore.releaseIfNecessary(TaskContext.get())
val ret = new Array[ColumnarBatch](numPartitions)
var start = 0
for (i <- 1 until Math.min(numPartitions, partitionIndexes.length)) {
val idx = partitionIndexes(i)
ret(i - 1) = sliceBatch(hostPartColumns, start, idx)
start = idx
}
ret(numPartitions - 1) = sliceBatch(hostPartColumns, start, numRows)
ret
} finally {
hostPartColumns.safeClose()
}
}
def sliceInternalGpuOrCpu(numRows: Int, partitionIndexes: Array[Int],
partitionColumns: Array[GpuColumnVector]): Array[ColumnarBatch] = {
val rapidsShuffleEnabled = GpuShuffleEnv.isRapidsShuffleEnabled
val nvtxRangeKey = if (rapidsShuffleEnabled) {
"sliceInternalOnGpu"
} else {
"sliceInternalOnCpu"
}
// If we are not using the Rapids shuffle we fall back to CPU splits way to avoid the hit
// for large number of small splits.
val sliceRange = new NvtxRange(nvtxRangeKey, NvtxColor.CYAN)
try {
if (rapidsShuffleEnabled) {
sliceInternalOnGpu(numRows, partitionIndexes, partitionColumns)
} else {
sliceInternalOnCpu(numRows, partitionIndexes, partitionColumns)
}
} finally {
sliceRange.close()
}
}
/**
* Compress contiguous tables representing the splits into compressed columnar batches.
* Contiguous tables corresponding to splits with no data will not be compressed.
* @param outputBatches where to collect the corresponding columnar batches for the splits
* @param codec compression codec to use
* @param contiguousTables contiguous tables to compress
*/
def compressSplits(
outputBatches: ArrayBuffer[ColumnarBatch],
codec: TableCompressionCodec,
contiguousTables: Array[ContiguousTable],
dataTypes: Array[DataType]): Unit = {
withResource(codec.createBatchCompressor(maxCompressionBatchSize,
Cuda.DEFAULT_STREAM)) { compressor =>
// tracks batches with no data and the corresponding output index for the batch
val emptyBatches = new ArrayBuffer[(ColumnarBatch, Int)]
// add each table either to the batch to be compressed or to the empty batch tracker
contiguousTables.zipWithIndex.foreach { case (ct, i) =>
if (ct.getTable.getRowCount == 0) {
withResource(ct) { _ =>
emptyBatches.append((GpuColumnVector.from(ct.getTable, dataTypes), i))
}
} else {
compressor.addTableToCompress(ct)
}
}
withResource(compressor.finish()) { compressedTables =>
var compressedTableIndex = 0
var outputIndex = 0
emptyBatches.foreach { case (emptyBatch, emptyOutputIndex) =>
require(emptyOutputIndex >= outputIndex)
// add any compressed batches that need to appear before the next empty batch
val numCompressedToAdd = emptyOutputIndex - outputIndex
(0 until numCompressedToAdd).foreach { _ =>
val compressedTable = compressedTables(compressedTableIndex)
outputBatches.append(GpuCompressedColumnVector.from(compressedTable, dataTypes))
compressedTableIndex += 1
}
outputBatches.append(emptyBatch)
outputIndex = emptyOutputIndex + 1
}
// add any compressed batches that remain after the last empty batch
(compressedTableIndex until compressedTables.length).foreach { i =>
val ct = compressedTables(i)
outputBatches.append(GpuCompressedColumnVector.from(ct, dataTypes))
}
}
}
}
}