Skip to content

Commit

Permalink
Deleting GpuPartition#prefixSum and its unit test cases
Browse files Browse the repository at this point in the history
#23: Delete unnecessary methods from GpuPartition

Task-Url: http://github.com/meisam/spark/issues/issue/23
  • Loading branch information
meisam committed Jan 15, 2015
1 parent fa98785 commit 0431ace
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 76 deletions.
32 changes: 0 additions & 32 deletions core/src/main/scala/org/apache/spark/rdd/GpuPartition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -400,38 +400,6 @@ class GpuPartition[T <: Product : ClassTag]
clReleaseMemObject(col)
}

def prefixSum(counts: Array[Int], prefixSums: Array[Int]): Unit = {

if (counts.length != prefixSums.length) {
throw new IllegalArgumentException("Input and output arrays should have the same size (%,12d != %,12d)".format(counts.length, prefixSums.length))
}

val globalSize = POW_2_S.filter(_ >= counts.length).head
val localSize = Math.min(globalSize, BLOCK_SIZE)
val global_work_size = Array[Long](globalSize)
val local_work_size = Array[Long](localSize)

// using double buffers to avoid copying data
val buffer1 = createReadWriteBuffer[Int](globalSize.toInt)
val buffer2 = createReadWriteBuffer[Int](globalSize.toInt)
hostToDeviceCopy[Int](Pointer.to(counts), buffer1, counts.length)
val kernel = clCreateKernel(context.getOpenCLProgram, "prefix_sum_stage", null)
var stride: Int = 0

var switchedBuffers = true

while (stride <= counts.length) {
clSetKernelArg(kernel, if (switchedBuffers) 0 else 1, Sizeof.cl_mem, Pointer.to(buffer1))
clSetKernelArg(kernel, if (switchedBuffers) 1 else 0, Sizeof.cl_mem, Pointer.to(buffer2))
clSetKernelArg(kernel, 2, Sizeof.cl_int, Pointer.to(Array[Int](stride)))
clEnqueueNDRangeKernel(context.getOpenCLQueue, kernel, 1, null, global_work_size, local_work_size, 0, null, null)
switchedBuffers = !switchedBuffers
stride = if (stride == 0) 1 else stride << 1
}
val results = if (switchedBuffers) buffer1 else buffer2
deviceToHostCopy[Int](results, Pointer.to(prefixSums), globalSize)
}

def getColumn[V: ClassTag](columnIndex: Int): Array[V] = {
val typeAwareColumnIndex = toTypeAwareColumnIndex(columnIndex)

Expand Down
44 changes: 0 additions & 44 deletions core/src/test/scala/org/apache/spark/gpu/GpuFilteredRDDSuit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -144,50 +144,6 @@ class GpuFilteredRDDSuit extends FunSuite with SharedSparkContext {
}
}

test("kernel.my prefixSum test") {
val TEST_DATA_SIZE = 3 + (1 << 14)

// the test sequence is (0,1,0,0,1,0,0,1,0,0,1,0,0,1,0,0,1,0,0,...)
// the prefix sum should be (0,1,1,1,2,2,2,3,3,3,4,4,4,5,5,5,6,6,6,...)
val testData = (0 until TEST_DATA_SIZE).map(_ % 3).map(_ % 2).zipWithIndex

val chunk = new GpuPartition[(Int, Int)](openCLContext, Array("INT", "INT"), DEFAULT_CAPACITY)
chunk.fill(testData.toIterator)

assert(chunk.intData(0) !== null)

val expectedResults = (0 until TEST_DATA_SIZE).map(x => (2 + x) / 3).toArray
val actualResults = new Array[Int](chunk.intData(0).length)

chunk.prefixSum(chunk.intData(0), actualResults)

assert(actualResults !== null)
assert(actualResults.length !== expectedResults.length)
expectedResults.zip(actualResults).zipWithIndex.foreach { case ((expected, actual), i) =>
assert(expected === actual, "The %sths expected %,12d <> %,12d actual".format(i, expected, actual))
}
}

test("kernel.my prefixSum small range test test") {
val count = 10
val testData = (0 until count).map(_ % 2).zipWithIndex

val chunk = new GpuPartition[(Int, Int)](openCLContext, Array("INT", "INT"), DEFAULT_CAPACITY)

chunk.fill(testData.toIterator)
assert(chunk.intData(0) !== null)

val expectedResults = Array(0, 1, 1, 2, 2, 3, 3, 4, 4, 5)
val actualResults = new Array[Int](chunk.intData(0).length)
chunk.prefixSum(chunk.intData(0), actualResults)

assert(actualResults !== null)
assert(actualResults.length !== expectedResults.length)
expectedResults.zip(actualResults).zipWithIndex.foreach { case ((expected, actual), i) =>
assert(expected === actual, "The %sths expected %,12d <> %,12d actual".format(i, expected, actual))
}
}

test("org.apache.spark.rdd.GpuRDD.filter test") {
// This crashes the OpenCL device
val testData: IndexedSeq[(Int, Int)] = (0 to 10).reverse.zipWithIndex
Expand Down

0 comments on commit 0431ace

Please sign in to comment.