Skip to content

Commit

Permalink
Merge pull request apache#70 from ashangit/criteo-2.2
Browse files Browse the repository at this point in the history
[SPARK-18364][YARN] Expose metrics for YarnShuffleService
  • Loading branch information
Willymontaz authored Feb 23, 2018
2 parents b4bb70d + 66e8044 commit 4119fed
Show file tree
Hide file tree
Showing 3 changed files with 242 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;

Expand All @@ -34,12 +36,13 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.api.*;
import org.apache.spark.network.util.LevelDBProvider;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,18 +52,19 @@
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.TransportServerBootstrap;
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.util.LevelDBProvider;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.yarn.util.HadoopConfigProvider;

/**
* An external shuffle service used by Spark on Yarn.
*
* <p>
* This is intended to be a long-running auxiliary service that runs in the NodeManager process.
* A Spark application may connect to this service by setting `spark.shuffle.service.enabled`.
* The application also automatically derives the service port through `spark.shuffle.service.port`
* specified in the Yarn configuration. This is so that both the clients and the server agree on
* the same port to communicate on.
*
* <p>
* The service also optionally supports authentication. This ensures that executors from one
* application cannot read the shuffle files written by those from another. This feature can be
* enabled by setting `spark.authenticate` in the Yarn configuration before starting the NM.
Expand Down Expand Up @@ -95,7 +99,7 @@ public class YarnShuffleService extends AuxiliaryService {
private static final ObjectMapper mapper = new ObjectMapper();
private static final String APP_CREDS_KEY_PREFIX = "AppCreds";
private static final LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider
.StoreVersion(1, 0);
.StoreVersion(1, 0);

// just for integration tests that want to look at this file -- in general not sensible as
// a static
Expand Down Expand Up @@ -165,6 +169,23 @@ protected void serviceInit(Configuration conf) throws Exception {
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);

// register metrics on the block handler into the Node Manager's metrics system.
try {
YarnShuffleServiceMetrics serviceMetrics = new YarnShuffleServiceMetrics(
blockHandler.getAllMetrics());
MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance();

Method registerSourceMethod = metricsSystem.getClass().getDeclaredMethod("registerSource",
String.class, String.class, MetricsSource.class);
registerSourceMethod.setAccessible(true);
registerSourceMethod.invoke(metricsSystem, "shuffleService", "Metrics on the Spark " +
"Shuffle Service", serviceMetrics);
logger.info("Registered metrics with Hadoop's DefaultMetricsSystem");
} catch (Exception e) {
logger.warn("Unable to register Spark Shuffle Service metrics with Node Manager; " +
"proceeding without metrics", e);
}

// If authentication is enabled, set up the shuffle server to use a
// special RPC handler that filters out unauthenticated fetch requests
List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
Expand Down Expand Up @@ -200,7 +221,7 @@ private void createSecretManager() throws IOException {

// Make sure this is protected in case its not in the NM recovery dir
FileSystem fs = FileSystem.getLocal(_conf);
fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short)0700));
fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short) 0700));

db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper);
logger.info("Recovery location is: " + secretsFile.getPath());
Expand Down Expand Up @@ -413,8 +434,8 @@ public int hashCode() {
@Override
public String toString() {
return Objects.toStringHelper(this)
.add("appId", appId)
.toString();
.add("appId", appId)
.toString();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.yarn;

import com.codahale.metrics.*;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;

import java.util.Map;

/**
* Modeled off of YARN's NodeManagerMetrics.
*/
public class YarnShuffleServiceMetrics implements MetricsSource {

private final MetricSet metricSet;

public YarnShuffleServiceMetrics(MetricSet metricSet) {
this.metricSet = metricSet;
}

/**
* Get metrics from the source
*
* @param collector to contain the resulting metrics snapshot
* @param all if true, return all metrics even if unchanged.
*/
@Override
public void getMetrics(MetricsCollector collector, boolean all) {
MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("shuffleService");

for (Map.Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) {
collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue());
}
}

public static void collectMetric(MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) {

// The metric types used in ExternalShuffleBlockHandler.ShuffleMetrics
if (metric instanceof Timer) {
Timer t = (Timer) metric;
metricsRecordBuilder
.addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name),
t.getCount())
.addGauge(
new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of timer " + name),
t.getFifteenMinuteRate())
.addGauge(
new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of timer " + name),
t.getFiveMinuteRate())
.addGauge(
new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name),
t.getOneMinuteRate())
.addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name),
t.getMeanRate());
} else if (metric instanceof Meter) {
Meter m = (Meter) metric;
metricsRecordBuilder
.addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of meter " + name),
m.getCount())
.addGauge(
new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of meter " + name),
m.getFifteenMinuteRate())
.addGauge(
new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of meter " + name),
m.getFiveMinuteRate())
.addGauge(
new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name),
m.getOneMinuteRate())
.addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name),
m.getMeanRate());
} else if (metric instanceof Gauge) {
Gauge m = (Gauge) metric;
Object gaugeValue = m.getValue();
if (gaugeValue instanceof Integer) {
Integer intValue = (Integer) gaugeValue;
metricsRecordBuilder
.addGauge(new ShuffleServiceMetricsInfo(name, "Integer value of " +
"gauge " + name), intValue.intValue());
} else if (gaugeValue instanceof Float) {
Float floatValue = (Float) gaugeValue;
metricsRecordBuilder
.addGauge(new ShuffleServiceMetricsInfo(name, "Float value of " +
"gauge " + name), floatValue.floatValue());
} else if (gaugeValue instanceof Long) {
Long longValue = (Long) gaugeValue;
metricsRecordBuilder
.addGauge(new ShuffleServiceMetricsInfo(name, "Long value of " +
"gauge " + name), longValue.longValue());
} else if (gaugeValue instanceof Double) {
Double doubleValue = (Double) gaugeValue;
metricsRecordBuilder
.addGauge(new ShuffleServiceMetricsInfo(name, "Double value of " +
"gauge " + name), doubleValue.doubleValue());
}
}
}

private static class ShuffleServiceMetricsInfo implements MetricsInfo {

private final String name;
private final String description;

ShuffleServiceMetricsInfo(String name, String description) {
this.name = name;
this.description = description;
}

@Override
public String name() {
return name;
}

@Override
public String description() {
return description;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.yarn

import scala.collection.JavaConverters._

import org.apache.hadoop.metrics2.MetricsRecordBuilder
import org.mockito.Matchers._
import org.mockito.Mockito.{mock, times, verify, when}
import org.scalatest.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.network.server.OneForOneStreamManager
import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleBlockResolver}

class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers {

val streamManager = mock(classOf[OneForOneStreamManager])
val blockResolver = mock(classOf[ExternalShuffleBlockResolver])
when(blockResolver.getRegisteredExecutorsSize).thenReturn(42)

val metrics = new ExternalShuffleBlockHandler(streamManager, blockResolver).getAllMetrics

test("metrics named as expected") {
val allMetrics = Set(
"openBlockRequestLatencyMillis", "registerExecutorRequestLatencyMillis",
"blockTransferRateBytes", "registeredExecutorsSize")

metrics.getMetrics.keySet().asScala should be (allMetrics)
}

// these three metrics have the same effect on the collector
for (testname <- Seq("openBlockRequestLatencyMillis",
"registerExecutorRequestLatencyMillis",
"blockTransferRateBytes")) {
test(s"$testname - collector receives correct types") {
val builder = mock(classOf[MetricsRecordBuilder])
when(builder.addCounter(any(), anyLong())).thenReturn(builder)
when(builder.addGauge(any(), anyDouble())).thenReturn(builder)

YarnShuffleServiceMetrics.collectMetric(builder, testname,
metrics.getMetrics.get(testname))

verify(builder).addCounter(anyObject(), anyLong())
verify(builder, times(4)).addGauge(anyObject(), anyDouble())
}
}

// this metric writes only one gauge to the collector
test("registeredExecutorsSize - collector receives correct types") {
val builder = mock(classOf[MetricsRecordBuilder])
when(builder.addCounter(any(), anyLong())).thenReturn(builder)
when(builder.addGauge(any(), anyDouble())).thenReturn(builder)

YarnShuffleServiceMetrics.collectMetric(builder, "registeredExecutorsSize",
metrics.getMetrics.get("registeredExecutorsSize"))

// only one
verify(builder).addGauge(anyObject(), anyInt())
}
}

0 comments on commit 4119fed

Please sign in to comment.