Skip to content

Commit

Permalink
[SPARK-16711] YarnShuffleService doesn't re-init properly on YARN rol…
Browse files Browse the repository at this point in the history
…ling upgrade
  • Loading branch information
Thomas Graves committed Aug 18, 2016
1 parent b72bb62 commit f0a5c56
Show file tree
Hide file tree
Showing 5 changed files with 302 additions and 129 deletions.
16 changes: 16 additions & 0 deletions common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,22 @@
<artifactId>commons-lang3</artifactId>
</dependency>

<dependency>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
<version>1.8</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>

<!-- Provided dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.util;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import org.fusesource.leveldbjni.JniDBFactory;
import org.fusesource.leveldbjni.internal.NativeDB;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;

/**
* LevelDB utility class available in the network package.
*/
public class LevelDBProvider {
private static final Logger logger = LoggerFactory.getLogger(LevelDBProvider.class);

public static DB initLevelDB(File dbFile, StoreVersion version, ObjectMapper mapper) throws
IOException {
DB tmpDb = null;
if (dbFile != null) {
Options options = new Options();
options.createIfMissing(false);
options.logger(new LevelDBLogger());
try {
tmpDb = JniDBFactory.factory.open(dbFile, options);
} catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
logger.info("Creating state database at " + dbFile);
options.createIfMissing(true);
try {
tmpDb = JniDBFactory.factory.open(dbFile, options);
} catch (NativeDB.DBException dbExc) {
throw new IOException("Unable to create state store", dbExc);
}
} else {
// the leveldb file seems to be corrupt somehow. Lets just blow it away and create a new
// one, so we can keep processing new apps
logger.error("error opening leveldb file {}. Creating new file, will not be able to " +
"recover state for existing applications", dbFile, e);
if (dbFile.isDirectory()) {
for (File f : dbFile.listFiles()) {
if (!f.delete()) {
logger.warn("error deleting {}", f.getPath());
}
}
}
if (!dbFile.delete()) {
logger.warn("error deleting {}", dbFile.getPath());
}
options.createIfMissing(true);
try {
tmpDb = JniDBFactory.factory.open(dbFile, options);
} catch (NativeDB.DBException dbExc) {
throw new IOException("Unable to create state store", dbExc);
}

}
}
// if there is a version mismatch, we throw an exception, which means the service is unusable
checkVersion(tmpDb, version, mapper);
}
return tmpDb;
}

private static class LevelDBLogger implements org.iq80.leveldb.Logger {
private static final Logger LOG = LoggerFactory.getLogger(LevelDBLogger.class);

@Override
public void log(String message) {
LOG.info(message);
}
}

/**
* Simple major.minor versioning scheme. Any incompatible changes should be across major
* versions. Minor version differences are allowed -- meaning we should be able to read
* dbs that are either earlier *or* later on the minor version.
*/
public static void checkVersion(DB db, StoreVersion newversion, ObjectMapper mapper) throws
IOException {
byte[] bytes = db.get(StoreVersion.KEY);
if (bytes == null) {
storeVersion(db, newversion, mapper);
} else {
StoreVersion version = mapper.readValue(bytes, StoreVersion.class);
if (version.major != newversion.major) {
throw new IOException("cannot read state DB with version " + version + ", incompatible " +
"with current version " + newversion);
}
storeVersion(db, newversion, mapper);
}
}

public static void storeVersion(DB db, StoreVersion version, ObjectMapper mapper)
throws IOException {
db.put(StoreVersion.KEY, mapper.writeValueAsBytes(version));
}

public static class StoreVersion {

final static byte[] KEY = "StoreVersion".getBytes(Charsets.UTF_8);

public final int major;
public final int minor;

@JsonCreator
public StoreVersion(@JsonProperty("major") int major, @JsonProperty("minor") int minor) {
this.major = major;
this.minor = minor;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

StoreVersion that = (StoreVersion) o;

return major == that.major && minor == that.minor;
}

@Override
public int hashCode() {
int result = major;
result = 31 * result + minor;
return result;
}
}

}
16 changes: 0 additions & 16 deletions common/network-shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,27 +42,11 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
<version>1.8</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>

<!-- Provided dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.LevelDBProvider;
import org.apache.spark.network.util.LevelDBProvider.StoreVersion;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.NettyUtils;
import org.apache.spark.network.util.TransportConf;
Expand Down Expand Up @@ -114,52 +116,10 @@ public ShuffleIndexInformation load(File file) throws IOException {
};
shuffleIndexCache = CacheBuilder.newBuilder()
.maximumSize(indexCacheEntries).build(indexCacheLoader);
if (registeredExecutorFile != null) {
Options options = new Options();
options.createIfMissing(false);
options.logger(new LevelDBLogger());
DB tmpDb;
try {
tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
} catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
logger.info("Creating state database at " + registeredExecutorFile);
options.createIfMissing(true);
try {
tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
} catch (NativeDB.DBException dbExc) {
throw new IOException("Unable to create state store", dbExc);
}
} else {
// the leveldb file seems to be corrupt somehow. Lets just blow it away and create a new
// one, so we can keep processing new apps
logger.error("error opening leveldb file {}. Creating new file, will not be able to " +
"recover state for existing applications", registeredExecutorFile, e);
if (registeredExecutorFile.isDirectory()) {
for (File f : registeredExecutorFile.listFiles()) {
if (!f.delete()) {
logger.warn("error deleting {}", f.getPath());
}
}
}
if (!registeredExecutorFile.delete()) {
logger.warn("error deleting {}", registeredExecutorFile.getPath());
}
options.createIfMissing(true);
try {
tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
} catch (NativeDB.DBException dbExc) {
throw new IOException("Unable to create state store", dbExc);
}

}
}
// if there is a version mismatch, we throw an exception, which means the service is unusable
checkVersion(tmpDb);
executors = reloadRegisteredExecutors(tmpDb);
db = tmpDb;
db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper);
if (db != null) {
executors = reloadRegisteredExecutors(db);
} else {
db = null;
executors = Maps.newConcurrentMap();
}
this.directoryCleaner = directoryCleaner;
Expand Down Expand Up @@ -384,76 +344,11 @@ static ConcurrentMap<AppExecId, ExecutorShuffleInfo> reloadRegisteredExecutors(D
break;
}
AppExecId id = parseDbAppExecKey(key);
logger.info("Reloading registered executors: " + id.toString());
ExecutorShuffleInfo shuffleInfo = mapper.readValue(e.getValue(), ExecutorShuffleInfo.class);
registeredExecutors.put(id, shuffleInfo);
}
}
return registeredExecutors;
}

private static class LevelDBLogger implements org.iq80.leveldb.Logger {
private static final Logger LOG = LoggerFactory.getLogger(LevelDBLogger.class);

@Override
public void log(String message) {
LOG.info(message);
}
}

/**
* Simple major.minor versioning scheme. Any incompatible changes should be across major
* versions. Minor version differences are allowed -- meaning we should be able to read
* dbs that are either earlier *or* later on the minor version.
*/
private static void checkVersion(DB db) throws IOException {
byte[] bytes = db.get(StoreVersion.KEY);
if (bytes == null) {
storeVersion(db);
} else {
StoreVersion version = mapper.readValue(bytes, StoreVersion.class);
if (version.major != CURRENT_VERSION.major) {
throw new IOException("cannot read state DB with version " + version + ", incompatible " +
"with current version " + CURRENT_VERSION);
}
storeVersion(db);
}
}

private static void storeVersion(DB db) throws IOException {
db.put(StoreVersion.KEY, mapper.writeValueAsBytes(CURRENT_VERSION));
}


public static class StoreVersion {

static final byte[] KEY = "StoreVersion".getBytes(StandardCharsets.UTF_8);

public final int major;
public final int minor;

@JsonCreator public StoreVersion(
@JsonProperty("major") int major,
@JsonProperty("minor") int minor) {
this.major = major;
this.minor = minor;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

StoreVersion that = (StoreVersion) o;

return major == that.major && minor == that.minor;
}

@Override
public int hashCode() {
int result = major;
result = 31 * result + minor;
return result;
}
}

}
Loading

0 comments on commit f0a5c56

Please sign in to comment.