Skip to content

Commit

Permalink
#15: Find the optimal size for global and local number of threads
Browse files Browse the repository at this point in the history
  • Loading branch information
meisam committed Jan 15, 2015
1 parent 879a498 commit 7639006
Showing 1 changed file with 6 additions and 15 deletions.
21 changes: 6 additions & 15 deletions core/src/main/scala/org/apache/spark/rdd/GpuPartition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class GpuPartition[T <: Product : ClassTag](val columnTypes: Array[String], val
}
}
}
inferBestWorkGroupSize
}

def getStringData(typeAwareColumnIndex: Int, rowIndex: Int): String = {
Expand Down Expand Up @@ -219,10 +220,8 @@ class GpuPartition[T <: Product : ClassTag](val columnTypes: Array[String], val
clSetKernelArg(kernel, 7, Sizeof.cl_int, Pointer.to(Array[Int](tmp)))
clSetKernelArg(kernel, 8, Sizeof.cl_int, Pointer.to(Array[Int](same)))
clSetKernelArg(kernel, 9, sharedMemSize, null)
var global_work_size = Array[Long](1)
global_work_size(0) = globalSize
var local_work_size = Array[Long](1)
local_work_size(0) = localSize
var global_work_size = Array[Long](globalSize)
var local_work_size = Array[Long](localSize)
clEnqueueNDRangeKernel(context.queue, kernel, 1, null, global_work_size, local_work_size, 0, null, null)
if (np2LastBlock != 0) {
kernel = clCreateKernel(context.program, "prescan", null)
Expand Down Expand Up @@ -370,8 +369,7 @@ class GpuPartition[T <: Product : ClassTag](val columnTypes: Array[String], val
}
}

def compute[T: ClassTag : TypeTag](col: Array[T], tupleNum: Long, value: T, comp: Int, globalSize: Long,
localSize: Long): Int = {
def compute[T: ClassTag : TypeTag](col: Array[T], tupleNum: Long, value: T, comp: Int): Int = {
if (context == null) {
context = new OpenCLContext
context.initOpenCL("/org/apache/spark/gpu/kernel.cl")
Expand Down Expand Up @@ -413,7 +411,7 @@ class GpuPartition[T <: Product : ClassTag](val columnTypes: Array[String], val
clEnqueueNDRangeKernel(context.getOpenCLQueue, kernel, 1, null, global_work_size, local_work_size, 0, null, null)

val startPsum = System.nanoTime()
scanImpl(gpuCount, globalSize.asInstanceOf[Int], gpuPsum)
scanImpl(gpuCount, globalSize, gpuPsum)
val endPsum = System.nanoTime()

val tmp1 = Array[Int](0)
Expand Down Expand Up @@ -626,21 +624,14 @@ class GpuPartition[T <: Product : ClassTag](val columnTypes: Array[String], val
val waitEvents = Array(new cl_event)

val startInitTime = System.nanoTime
val globalSize = POW_2_S.filter(_ >= columnData.length).head

val localSize = Math.min(globalSize, 256)
val global_work_size = Array[Long](globalSize)

val local_work_size = Array[Long](localSize)
if (context == null) {
context = new OpenCLContext
context.initOpenCL("/org/apache/spark/gpu/kernel.cl")
}

val endInitTime = System.nanoTime

val startTransferTime = System.nanoTime
val columnBuffer = createReadBuffer[Int](globalSize.toInt)
val columnBuffer = createReadBuffer[Int](globalSize)
hostToDeviceCopy[Int](Pointer.to(columnData), columnBuffer, columnData.length)

val endTransferTime = System.nanoTime
Expand Down

0 comments on commit 7639006

Please sign in to comment.