Skip to content

Commit

Permalink
#16: Use Spark metrics system for profiling GPU applications
Browse files Browse the repository at this point in the history
  • Loading branch information
meisam committed Mar 27, 2015
1 parent 6019e0c commit 3f4ddc5
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class GpuAggregationPartition[T <: Product : TypeTag, TP <: Product : TypeTag](
extends GpuPartition[T](context, capacity) {

def aggregate(): Unit = {
val startTime = System.nanoTime()
parentPartition.inferBestWorkGroupSize
this.globalSize = parentPartition.globalSize
this.localSize = parentPartition.localSize
Expand Down Expand Up @@ -60,6 +61,7 @@ class GpuAggregationPartition[T <: Product : TypeTag, TP <: Product : TypeTag](
val gbType: Array[Int] = gbColumnIndexes.map(i => columnTypes(i)).map(t => ColumnarTypes
.getIndex(t.tpe)).toIterator.toArray

println(f"gbType.length = ${gbType.length}")
val gpuGbType = createReadBuffer[Int](gbType.length)
hostToDeviceCopy[Int](pointer(gbType), gpuGbType, gbType.length)

Expand Down Expand Up @@ -295,6 +297,9 @@ class GpuAggregationPartition[T <: Product : TypeTag, TP <: Product : TypeTag](

clReleaseMemObject(gpuGbCount)
clReleaseMemObject(gpu_hashNum)
val endTime = System.nanoTime()
logInfo(f"aggregation time = ${endTime - startTime}%,d")

}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class GpuFilteredPartition[T <: Product: TypeTag, U: TypeTag](context: OpenCLCon
extends GpuPartition[T](context, capacity) {

def filter(parent: GpuPartition[T]) = {
val startFilterTime = System.nanoTime()

if (parent.size == 0) {
this.size = 0
Expand Down Expand Up @@ -117,6 +118,10 @@ class GpuFilteredPartition[T <: Product: TypeTag, U: TypeTag](context: OpenCLCon
case (_, index) => project[String](index, this.size, gpuFilter, gpuPsum, parent)
})

val endFilterTime = System.nanoTime()

logInfo(f"filter time = ${endFilterTime - startFilterTime}%,d")

clReleaseMemObject(gpuFilter)
clReleaseMemObject(gpuPsum)

Expand All @@ -125,6 +130,7 @@ class GpuFilteredPartition[T <: Product: TypeTag, U: TypeTag](context: OpenCLCon
}

def project[V: TypeTag](columnIndex: Int, outSize: Int, gpuFilter: cl_mem, gpuPsum: cl_mem, parent: GpuPartition[T]) {
val startTime = System.nanoTime()
if (outSize == 0)
return
val colData = parent.getColumn[V](columnIndex)
Expand Down Expand Up @@ -161,6 +167,8 @@ class GpuFilteredPartition[T <: Product: TypeTag, U: TypeTag](context: OpenCLCon

releaseCol(result)
releaseCol(scanCol)
val endTime = System.nanoTime()
logInfo(f"projection time = ${endTime - startTime}%,d")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,11 @@ U: TypeTag]
def join(): Int = {
this.globalSize = leftPartition.globalSize
this.localSize = leftPartition.localSize
buildHashTable

val startTime = System.nanoTime()
val result = buildHashTable
val endTime = System.nanoTime()
logInfo(f"join time = ${endTime - startTime}%,d")
result
}

}
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/GpuPartition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ class GpuPartition[T <: Product : TypeTag](context: OpenCLContext, val capacity:
* @param fromIndex the index of first element to read
*/
def fillFromFiles(paths: Array[String], fromIndex: Int = 0): Unit = {
val startTime = System.nanoTime()
assert(paths.length == columnTypes.size, {
" %d file paths but only %d columns".format(paths.length, columnTypes.size)
})
Expand Down Expand Up @@ -322,6 +323,9 @@ class GpuPartition[T <: Product : TypeTag](context: OpenCLContext, val capacity:
context.diskReadTime += diskReadTime

})
val endTime = System.nanoTime()
logInfo(f"load from disk time= ${endTime - startTime}%,d")

}

def fill(iter: Iterator[T]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class GpuSortPartition[T <: Product : TypeTag]
Array[SortDirection.Value], capacity: Int) extends GpuPartition[T](context, capacity) {

def sort(): Unit = {
val startTime = System.nanoTime()
/*
struct timespec start,end;
Expand Down Expand Up @@ -810,6 +811,8 @@ Array[SortDirection.Value], capacity: Int) extends GpuPartition[T](context, capa
return res;
*/
val endTime = System.nanoTime()
logInfo(f"sort time = ${endTime - startTime}%,d")
}


Expand Down

0 comments on commit 3f4ddc5

Please sign in to comment.