Skip to content

Commit

Permalink
move logic to createSecretManager and some review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Graves committed Aug 31, 2016
1 parent 0e39687 commit c4f58e8
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.network.util;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -27,10 +31,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
* LevelDB utility class available in the network package.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,19 @@ public class YarnShuffleService extends AuxiliaryService {
static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";
private static final boolean DEFAULT_STOP_ON_FAILURE = false;

// just for testing when you want to find an open port
@VisibleForTesting
static int boundPort = -1;
private static final ObjectMapper mapper = new ObjectMapper();
private static final String APP_CREDS_KEY_PREFIX = "AppCreds";
private static final LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider
.StoreVersion(1, 0);

// just for integration tests that want to look at this file -- in general not sensible as
// a static
@VisibleForTesting
static YarnShuffleService instance;

// An entity that manages the shuffle secret per application
// This is used only if authentication is enabled
private ShuffleSecretManager secretManager;
Expand All @@ -110,22 +123,7 @@ public class YarnShuffleService extends AuxiliaryService {
@VisibleForTesting
File registeredExecutorFile;

// just for testing when you want to find an open port
@VisibleForTesting
static int boundPort = -1;

private static final ObjectMapper mapper = new ObjectMapper();
private DB db;
private File secretsFile;
private static final String APP_CREDS_KEY_PREFIX = "AppCreds";
private static final LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider
.StoreVersion(1, 0);


// just for integration tests that want to look at this file -- in general not sensible as
// a static
@VisibleForTesting
static YarnShuffleService instance;

public YarnShuffleService() {
super("spark_shuffle");
Expand All @@ -152,11 +150,6 @@ protected void serviceInit(Configuration conf) throws Exception {
boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE);

try {
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
if (authEnabled) {
secretManager = new ShuffleSecretManager();
}

// In case this NM was killed while there were running spark applications, we need to restore
// lost state for the existing executors. We look for an existing file in the NM's local dirs.
// If we don't find one, then we choose a file to use to save the state next time. Even if
Expand All @@ -165,25 +158,15 @@ protected void serviceInit(Configuration conf) throws Exception {
registeredExecutorFile =
new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);

secretsFile = new File(getRecoveryPath().toUri().getPath(), SECRETS_RECOVERY_FILE_NAME);

// Make sure this is protected in case its not in the NM recovery dir
FileSystem fs = FileSystem.getLocal(conf);
fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short)0700));

db = LevelDBProvider.initLevelDB(this.secretsFile, CURRENT_VERSION, mapper);
logger.info("Recovery location is: " + secretsFile.getPath());
if (db != null) {
reloadCreds(db);
}

TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);

// If authentication is enabled, set up the shuffle server to use a
// special RPC handler that filters out unauthenticated fetch requests
List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
if (authEnabled) {
createSecretManager();
bootstraps.add(new SaslServerBootstrap(transportConf, secretManager));
}

Expand All @@ -207,8 +190,17 @@ protected void serviceInit(Configuration conf) throws Exception {
}
}

private void reloadCreds(DB db) throws IOException {
if (isAuthenticationEnabled()) {
private void createSecretManager() throws IOException {
secretManager = new ShuffleSecretManager();
File secretsFile = new File(getRecoveryPath().toUri().getPath(), SECRETS_RECOVERY_FILE_NAME);

// Make sure this is protected in case its not in the NM recovery dir
FileSystem fs = FileSystem.getLocal(_conf);
fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short)0700));

db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper);
logger.info("Recovery location is: " + secretsFile.getPath());
if (db != null) {
logger.info("Going to reload spark shuffle data");
DBIterator itr = db.iterator();
itr.seek(APP_CREDS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
Expand Down

0 comments on commit c4f58e8

Please sign in to comment.