From 54eaf76b1ce5587ff54ad2f9b3721ebda948034b Mon Sep 17 00:00:00 2001 From: Meisam Fathi Salmi Date: Thu, 12 Mar 2015 13:29:01 -0400 Subject: [PATCH] #16: Use Spark metrics system for profiling GPU applications Task-Url: http://github.com/meisam/spark/issues/issue/16 --- .../org/apache/spark/rdd/GpuAggregationPartition.scala | 5 +++++ .../scala/org/apache/spark/rdd/GpuFilteredPartition.scala | 8 ++++++++ .../scala/org/apache/spark/rdd/GpuJoinPartition.scala | 7 +++++-- .../main/scala/org/apache/spark/rdd/GpuPartition.scala | 4 ++++ .../scala/org/apache/spark/rdd/GpuSortPartition.scala | 3 +++ 5 files changed, 25 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/GpuAggregationPartition.scala b/core/src/main/scala/org/apache/spark/rdd/GpuAggregationPartition.scala index 02321b653e242..ace07f050309e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/GpuAggregationPartition.scala +++ b/core/src/main/scala/org/apache/spark/rdd/GpuAggregationPartition.scala @@ -14,6 +14,7 @@ class GpuAggregationPartition[T <: Product : TypeTag, TP <: Product : TypeTag]( extends GpuPartition[T](context, capacity) { def aggregate(): Unit = { + val startTime = System.nanoTime() parentPartition.inferBestWorkGroupSize this.globalSize = parentPartition.globalSize this.localSize = parentPartition.localSize @@ -60,6 +61,7 @@ class GpuAggregationPartition[T <: Product : TypeTag, TP <: Product : TypeTag]( val gbType: Array[Int] = gbColumnIndexes.map(i => columnTypes(i)).map(t => ColumnarTypes .getIndex(t.tpe)).toIterator.toArray + println(f"gbType.length = ${gbType.length}") val gpuGbType = createReadBuffer[Int](gbType.length) hostToDeviceCopy[Int](pointer(gbType), gpuGbType, gbType.length) @@ -295,6 +297,9 @@ class GpuAggregationPartition[T <: Product : TypeTag, TP <: Product : TypeTag]( clReleaseMemObject(gpuGbCount) clReleaseMemObject(gpu_hashNum) + val endTime = System.nanoTime() + logInfo(f"aggregation time = ${endTime - startTime}%,d") + } } diff --git a/core/src/main/scala/org/apache/spark/rdd/GpuFilteredPartition.scala b/core/src/main/scala/org/apache/spark/rdd/GpuFilteredPartition.scala index 86abc44f84a41..c75657ee04c93 100644 --- a/core/src/main/scala/org/apache/spark/rdd/GpuFilteredPartition.scala +++ b/core/src/main/scala/org/apache/spark/rdd/GpuFilteredPartition.scala @@ -12,6 +12,7 @@ class GpuFilteredPartition[T <: Product: TypeTag, U: TypeTag](context: OpenCLCon extends GpuPartition[T](context, capacity) { def filter(parent: GpuPartition[T]) = { + val startFilterTime = System.nanoTime() if (parent.size == 0) { this.size = 0 @@ -117,6 +118,10 @@ class GpuFilteredPartition[T <: Product: TypeTag, U: TypeTag](context: OpenCLCon case (_, index) => project[String](index, this.size, gpuFilter, gpuPsum, parent) }) + val endFilterTime = System.nanoTime() + + logInfo(f"filter time = ${endFilterTime - startFilterTime}%,d") + clReleaseMemObject(gpuFilter) clReleaseMemObject(gpuPsum) @@ -125,6 +130,7 @@ class GpuFilteredPartition[T <: Product: TypeTag, U: TypeTag](context: OpenCLCon } def project[V: TypeTag](columnIndex: Int, outSize: Int, gpuFilter: cl_mem, gpuPsum: cl_mem, parent: GpuPartition[T]) { + val startTime = System.nanoTime() if (outSize == 0) return val colData = parent.getColumn[V](columnIndex) @@ -161,6 +167,8 @@ class GpuFilteredPartition[T <: Product: TypeTag, U: TypeTag](context: OpenCLCon releaseCol(result) releaseCol(scanCol) + val endTime = System.nanoTime() + logInfo(f"projection time = ${endTime - startTime}%,d") } } diff --git a/core/src/main/scala/org/apache/spark/rdd/GpuJoinPartition.scala b/core/src/main/scala/org/apache/spark/rdd/GpuJoinPartition.scala index 8653d4e899cad..7aee693d17be9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/GpuJoinPartition.scala +++ b/core/src/main/scala/org/apache/spark/rdd/GpuJoinPartition.scala @@ -226,8 +226,11 @@ U: TypeTag] def join(): Int = { this.globalSize = leftPartition.globalSize this.localSize = leftPartition.localSize - buildHashTable - + val startTime = System.nanoTime() + val result = buildHashTable + val endTime = System.nanoTime() + logInfo(f"join time = ${endTime - startTime}%,d") + result } } diff --git a/core/src/main/scala/org/apache/spark/rdd/GpuPartition.scala b/core/src/main/scala/org/apache/spark/rdd/GpuPartition.scala index 728847bd97010..29c762762ac15 100644 --- a/core/src/main/scala/org/apache/spark/rdd/GpuPartition.scala +++ b/core/src/main/scala/org/apache/spark/rdd/GpuPartition.scala @@ -213,6 +213,7 @@ class GpuPartition[T <: Product : TypeTag](context: OpenCLContext, val capacity: * @param fromIndex the index of first element to read */ def fillFromFiles(paths: Array[String], fromIndex: Int = 0): Unit = { + val startTime = System.nanoTime() assert(paths.length == columnTypes.size, { " %d file paths but only %d columns".format(paths.length, columnTypes.size) }) @@ -322,6 +323,9 @@ class GpuPartition[T <: Product : TypeTag](context: OpenCLContext, val capacity: context.diskReadTime += diskReadTime }) + val endTime = System.nanoTime() + logInfo(f"load from disk time= ${endTime - startTime}%,d") + } def fill(iter: Iterator[T]): Unit = { diff --git a/core/src/main/scala/org/apache/spark/rdd/GpuSortPartition.scala b/core/src/main/scala/org/apache/spark/rdd/GpuSortPartition.scala index 53fbfc50d303d..3a60c269ea22d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/GpuSortPartition.scala +++ b/core/src/main/scala/org/apache/spark/rdd/GpuSortPartition.scala @@ -12,6 +12,7 @@ class GpuSortPartition[T <: Product : TypeTag] Array[SortDirection.Value], capacity: Int) extends GpuPartition[T](context, capacity) { def sort(): Unit = { + val startTime = System.nanoTime() /* struct timespec start,end; @@ -810,6 +811,8 @@ Array[SortDirection.Value], capacity: Int) extends GpuPartition[T](context, capa return res; */ + val endTime = System.nanoTime() + logInfo(f"sort time = ${endTime - startTime}%,d") }