Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEZ-4352: Add a web ui interface for TezChild #248

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,27 @@ public TezConfiguration(boolean loadDefaults) {
@ConfigurationProperty
public static final String TEZ_MDC_CUSTOM_KEYS_CONF_PROPS = TEZ_MDC_CUSTOM_KEYS + ".conf.props";

/**
* String value
* Whether to start web ui service in task processes.
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty(type="boolean")
public static final String TEZ_TASK_WEBSERVICE_ENABLE = TEZ_TASK_PREFIX
+ "webservice.enable";
public static final boolean TEZ_TASK_WEBSERVICE_ENABLE_DEFAULT = false;

/**
* String value. Range of ports that the task container can use for the WebUIService. Leave blank
* to use all possible ports. Expert level setting. It's hadoop standard range configuration.
* For example 50051-55000
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty(type = "string")
public static final String TEZ_TASK_WEBSERVICE_PORT_RANGE = TEZ_AM_PREFIX + "webservice.port-range";

public static final String TEZ_TASK_WEBSERVICE_PORT_RANGE_DEFAULT = "50051-55000";

/**
* double value. Represents ratio of unique failed outputs / number of consumer
* tasks. When this condition or value mentioned in {@link
Expand Down Expand Up @@ -2054,7 +2075,7 @@ public TezConfiguration(boolean loadDefaults) {
* For example 50000-50050,50100-50200
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty(type="boolean")
@ConfigurationProperty(type="string")
public static final String TEZ_AM_WEBSERVICE_PORT_RANGE = TEZ_AM_PREFIX
+ "tez-ui.webservice.port-range";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ public interface ExecutionContext {
* Get the hostname on which the JVM is running.
* @return the hostname
*/
public String getHostName();
String getHostName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
public class ExecutionContextImpl implements ExecutionContext {

private final String hostname;
private String containerId = null;

public ExecutionContextImpl(String hostname) {
this.hostname = hostname;
Expand All @@ -28,4 +29,13 @@ public ExecutionContextImpl(String hostname) {
public String getHostName() {
return hostname;
}

public ExecutionContext withContainerId(String containerId) {
this.containerId = containerId;
return this;
}

public String getContainerId() {
return containerId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
import org.apache.tez.runtime.hook.TezTaskAttemptHook;
import org.apache.tez.runtime.internals.api.TaskReporterInterface;
import org.apache.tez.runtime.web.TezChildWebUIService;
import org.apache.tez.util.LoggingUtils;

import org.apache.tez.util.TezRuntimeShutdownHandler;
Expand Down Expand Up @@ -133,6 +134,8 @@ public class TezChild {
private final TezExecutors sharedExecutor;
private ThreadLocalMap mdcContext;

private TezChildWebUIService webUIService;

public TezChild(Configuration conf, String host, int port, String containerIdentifier,
String tokenIdentifier, int appAttemptNumber, String workingDir, String[] localDirs,
Map<String, String> serviceProviderEnvMap,
Expand Down Expand Up @@ -209,6 +212,9 @@ public TezTaskUmbilicalProtocol run() throws Exception {
ownUmbilical = false;
}
TezCommonUtils.logCredentials(LOG, credentials, "tezChildInit");
if (isWebUIServiceEnabled(conf)) {
this.webUIService = new TezChildWebUIService(conf, executionContext).start();
}
}

public ContainerExecutionResult run() throws IOException, InterruptedException, TezException {
Expand Down Expand Up @@ -437,12 +443,20 @@ public void shutdown() {
if (ownUmbilical) {
RPC.stopProxy(umbilical);
}
if (webUIService != null) {
webUIService.stop();
}
}

TezRuntimeShutdownHandler.shutdown();
LOG.info("TezChild shutdown finished");
}

private boolean isWebUIServiceEnabled(Configuration conf) {
return conf.getBoolean(TezConfiguration.TEZ_TASK_WEBSERVICE_ENABLE,
TezConfiguration.TEZ_TASK_WEBSERVICE_ENABLE_DEFAULT);
}

public static class ContainerExecutionResult {
public static enum ExitStatus {
SUCCESS(0),
Expand Down Expand Up @@ -559,7 +573,9 @@ public static void main(String[] args) throws IOException, InterruptedException,

TezChild tezChild = newTezChild(defaultConf, host, port, containerIdentifier,
tokenIdentifier, attemptNumber, localDirs, System.getenv(Environment.PWD.name()),
System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())),
System.getenv(), pid,
new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name()))
.withContainerId(System.getenv(Environment.CONTAINER_ID.name())),
credentials, Runtime.getRuntime().maxMemory(), System
.getenv(ApplicationConstants.Environment.USER.toString()), null, true, hadoopShim);
ContainerExecutionResult result = tezChild.run();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.runtime.web;

import java.io.PrintWriter;

import org.apache.hadoop.yarn.webapp.Controller;
import org.apache.hadoop.yarn.webapp.MimeType;
import org.apache.hadoop.yarn.webapp.View;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;

import com.google.inject.Inject;

public class TezChildWebController extends Controller {

@Inject
public TezChildWebController(RequestContext requestContext) {
super(requestContext);
}

@Override
public void index() {
ui();
}

public void ui() {
render(StaticTezChildView.class);
}

public static class StaticTezChildView extends View {
@Inject
private ExecutionContextImpl executionContext;

@Override
public void render() {
response().setContentType(MimeType.HTML);
PrintWriter pw = writer();
pw.write("<html><head><meta charset=\\\"utf-8\\\"><title>TezChild UI</title>");
pw.write("</head><body>");
pw.write(String.format("<h1>TezChild UI</h1> <h2>%s, %s</h2> %s :: %s :: %s", executionContext.getHostName(),
executionContext.getContainerId(), getLink("jmx"), getLink("conf"), getLink("stacks")));
pw.write("</body></html>");
pw.flush();
}

private String getLink(String path) {
return "<a href=\"/" + path + "\">" + path + "</a>";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.web;

import java.net.InetSocketAddress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.tez.common.web.ServletToControllerAdapters.ConfServletController;
import org.apache.tez.common.web.ServletToControllerAdapters.JMXJsonServletController;
import org.apache.tez.common.web.ServletToControllerAdapters.StackServletController;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TezChildWebUIService {
private static final Logger LOG = LoggerFactory.getLogger(TezChildWebUIService.class);

private Configuration conf;
private ExecutionContext executionContext;
private TezChildWebApp tezChildWebApp;
private WebApp webApp;
private String baseUrl = ""; //url without paths, like http://host:port

public TezChildWebUIService(Configuration conf, ExecutionContext executionContext) {
this.tezChildWebApp = new TezChildWebApp(executionContext);
this.conf = conf;
this.executionContext = executionContext;
}

public TezChildWebUIService start() {
try {
if (conf.get(TezConfiguration.TEZ_TASK_WEBSERVICE_PORT_RANGE) == null) {
conf.set(TezConfiguration.TEZ_TASK_WEBSERVICE_PORT_RANGE,
TezConfiguration.TEZ_TASK_WEBSERVICE_PORT_RANGE_DEFAULT);
LOG.info(
"Using default port range for WebUIService: " + conf.get(TezConfiguration.TEZ_TASK_WEBSERVICE_PORT_RANGE));
}
this.webApp = WebApps.$for(this.tezChildWebApp).with(conf)
.withPortRange(conf, TezConfiguration.TEZ_TASK_WEBSERVICE_PORT_RANGE).start(this.tezChildWebApp);
InetSocketAddress address = webApp.getListenerAddress();
if (address != null) {
String hostname = executionContext.getHostName();
InetSocketAddress bindAddress = NetUtils.createSocketAddrForHost(hostname, address.getPort());
final int port = address.getPort();
if (bindAddress.getAddress() != null && bindAddress.getAddress().getCanonicalHostName() != null) {
hostname = bindAddress.getAddress().getCanonicalHostName();
} else {
LOG.warn("Failed to resolve canonical hostname for " + hostname);
}
baseUrl = String.format("http://%s:%d", hostname, port);
LOG.info("Instantiated TezChild WebUIService at " + baseUrl + "/ui");
}
} catch (Exception e) {
LOG.error("TezChild WebUIService failed to start.", e);
throw new TezUncheckedException(e);
}
return this;
}

public void stop() {
if (this.webApp != null) {
LOG.debug("Stopping WebApp");
this.webApp.stop();
}
}

private static class TezChildWebApp extends WebApp {
private ExecutionContext executionContext;

TezChildWebApp(ExecutionContext executionContext) {
this.executionContext = executionContext;
}

@Override
public void setup() {
bind(ExecutionContextImpl.class).toInstance((ExecutionContextImpl) executionContext);
route("/", TezChildWebController.class, "ui");
route("/ui", TezChildWebController.class, "ui");
route("/jmx", JMXJsonServletController.class);
route("/conf", ConfServletController.class);
route("/stacks", StackServletController.class);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.Private
package org.apache.tez.runtime.web;
import org.apache.hadoop.classification.InterfaceAudience;
42 changes: 42 additions & 0 deletions tez-tests/src/test/java/org/apache/tez/test/TestAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,48 @@ public void testAMWebUIService() throws TezException, IOException, InterruptedEx
tezSession.stop();
}

@Test(timeout = 60000)
public void testTaskWebUIService() throws TezException, IOException, InterruptedException {
SleepProcessorConfig spConf = new SleepProcessorConfig(1);

DAG dag = DAG.create("TezSleepProcessor");
Vertex vertex = Vertex.create("SleepVertex",
ProcessorDescriptor.create(SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
Resource.newInstance(1024, 1));
dag.addVertex(vertex);

TezConfiguration tezConf = new TezConfiguration(tezCluster.getConfig());
tezConf.setBoolean(TezConfiguration.TEZ_TASK_WEBSERVICE_ENABLE, true);
String tezTaskWebUIServicePort = "50051";
tezConf.set(TezConfiguration.TEZ_TASK_WEBSERVICE_PORT_RANGE, tezTaskWebUIServicePort);

TezClient tezSession = TezClient.create("TezSleepProcessor", tezConf, false);
tezSession.start();

DAGClient dagClient = tezSession.submitDAG(dag);

DAGStatus dagStatus = dagClient.getDAGStatus(null);
while (!dagStatus.isCompleted()) {
Thread.sleep(500L);
dagStatus = dagClient.getDAGStatus(null);
}

// host: this is a unit test, we can assume that task container runs on the same host as the am
// port: we expect it to be what we configured
String amWebUIAddress = dagClient.getWebUIAddress();
URL amWebUIAddressUrl = new URL(amWebUIAddress);
URL taskWebUIAddress = new URL(amWebUIAddressUrl.getProtocol(), amWebUIAddressUrl.getHost(),
Integer.parseInt(tezTaskWebUIServicePort), "");

LOG.info("TezTask webUI address: " + taskWebUIAddress);

checkAddress(taskWebUIAddress + "/jmx");
checkAddress(taskWebUIAddress + "/conf");
checkAddress(taskWebUIAddress + "/stacks");

tezSession.stop();
}

private void checkAddress(String url) {
checkAddress(url, 200);
}
Expand Down
Loading