From 0d7a79ce139daa10bd929be1c499240dc2b04dac Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Mon, 31 Oct 2022 08:18:34 +0100 Subject: [PATCH] TEZ-4352: Add a web ui interface for TezChild --- .../apache/tez/dag/api/TezConfiguration.java | 23 +++- .../tez/runtime/api/ExecutionContext.java | 2 +- .../api/impl/ExecutionContextImpl.java | 10 ++ .../org/apache/tez/runtime/task/TezChild.java | 18 ++- .../runtime/web/TezChildWebController.java | 66 +++++++++++ .../tez/runtime/web/TezChildWebUIService.java | 105 ++++++++++++++++++ .../apache/tez/runtime/web/package-info.java | 20 ++++ .../test/java/org/apache/tez/test/TestAM.java | 42 +++++++ 8 files changed, 283 insertions(+), 3 deletions(-) create mode 100644 tez-runtime-internals/src/main/java/org/apache/tez/runtime/web/TezChildWebController.java create mode 100644 tez-runtime-internals/src/main/java/org/apache/tez/runtime/web/TezChildWebUIService.java create mode 100644 tez-runtime-internals/src/main/java/org/apache/tez/runtime/web/package-info.java diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 8862f4b7d6..353aef40ae 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -305,6 +305,27 @@ public TezConfiguration(boolean loadDefaults) { @ConfigurationProperty public static final String TEZ_MDC_CUSTOM_KEYS_CONF_PROPS = TEZ_MDC_CUSTOM_KEYS + ".conf.props"; + /** + * String value + * Whether to start web ui service in task processes. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="boolean") + public static final String TEZ_TASK_WEBSERVICE_ENABLE = TEZ_TASK_PREFIX + + "webservice.enable"; + public static final boolean TEZ_TASK_WEBSERVICE_ENABLE_DEFAULT = false; + + /** + * String value. Range of ports that the task container can use for the WebUIService. Leave blank + * to use all possible ports. Expert level setting. It's hadoop standard range configuration. + * For example 50051-55000 + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type = "string") + public static final String TEZ_TASK_WEBSERVICE_PORT_RANGE = TEZ_AM_PREFIX + "webservice.port-range"; + + public static final String TEZ_TASK_WEBSERVICE_PORT_RANGE_DEFAULT = "50051-55000"; + /** * double value. Represents ratio of unique failed outputs / number of consumer * tasks. When this condition or value mentioned in {@link @@ -2054,7 +2075,7 @@ public TezConfiguration(boolean loadDefaults) { * For example 50000-50050,50100-50200 */ @ConfigurationScope(Scope.AM) - @ConfigurationProperty(type="boolean") + @ConfigurationProperty(type="string") public static final String TEZ_AM_WEBSERVICE_PORT_RANGE = TEZ_AM_PREFIX + "tez-ui.webservice.port-range"; diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/ExecutionContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/ExecutionContext.java index fa9a47ff0d..cb970b3307 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/ExecutionContext.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/ExecutionContext.java @@ -28,5 +28,5 @@ public interface ExecutionContext { * Get the hostname on which the JVM is running. * @return the hostname */ - public String getHostName(); + String getHostName(); } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/ExecutionContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/ExecutionContextImpl.java index 6037c7b932..84ede80ee4 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/ExecutionContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/ExecutionContextImpl.java @@ -19,6 +19,7 @@ public class ExecutionContextImpl implements ExecutionContext { private final String hostname; + private String containerId = null; public ExecutionContextImpl(String hostname) { this.hostname = hostname; @@ -28,4 +29,13 @@ public ExecutionContextImpl(String hostname) { public String getHostName() { return hostname; } + + public ExecutionContext withContainerId(String containerId) { + this.containerId = containerId; + return this; + } + + public String getContainerId() { + return containerId; + } } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index ed14bd880c..efaa19b9c2 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -75,6 +75,7 @@ import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; import org.apache.tez.runtime.hook.TezTaskAttemptHook; import org.apache.tez.runtime.internals.api.TaskReporterInterface; +import org.apache.tez.runtime.web.TezChildWebUIService; import org.apache.tez.util.LoggingUtils; import org.apache.tez.util.TezRuntimeShutdownHandler; @@ -133,6 +134,8 @@ public class TezChild { private final TezExecutors sharedExecutor; private ThreadLocalMap mdcContext; + private TezChildWebUIService webUIService; + public TezChild(Configuration conf, String host, int port, String containerIdentifier, String tokenIdentifier, int appAttemptNumber, String workingDir, String[] localDirs, Map serviceProviderEnvMap, @@ -209,6 +212,9 @@ public TezTaskUmbilicalProtocol run() throws Exception { ownUmbilical = false; } TezCommonUtils.logCredentials(LOG, credentials, "tezChildInit"); + if (isWebUIServiceEnabled(conf)) { + this.webUIService = new TezChildWebUIService(conf, executionContext).start(); + } } public ContainerExecutionResult run() throws IOException, InterruptedException, TezException { @@ -437,12 +443,20 @@ public void shutdown() { if (ownUmbilical) { RPC.stopProxy(umbilical); } + if (webUIService != null) { + webUIService.stop(); + } } TezRuntimeShutdownHandler.shutdown(); LOG.info("TezChild shutdown finished"); } + private boolean isWebUIServiceEnabled(Configuration conf) { + return conf.getBoolean(TezConfiguration.TEZ_TASK_WEBSERVICE_ENABLE, + TezConfiguration.TEZ_TASK_WEBSERVICE_ENABLE_DEFAULT); + } + public static class ContainerExecutionResult { public static enum ExitStatus { SUCCESS(0), @@ -559,7 +573,9 @@ public static void main(String[] args) throws IOException, InterruptedException, TezChild tezChild = newTezChild(defaultConf, host, port, containerIdentifier, tokenIdentifier, attemptNumber, localDirs, System.getenv(Environment.PWD.name()), - System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())), + System.getenv(), pid, + new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())) + .withContainerId(System.getenv(Environment.CONTAINER_ID.name())), credentials, Runtime.getRuntime().maxMemory(), System .getenv(ApplicationConstants.Environment.USER.toString()), null, true, hadoopShim); ContainerExecutionResult result = tezChild.run(); diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/web/TezChildWebController.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/web/TezChildWebController.java new file mode 100644 index 0000000000..bc00ebab6c --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/web/TezChildWebController.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.web; + +import java.io.PrintWriter; + +import org.apache.hadoop.yarn.webapp.Controller; +import org.apache.hadoop.yarn.webapp.MimeType; +import org.apache.hadoop.yarn.webapp.View; +import org.apache.tez.runtime.api.impl.ExecutionContextImpl; + +import com.google.inject.Inject; + +public class TezChildWebController extends Controller { + + @Inject + public TezChildWebController(RequestContext requestContext) { + super(requestContext); + } + + @Override + public void index() { + ui(); + } + + public void ui() { + render(StaticTezChildView.class); + } + + public static class StaticTezChildView extends View { + @Inject + private ExecutionContextImpl executionContext; + + @Override + public void render() { + response().setContentType(MimeType.HTML); + PrintWriter pw = writer(); + pw.write("TezChild UI"); + pw.write(""); + pw.write(String.format("

TezChild UI

%s, %s

%s :: %s :: %s", executionContext.getHostName(), + executionContext.getContainerId(), getLink("jmx"), getLink("conf"), getLink("stacks"))); + pw.write(""); + pw.flush(); + } + + private String getLink(String path) { + return "" + path + ""; + } + } +} diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/web/TezChildWebUIService.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/web/TezChildWebUIService.java new file mode 100644 index 0000000000..31b8d3021a --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/web/TezChildWebUIService.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.web; + +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.yarn.webapp.WebApp; +import org.apache.hadoop.yarn.webapp.WebApps; +import org.apache.tez.common.web.ServletToControllerAdapters.ConfServletController; +import org.apache.tez.common.web.ServletToControllerAdapters.JMXJsonServletController; +import org.apache.tez.common.web.ServletToControllerAdapters.StackServletController; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.runtime.api.ExecutionContext; +import org.apache.tez.runtime.api.impl.ExecutionContextImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TezChildWebUIService { + private static final Logger LOG = LoggerFactory.getLogger(TezChildWebUIService.class); + + private Configuration conf; + private ExecutionContext executionContext; + private TezChildWebApp tezChildWebApp; + private WebApp webApp; + private String baseUrl = ""; //url without paths, like http://host:port + + public TezChildWebUIService(Configuration conf, ExecutionContext executionContext) { + this.tezChildWebApp = new TezChildWebApp(executionContext); + this.conf = conf; + this.executionContext = executionContext; + } + + public TezChildWebUIService start() { + try { + if (conf.get(TezConfiguration.TEZ_TASK_WEBSERVICE_PORT_RANGE) == null) { + conf.set(TezConfiguration.TEZ_TASK_WEBSERVICE_PORT_RANGE, + TezConfiguration.TEZ_TASK_WEBSERVICE_PORT_RANGE_DEFAULT); + LOG.info( + "Using default port range for WebUIService: " + conf.get(TezConfiguration.TEZ_TASK_WEBSERVICE_PORT_RANGE)); + } + this.webApp = WebApps.$for(this.tezChildWebApp).with(conf) + .withPortRange(conf, TezConfiguration.TEZ_TASK_WEBSERVICE_PORT_RANGE).start(this.tezChildWebApp); + InetSocketAddress address = webApp.getListenerAddress(); + if (address != null) { + String hostname = executionContext.getHostName(); + InetSocketAddress bindAddress = NetUtils.createSocketAddrForHost(hostname, address.getPort()); + final int port = address.getPort(); + if (bindAddress.getAddress() != null && bindAddress.getAddress().getCanonicalHostName() != null) { + hostname = bindAddress.getAddress().getCanonicalHostName(); + } else { + LOG.warn("Failed to resolve canonical hostname for " + hostname); + } + baseUrl = String.format("http://%s:%d", hostname, port); + LOG.info("Instantiated TezChild WebUIService at " + baseUrl + "/ui"); + } + } catch (Exception e) { + LOG.error("TezChild WebUIService failed to start.", e); + throw new TezUncheckedException(e); + } + return this; + } + + public void stop() { + if (this.webApp != null) { + LOG.debug("Stopping WebApp"); + this.webApp.stop(); + } + } + + private static class TezChildWebApp extends WebApp { + private ExecutionContext executionContext; + + TezChildWebApp(ExecutionContext executionContext) { + this.executionContext = executionContext; + } + + @Override + public void setup() { + bind(ExecutionContextImpl.class).toInstance((ExecutionContextImpl) executionContext); + route("/", TezChildWebController.class, "ui"); + route("/ui", TezChildWebController.class, "ui"); + route("/jmx", JMXJsonServletController.class); + route("/conf", ConfServletController.class); + route("/stacks", StackServletController.class); + } + } +} diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/web/package-info.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/web/package-info.java new file mode 100644 index 0000000000..17e6cc48af --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/web/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Private +package org.apache.tez.runtime.web; +import org.apache.hadoop.classification.InterfaceAudience; diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAM.java b/tez-tests/src/test/java/org/apache/tez/test/TestAM.java index a31fa7e8ad..c92ee3b9ab 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestAM.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestAM.java @@ -140,6 +140,48 @@ public void testAMWebUIService() throws TezException, IOException, InterruptedEx tezSession.stop(); } + @Test(timeout = 60000) + public void testTaskWebUIService() throws TezException, IOException, InterruptedException { + SleepProcessorConfig spConf = new SleepProcessorConfig(1); + + DAG dag = DAG.create("TezSleepProcessor"); + Vertex vertex = Vertex.create("SleepVertex", + ProcessorDescriptor.create(SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, + Resource.newInstance(1024, 1)); + dag.addVertex(vertex); + + TezConfiguration tezConf = new TezConfiguration(tezCluster.getConfig()); + tezConf.setBoolean(TezConfiguration.TEZ_TASK_WEBSERVICE_ENABLE, true); + String tezTaskWebUIServicePort = "50051"; + tezConf.set(TezConfiguration.TEZ_TASK_WEBSERVICE_PORT_RANGE, tezTaskWebUIServicePort); + + TezClient tezSession = TezClient.create("TezSleepProcessor", tezConf, false); + tezSession.start(); + + DAGClient dagClient = tezSession.submitDAG(dag); + + DAGStatus dagStatus = dagClient.getDAGStatus(null); + while (!dagStatus.isCompleted()) { + Thread.sleep(500L); + dagStatus = dagClient.getDAGStatus(null); + } + + // host: this is a unit test, we can assume that task container runs on the same host as the am + // port: we expect it to be what we configured + String amWebUIAddress = dagClient.getWebUIAddress(); + URL amWebUIAddressUrl = new URL(amWebUIAddress); + URL taskWebUIAddress = new URL(amWebUIAddressUrl.getProtocol(), amWebUIAddressUrl.getHost(), + Integer.parseInt(tezTaskWebUIServicePort), ""); + + LOG.info("TezTask webUI address: " + taskWebUIAddress); + + checkAddress(taskWebUIAddress + "/jmx"); + checkAddress(taskWebUIAddress + "/conf"); + checkAddress(taskWebUIAddress + "/stacks"); + + tezSession.stop(); + } + private void checkAddress(String url) { checkAddress(url, 200); }