From 47be7fab9b33c539a9a5ea1d72666dd4305fe636 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Wed, 27 Oct 2021 21:52:33 +0200 Subject: [PATCH] TEZ-4347: Add some diagnostic endpoints to TezAM's WebUIService --- .../apache/tez/dag/api/TezConfiguration.java | 12 ++ .../apache/tez/dag/api/client/DAGClient.java | 9 ++ .../tez/dag/api/client/DAGClientImpl.java | 7 +- .../tez/dag/api/client/DAGClientInternal.java | 2 + .../dag/api/client/DAGClientTimelineImpl.java | 5 + .../dag/api/client/rpc/DAGClientRPCImpl.java | 12 ++ .../src/main/proto/DAGClientAMProtocol.proto | 8 + .../AbstractServletToControllerAdapter.java | 104 +++++++++++++ .../web/ServletToControllerAdapters.java | 45 ++++++ .../apache/tez/common/web/package-info.java | 22 +++ tez-dag/findbugs-exclude.xml | 7 + .../tez/dag/api/client/DAGClientHandler.java | 3 + ...GClientAMProtocolBlockingPBServerImpl.java | 8 + .../org/apache/tez/dag/app/DAGAppMaster.java | 4 + .../apache/tez/dag/app/web/WebUIService.java | 23 ++- .../tez/dag/api/client/MRDAGClient.java | 6 + .../test/java/org/apache/tez/test/TestAM.java | 146 ++++++++++++++++++ 17 files changed, 421 insertions(+), 2 deletions(-) create mode 100644 tez-common/src/main/java/org/apache/tez/common/web/AbstractServletToControllerAdapter.java create mode 100644 tez-common/src/main/java/org/apache/tez/common/web/ServletToControllerAdapters.java create mode 100644 tez-common/src/main/java/org/apache/tez/common/web/package-info.java create mode 100644 tez-tests/src/test/java/org/apache/tez/test/TestAM.java diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 16d1dfcb70..6a228da5de 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1964,6 +1964,18 @@ public TezConfiguration(boolean loadDefaults) { + "tez-ui.webservice.enable"; public static final boolean TEZ_AM_WEBSERVICE_ENABLE_DEFAULT = true; + /** + * String value. Range of ports that the AM can use for the WebUIService. Leave blank + * to use all possible ports. Expert level setting. It's hadoop standard range configuration. + * For example 50000-50050,50100-50200 + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="boolean") + public static final String TEZ_AM_WEBSERVICE_PORT_RANGE = TEZ_AM_PREFIX + + "tez-ui.webservice.port-range"; + + public static final String TEZ_AM_WEBSERVICE_PORT_RANGE_DEFAULT = "50000-50050"; + // TODO only validate property here, value can also be validated if necessary public static void validateProperty(String property, Scope usedScope) { Scope validScope = PropertyScope.get(property); diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java index ec20ef1de4..944bff3fbd 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java @@ -140,4 +140,13 @@ public DAGStatus waitForCompletion(long timeMs) throws IOException, TezException public abstract DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set statusGetOpts) throws IOException, TezException, InterruptedException; + /** + * Returns the Tez AM's web ui address if any. + * + * @return The http web UI address + * @throws IOException + * @throws TezException + */ + public abstract String getWebUIAddress() throws IOException, TezException; + } diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java index 3c0de00819..bfea96b998 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java @@ -30,10 +30,10 @@ import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; + import org.apache.hadoop.security.UserGroupInformation; import org.apache.tez.common.CachedEntity; import org.apache.tez.common.Preconditions; - import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -670,6 +670,11 @@ public DAGClientInternal getRealClient() { return realClient; } + @Override + public String getWebUIAddress() throws IOException, TezException { + return realClient.getWebUIAddress(); + } + private double getProgress(Progress progress) { return (progress.getTotalTaskCount() == 0 ? 0.0 : (double) (progress.getSucceededTaskCount()) / progress.getTotalTaskCount()); diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java index a3c898a855..8346d53da7 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java @@ -125,4 +125,6 @@ public abstract VertexStatus getVertexStatus(String vertexName, */ public abstract DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set statusGetOpts) throws IOException, TezException, InterruptedException; + + public abstract String getWebUIAddress() throws IOException, TezException; } diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java index 17d2386860..4ec9c94354 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java @@ -523,4 +523,9 @@ public DAGStatus getDAGStatus(@Nullable Set statusOptions, return getDAGStatus(statusOptions); } + @Override + public String getWebUIAddress() throws IOException, TezException { + throw new TezException("DAGClientTimelineImpl.getWebUIAddress is not supported"); + } + } diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java index 5d5752e6e2..4a6a486270 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java @@ -48,6 +48,7 @@ import org.apache.tez.dag.api.client.VertexStatus; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto; +import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetWebUIAddressRequestProto; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TryKillDAGRequestProto; import com.google.common.annotations.VisibleForTesting; @@ -303,4 +304,15 @@ public DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set throw new TezException("not supported"); } + @Override + public String getWebUIAddress() throws IOException, TezException { + LOG.debug("getWebUIAddress via AM for app: {} dag:{}", appId, dagId); + GetWebUIAddressRequestProto.Builder requestProtoBuilder = GetWebUIAddressRequestProto.newBuilder(); + try { + return proxy.getWebUIAddress(null, requestProtoBuilder.build()).getWebUiAddress(); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + throw new TezException(e); + } + } } diff --git a/tez-api/src/main/proto/DAGClientAMProtocol.proto b/tez-api/src/main/proto/DAGClientAMProtocol.proto index 113c9ccfce..f0ff3916ea 100644 --- a/tez-api/src/main/proto/DAGClientAMProtocol.proto +++ b/tez-api/src/main/proto/DAGClientAMProtocol.proto @@ -90,6 +90,13 @@ message GetAMStatusResponseProto { required TezAppMasterStatusProto status = 1; } +message GetWebUIAddressRequestProto { +} + +message GetWebUIAddressResponseProto { + required string web_ui_address = 1; +} + service DAGClientAMProtocol { rpc getAllDAGs (GetAllDAGsRequestProto) returns (GetAllDAGsResponseProto); rpc getDAGStatus (GetDAGStatusRequestProto) returns (GetDAGStatusResponseProto); @@ -98,4 +105,5 @@ service DAGClientAMProtocol { rpc submitDAG (SubmitDAGRequestProto) returns (SubmitDAGResponseProto); rpc shutdownSession (ShutdownSessionRequestProto) returns (ShutdownSessionResponseProto); rpc getAMStatus (GetAMStatusRequestProto) returns (GetAMStatusResponseProto); + rpc getWebUIAddress (GetWebUIAddressRequestProto) returns (GetWebUIAddressResponseProto); } diff --git a/tez-common/src/main/java/org/apache/tez/common/web/AbstractServletToControllerAdapter.java b/tez-common/src/main/java/org/apache/tez/common/web/AbstractServletToControllerAdapter.java new file mode 100644 index 0000000000..b79b5d5d9c --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/common/web/AbstractServletToControllerAdapter.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.common.web; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Enumeration; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.servlet.ServletConfig; +import javax.servlet.ServletContext; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.hadoop.yarn.webapp.Controller; + +/** + * AbstractServletToControllerAdapter is a common ancestor for classes + * that wish to adapt servlets to yarn webapp controllers. + * The adapter is responsible for: + * 1. creating a servlet instance + * 2. creating a dummy ServletConfig + * 3. delegating calls to the servlet instance's doGet method + */ +public abstract class AbstractServletToControllerAdapter extends Controller { + private AtomicBoolean initialized = new AtomicBoolean(false); + protected HttpServlet servlet; + + @Override + public void index() { + if (initialized.compareAndSet(false, true)) { + initServlet(); + } + try { + /* + * This reflection workaround is needed because HttpServlet.doGet is protected + * (even if subclasses have it public). + */ + Method doGetMethod = + this.servlet.getClass().getMethod("doGet", HttpServletRequest.class, HttpServletResponse.class); + doGetMethod.setAccessible(true); + doGetMethod.invoke(this.servlet, request(), response()); + } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException + | SecurityException e) { + throw new RuntimeException(e); + } + } + + /** + * Creates a dummy servlet config which is suitable for initializing a servlet instance. + * @param servletName + * @return a ServletConfig instance initialized with a ServletContext + */ + private ServletConfig getDummyServletConfig(String servletName) { + return new ServletConfig() { + + @Override + public String getServletName() { + return servletName; + } + + @Override + public ServletContext getServletContext() { + return request().getServletContext(); + } + + @Override + public Enumeration getInitParameterNames() { + return null; + } + + @Override + public String getInitParameter(String name) { + return null; + } + }; + } + + private void initServlet() { + try { + servlet.init(getDummyServletConfig(this.servlet.getClass().getSimpleName())); + } catch (ServletException e) { + throw new RuntimeException(e); + } + } +} diff --git a/tez-common/src/main/java/org/apache/tez/common/web/ServletToControllerAdapters.java b/tez-common/src/main/java/org/apache/tez/common/web/ServletToControllerAdapters.java new file mode 100644 index 0000000000..35ca1b6408 --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/common/web/ServletToControllerAdapters.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.common.web; + +import javax.servlet.ServletException; + +import org.apache.hadoop.conf.ConfServlet; +import org.apache.hadoop.http.HttpServer2.StackServlet; +import org.apache.hadoop.jmx.JMXJsonServlet; + +public class ServletToControllerAdapters { + public static class JMXJsonServletController extends AbstractServletToControllerAdapter { + public JMXJsonServletController() throws ServletException { + this.servlet = new JMXJsonServlet(); + } + } + + public static class ConfServletController extends AbstractServletToControllerAdapter { + public ConfServletController() throws ServletException { + this.servlet = new ConfServlet(); + } + } + + public static class StackServletController extends AbstractServletToControllerAdapter { + public StackServletController() throws ServletException { + this.servlet = new StackServlet(); + } + } +} diff --git a/tez-common/src/main/java/org/apache/tez/common/web/package-info.java b/tez-common/src/main/java/org/apache/tez/common/web/package-info.java new file mode 100644 index 0000000000..2fbda31fda --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/common/web/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@Private +package org.apache.tez.common.web; + +import org.apache.hadoop.classification.InterfaceAudience.Private; \ No newline at end of file diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml index e8755832f4..bb3d9b976e 100644 --- a/tez-dag/findbugs-exclude.xml +++ b/tez-dag/findbugs-exclude.xml @@ -126,6 +126,13 @@ + + + + + + + diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java index 4cdd1ec9d1..1de62012e7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java @@ -186,4 +186,7 @@ public long getLastHeartbeatTime() { return lastHeartbeatTime.get(); } + public String getWebUIAddress() { + return dagAppMaster.getWebUIAddress(); + } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java index 72cf0d5642..e7e4244568 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java @@ -45,6 +45,8 @@ import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusResponseProto; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusResponseProto; +import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetWebUIAddressRequestProto; +import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetWebUIAddressResponseProto; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionResponseProto; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto; @@ -226,4 +228,10 @@ public GetAMStatusResponseProto getAMStatus(RpcController controller, } } + @Override + public GetWebUIAddressResponseProto getWebUIAddress(RpcController controller, GetWebUIAddressRequestProto request) + throws ServiceException { + String address = real.getWebUIAddress(); + return GetWebUIAddressResponseProto.newBuilder().setWebUiAddress(address).build(); + } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 972fadf854..690e2858d7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -2618,6 +2618,10 @@ private boolean enableWebUIService() { TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE_DEFAULT); } + public String getWebUIAddress() { + return webUIService == null ? null : webUIService.getBaseUrl(); + } + @VisibleForTesting static void parseAllPlugins( List taskSchedulerDescriptors, BiMap taskSchedulerPluginMap, diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java index 1670370187..bf94a73002 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java @@ -23,6 +23,10 @@ import java.net.InetSocketAddress; import org.apache.tez.common.Preconditions; +import org.apache.tez.common.web.ServletToControllerAdapters.ConfServletController; +import org.apache.tez.common.web.ServletToControllerAdapters.JMXJsonServletController; +import org.apache.tez.common.web.ServletToControllerAdapters.StackServletController; + import com.google.inject.name.Names; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +36,7 @@ import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApps; import org.apache.hadoop.yarn.webapp.YarnWebParams; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.AppContext; @@ -51,6 +56,7 @@ public class WebUIService extends AbstractService { private final AppContext context; private TezAMWebApp tezAMWebApp; private WebApp webApp; + private String baseUrl = ""; //url without paths, like http://host:port private String trackingUrl = ""; private String historyUrl = ""; @@ -88,9 +94,16 @@ protected void serviceStart() throws Exception { // certificates, however AM user is not trusted. // ideally the withHttpPolicy should be used, however hadoop 2.2 does not have the api conf.set("yarn.http.policy", "HTTP_ONLY"); + if (conf.get(TezConfiguration.TEZ_AM_WEBSERVICE_PORT_RANGE) == null) { + conf.set(TezConfiguration.TEZ_AM_WEBSERVICE_PORT_RANGE, + TezConfiguration.TEZ_AM_WEBSERVICE_PORT_RANGE_DEFAULT); + LOG.info( + "Using default port range for WebUIService: " + conf.get(TezConfiguration.TEZ_AM_WEBSERVICE_PORT_RANGE)); + } this.webApp = WebApps .$for(this.tezAMWebApp) .with(conf) + .withPortRange(conf, TezConfiguration.TEZ_AM_WEBSERVICE_PORT_RANGE) .start(this.tezAMWebApp); InetSocketAddress address = webApp.getListenerAddress(); if (address != null) { @@ -105,7 +118,8 @@ protected void serviceStart() throws Exception { LOG.warn("Failed to resolve canonical hostname for " + context.getAppMaster().getAppNMHost()); } - trackingUrl = "http://" + hostname + ":" + port + "/ui/"; + baseUrl = "http://" + hostname + ":" + port; + trackingUrl = baseUrl + "/ui/"; LOG.info("Instantiated WebUIService at " + trackingUrl); } } catch (Exception e) { @@ -125,6 +139,10 @@ protected void serviceStop() throws Exception { super.serviceStop(); } + public String getBaseUrl() { + return baseUrl; + } + public String getTrackingURL() { return trackingUrl; } @@ -214,6 +232,9 @@ public void setup() { "getTasksInfo"); route(WS_PREFIX_V2 + pajoin("attemptsInfo", ATTEMPT_ID, DAG_ID), AMWebController.class, "getAttemptsInfo"); + route("/jmx", JMXJsonServletController.class); + route("/conf", ConfServletController.class); + route("/stacks", StackServletController.class); } } } diff --git a/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java b/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java index 16dc2f8c01..c2646bd819 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java @@ -106,4 +106,10 @@ public DAGStatus getDAGStatus(@Nullable Set statusOptions, long timeout) throws IOException, TezException { return getDAGStatus(statusOptions); } + + @Override + public String getWebUIAddress() throws IOException, TezException { + throw new TezException("MRDAGClient.getWebUIAddress is not supported"); + } + } diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAM.java b/tez-tests/src/test/java/org/apache/tez/test/TestAM.java new file mode 100644 index 0000000000..3e8a58cf7a --- /dev/null +++ b/tez-tests/src/test/java/org/apache/tez/test/TestAM.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.test; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configuration.IntegerRanges; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.runtime.library.processor.SleepProcessor; +import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestAM { + + private static final Logger LOG = LoggerFactory.getLogger(TestAM.class); + + private static MiniTezCluster tezCluster; + private static MiniDFSCluster dfsCluster; + + private static Configuration conf = new Configuration(); + private static FileSystem remoteFs; + + private static final String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestAM.class.getName() + "-tmpDir"; + + @BeforeClass + public static void setup() throws IOException { + try { + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); + dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null).build(); + remoteFs = dfsCluster.getFileSystem(); + } catch (IOException io) { + throw new RuntimeException("problem starting mini dfs cluster", io); + } + + if (tezCluster == null) { + tezCluster = new MiniTezCluster(TestAM.class.getName(), 1, 1, 1); + Configuration tezClusterConf = new Configuration(); + tezClusterConf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS + tezClusterConf.setInt("yarn.nodemanager.delete.debug-delay-sec", 20000); + tezClusterConf.setLong(TezConfiguration.TEZ_AM_SLEEP_TIME_BEFORE_EXIT_MILLIS, 1000); + tezClusterConf.set(YarnConfiguration.PROXY_ADDRESS, "localhost"); + tezCluster.init(tezClusterConf); + tezCluster.start(); + } + } + + @AfterClass + public static void tearDown() { + if (tezCluster != null) { + tezCluster.stop(); + tezCluster = null; + } + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } + } + + @Test(timeout = 60000) + public void testAMWebUIService() throws TezException, IOException, InterruptedException { + SleepProcessorConfig spConf = new SleepProcessorConfig(1); + + DAG dag = DAG.create("TezSleepProcessor"); + Vertex vertex = Vertex.create("SleepVertex", + ProcessorDescriptor.create(SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, + Resource.newInstance(1024, 1)); + dag.addVertex(vertex); + + TezConfiguration tezConf = new TezConfiguration(tezCluster.getConfig()); + TezClient tezSession = TezClient.create("TezSleepProcessor", tezConf, false); + tezSession.start(); + + DAGClient dagClient = tezSession.submitDAG(dag); + + DAGStatus dagStatus = dagClient.getDAGStatus(null); + while (!dagStatus.isCompleted()) { + Thread.sleep(500L); + dagStatus = dagClient.getDAGStatus(null); + } + + String webUIAddress = dagClient.getWebUIAddress(); + assertNotNull("getWebUIAddress should return TezAM's web UI address", webUIAddress); + LOG.info("TezAM webUI address: " + webUIAddress); + + checkAddress(webUIAddress + "/jmx"); + checkAddress(webUIAddress + "/conf"); + checkAddress(webUIAddress + "/stacks"); + + URL url = new URL(webUIAddress); + IntegerRanges portRange = conf.getRange(TezConfiguration.TEZ_AM_WEBSERVICE_PORT_RANGE, + TezConfiguration.TEZ_AM_WEBSERVICE_PORT_RANGE_DEFAULT); + assertTrue("WebUIService port should be in the defined range (got: " + url.getPort() + ")", + portRange.getRangeStart() <= url.getPort()); + + tezSession.stop(); + } + + private void checkAddress(String url) { + boolean success = false; + try { + HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection(); + connection.connect(); + success = (connection.getResponseCode() == 200); + } catch (Exception e) { + LOG.error("Error while checking url: " + url, e); + } + assertTrue(url + " should be available", success); + } +}