Skip to content

Commit

Permalink
Refectoring other classes that use GpuPartition
Browse files Browse the repository at this point in the history
#17: Initialize the OpenCL context properly in GpuPartition

Task-Url: http://github.com/meisam/spark/issues/issue/17
  • Loading branch information
meisam committed Mar 27, 2015
1 parent faaf7d7 commit ca27433
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package org.apache.spark.rdd

import org.apache.spark.scheduler.OpenCLContext

import scala.reflect.ClassTag

class GpuFilteredPartition[T <: Product : ClassTag]
(columnTypes: Array[String], colIndex: Int, operation: Int, value: Int,
capacity: Int) extends GpuPartition[T](columnTypes, capacity) {
(context: OpenCLContext, columnTypes: Array[String], colIndex: Int, operation: Int, value: Int,
capacity: Int) extends GpuPartition[T](context, columnTypes, capacity) {
override def fill(iter: Iterator[T]): Unit = {
val startTransformDataTime = System.nanoTime
val endTransformDataTime = System.nanoTime
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.apache.spark.rdd

import org.apache.spark.scheduler.OpenCLContext

import scala.reflect.ClassTag

class GpuPartitionIterator[T <: Product : ClassTag]
Expand All @@ -12,7 +14,11 @@ class GpuPartitionIterator[T <: Product : ClassTag]

protected var currentPosition: Int = -1

protected val currentChunk: GpuPartition[T] = new GpuPartition[T](columnTypes, chunkCapacity)
protected val context: OpenCLContext = new OpenCLContext

context.initOpenCL("/org/apache/spark/gpu/kernel.cl")

protected val currentChunk: GpuPartition[T] = new GpuPartition[T](context, columnTypes, chunkCapacity)

override def next(): T = {
guaranteeFill
Expand Down
35 changes: 14 additions & 21 deletions core/src/test/scala/org/apache/spark/gpu/GpuFilteredRDDSuit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.gpu

import org.apache.spark.SharedSparkContext
import org.apache.spark.rdd.{GpuFilteredPartitionIterator, GpuFilteredRDD, GpuPartition}
import org.apache.spark.rdd.{ComparisonOperation, GpuPartition}
import org.apache.spark.scheduler.OpenCLContext
import org.jocl.CL._
import org.jocl.{Pointer, Sizeof}
Expand Down Expand Up @@ -151,8 +151,7 @@ class GpuFilteredRDDSuit extends FunSuite with SharedSparkContext {
// the prefix sum should be (0,1,1,1,2,2,2,3,3,3,4,4,4,5,5,5,6,6,6,...)
val testData = (0 until TEST_DATA_SIZE).map(_ % 3).map(_ % 2).zipWithIndex

val chunk = new GpuPartition[(Int, Int)](Array("INT", "INT"), DEFAULT_CAPACITY)
chunk.context = openCLContext
val chunk = new GpuPartition[(Int, Int)](openCLContext, Array("INT", "INT"), DEFAULT_CAPACITY)
chunk.fill(testData.toIterator)

assert(chunk.intData(0) !== null)
Expand All @@ -173,9 +172,8 @@ class GpuFilteredRDDSuit extends FunSuite with SharedSparkContext {
val count = 10
val testData = (0 until count).map(_ % 2).zipWithIndex

val chunk = new GpuPartition[(Int, Int)](Array("INT", "INT"), DEFAULT_CAPACITY)
val chunk = new GpuPartition[(Int, Int)](openCLContext, Array("INT", "INT"), DEFAULT_CAPACITY)

chunk.context = openCLContext
chunk.fill(testData.toIterator)
assert(chunk.intData(0) !== null)

Expand All @@ -201,8 +199,7 @@ class GpuFilteredRDDSuit extends FunSuite with SharedSparkContext {
val resultSize = prefixSums(prefixSums.length - 1)
val actualResults = Array.ofDim[Int](resultSize)

val chunk = new GpuPartition[(Int, Int)](Array("INT", "INT"), DEFAULT_CAPACITY)
chunk.context = openCLContext
val chunk = new GpuPartition[(Int, Int)](openCLContext,Array("INT", "INT"), DEFAULT_CAPACITY)
chunk.fill(sourceCol.zipWithIndex.iterator)

assert(chunk.intData(0) !== null)
Expand All @@ -222,8 +219,7 @@ class GpuFilteredRDDSuit extends FunSuite with SharedSparkContext {
// This crashes the OpenCL device
val testData: IndexedSeq[(Int, Int)] = (0 to 10).reverse.zipWithIndex

val chunk = new GpuPartition[(Int, Int)](Array("INT", "INT"), DEFAULT_CAPACITY)
chunk.context = openCLContext
val chunk = new GpuPartition[(Int, Int)](openCLContext, Array("INT", "INT"), DEFAULT_CAPACITY)
chunk.fill(testData.toIterator)
chunk.filter(1, 1, ComparisonOperation.<)

Expand All @@ -238,18 +234,15 @@ class GpuFilteredRDDSuit extends FunSuite with SharedSparkContext {
)
}

test("GpuFilterdRDD(Int, Int) test") {
val testData: IndexedSeq[(Int, Int)] = (0 to 10).zipWithIndex

val rdd = sc.parallelize(testData)
val gpuRdd = rdd.toGpuFilterRDD(Array("INT", "INT"), 0, 0, 1)
val collectedChunks: Array[GpuPartition[Product]] = gpuRdd.collect()
assert(collectedChunks.length === 1)
val chunk = collectedChunks(0)
assert(chunk.size === 1)
assert(chunk.intData(0)(0) === 1, "values do not match")
assert(chunk.intData(1)(1) === 1, "values do not match")
}
val rdd = sc.parallelize(testData)
val gpuRdd = rdd.toGpuFilterRDD(Array("INT", "INT"), 0, 0, 1)
val collectedChunks: Array[GpuPartition[Product]] = gpuRdd.collect()
assert(collectedChunks.length === 1)
val chunk = collectedChunks(0)
assert(chunk.size === 1)
assert(chunk.intData(0)(0) === 1, "values do not match")
assert(chunk.intData(1)(1) === 1, "values do not match")
}

test("compute") {
val testData: IndexedSeq[(Int, Int)] = (0 to 10).zipWithIndex
Expand Down
21 changes: 15 additions & 6 deletions core/src/test/scala/org/apache/spark/gpu/GpuPartitionSuit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.gpu

import org.apache.spark.SharedSparkContext
import org.apache.spark.rdd.GpuPartition
import org.apache.spark.scheduler.OpenCLContext
import org.scalatest.FunSuite

import scala.collection.immutable.IndexedSeq
Expand All @@ -30,10 +31,18 @@ import scala.language.existentials
class GpuPartitionSuit extends FunSuite with SharedSparkContext {

val DEFAULT_CAPACITY = (1 << 10)
val openCLContext = new OpenCLContext

override def beforeAll() {
super.beforeAll()
// setLogLevel(LogLevel.LOG_TRACE)
openCLContext.initOpenCL("/org/apache/spark/gpu/kernel.cl")
}


test("org.apache.spark.rdd.GpuPartition.initArray test") {
val x = new GpuPartition[(Int, String, Float, Double, String)](Array("INT", "STRING", "FLOAT",
"DOUBLE", "STRING"), DEFAULT_CAPACITY)
val x = new GpuPartition[(Int, String, Float, Double, String)](openCLContext,
Array("INT", "STRING", "FLOAT", "DOUBLE", "STRING"), DEFAULT_CAPACITY)
assert(x.intData.length === 1)
assert(x.longData.length === 0)
assert(x.floatData.length === 1)
Expand All @@ -44,7 +53,7 @@ class GpuPartitionSuit extends FunSuite with SharedSparkContext {
test("org.apache.spark.rdd.GpuPartition.fill test") {
val testData = (0 to 10).reverse.zipWithIndex.toIterator

val chunk = new GpuPartition[(Int, Int)](Array("INT", "INT"), DEFAULT_CAPACITY)
val chunk = new GpuPartition[(Int, Int)](openCLContext, Array("INT", "INT"), DEFAULT_CAPACITY)
chunk.fill(testData)
(0 until chunk.capacity).foreach(i =>
if (i <= 10) {
Expand All @@ -61,8 +70,8 @@ class GpuPartitionSuit extends FunSuite with SharedSparkContext {
val testData = (0 to 10).map(x => (x, "STR_I_%d".format(x), 1.5f + x, 2.5d + x, "STR_II_%d".format(x), x - 1, "STR_III_%d".format(x)))

val rdd = sc.parallelize(testData)
val rddChunk = new GpuPartition(Array("INT", "STRING", "FLOAT", "DOUBLE", "STRING", "INT",
"STRING"), DEFAULT_CAPACITY)
val rddChunk = new GpuPartition(openCLContext,
Array("INT", "STRING", "FLOAT", "DOUBLE", "STRING", "INT", "STRING"), DEFAULT_CAPACITY)
assert(rddChunk.toTypeAwareColumnIndex(0) === 0)
assert(rddChunk.toTypeAwareColumnIndex(1) === 0)
assert(rddChunk.toTypeAwareColumnIndex(2) === 0)
Expand All @@ -76,7 +85,7 @@ class GpuPartitionSuit extends FunSuite with SharedSparkContext {
val testData: IndexedSeq[(String, String)] = (0 to 10).reverse.zipWithIndex.map(
x => ("STR_I_%d".format(x._1), "STR_II_%d".format(x._2)))

val chunk = new GpuPartition[(String, String)](Array("STRING", "STRING"),
val chunk = new GpuPartition[(String, String)](openCLContext, Array("STRING", "STRING"),
DEFAULT_CAPACITY)
chunk.fill(testData.toIterator)
(0 until chunk.capacity).foreach(i =>
Expand Down

0 comments on commit ca27433

Please sign in to comment.