From ca274331977b8d9b6e8ad4f1fa386e686d80511c Mon Sep 17 00:00:00 2001 From: Meisam Fathi Salmi Date: Wed, 29 Oct 2014 16:37:22 -0400 Subject: [PATCH] Refectoring other classes that use GpuPartition #17: Initialize the OpenCL context properly in GpuPartition Task-Url: http://github.com/meisam/spark/issues/issue/17 --- .../spark/rdd/GpuFilteredPartition.scala | 6 ++-- .../spark/rdd/GpuPartitionIterator.scala | 8 ++++- .../apache/spark/gpu/GpuFilteredRDDSuit.scala | 35 ++++++++----------- .../apache/spark/gpu/GpuPartitionSuit.scala | 21 +++++++---- 4 files changed, 40 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/GpuFilteredPartition.scala b/core/src/main/scala/org/apache/spark/rdd/GpuFilteredPartition.scala index 657a42ea7d4aa..3d0db1755373a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/GpuFilteredPartition.scala +++ b/core/src/main/scala/org/apache/spark/rdd/GpuFilteredPartition.scala @@ -1,10 +1,12 @@ package org.apache.spark.rdd +import org.apache.spark.scheduler.OpenCLContext + import scala.reflect.ClassTag class GpuFilteredPartition[T <: Product : ClassTag] -(columnTypes: Array[String], colIndex: Int, operation: Int, value: Int, - capacity: Int) extends GpuPartition[T](columnTypes, capacity) { +(context: OpenCLContext, columnTypes: Array[String], colIndex: Int, operation: Int, value: Int, + capacity: Int) extends GpuPartition[T](context, columnTypes, capacity) { override def fill(iter: Iterator[T]): Unit = { val startTransformDataTime = System.nanoTime val endTransformDataTime = System.nanoTime diff --git a/core/src/main/scala/org/apache/spark/rdd/GpuPartitionIterator.scala b/core/src/main/scala/org/apache/spark/rdd/GpuPartitionIterator.scala index 9ecba23289beb..035df58f8b4f5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/GpuPartitionIterator.scala +++ b/core/src/main/scala/org/apache/spark/rdd/GpuPartitionIterator.scala @@ -1,5 +1,7 @@ package org.apache.spark.rdd +import org.apache.spark.scheduler.OpenCLContext + import scala.reflect.ClassTag class GpuPartitionIterator[T <: Product : ClassTag] @@ -12,7 +14,11 @@ class GpuPartitionIterator[T <: Product : ClassTag] protected var currentPosition: Int = -1 - protected val currentChunk: GpuPartition[T] = new GpuPartition[T](columnTypes, chunkCapacity) + protected val context: OpenCLContext = new OpenCLContext + + context.initOpenCL("/org/apache/spark/gpu/kernel.cl") + + protected val currentChunk: GpuPartition[T] = new GpuPartition[T](context, columnTypes, chunkCapacity) override def next(): T = { guaranteeFill diff --git a/core/src/test/scala/org/apache/spark/gpu/GpuFilteredRDDSuit.scala b/core/src/test/scala/org/apache/spark/gpu/GpuFilteredRDDSuit.scala index 728a25e4e1e40..f4a21b6e263b1 100644 --- a/core/src/test/scala/org/apache/spark/gpu/GpuFilteredRDDSuit.scala +++ b/core/src/test/scala/org/apache/spark/gpu/GpuFilteredRDDSuit.scala @@ -18,7 +18,7 @@ package org.apache.spark.gpu import org.apache.spark.SharedSparkContext -import org.apache.spark.rdd.{GpuFilteredPartitionIterator, GpuFilteredRDD, GpuPartition} +import org.apache.spark.rdd.{ComparisonOperation, GpuPartition} import org.apache.spark.scheduler.OpenCLContext import org.jocl.CL._ import org.jocl.{Pointer, Sizeof} @@ -151,8 +151,7 @@ class GpuFilteredRDDSuit extends FunSuite with SharedSparkContext { // the prefix sum should be (0,1,1,1,2,2,2,3,3,3,4,4,4,5,5,5,6,6,6,...) val testData = (0 until TEST_DATA_SIZE).map(_ % 3).map(_ % 2).zipWithIndex - val chunk = new GpuPartition[(Int, Int)](Array("INT", "INT"), DEFAULT_CAPACITY) - chunk.context = openCLContext + val chunk = new GpuPartition[(Int, Int)](openCLContext, Array("INT", "INT"), DEFAULT_CAPACITY) chunk.fill(testData.toIterator) assert(chunk.intData(0) !== null) @@ -173,9 +172,8 @@ class GpuFilteredRDDSuit extends FunSuite with SharedSparkContext { val count = 10 val testData = (0 until count).map(_ % 2).zipWithIndex - val chunk = new GpuPartition[(Int, Int)](Array("INT", "INT"), DEFAULT_CAPACITY) + val chunk = new GpuPartition[(Int, Int)](openCLContext, Array("INT", "INT"), DEFAULT_CAPACITY) - chunk.context = openCLContext chunk.fill(testData.toIterator) assert(chunk.intData(0) !== null) @@ -201,8 +199,7 @@ class GpuFilteredRDDSuit extends FunSuite with SharedSparkContext { val resultSize = prefixSums(prefixSums.length - 1) val actualResults = Array.ofDim[Int](resultSize) - val chunk = new GpuPartition[(Int, Int)](Array("INT", "INT"), DEFAULT_CAPACITY) - chunk.context = openCLContext + val chunk = new GpuPartition[(Int, Int)](openCLContext,Array("INT", "INT"), DEFAULT_CAPACITY) chunk.fill(sourceCol.zipWithIndex.iterator) assert(chunk.intData(0) !== null) @@ -222,8 +219,7 @@ class GpuFilteredRDDSuit extends FunSuite with SharedSparkContext { // This crashes the OpenCL device val testData: IndexedSeq[(Int, Int)] = (0 to 10).reverse.zipWithIndex - val chunk = new GpuPartition[(Int, Int)](Array("INT", "INT"), DEFAULT_CAPACITY) - chunk.context = openCLContext + val chunk = new GpuPartition[(Int, Int)](openCLContext, Array("INT", "INT"), DEFAULT_CAPACITY) chunk.fill(testData.toIterator) chunk.filter(1, 1, ComparisonOperation.<) @@ -238,18 +234,15 @@ class GpuFilteredRDDSuit extends FunSuite with SharedSparkContext { ) } - test("GpuFilterdRDD(Int, Int) test") { - val testData: IndexedSeq[(Int, Int)] = (0 to 10).zipWithIndex - - val rdd = sc.parallelize(testData) - val gpuRdd = rdd.toGpuFilterRDD(Array("INT", "INT"), 0, 0, 1) - val collectedChunks: Array[GpuPartition[Product]] = gpuRdd.collect() - assert(collectedChunks.length === 1) - val chunk = collectedChunks(0) - assert(chunk.size === 1) - assert(chunk.intData(0)(0) === 1, "values do not match") - assert(chunk.intData(1)(1) === 1, "values do not match") - } + val rdd = sc.parallelize(testData) + val gpuRdd = rdd.toGpuFilterRDD(Array("INT", "INT"), 0, 0, 1) + val collectedChunks: Array[GpuPartition[Product]] = gpuRdd.collect() + assert(collectedChunks.length === 1) + val chunk = collectedChunks(0) + assert(chunk.size === 1) + assert(chunk.intData(0)(0) === 1, "values do not match") + assert(chunk.intData(1)(1) === 1, "values do not match") + } test("compute") { val testData: IndexedSeq[(Int, Int)] = (0 to 10).zipWithIndex diff --git a/core/src/test/scala/org/apache/spark/gpu/GpuPartitionSuit.scala b/core/src/test/scala/org/apache/spark/gpu/GpuPartitionSuit.scala index 34a0ce36adffd..4fc27552b8802 100644 --- a/core/src/test/scala/org/apache/spark/gpu/GpuPartitionSuit.scala +++ b/core/src/test/scala/org/apache/spark/gpu/GpuPartitionSuit.scala @@ -19,6 +19,7 @@ package org.apache.spark.gpu import org.apache.spark.SharedSparkContext import org.apache.spark.rdd.GpuPartition +import org.apache.spark.scheduler.OpenCLContext import org.scalatest.FunSuite import scala.collection.immutable.IndexedSeq @@ -30,10 +31,18 @@ import scala.language.existentials class GpuPartitionSuit extends FunSuite with SharedSparkContext { val DEFAULT_CAPACITY = (1 << 10) + val openCLContext = new OpenCLContext + + override def beforeAll() { + super.beforeAll() + // setLogLevel(LogLevel.LOG_TRACE) + openCLContext.initOpenCL("/org/apache/spark/gpu/kernel.cl") + } + test("org.apache.spark.rdd.GpuPartition.initArray test") { - val x = new GpuPartition[(Int, String, Float, Double, String)](Array("INT", "STRING", "FLOAT", - "DOUBLE", "STRING"), DEFAULT_CAPACITY) + val x = new GpuPartition[(Int, String, Float, Double, String)](openCLContext, + Array("INT", "STRING", "FLOAT", "DOUBLE", "STRING"), DEFAULT_CAPACITY) assert(x.intData.length === 1) assert(x.longData.length === 0) assert(x.floatData.length === 1) @@ -44,7 +53,7 @@ class GpuPartitionSuit extends FunSuite with SharedSparkContext { test("org.apache.spark.rdd.GpuPartition.fill test") { val testData = (0 to 10).reverse.zipWithIndex.toIterator - val chunk = new GpuPartition[(Int, Int)](Array("INT", "INT"), DEFAULT_CAPACITY) + val chunk = new GpuPartition[(Int, Int)](openCLContext, Array("INT", "INT"), DEFAULT_CAPACITY) chunk.fill(testData) (0 until chunk.capacity).foreach(i => if (i <= 10) { @@ -61,8 +70,8 @@ class GpuPartitionSuit extends FunSuite with SharedSparkContext { val testData = (0 to 10).map(x => (x, "STR_I_%d".format(x), 1.5f + x, 2.5d + x, "STR_II_%d".format(x), x - 1, "STR_III_%d".format(x))) val rdd = sc.parallelize(testData) - val rddChunk = new GpuPartition(Array("INT", "STRING", "FLOAT", "DOUBLE", "STRING", "INT", - "STRING"), DEFAULT_CAPACITY) + val rddChunk = new GpuPartition(openCLContext, + Array("INT", "STRING", "FLOAT", "DOUBLE", "STRING", "INT", "STRING"), DEFAULT_CAPACITY) assert(rddChunk.toTypeAwareColumnIndex(0) === 0) assert(rddChunk.toTypeAwareColumnIndex(1) === 0) assert(rddChunk.toTypeAwareColumnIndex(2) === 0) @@ -76,7 +85,7 @@ class GpuPartitionSuit extends FunSuite with SharedSparkContext { val testData: IndexedSeq[(String, String)] = (0 to 10).reverse.zipWithIndex.map( x => ("STR_I_%d".format(x._1), "STR_II_%d".format(x._2))) - val chunk = new GpuPartition[(String, String)](Array("STRING", "STRING"), + val chunk = new GpuPartition[(String, String)](openCLContext, Array("STRING", "STRING"), DEFAULT_CAPACITY) chunk.fill(testData.toIterator) (0 until chunk.capacity).foreach(i =>