Skip to content

Commit

Permalink
Update Kitten to run against Hadoop 2.2.0.
Browse files Browse the repository at this point in the history
  • Loading branch information
Josh Wills committed Nov 30, 2013
1 parent 9faaf1e commit 11527be
Show file tree
Hide file tree
Showing 31 changed files with 347 additions and 648 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
.settings
target
build
*.iml
.idea
8 changes: 2 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,11 @@ To build Kitten, run:

from this directory. That will build the common, master, and client subprojects.

Kitten is developed against CDH4, which ships with an experimental YARN
module. [Cloudera Manager](https://ccp.cloudera.com/display/SUPPORT/Cloudera+Manager+Downloads)
is the easiest way to get a Hadoop cluster with YARN up and running.

The `java/examples/distshell` directory contains an example configuration file
that can be used to run the Kitten version of the Distributed Shell example
application that ships with Hadoop 2.0.0. To run the example, execute:
application that ships with Hadoop 2.2.0. To run the example, execute:

hadoop jar kitten-client-0.1.0-jar-with-dependencies.jar distshell.lua distshell
hadoop jar kitten-client-0.2.0-jar-with-dependencies.jar distshell.lua distshell

where the jar file is in the `java/client/target` directory. You should also copy the
application master jar file from `java/master/target` to a directory where it can be
Expand Down
2 changes: 1 addition & 1 deletion java/client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>kitten-java</artifactId>
<groupId>com.cloudera.kitten</groupId>
<version>0.1.0</version>
<version>0.2.0</version>
<relativePath>../</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public String getApplicationName() {

@Override
public String getQueue() {
return env.isNil(LuaFields.QUEUE) ? "" : env.getString(LuaFields.QUEUE);
return env.isNil(LuaFields.QUEUE) ? "default" : env.getString(LuaFields.QUEUE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,30 @@
*/
package com.cloudera.kitten.client.service;

import java.net.InetSocketAddress;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.ipc.YarnRPC;

import com.cloudera.kitten.MasterConnectionFactory;
import com.google.common.base.Preconditions;

public class ApplicationsManagerConnectionFactory implements
MasterConnectionFactory<ClientRMProtocol> {
public class YarnClientFactory implements MasterConnectionFactory<YarnClient> {

private static final Log LOG = LogFactory.getLog(ApplicationsManagerConnectionFactory.class);
private static final Log LOG = LogFactory.getLog(YarnClientFactory.class);

private final Configuration conf;
private final YarnRPC rpc;

public ApplicationsManagerConnectionFactory(Configuration conf) {

public YarnClientFactory(Configuration conf) {
this.conf = Preconditions.checkNotNull(conf);
this.rpc = YarnRPC.create(conf);
}

@Override
public ClientRMProtocol connect() {
YarnConfiguration yarnConf = new YarnConfiguration(conf);
InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS));
LOG.info("Connecting to ResourceManager at: " + rmAddress);
return ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, conf));
public YarnClient connect() {
YarnClient client = YarnClient.createYarnClient();
client.init(new YarnConfiguration(conf));
client.start();
return client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,23 @@
*/
package com.cloudera.kitten.client.service;

import java.io.IOException;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.exceptions.YarnException;

import com.cloudera.kitten.ContainerLaunchContextFactory;
import com.cloudera.kitten.ContainerLaunchParameters;
Expand All @@ -58,48 +54,45 @@ public class YarnClientServiceImpl extends AbstractScheduledService
private static final Log LOG = LogFactory.getLog(YarnClientServiceImpl.class);

private final YarnClientParameters parameters;
private final MasterConnectionFactory<ClientRMProtocol> applicationsManagerFactory;
private final MasterConnectionFactory<YarnClient> yarnClientFactory;
private final Stopwatch stopwatch;

private ClientRMProtocol applicationsManager;
private YarnClient yarnClient;
private ApplicationId applicationId;
private ApplicationReport finalReport;
private boolean timeout = false;

public YarnClientServiceImpl(YarnClientParameters params) {
this(params, new ApplicationsManagerConnectionFactory(params.getConfiguration()),
this(params, new YarnClientFactory(params.getConfiguration()),
new Stopwatch());
}

public YarnClientServiceImpl(YarnClientParameters parameters,
MasterConnectionFactory<ClientRMProtocol> applicationsManagerFactory,
MasterConnectionFactory<YarnClient> yarnClientFactory,
Stopwatch stopwatch) {
this.parameters = Preconditions.checkNotNull(parameters);
this.applicationsManagerFactory = applicationsManagerFactory;
this.yarnClientFactory = yarnClientFactory;
this.stopwatch = stopwatch;
}

@Override
protected void startUp() {
this.applicationsManager = applicationsManagerFactory.connect();
this.yarnClient = yarnClientFactory.connect();
YarnClientApplication clientApp = getNewApplication();
GetNewApplicationResponse newApp = clientApp.getNewApplicationResponse();
ContainerLaunchContextFactory clcFactory = new ContainerLaunchContextFactory(newApp.getMaximumResourceCapability());

GetNewApplicationResponse newApp = getNewApplication();
this.applicationId = newApp.getApplicationId();
ContainerLaunchContextFactory clcFactory = new ContainerLaunchContextFactory(
newApp.getMinimumResourceCapability(), newApp.getMaximumResourceCapability());

LOG.info("Setting up application submission context for the application master");
ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
appContext.setApplicationId(applicationId);
ApplicationSubmissionContext appContext = clientApp.getApplicationSubmissionContext();
this.applicationId = appContext.getApplicationId();
appContext.setApplicationName(parameters.getApplicationName());

// Setup the container for the application master.
ContainerLaunchParameters appMasterParams = parameters.getApplicationMasterParameters(applicationId);
ContainerLaunchContext clc = clcFactory.create(appMasterParams);
appContext.setResource(clcFactory.createResource(appMasterParams));
appContext.setAMContainerSpec(clc);
appContext.setUser(appMasterParams.getUser());
appContext.setQueue(parameters.getQueue());
appContext.setPriority(ContainerLaunchContextFactory.createPriority(appMasterParams.getPriority()));
appContext.setPriority(clcFactory.createPriority(appMasterParams.getPriority()));
submitApplication(appContext);

// Make sure we stop the application in the case that it isn't done already.
Expand All @@ -116,25 +109,28 @@ public void run() {
}

private void submitApplication(ApplicationSubmissionContext appContext) {
SubmitApplicationRequest appRequest = Records.newRecord(SubmitApplicationRequest.class);
appRequest.setApplicationSubmissionContext(appContext);

LOG.info("Submitting application to the applications manager");
try {
applicationsManager.submitApplication(appRequest);
} catch (YarnRemoteException e) {
yarnClient.submitApplication(appContext);
} catch (YarnException e) {
LOG.error("Exception thrown submitting application", e);
stop();
}
} catch (IOException e) {
LOG.error("IOException thrown submitting application", e);
stop();
}
}

private GetNewApplicationResponse getNewApplication() {
private YarnClientApplication getNewApplication() {
try {
return applicationsManager.getNewApplication(Records.newRecord(GetNewApplicationRequest.class));
} catch (YarnRemoteException e) {
return yarnClient.createApplication();
} catch (YarnException e) {
LOG.error("Exception thrown getting new application", e);
stop();
return null;
} catch (IOException e) {
stop();
return null;
}
}

Expand All @@ -143,30 +139,34 @@ protected void shutDown() {
if (finalReport != null) {
YarnApplicationState state = finalReport.getYarnApplicationState();
FinalApplicationStatus status = finalReport.getFinalApplicationStatus();
String diagnostics = finalReport.getDiagnostics();
if (YarnApplicationState.FINISHED == state) {
if (FinalApplicationStatus.SUCCEEDED == status) {
LOG.info("Application completed successfully.");
}
else {
LOG.info("Application finished unsuccessfully."
+ " State=" + state.toString() + ", FinalStatus=" + status.toString());
+ " State = " + state.toString() + ", FinalStatus = " + status.toString());
}
}
else if (YarnApplicationState.KILLED == state
|| YarnApplicationState.FAILED == state) {
LOG.info("Application did not complete successfully."
+ " State=" + state.toString() + ", FinalStatus=" + status.toString());
+ " State = " + state.toString() + ", FinalStatus = " + status.toString());
if (diagnostics != null) {
LOG.info("Diagnostics = " + diagnostics);
}
}
} else {
// Otherwise, we need to kill the application, if it was created.
if (applicationId != null) {
LOG.info("Killing application id = " + applicationId);
KillApplicationRequest request = Records.newRecord(KillApplicationRequest.class);
request.setApplicationId(applicationId);
try {
applicationsManager.forceKillApplication(request);
} catch (YarnRemoteException e) {
yarnClient.killApplication(applicationId);
} catch (YarnException e) {
LOG.error("Exception thrown killing application", e);
} catch (IOException e) {
LOG.error("IOException thrown killing application", e);
}
LOG.info("Application was killed.");
}
Expand Down Expand Up @@ -198,14 +198,14 @@ public ApplicationReport getFinalReport() {

@Override
public ApplicationReport getApplicationReport() {
GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class);
reportRequest.setApplicationId(applicationId);
try {
GetApplicationReportResponse response = applicationsManager.getApplicationReport(reportRequest);
return response.getApplicationReport();
} catch (YarnRemoteException e) {
return yarnClient.getApplicationReport(applicationId);
} catch (YarnException e) {
LOG.error("Exception occurred requesting application report", e);
return null;
} catch (IOException e) {
LOG.error("IOException occurred requesting application report", e);
return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public static void setup() throws InterruptedException, IOException {
1, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
conf = yarnCluster.getConfig();
}
try {
Thread.sleep(2000);
Expand All @@ -71,17 +72,20 @@ public void testKittenShell() throws Exception {
String config = "/lua/distshell.lua";

// For the outputs
File tmpFile = File.createTempFile("distshell", ".txt");
tmpFile.deleteOnExit();
File tmpFile = new File("/tmp/distshell.out");
if (tmpFile.exists()) {
tmpFile.delete();
}
//tmpFile.deleteOnExit();

KittenClient client = new KittenClient(
ImmutableMap.<String, Object>of(
"TEST_FILE", tmpFile.getAbsolutePath(),
"PWD", (new File(".")).getAbsolutePath()));
conf.set(LocalDataHelper.APP_BASE_DIR, "file:///tmp/");
client.setConf(conf);

assertEquals(0, client.run(new String[] { config, "distshell" }));
System.out.println("Running...");
assertEquals(0, client.run(new String[]{config, "distshell"}));
assertEquals(12, Files.readLines(tmpFile, Charsets.UTF_8).size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,17 @@

import com.cloudera.kitten.ContainerLaunchParameters;
import com.cloudera.kitten.client.YarnClientParameters;
import com.cloudera.kitten.client.params.lua.LuaYarnClientParameters;
import com.cloudera.kitten.util.LocalDataHelper;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;

public class BasicLuaConfigTest {

Resource clusterMin;
Resource clusterMax;
Configuration conf;

@Before
public void setUp() throws Exception {
clusterMin = Records.newRecord(Resource.class);
clusterMin.setMemory(50);
clusterMax = Records.newRecord(Resource.class);
clusterMax.setMemory(90);
conf = new Configuration();
Expand All @@ -59,14 +55,12 @@ public void testBasicClient() throws Exception {
conf);
assertEquals("Distributed Shell", params.getApplicationName());
assertEquals(86400L, params.getClientTimeoutMillis());
assertEquals("", params.getQueue());
assertEquals("default", params.getQueue());

ContainerLaunchParameters clp = params.getApplicationMasterParameters(null);
assertEquals("josh", clp.getUser());
assertEquals(1, clp.getPriority());
assertEquals(100, clp.getMemory());
// clusterMax = 90 < 100
assertEquals(clusterMax, clp.getContainerResource(clusterMin, clusterMax));
assertEquals(clusterMax, clp.getContainerResource(clusterMax));
Map<String, String> expEnv = ImmutableMap.of(
"zs", "10", "a", "b", "fiz", "faz", "foo", "foo", "biz", "baz");
Map<String, String> actEnv = clp.getEnvironment();
Expand Down
5 changes: 3 additions & 2 deletions java/client/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# ***** Set root logger level to INFO and its only appender to A.
# ***** Set root logger level to WARN and its only appender to A.
log4j.rootLogger=warn, A
log4j.logger.com.cloudera.kitten=info, A
log4j.logger.org.apache.hadoop=error, A
log4j.additivity.com.cloudera.kitten=false

# ***** A is set to be a ConsoleAppender.
log4j.appender.A=org.apache.log4j.ConsoleAppender
Expand Down
5 changes: 5 additions & 0 deletions java/client/src/test/resources/lua/distshell.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,23 @@ CONTAINER_INSTANCES = 2
base_env = {
-- PWD is specified by the test case.
CLASSPATH = table.concat({"${CLASSPATH}", PWD .. "/target/classes/", PWD .. "/target/lib/*"}, ":"),

}

distshell = yarn {
name = "Distributed Shell",
timeout = 60000,
memory = 256,
cores = 1,

master = {
env = base_env,
command = {
base = "java -Xmx128m com.cloudera.kitten.appmaster.ApplicationMaster",
args = { "-conf job.xml", "1> <LOG_DIR>/stdout 2> <LOG_DIR>/stderr" }, -- job.xml contains the client configuration info.
},
resources = {
["log4j.properties"] = {file = PWD .. "/src/test/resources/log4j.properties"}
}
},

Expand Down
2 changes: 1 addition & 1 deletion java/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>kitten-java</artifactId>
<groupId>com.cloudera.kitten</groupId>
<version>0.1.0</version>
<version>0.2.0</version>
<relativePath>../</relativePath>
</parent>

Expand Down
Loading

0 comments on commit 11527be

Please sign in to comment.