Skip to content

Commit

Permalink
Deleting GpuPartition#selection
Browse files Browse the repository at this point in the history
The GpuPartition@filter method replaces GpuPartition#selection

#23: Delete unnecessary methods from GpuPartition

Task-Url: http://github.com/meisam/spark/issues/issue/23
  • Loading branch information
meisam committed Jan 15, 2015
1 parent 8522d56 commit e9e1e08
Showing 1 changed file with 0 additions and 104 deletions.
104 changes: 0 additions & 104 deletions core/src/main/scala/org/apache/spark/rdd/GpuPartition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -618,110 +618,6 @@ class GpuPartition[T <: Product : ClassTag]
resCount
}

def selection(columnData: Array[Int], value: Int, isBlocking: Boolean = true): Array[Int] = {

val waitEvents = Array(new cl_event)

val startInitTime = System.nanoTime

val global_work_size = Array[Long](globalSize)
val local_work_size = Array[Long](localSize)

val endInitTime = System.nanoTime

val startTransferTime = System.nanoTime
val columnBuffer = createReadBuffer[Int](globalSize)
hostToDeviceCopy[Int](Pointer.to(columnData), columnBuffer, columnData.length)

val endTransferTime = System.nanoTime

val startFilterTime = System.nanoTime
val filterBuffer = clCreateBuffer(context.getOpenCLContext, CL_MEM_READ_WRITE, Sizeof.cl_int * globalSize, null, null)

val filterKernel = clCreateKernel(context.getOpenCLProgram, "genScanFilter_init_int_eq", null)
clSetKernelArg(filterKernel, 0, Sizeof.cl_mem, Pointer.to(columnBuffer))
clSetKernelArg(filterKernel, 1, Sizeof.cl_long, Pointer.to(Array[Long](columnData.length.toLong)))
clSetKernelArg(filterKernel, 2, Sizeof.cl_int, Pointer.to(Array[Int](value)))
clSetKernelArg(filterKernel, 3, Sizeof.cl_mem, Pointer.to(filterBuffer))
clEnqueueNDRangeKernel(context.getOpenCLQueue, filterKernel, 1, null, global_work_size,
local_work_size, 0, null, waitEvents(0))
clWaitForEvents(1, waitEvents)

val endFilterTime = System.nanoTime
val startPrefixSumTime = System.nanoTime
// using double buffers to avoid copying data
val prefixSumBuffer1 = createReadWriteBuffer[Int](globalSize.toInt)

val prefixSumBuffer2 = createReadWriteBuffer[Int](globalSize.toInt)
val copyKernel = clCreateKernel(context.getOpenCLProgram, "copy_buffer", null)
clSetKernelArg(copyKernel, 0, Sizeof.cl_mem, Pointer.to(filterBuffer))
clSetKernelArg(copyKernel, 1, Sizeof.cl_mem, Pointer.to(prefixSumBuffer1))
clEnqueueNDRangeKernel(context.getOpenCLQueue, copyKernel, 1, null, global_work_size,
local_work_size, 0, null, waitEvents(0))
clWaitForEvents(1, waitEvents)

val prefixSumKernel = clCreateKernel(context.getOpenCLProgram, "prefix_sum_stage", null)

var stride: Int = 0

var switchedBuffers = true
while (stride <= columnData.length) {
clSetKernelArg(prefixSumKernel, if (switchedBuffers) 0 else 1, Sizeof.cl_mem, Pointer.to(prefixSumBuffer1))
clSetKernelArg(prefixSumKernel, if (switchedBuffers) 1 else 0, Sizeof.cl_mem, Pointer.to(prefixSumBuffer2))
clSetKernelArg(prefixSumKernel, 2, Sizeof.cl_int, Pointer.to(Array[Int](stride)))
clEnqueueNDRangeKernel(context.getOpenCLQueue, prefixSumKernel, 1, null,
global_work_size, local_work_size, 0, null, waitEvents(0))
switchedBuffers = !switchedBuffers
stride = if (stride == 0) 1 else stride << 1
clWaitForEvents(1, waitEvents)
}
val prefixSumBuffer = if (switchedBuffers) prefixSumBuffer1 else prefixSumBuffer2

val endPrefixSumTime = System.nanoTime

val startFetchSizeTime = System.nanoTime
val resultSize = Array(columnData.length)

deviceToHostCopy[Int](prefixSumBuffer, Pointer.to(resultSize), columnData.length, 0)
val endFetchSizeTime = System.nanoTime

val d_destColumn = clCreateBuffer(context.getOpenCLContext, CL_MEM_READ_WRITE,
Sizeof.cl_int * resultSize.head, null, null)

val startScanTime = System.nanoTime
val scanKernel = clCreateKernel(context.getOpenCLProgram, "scan", null)

clSetKernelArg(scanKernel, 0, Sizeof.cl_mem, Pointer.to(columnBuffer))
clSetKernelArg(scanKernel, 1, Sizeof.cl_mem, Pointer.to(filterBuffer))
clSetKernelArg(scanKernel, 2, Sizeof.cl_mem, Pointer.to(prefixSumBuffer))
clSetKernelArg(scanKernel, 3, Sizeof.cl_mem, Pointer.to(d_destColumn))
clSetKernelArg(scanKernel, 4, Sizeof.cl_int, Pointer.to(Array[Int](resultSize.head)))
clEnqueueNDRangeKernel(context.getOpenCLQueue, scanKernel, 1, null, global_work_size,
local_work_size, 0, null, waitEvents(0))
clWaitForEvents(1, waitEvents)

val endScanTime = System.nanoTime

val startCopyResultTime = System.nanoTime
val destColumn = new Array[Int](resultSize.head)
deviceToHostCopy[Int](d_destColumn, Pointer.to(destColumn), resultSize.head)

val endCopyResultTime = System.nanoTime

println("Times (%12s | %12s | %12s | %12s | %12s | %12s)".format(
"Transfer2GPU", "Filter", "PrefixSum", "FetchSize", "LastScan", "Transfer2Host"))
println("Times (%,12d | %,12d | %,12d | %,12d | %,12d | %,12d)".format(
-(startTransferTime - endTransferTime),
-(startFilterTime - endFilterTime),
-(startPrefixSumTime - endPrefixSumTime),
-(startFetchSizeTime - endFetchSizeTime),
-(startScanTime - endScanTime),
-(startCopyResultTime - endCopyResultTime)))
destColumn
}

def nonBlockingSelection(columnData: Array[Int], value: Int) = selection(columnData, value, false)

private def createReadBuffer[V: ClassTag](elementCount: Int): cl_mem = {
val size = elementCount * baseSize[V]
clCreateBuffer(context.getOpenCLContext, CL_MEM_READ_ONLY, size, null, null)
Expand Down

0 comments on commit e9e1e08

Please sign in to comment.