From 66e804423f44bcc03e04d60393c7322d102183d1 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Wed, 22 Mar 2017 19:59:38 -0700 Subject: [PATCH] [SPARK-18364][YARN] Expose metrics for YarnShuffleService Registers the shuffle server's metrics with the Hadoop Node Manager's DefaultMetricsSystem. 2-space indentation on java files; pass ./dev/lint-java Test metric collector gets right converted calls camel-case shuffleService Pass scalastyle Reformat and organize imports With import order specified at http://spark.apache.org/contributing.html --- .../network/yarn/YarnShuffleService.java | 39 +++-- .../yarn/YarnShuffleServiceMetrics.java | 137 ++++++++++++++++++ .../yarn/YarnShuffleServiceMetricsSuite.scala | 75 ++++++++++ 3 files changed, 242 insertions(+), 9 deletions(-) create mode 100644 common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java create mode 100644 resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index cd67eb28573e8..b0785ce3a827c 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -19,8 +19,10 @@ import java.io.File; import java.io.IOException; -import java.nio.charset.StandardCharsets; +import java.lang.reflect.Method; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.util.List; import java.util.Map; @@ -34,12 +36,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.api.*; -import org.apache.spark.network.util.LevelDBProvider; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBIterator; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,18 +52,19 @@ import org.apache.spark.network.server.TransportServer; import org.apache.spark.network.server.TransportServerBootstrap; import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler; +import org.apache.spark.network.util.LevelDBProvider; import org.apache.spark.network.util.TransportConf; import org.apache.spark.network.yarn.util.HadoopConfigProvider; /** * An external shuffle service used by Spark on Yarn. - * + *

* This is intended to be a long-running auxiliary service that runs in the NodeManager process. * A Spark application may connect to this service by setting `spark.shuffle.service.enabled`. * The application also automatically derives the service port through `spark.shuffle.service.port` * specified in the Yarn configuration. This is so that both the clients and the server agree on * the same port to communicate on. - * + *

* The service also optionally supports authentication. This ensures that executors from one * application cannot read the shuffle files written by those from another. This feature can be * enabled by setting `spark.authenticate` in the Yarn configuration before starting the NM. @@ -95,7 +99,7 @@ public class YarnShuffleService extends AuxiliaryService { private static final ObjectMapper mapper = new ObjectMapper(); private static final String APP_CREDS_KEY_PREFIX = "AppCreds"; private static final LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider - .StoreVersion(1, 0); + .StoreVersion(1, 0); // just for integration tests that want to look at this file -- in general not sensible as // a static @@ -165,6 +169,23 @@ protected void serviceInit(Configuration conf) throws Exception { TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); + // register metrics on the block handler into the Node Manager's metrics system. + try { + YarnShuffleServiceMetrics serviceMetrics = new YarnShuffleServiceMetrics( + blockHandler.getAllMetrics()); + MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance(); + + Method registerSourceMethod = metricsSystem.getClass().getDeclaredMethod("registerSource", + String.class, String.class, MetricsSource.class); + registerSourceMethod.setAccessible(true); + registerSourceMethod.invoke(metricsSystem, "shuffleService", "Metrics on the Spark " + + "Shuffle Service", serviceMetrics); + logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); + } catch (Exception e) { + logger.warn("Unable to register Spark Shuffle Service metrics with Node Manager; " + + "proceeding without metrics", e); + } + // If authentication is enabled, set up the shuffle server to use a // special RPC handler that filters out unauthenticated fetch requests List bootstraps = Lists.newArrayList(); @@ -200,7 +221,7 @@ private void createSecretManager() throws IOException { // Make sure this is protected in case its not in the NM recovery dir FileSystem fs = FileSystem.getLocal(_conf); - fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short)0700)); + fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short) 0700)); db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper); logger.info("Recovery location is: " + secretsFile.getPath()); @@ -413,8 +434,8 @@ public int hashCode() { @Override public String toString() { return Objects.toStringHelper(this) - .add("appId", appId) - .toString(); + .add("appId", appId) + .toString(); } } diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java new file mode 100644 index 0000000000000..40a19e7afed1d --- /dev/null +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.yarn; + +import com.codahale.metrics.*; +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; + +import java.util.Map; + +/** + * Modeled off of YARN's NodeManagerMetrics. + */ +public class YarnShuffleServiceMetrics implements MetricsSource { + + private final MetricSet metricSet; + + public YarnShuffleServiceMetrics(MetricSet metricSet) { + this.metricSet = metricSet; + } + + /** + * Get metrics from the source + * + * @param collector to contain the resulting metrics snapshot + * @param all if true, return all metrics even if unchanged. + */ + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("shuffleService"); + + for (Map.Entry entry : metricSet.getMetrics().entrySet()) { + collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue()); + } + } + + public static void collectMetric(MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) { + + // The metric types used in ExternalShuffleBlockHandler.ShuffleMetrics + if (metric instanceof Timer) { + Timer t = (Timer) metric; + metricsRecordBuilder + .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name), + t.getCount()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of timer " + name), + t.getFifteenMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of timer " + name), + t.getFiveMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name), + t.getOneMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name), + t.getMeanRate()); + } else if (metric instanceof Meter) { + Meter m = (Meter) metric; + metricsRecordBuilder + .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of meter " + name), + m.getCount()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of meter " + name), + m.getFifteenMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of meter " + name), + m.getFiveMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name), + m.getOneMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name), + m.getMeanRate()); + } else if (metric instanceof Gauge) { + Gauge m = (Gauge) metric; + Object gaugeValue = m.getValue(); + if (gaugeValue instanceof Integer) { + Integer intValue = (Integer) gaugeValue; + metricsRecordBuilder + .addGauge(new ShuffleServiceMetricsInfo(name, "Integer value of " + + "gauge " + name), intValue.intValue()); + } else if (gaugeValue instanceof Float) { + Float floatValue = (Float) gaugeValue; + metricsRecordBuilder + .addGauge(new ShuffleServiceMetricsInfo(name, "Float value of " + + "gauge " + name), floatValue.floatValue()); + } else if (gaugeValue instanceof Long) { + Long longValue = (Long) gaugeValue; + metricsRecordBuilder + .addGauge(new ShuffleServiceMetricsInfo(name, "Long value of " + + "gauge " + name), longValue.longValue()); + } else if (gaugeValue instanceof Double) { + Double doubleValue = (Double) gaugeValue; + metricsRecordBuilder + .addGauge(new ShuffleServiceMetricsInfo(name, "Double value of " + + "gauge " + name), doubleValue.doubleValue()); + } + } + } + + private static class ShuffleServiceMetricsInfo implements MetricsInfo { + + private final String name; + private final String description; + + ShuffleServiceMetricsInfo(String name, String description) { + this.name = name; + this.description = description; + } + + @Override + public String name() { + return name; + } + + @Override + public String description() { + return description; + } + } +} diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala new file mode 100644 index 0000000000000..183545c94f329 --- /dev/null +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.network.yarn + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.metrics2.MetricsRecordBuilder +import org.mockito.Matchers._ +import org.mockito.Mockito.{mock, times, verify, when} +import org.scalatest.Matchers + +import org.apache.spark.SparkFunSuite +import org.apache.spark.network.server.OneForOneStreamManager +import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleBlockResolver} + +class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers { + + val streamManager = mock(classOf[OneForOneStreamManager]) + val blockResolver = mock(classOf[ExternalShuffleBlockResolver]) + when(blockResolver.getRegisteredExecutorsSize).thenReturn(42) + + val metrics = new ExternalShuffleBlockHandler(streamManager, blockResolver).getAllMetrics + + test("metrics named as expected") { + val allMetrics = Set( + "openBlockRequestLatencyMillis", "registerExecutorRequestLatencyMillis", + "blockTransferRateBytes", "registeredExecutorsSize") + + metrics.getMetrics.keySet().asScala should be (allMetrics) + } + + // these three metrics have the same effect on the collector + for (testname <- Seq("openBlockRequestLatencyMillis", + "registerExecutorRequestLatencyMillis", + "blockTransferRateBytes")) { + test(s"$testname - collector receives correct types") { + val builder = mock(classOf[MetricsRecordBuilder]) + when(builder.addCounter(any(), anyLong())).thenReturn(builder) + when(builder.addGauge(any(), anyDouble())).thenReturn(builder) + + YarnShuffleServiceMetrics.collectMetric(builder, testname, + metrics.getMetrics.get(testname)) + + verify(builder).addCounter(anyObject(), anyLong()) + verify(builder, times(4)).addGauge(anyObject(), anyDouble()) + } + } + + // this metric writes only one gauge to the collector + test("registeredExecutorsSize - collector receives correct types") { + val builder = mock(classOf[MetricsRecordBuilder]) + when(builder.addCounter(any(), anyLong())).thenReturn(builder) + when(builder.addGauge(any(), anyDouble())).thenReturn(builder) + + YarnShuffleServiceMetrics.collectMetric(builder, "registeredExecutorsSize", + metrics.getMetrics.get("registeredExecutorsSize")) + + // only one + verify(builder).addGauge(anyObject(), anyInt()) + } +}