Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into SPARK-28461
Browse files Browse the repository at this point in the history
# Conflicts:
#	docs/sql-migration-guide-upgrade.md
  • Loading branch information
wangyum committed Aug 6, 2019
2 parents bd6b1d2 + bab88c4 commit 141c3d4
Show file tree
Hide file tree
Showing 131 changed files with 1,872 additions and 810 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@

import com.codahale.metrics.MetricSet;

/** Provides an interface for reading shuffle files, either from an Executor or external service. */
public abstract class ShuffleClient implements Closeable {
/**
* Provides an interface for reading both shuffle files and RDD blocks, either from an Executor
* or external service.
*/
public abstract class BlockStoreClient implements Closeable {

/**
* Fetch a sequence of blocks from a remote node asynchronously,
Expand Down Expand Up @@ -51,7 +54,7 @@ public abstract void fetchBlocks(
DownloadFileManager downloadFileManager);

/**
* Get the shuffle MetricsSet from ShuffleClient, this will be used in MetricsSystem to
* Get the shuffle MetricsSet from BlockStoreClient, this will be used in MetricsSystem to
* get the Shuffle related metrics.
*/
public MetricSet shuffleMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,22 @@
import org.apache.spark.network.util.TransportConf;

/**
* RPC Handler for a server which can serve shuffle blocks from outside of an Executor process.
* RPC Handler for a server which can serve both RDD blocks and shuffle blocks from outside
* of an Executor process.
*
* Handles registering executors and opening shuffle or disk persisted RDD blocks from them.
* Blocks are registered with the "one-for-one" strategy, meaning each Transport-layer Chunk
* is equivalent to one block.
*/
public class ExternalShuffleBlockHandler extends RpcHandler {
private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockHandler.class);
public class ExternalBlockHandler extends RpcHandler {
private static final Logger logger = LoggerFactory.getLogger(ExternalBlockHandler.class);

@VisibleForTesting
final ExternalShuffleBlockResolver blockManager;
private final OneForOneStreamManager streamManager;
private final ShuffleMetrics metrics;

public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile)
public ExternalBlockHandler(TransportConf conf, File registeredExecutorFile)
throws IOException {
this(new OneForOneStreamManager(),
new ExternalShuffleBlockResolver(conf, registeredExecutorFile));
Expand All @@ -74,7 +75,7 @@ public ExternalShuffleBlockResolver getBlockResolver() {

/** Enables mocking out the StreamManager and BlockManager. */
@VisibleForTesting
public ExternalShuffleBlockHandler(
public ExternalBlockHandler(
OneForOneStreamManager streamManager,
ExternalShuffleBlockResolver blockManager) {
this.metrics = new ShuffleMetrics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,12 @@
import org.apache.spark.network.util.TransportConf;

/**
* Client for reading shuffle blocks which points to an external (outside of executor) server.
* This is instead of reading shuffle blocks directly from other executors (via
* BlockTransferService), which has the downside of losing the shuffle data if we lose the
* executors.
* Client for reading both RDD blocks and shuffle blocks which points to an external
* (outside of executor) server. This is instead of reading blocks directly from other executors
* (via BlockTransferService), which has the downside of losing the data if we lose the executors.
*/
public class ExternalShuffleClient extends ShuffleClient {
private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleClient.class);
public class ExternalBlockStoreClient extends BlockStoreClient {
private static final Logger logger = LoggerFactory.getLogger(ExternalBlockStoreClient.class);

private final TransportConf conf;
private final boolean authEnabled;
Expand All @@ -61,7 +60,7 @@ public class ExternalShuffleClient extends ShuffleClient {
* Creates an external shuffle client, with SASL optionally enabled. If SASL is not enabled,
* then secretKeyHolder may be null.
*/
public ExternalShuffleClient(
public ExternalBlockStoreClient(
TransportConf conf,
SecretKeyHolder secretKeyHolder,
boolean authEnabled,
Expand All @@ -77,8 +76,8 @@ protected void checkInit() {
}

/**
* Initializes the ShuffleClient, specifying this Executor's appId.
* Must be called before any other method on the ShuffleClient.
* Initializes the BlockStoreClient, specifying this Executor's appId.
* Must be called before any other method on the BlockStoreClient.
*/
public void init(String appId) {
this.appId = appId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
import io.netty.buffer.Unpooled;

import org.apache.spark.network.protocol.Encodable;
import org.apache.spark.network.shuffle.ExternalBlockHandler;
import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat;

/**
* Messages handled by the {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler}, or
* Messages handled by the {@link ExternalBlockHandler}, or
* by Spark's NettyBlockTransferService.
*
* At a high level:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.TransportServerBootstrap;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.shuffle.ExternalBlockHandler;
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;
import org.apache.spark.network.shuffle.OneForOneBlockFetcher;
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
Expand Down Expand Up @@ -173,7 +173,7 @@ public void testNoSaslServer() {
public void testAppIsolation() throws Exception {
// Start a new server with the correct RPC handler to serve block data.
ExternalShuffleBlockResolver blockResolver = mock(ExternalShuffleBlockResolver.class);
ExternalShuffleBlockHandler blockHandler = new ExternalShuffleBlockHandler(
ExternalBlockHandler blockHandler = new ExternalBlockHandler(
new OneForOneStreamManager(), blockResolver);
TransportServerBootstrap bootstrap = new SaslServerBootstrap(conf, secretKeyHolder);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.apache.spark.network.shuffle.protocol.StreamHandle;
import org.apache.spark.network.shuffle.protocol.UploadBlock;

public class ExternalShuffleBlockHandlerSuite {
public class ExternalBlockHandlerSuite {
TransportClient client = mock(TransportClient.class);

OneForOneStreamManager streamManager;
Expand All @@ -59,7 +59,7 @@ public class ExternalShuffleBlockHandlerSuite {
public void beforeEach() {
streamManager = mock(OneForOneStreamManager.class);
blockResolver = mock(ExternalShuffleBlockResolver.class);
handler = new ExternalShuffleBlockHandler(streamManager, blockResolver);
handler = new ExternalBlockHandler(streamManager, blockResolver);
}

@Test
Expand All @@ -74,7 +74,7 @@ public void testRegisterExecutor() {
verify(callback, times(1)).onSuccess(any(ByteBuffer.class));
verify(callback, never()).onFailure(any(Throwable.class));
// Verify register executor request latency metrics
Timer registerExecutorRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler)
Timer registerExecutorRequestLatencyMillis = (Timer) ((ExternalBlockHandler) handler)
.getAllMetrics()
.getMetrics()
.get("registerExecutorRequestLatencyMillis");
Expand Down Expand Up @@ -168,13 +168,13 @@ private void checkOpenBlocksReceive(BlockTransferMessage msg, ManagedBuffer[] bl
}

private void verifyOpenBlockLatencyMetrics() {
Timer openBlockRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler)
Timer openBlockRequestLatencyMillis = (Timer) ((ExternalBlockHandler) handler)
.getAllMetrics()
.getMetrics()
.get("openBlockRequestLatencyMillis");
assertEquals(1, openBlockRequestLatencyMillis.getCount());
// Verify block transfer metrics
Meter blockTransferRateBytes = (Meter) ((ExternalShuffleBlockHandler) handler)
Meter blockTransferRateBytes = (Meter) ((ExternalBlockHandler) handler)
.getAllMetrics()
.getMetrics()
.get("blockTransferRateBytes");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class ExternalShuffleIntegrationSuite {
// Executor 0 is sort-based
static TestShuffleDataContext dataContext0;

static ExternalShuffleBlockHandler handler;
static ExternalBlockHandler handler;
static TransportServer server;
static TransportConf conf;
static TransportContext transportContext;
Expand Down Expand Up @@ -109,7 +109,7 @@ public static void beforeAll() throws IOException {
config.put("spark.shuffle.io.maxRetries", "0");
config.put(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, "true");
conf = new TransportConf("shuffle", new MapConfigProvider(config));
handler = new ExternalShuffleBlockHandler(
handler = new ExternalBlockHandler(
new OneForOneStreamManager(),
new ExternalShuffleBlockResolver(conf, null) {
@Override
Expand Down Expand Up @@ -176,7 +176,8 @@ private FetchResult fetchBlocks(

final Semaphore requestsRemaining = new Semaphore(0);

try (ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000)) {
try (ExternalBlockStoreClient client = new ExternalBlockStoreClient(
clientConf, null, false, 5000)) {
client.init(APP_ID);
client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
new BlockFetchingListener() {
Expand Down Expand Up @@ -271,7 +272,7 @@ public void testRemoveRddBlocks() throws Exception {
String validBlockIdToRemove = "rdd_" + RDD_ID +"_" + SPLIT_INDEX_VALID_BLOCK_TO_RM;
String missingBlockIdToRemove = "rdd_" + RDD_ID +"_" + SPLIT_INDEX_MISSING_BLOCK_TO_RM;

try (ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, 5000)) {
try (ExternalBlockStoreClient client = new ExternalBlockStoreClient(conf, null, false, 5000)) {
client.init(APP_ID);
Future<Integer> numRemovedBlocks = client.removeBlocks(
TestUtils.getLocalHost(),
Expand Down Expand Up @@ -331,7 +332,7 @@ public void testFetchNoServer() throws Exception {

private static void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo)
throws IOException, InterruptedException {
ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, 5000);
ExternalBlockStoreClient client = new ExternalBlockStoreClient(conf, null, false, 5000);
client.init(APP_ID);
client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(),
executorId, executorInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class ExternalShuffleSecuritySuite {

@Before
public void beforeEach() throws IOException {
transportContext = new TransportContext(conf, new ExternalShuffleBlockHandler(conf, null));
transportContext = new TransportContext(conf, new ExternalBlockHandler(conf, null));
TransportServerBootstrap bootstrap = new SaslServerBootstrap(conf,
new TestSecretKeyHolder("my-app-id", "secret"));
this.server = transportContext.createServer(Arrays.asList(bootstrap));
Expand Down Expand Up @@ -91,7 +91,7 @@ public void testEncryption() throws IOException, InterruptedException {
validate("my-app-id", "secret", true);
}

/** Creates an ExternalShuffleClient and attempts to register with the server. */
/** Creates an ExternalBlockStoreClient and attempts to register with the server. */
private void validate(String appId, String secretKey, boolean encrypt)
throws IOException, InterruptedException {
TransportConf testConf = conf;
Expand All @@ -100,8 +100,8 @@ private void validate(String appId, String secretKey, boolean encrypt)
ImmutableMap.of("spark.authenticate.enableSaslEncryption", "true")));
}

try (ExternalShuffleClient client =
new ExternalShuffleClient(
try (ExternalBlockStoreClient client =
new ExternalBlockStoreClient(
testConf, new TestSecretKeyHolder(appId, secretKey), true, 5000)) {
client.init(appId);
// Registration either succeeds or throws an exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.apache.spark.network.sasl.ShuffleSecretManager;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.TransportServerBootstrap;
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.shuffle.ExternalBlockHandler;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.yarn.util.HadoopConfigProvider;

Expand Down Expand Up @@ -123,7 +123,7 @@ public class YarnShuffleService extends AuxiliaryService {

// Handles registering executors and opening shuffle blocks
@VisibleForTesting
ExternalShuffleBlockHandler blockHandler;
ExternalBlockHandler blockHandler;

// Where to store & reload executor info for recovering state after an NM restart
@VisibleForTesting
Expand Down Expand Up @@ -170,7 +170,7 @@ protected void serviceInit(Configuration conf) throws Exception {
}

TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
blockHandler = new ExternalBlockHandler(transportConf, registeredExecutorFile);

// If authentication is enabled, set up the shuffle server to use a
// special RPC handler that filters out unauthenticated fetch requests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.hadoop.metrics2.MetricsSource;

/**
* Forward {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.ShuffleMetrics}
* Forward {@link org.apache.spark.network.shuffle.ExternalBlockHandler.ShuffleMetrics}
* to hadoop metrics system.
* NodeManager by default exposes JMX endpoint where can be collected.
*/
Expand Down Expand Up @@ -55,7 +55,7 @@ public void getMetrics(MetricsCollector collector, boolean all) {

/**
* The metric types used in
* {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.ShuffleMetrics}.
* {@link org.apache.spark.network.shuffle.ExternalBlockHandler.ShuffleMetrics}.
* Visible for testing.
*/
public static void collectMetric(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private CountMinSketchImpl() {}
this.eps = eps;
this.confidence = confidence;
this.width = (int) Math.ceil(2 / eps);
this.depth = (int) Math.ceil(-Math.log(1 - confidence) / Math.log(2));
this.depth = (int) Math.ceil(-Math.log1p(-confidence) / Math.log(2));
initTablesWith(depth, width, seed);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,13 +621,13 @@ public void writeToOutputStreamOverflow() throws IOException {
public void writeToOutputStream() throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
EMPTY_UTF8.writeTo(outputStream);
assertEquals("", outputStream.toString("UTF-8"));
assertEquals("", outputStream.toString(StandardCharsets.UTF_8.name()));
outputStream.reset();

fromString("数据砖很重").writeTo(outputStream);
assertEquals(
"数据砖很重",
outputStream.toString("UTF-8"));
outputStream.toString(StandardCharsets.UTF_8.name()));
outputStream.reset();
}

Expand All @@ -651,7 +651,7 @@ public void writeToOutputStreamIntArray() throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
fromAddress(array, Platform.INT_ARRAY_OFFSET, length)
.writeTo(outputStream);
assertEquals("大千世界", outputStream.toString("UTF-8"));
assertEquals("大千世界", outputStream.toString(StandardCharsets.UTF_8.name()));
}

@Test
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.internal.config._
import org.apache.spark.memory.{MemoryManager, UnifiedMemoryManager}
import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
import org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransportConf}
import org.apache.spark.network.shuffle.ExternalShuffleClient
import org.apache.spark.network.shuffle.ExternalBlockStoreClient
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator}
import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorEndpoint
Expand Down Expand Up @@ -331,7 +331,7 @@ object SparkEnv extends Logging {

val externalShuffleClient = if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
Some(new ExternalShuffleClient(transConf, securityManager,
Some(new ExternalBlockStoreClient(transConf, securityManager,
securityManager.isAuthenticationEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT)))
} else {
None
Expand Down
Loading

0 comments on commit 141c3d4

Please sign in to comment.