Skip to content

Commit

Permalink
improvement ShuffleReader metrics (apache#12)
Browse files Browse the repository at this point in the history
Co-authored-by: zhaokuo03 <[email protected]>
  • Loading branch information
kecookier and kecookier authored Sep 25, 2023
1 parent 96c2c70 commit 6387e56
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ class MetricsHandler extends MetricsApi with Logging {
"compressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime to compress"),
"prepareTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime to prepare"),
"decompressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime_decompress"),
"ipcTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime_ipc"),
"deserializeTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime_deserialize"),
"avgReadBatchNumRows" -> SQLMetrics
.createAverageMetric(sparkContext, "avg read batch num rows"),
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,15 @@ class SparkPlanExecHandler extends SparkPlanExecApi {
val readBatchNumRows = metrics("avgReadBatchNumRows")
val numOutputRows = metrics("numOutputRows")
val decompressTime = metrics("decompressTime")
val ipcTime = metrics("ipcTime")
val deserializeTime = metrics("deserializeTime")
if (GlutenConfig.getConf.isUseCelebornShuffleManager) {
val clazz = ClassUtils.getClass("org.apache.spark.shuffle.CelebornColumnarBatchSerializer")
val constructor =
clazz.getConstructor(classOf[StructType], classOf[SQLMetric], classOf[SQLMetric])
constructor.newInstance(schema, readBatchNumRows, numOutputRows).asInstanceOf[Serializer]
} else {
new ColumnarBatchSerializer(schema, readBatchNumRows, numOutputRows, decompressTime)
new ColumnarBatchSerializer(schema, readBatchNumRows, numOutputRows, decompressTime, ipcTime, deserializeTime)
}
}

Expand Down
11 changes: 7 additions & 4 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ static jmethodID veloxColumnarBatchScannerNext;

static jclass shuffleReaderMetricsClass;
static jmethodID shuffleReaderMetricsSetDecompressTime;
static jmethodID shuffleReaderMetricsSetIpcTime;
static jmethodID shuffleReaderMetricsSetDeserializeTime;

class JavaInputStreamAdaptor final : public arrow::io::InputStream {
public:
Expand Down Expand Up @@ -293,6 +295,9 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
createGlobalClassReferenceOrError(env, "Lio/glutenproject/vectorized/ShuffleReaderMetrics;");
shuffleReaderMetricsSetDecompressTime =
getMethodIdOrError(env, shuffleReaderMetricsClass, "setDecompressTime", "(J)V");
shuffleReaderMetricsSetIpcTime = getMethodIdOrError(env, shuffleReaderMetricsClass, "setIpcTime", "(J)V");
shuffleReaderMetricsSetDeserializeTime =
getMethodIdOrError(env, shuffleReaderMetricsClass, "SetDeserializeTime", "(J)V");

return jniVersion;
}
Expand Down Expand Up @@ -1056,10 +1061,8 @@ JNIEXPORT void JNICALL Java_io_glutenproject_vectorized_ShuffleReaderJniWrapper_

auto reader = executionCtx->getShuffleReader(shuffleReaderHandle);
env->CallVoidMethod(metrics, shuffleReaderMetricsSetDecompressTime, reader->getDecompressTime());

// TODO zhaokuo
// env->CallVoidMethod(metrics, shuffleReaderMetricsSetIpcTime, reader->getIpcTime());
// env->CallVoidMethod(metrics, shuffleReaderMetricsSetDeserializeTime, reader->getDeserializeTime());
env->CallVoidMethod(metrics, shuffleReaderMetricsSetIpcTime, reader->getIpcTime());
env->CallVoidMethod(metrics, shuffleReaderMetricsSetDeserializeTime, reader->getDeserializeTime());

checkException(env);
JNI_METHOD_END()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

public class ShuffleReaderMetrics {
private long decompressTime;
private long ipcTime;
private long deserializeTime;

public void setDecompressTime(long decompressTime) {
this.decompressTime = decompressTime;
Expand All @@ -26,4 +28,20 @@ public void setDecompressTime(long decompressTime) {
public long getDecompressTime() {
return decompressTime;
}

public void setIpcTime(long ipcTime) {
this.ipcTime = ipcTime;
}

public long getIpcTime() {
return ipcTime;
}

public void SetDeserializeTime(long ipcTime) {
this.deserializeTime = deserializeTime;
}

public long getDeserializeTime() {
return deserializeTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ class ColumnarBatchSerializer(
schema: StructType,
readBatchNumRows: SQLMetric,
numOutputRows: SQLMetric,
decompressTime: SQLMetric)
decompressTime: SQLMetric,
ipcTime: SQLMetric,
deserializeTime: SQLMetric)
extends Serializer
with Serializable {

Expand All @@ -54,7 +56,8 @@ class ColumnarBatchSerializer(

/** Creates a new [[SerializerInstance]]. */
override def newInstance(): SerializerInstance = {
new ColumnarBatchSerializerInstance(schema, readBatchNumRows, numOutputRows, decompressTime)
new ColumnarBatchSerializerInstance(schema, readBatchNumRows, numOutputRows, decompressTime,
ipcTime, deserializeTime)
}

override def supportsRelocationOfSerializedObjects: Boolean = supportsRelocation
Expand All @@ -64,7 +67,9 @@ private class ColumnarBatchSerializerInstance(
schema: StructType,
readBatchNumRows: SQLMetric,
numOutputRows: SQLMetric,
decompressTime: SQLMetric)
decompressTime: SQLMetric,
ipcTime: SQLMetric,
deserializeTime: SQLMetric)
extends SerializerInstance
with Logging {

Expand Down Expand Up @@ -106,6 +111,8 @@ private class ColumnarBatchSerializerInstance(
shuffleReaderHandle,
readerMetrics)
decompressTime += readerMetrics.getDecompressTime
ipcTime += readerMetrics.getIpcTime
deserializeTime += readerMetrics.getDeserializeTime

cSchema.close()
ShuffleReaderJniWrapper.INSTANCE.close(executionCtxHandle, shuffleReaderHandle)
Expand Down

0 comments on commit 6387e56

Please sign in to comment.