diff --git a/core/src/main/scala/org/apache/spark/rdd/GpuPartition.scala b/core/src/main/scala/org/apache/spark/rdd/GpuPartition.scala index b6c335650bb2d..c8eab93cb9749 100644 --- a/core/src/main/scala/org/apache/spark/rdd/GpuPartition.scala +++ b/core/src/main/scala/org/apache/spark/rdd/GpuPartition.scala @@ -567,62 +567,52 @@ class GpuPartition[T <: Product : ClassTag](val columnTypes: Array[String], val def filter[V: ClassTag : TypeTag](columnIndex: Int, value: V, operation: ComparisonOperation.Value): Int = { - val prefixSumKernel = clCreateKernel(context.getOpenCLProgram, "prefix_sum_stage", null) + val start: Long = System.nanoTime + val tupleNum = this.size + gpuCol = createReadWriteBuffer[V](tupleNum) - var stride: Int = 0 + val col = getColumn[V](columnIndex) + hostToDeviceCopy[V](pointer(col), gpuCol, tupleNum) - var switchedBuffers = true - while (stride <= columnData.length) { - clSetKernelArg(prefixSumKernel, if (switchedBuffers) 0 else 1, Sizeof.cl_mem, Pointer.to(prefixSumBuffer1)) - clSetKernelArg(prefixSumKernel, if (switchedBuffers) 1 else 0, Sizeof.cl_mem, Pointer.to(prefixSumBuffer2)) - clSetKernelArg(prefixSumKernel, 2, Sizeof.cl_int, Pointer.to(Array[Int](stride))) - clEnqueueNDRangeKernel(context.getOpenCLQueue, prefixSumKernel, 1, null, - global_work_size, local_work_size, 0, null, waitEvents(0)) - switchedBuffers = !switchedBuffers - stride = if (stride == 0) 1 else stride << 1 - clWaitForEvents(1, waitEvents) - } - val prefixSumBuffer = if (switchedBuffers) prefixSumBuffer1 else prefixSumBuffer2 + gpuFilter = createReadWriteBuffer[Int](tupleNum) + gpuPsum = createReadWriteBuffer[Int](globalSize) + gpuCount = createReadWriteBuffer[Int](globalSize) - val endPrefixSumTime = System.nanoTime + val operationName = operation.toString - val startFetchSizeTime = System.nanoTime - val resultSize = Array(columnData.length) + val typeName = typeNameString[V]() - deviceToHostCopy(prefixSumBuffer, Pointer.to(resultSize), columnData.length, 0) - val endFetchSizeTime = System.nanoTime + val kernelName = "genScanFilter_init_%s_%s".format(typeName, operationName) - val d_destColumn = clCreateBuffer(context.getOpenCLContext, CL_MEM_READ_WRITE, - Sizeof.cl_int * resultSize.head, null, null) + var kernel = clCreateKernel(context.getOpenCLProgram, kernelName, null) - val startScanTime = System.nanoTime - val scanKernel = clCreateKernel(context.getOpenCLProgram, "scan", null) + clSetKernelArg(kernel, 0, Sizeof.cl_mem, Pointer.to(gpuCol)) + clSetKernelArg(kernel, 1, Sizeof.cl_long, Pointer.to(Array[Long](tupleNum))) + clSetKernelArg(kernel, 2, baseSize[V], pointer(Array(value))) + clSetKernelArg(kernel, 3, Sizeof.cl_mem, Pointer.to(gpuFilter)) + val global_work_size = Array[Long](globalSize) + val local_work_size = Array[Long](localSize) + clEnqueueNDRangeKernel(context.getOpenCLQueue, kernel, 1, null, global_work_size, local_work_size, 0, null, null) + kernel = clCreateKernel(context.getOpenCLProgram, "countScanNum", null) + clSetKernelArg(kernel, 0, Sizeof.cl_mem, Pointer.to(gpuFilter)) + clSetKernelArg(kernel, 1, Sizeof.cl_long, Pointer.to(Array[Long](tupleNum))) + clSetKernelArg(kernel, 2, Sizeof.cl_mem, Pointer.to(gpuCount)) + clEnqueueNDRangeKernel(context.getOpenCLQueue, kernel, 1, null, global_work_size, local_work_size, 0, null, null) - clSetKernelArg(scanKernel, 0, Sizeof.cl_mem, Pointer.to(columnBuffer)) - clSetKernelArg(scanKernel, 1, Sizeof.cl_mem, Pointer.to(filterBuffer)) - clSetKernelArg(scanKernel, 2, Sizeof.cl_mem, Pointer.to(prefixSumBuffer)) - clSetKernelArg(scanKernel, 3, Sizeof.cl_mem, Pointer.to(d_destColumn)) - clSetKernelArg(scanKernel, 4, Sizeof.cl_int, Pointer.to(Array[Int](resultSize.head))) - clEnqueueNDRangeKernel(context.getOpenCLQueue, scanKernel, 1, null, global_work_size, - local_work_size, 0, null, waitEvents(0)) - clWaitForEvents(1, waitEvents) + val startPsum = System.nanoTime() + scanImpl(gpuCount, globalSize, gpuPsum) + val endPsum = System.nanoTime() - val endScanTime = System.nanoTime + val tmp1 = Array[Int](0) + val tmp2 = Array[Int](0) deviceToHostCopy[Int](gpuCount, pointer(tmp1), 1) deviceToHostCopy[Int](gpuPsum, pointer(tmp2), 1) - println("Times (%12s | %12s | %12s | %12s | %12s | %12s)".format( - "Transfer2GPU", "Filter", "PrefixSum", "FetchSize", "LastScan", "Transfer2Host")) - println("Times (%,12d | %,12d | %,12d | %,12d | %,12d | %,12d)".format( - -(startTransferTime - endTransferTime), - -(startFilterTime - endFilterTime), - -(startPrefixSumTime - endPrefixSumTime), - -(startFetchSizeTime - endFetchSizeTime), - -(startScanTime - endScanTime), - -(startCopyResultTime - endCopyResultTime))) - destColumn + resCount = tmp1(0) + tmp2(0) + val end: Long = System.nanoTime + resCount } def selection(columnData: Array[Int], value: Int, isBlocking: Boolean = true): Array[Int] = {