Skip to content

Commit

Permalink
Rewriting the filter method.
Browse files Browse the repository at this point in the history
The old implementation was slow. This rewrite makes it a lot faster.

The new version is based on Yuan's implementation, which itself is based
on Cuda's implementation.

#14: Refactor GpuFilteredRDD and FilteredChunkItrator

Task-Url: http://github.com/meisam/spark/issues/issue/14
  • Loading branch information
meisam committed Mar 27, 2015
1 parent a82aef5 commit c9284cf
Showing 1 changed file with 32 additions and 42 deletions.
74 changes: 32 additions & 42 deletions core/src/main/scala/org/apache/spark/rdd/GpuPartition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -567,62 +567,52 @@ class GpuPartition[T <: Product : ClassTag](val columnTypes: Array[String], val
def filter[V: ClassTag : TypeTag](columnIndex: Int, value: V, operation: ComparisonOperation.Value):
Int = {

val prefixSumKernel = clCreateKernel(context.getOpenCLProgram, "prefix_sum_stage", null)
val start: Long = System.nanoTime
val tupleNum = this.size
gpuCol = createReadWriteBuffer[V](tupleNum)

var stride: Int = 0
val col = getColumn[V](columnIndex)
hostToDeviceCopy[V](pointer(col), gpuCol, tupleNum)

var switchedBuffers = true
while (stride <= columnData.length) {
clSetKernelArg(prefixSumKernel, if (switchedBuffers) 0 else 1, Sizeof.cl_mem, Pointer.to(prefixSumBuffer1))
clSetKernelArg(prefixSumKernel, if (switchedBuffers) 1 else 0, Sizeof.cl_mem, Pointer.to(prefixSumBuffer2))
clSetKernelArg(prefixSumKernel, 2, Sizeof.cl_int, Pointer.to(Array[Int](stride)))
clEnqueueNDRangeKernel(context.getOpenCLQueue, prefixSumKernel, 1, null,
global_work_size, local_work_size, 0, null, waitEvents(0))
switchedBuffers = !switchedBuffers
stride = if (stride == 0) 1 else stride << 1
clWaitForEvents(1, waitEvents)
}
val prefixSumBuffer = if (switchedBuffers) prefixSumBuffer1 else prefixSumBuffer2
gpuFilter = createReadWriteBuffer[Int](tupleNum)
gpuPsum = createReadWriteBuffer[Int](globalSize)
gpuCount = createReadWriteBuffer[Int](globalSize)

val endPrefixSumTime = System.nanoTime
val operationName = operation.toString

val startFetchSizeTime = System.nanoTime
val resultSize = Array(columnData.length)
val typeName = typeNameString[V]()

deviceToHostCopy(prefixSumBuffer, Pointer.to(resultSize), columnData.length, 0)
val endFetchSizeTime = System.nanoTime
val kernelName = "genScanFilter_init_%s_%s".format(typeName, operationName)

val d_destColumn = clCreateBuffer(context.getOpenCLContext, CL_MEM_READ_WRITE,
Sizeof.cl_int * resultSize.head, null, null)
var kernel = clCreateKernel(context.getOpenCLProgram, kernelName, null)

val startScanTime = System.nanoTime
val scanKernel = clCreateKernel(context.getOpenCLProgram, "scan", null)
clSetKernelArg(kernel, 0, Sizeof.cl_mem, Pointer.to(gpuCol))
clSetKernelArg(kernel, 1, Sizeof.cl_long, Pointer.to(Array[Long](tupleNum)))
clSetKernelArg(kernel, 2, baseSize[V], pointer(Array(value)))
clSetKernelArg(kernel, 3, Sizeof.cl_mem, Pointer.to(gpuFilter))
val global_work_size = Array[Long](globalSize)
val local_work_size = Array[Long](localSize)
clEnqueueNDRangeKernel(context.getOpenCLQueue, kernel, 1, null, global_work_size, local_work_size, 0, null, null)
kernel = clCreateKernel(context.getOpenCLProgram, "countScanNum", null)
clSetKernelArg(kernel, 0, Sizeof.cl_mem, Pointer.to(gpuFilter))
clSetKernelArg(kernel, 1, Sizeof.cl_long, Pointer.to(Array[Long](tupleNum)))
clSetKernelArg(kernel, 2, Sizeof.cl_mem, Pointer.to(gpuCount))
clEnqueueNDRangeKernel(context.getOpenCLQueue, kernel, 1, null, global_work_size, local_work_size, 0, null, null)

clSetKernelArg(scanKernel, 0, Sizeof.cl_mem, Pointer.to(columnBuffer))
clSetKernelArg(scanKernel, 1, Sizeof.cl_mem, Pointer.to(filterBuffer))
clSetKernelArg(scanKernel, 2, Sizeof.cl_mem, Pointer.to(prefixSumBuffer))
clSetKernelArg(scanKernel, 3, Sizeof.cl_mem, Pointer.to(d_destColumn))
clSetKernelArg(scanKernel, 4, Sizeof.cl_int, Pointer.to(Array[Int](resultSize.head)))
clEnqueueNDRangeKernel(context.getOpenCLQueue, scanKernel, 1, null, global_work_size,
local_work_size, 0, null, waitEvents(0))
clWaitForEvents(1, waitEvents)
val startPsum = System.nanoTime()
scanImpl(gpuCount, globalSize, gpuPsum)
val endPsum = System.nanoTime()

val endScanTime = System.nanoTime
val tmp1 = Array[Int](0)
val tmp2 = Array[Int](0)

deviceToHostCopy[Int](gpuCount, pointer(tmp1), 1)

deviceToHostCopy[Int](gpuPsum, pointer(tmp2), 1)

println("Times (%12s | %12s | %12s | %12s | %12s | %12s)".format(
"Transfer2GPU", "Filter", "PrefixSum", "FetchSize", "LastScan", "Transfer2Host"))
println("Times (%,12d | %,12d | %,12d | %,12d | %,12d | %,12d)".format(
-(startTransferTime - endTransferTime),
-(startFilterTime - endFilterTime),
-(startPrefixSumTime - endPrefixSumTime),
-(startFetchSizeTime - endFetchSizeTime),
-(startScanTime - endScanTime),
-(startCopyResultTime - endCopyResultTime)))
destColumn
resCount = tmp1(0) + tmp2(0)
val end: Long = System.nanoTime
resCount
}

def selection(columnData: Array[Int], value: Int, isBlocking: Boolean = true): Array[Int] = {
Expand Down

0 comments on commit c9284cf

Please sign in to comment.