Skip to content

Commit

Permalink
MapR [SPARK-434] Move absent commits from 2.3.2 branch (apache#425)
Browse files Browse the repository at this point in the history
* MapR [SPARK-352] Spark shell fails with "NoClassDefFoundError: org/apache/hadoop/fs/FSDataInputStream" if java is not available in PATH

* MapR [SPARK-350] Deprecate Spark Kafka-09 package

* MapR [SPARK-326] Investigate possibility of writing Java example for the MapRDB OJAI connector

* [SPARK-356] Merge mapr changes from kafka-09 package into the kafka-10

* SPARK-319 Fix for sparkR version check

* MapR [SPARK-349] Update OJAI client to v3 for Spark MapR-DB JSON connector

* MapR [SPARK-367] Move absent commits from 2.3.1 branch

* MapR [SPARK-137] Analyze the warning during compilation of OJAI connector

* MapR [SPARK-369] Spark 2.3.2 fails with error related to zookeeper

* [MAPR-26258] hbasecontext.HBaseDistributedScanExample fails

* [SPARK-24355] Spark external shuffle server improvement to better handle block fetch requests

* MapR [SPARK-374] Spark Hive example fails when we submit job from another(simple) cluster user

* MapR [SPARK-434] Move absent commits from 2.3.2 branch

* MapR [SPARK-434] Move absent commits from 2.3.2 branch

* MapR [SPARK-373] Unexpected behavior during job running in standalone cluster mode

* MapR [SPARK-419] Update hive-maprdb-json-handler jar for spark 2.3.2.0 and spark 2.2.1

* MapR [SPARK-396] Interface change of sendToKafka

* MapR [SPARK-357] consumer groups are prepeneded with a "service_" prefix

* MapR [SPARK-429] Changes in maprdb connector are the cause of broken backward compatibility

* MapR [SPARK-427] Update kafka in Spark-2.4.0 to the 1.1.1-mapr

* MapR [SPARK-434] Move absent commits from 2.3.2 branch

* Move absent commits from 2.3.2 branch

* MapR [SPARK-434] Move absent commits from 2.3.2 branch

* Move absent commits from 2.3.2 branch

* Move absent commits from 2.3.2 branch
  • Loading branch information
ekrivokonmapr committed Sep 19, 2019
1 parent cee169b commit a00fdca
Show file tree
Hide file tree
Showing 93 changed files with 1,619 additions and 842 deletions.
2 changes: 1 addition & 1 deletion R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ sparkR.session <- function(
jvmVersionStrip <- gsub("-SNAPSHOT", "", jvmVersion)
rPackageVersion <- paste0(packageVersion("SparkR"))

if (jvmVersionStrip != rPackageVersion) {
if (!grepl(paste("^", rPackageVersion, sep=""), jvmVersionStrip)) {
warning(paste("Version mismatch between Spark JVM and SparkR package. JVM version was",
jvmVersion, ", while R package version was", rPackageVersion))
}
Expand Down
5 changes: 5 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@
<artifactId>spark-streaming-kafka-0-9_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
Expand Down
5 changes: 5 additions & 0 deletions bin/mapr-classpath.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
#!/usr/bin/env bash

if [ ! "$(command -v java)" ]; then
echo "JAVA_HOME is not set" >&2
exit 1
fi

SPARK_HOME=$(readlink -f "/usr/local/spark")
java -cp $SPARK_HOME'/jars/*' org.apache.spark.classpath.ClasspathFilter $(mapr classpath) $SPARK_HOME'/conf/dep-blacklist.txt'
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.List;

import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
Expand All @@ -32,11 +34,13 @@
import org.apache.spark.network.client.TransportResponseHandler;
import org.apache.spark.network.protocol.MessageDecoder;
import org.apache.spark.network.protocol.MessageEncoder;
import org.apache.spark.network.server.ChunkFetchRequestHandler;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.TransportChannelHandler;
import org.apache.spark.network.server.TransportRequestHandler;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.TransportServerBootstrap;
import org.apache.spark.network.util.IOMode;
import org.apache.spark.network.util.NettyUtils;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.util.TransportFrameDecoder;
Expand All @@ -61,6 +65,7 @@ public class TransportContext {
private final TransportConf conf;
private final RpcHandler rpcHandler;
private final boolean closeIdleConnections;
private final boolean isClientOnly;

/**
* Force to create MessageEncoder and MessageDecoder so that we can make sure they will be created
Expand All @@ -77,17 +82,54 @@ public class TransportContext {
private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE;
private static final MessageDecoder DECODER = MessageDecoder.INSTANCE;

// Separate thread pool for handling ChunkFetchRequest. This helps to enable throttling
// max number of TransportServer worker threads that are blocked on writing response
// of ChunkFetchRequest message back to the client via the underlying channel.
private static EventLoopGroup chunkFetchWorkers;

public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
this(conf, rpcHandler, false);
this(conf, rpcHandler, false, false);
}

public TransportContext(
TransportConf conf,
RpcHandler rpcHandler,
boolean closeIdleConnections) {
this(conf, rpcHandler, closeIdleConnections, false);
}

/**
* Enables TransportContext initialization for underlying client and server.
*
* @param conf TransportConf
* @param rpcHandler RpcHandler responsible for handling requests and responses.
* @param closeIdleConnections Close idle connections if it is set to true.
* @param isClientOnly This config indicates the TransportContext is only used by a client.
* This config is more important when external shuffle is enabled.
* It stops creating extra event loop and subsequent thread pool
* for shuffle clients to handle chunked fetch requests.
*/
public TransportContext(
TransportConf conf,
RpcHandler rpcHandler,
boolean closeIdleConnections,
boolean isClientOnly) {
this.conf = conf;
this.rpcHandler = rpcHandler;
this.closeIdleConnections = closeIdleConnections;
this.isClientOnly = isClientOnly;

synchronized(TransportContext.class) {
if (chunkFetchWorkers == null &&
conf.getModuleName() != null &&
conf.getModuleName().equalsIgnoreCase("shuffle") &&
!isClientOnly) {
chunkFetchWorkers = NettyUtils.createEventLoop(
IOMode.valueOf(conf.ioMode()),
conf.chunkFetchHandlerThreads(),
"shuffle-chunk-fetch-handler");
}
}
}

/**
Expand Down Expand Up @@ -144,14 +186,23 @@ public TransportChannelHandler initializePipeline(
RpcHandler channelRpcHandler) {
try {
TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
channel.pipeline()
ChunkFetchRequestHandler chunkFetchHandler =
createChunkFetchHandler(channelHandler, channelRpcHandler);
ChannelPipeline pipeline = channel.pipeline()
.addLast("encoder", ENCODER)
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
.addLast("decoder", DECODER)
.addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
.addLast("idleStateHandler",
new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
// NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
// would require more logic to guarantee if this were not part of the same event loop.
.addLast("handler", channelHandler);
// Use a separate EventLoopGroup to handle ChunkFetchRequest messages for shuffle rpcs.
if (conf.getModuleName() != null &&
conf.getModuleName().equalsIgnoreCase("shuffle")
&& !isClientOnly) {
pipeline.addLast(chunkFetchWorkers, "chunkFetchHandler", chunkFetchHandler);
}
return channelHandler;
} catch (RuntimeException e) {
logger.error("Error while initializing Netty pipeline", e);
Expand All @@ -173,5 +224,14 @@ private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler
conf.connectionTimeoutMs(), closeIdleConnections);
}

/**
* Creates the dedicated ChannelHandler for ChunkFetchRequest messages.
*/
private ChunkFetchRequestHandler createChunkFetchHandler(TransportChannelHandler channelHandler,
RpcHandler rpcHandler) {
return new ChunkFetchRequestHandler(channelHandler.getClient(),
rpcHandler.getStreamManager(), conf.maxChunksBeingTransferred());
}

public TransportConf getConf() { return conf; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.server;

import java.net.SocketAddress;

import com.google.common.base.Throwables;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.protocol.ChunkFetchFailure;
import org.apache.spark.network.protocol.ChunkFetchRequest;
import org.apache.spark.network.protocol.ChunkFetchSuccess;
import org.apache.spark.network.protocol.Encodable;

import static org.apache.spark.network.util.NettyUtils.*;

/**
* A dedicated ChannelHandler for processing ChunkFetchRequest messages. When sending response
* of ChunkFetchRequest messages to the clients, the thread performing the I/O on the underlying
* channel could potentially be blocked due to disk contentions. If several hundreds of clients
* send ChunkFetchRequest to the server at the same time, it could potentially occupying all
* threads from TransportServer's default EventLoopGroup for waiting for disk reads before it
* can send the block data back to the client as part of the ChunkFetchSuccess messages. As a
* result, it would leave no threads left to process other RPC messages, which takes much less
* time to process, and could lead to client timing out on either performing SASL authentication,
* registering executors, or waiting for response for an OpenBlocks messages.
*/
public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler<ChunkFetchRequest> {
private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class);

private final TransportClient client;
private final StreamManager streamManager;
/** The max number of chunks being transferred and not finished yet. */
private final long maxChunksBeingTransferred;

public ChunkFetchRequestHandler(
TransportClient client,
StreamManager streamManager,
Long maxChunksBeingTransferred) {
this.client = client;
this.streamManager = streamManager;
this.maxChunksBeingTransferred = maxChunksBeingTransferred;
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), cause);
ctx.close();
}

@Override
protected void channelRead0(
ChannelHandlerContext ctx,
final ChunkFetchRequest msg) throws Exception {
Channel channel = ctx.channel();
if (logger.isTraceEnabled()) {
logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel),
msg.streamChunkId);
}
long chunksBeingTransferred = streamManager.chunksBeingTransferred();
if (chunksBeingTransferred >= maxChunksBeingTransferred) {
logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
chunksBeingTransferred, maxChunksBeingTransferred);
channel.close();
return;
}
ManagedBuffer buf;
try {
streamManager.checkAuthorization(client, msg.streamChunkId.streamId);
streamManager.registerChannel(channel, msg.streamChunkId.streamId);
buf = streamManager.getChunk(msg.streamChunkId.streamId, msg.streamChunkId.chunkIndex);
} catch (Exception e) {
logger.error(String.format("Error opening block %s for request from %s",
msg.streamChunkId, getRemoteAddress(channel)), e);
respond(channel, new ChunkFetchFailure(msg.streamChunkId,
Throwables.getStackTraceAsString(e)));
return;
}

streamManager.chunkBeingSent(msg.streamChunkId.streamId);
respond(channel, new ChunkFetchSuccess(msg.streamChunkId, buf)).addListener(
(ChannelFutureListener) future -> streamManager.chunkSent(msg.streamChunkId.streamId));
}

/**
* The invocation to channel.writeAndFlush is async, and the actual I/O on the
* channel will be handled by the EventLoop the channel is registered to. So even
* though we are processing the ChunkFetchRequest in a separate thread pool, the actual I/O,
* which is the potentially blocking call that could deplete server handler threads, is still
* being processed by TransportServer's default EventLoopGroup. In order to throttle the max
* number of threads that channel I/O for sending response to ChunkFetchRequest, the thread
* calling channel.writeAndFlush will wait for the completion of sending response back to
* client by invoking await(). This will throttle the rate at which threads from
* ChunkFetchRequest dedicated EventLoopGroup submit channel I/O requests to TransportServer's
* default EventLoopGroup, thus making sure that we can reserve some threads in
* TransportServer's default EventLoopGroup for handling other RPC messages.
*/
private ChannelFuture respond(
final Channel channel,
final Encodable result) throws InterruptedException {
final SocketAddress remoteAddress = channel.remoteAddress();
return channel.writeAndFlush(result).await().addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
logger.trace("Sent result {} to client {}", result, remoteAddress);
} else {
logger.error(String.format("Error sending result %s to %s; closing connection",
result, remoteAddress), future.cause());
channel.close();
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
package org.apache.spark.network.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportResponseHandler;
import org.apache.spark.network.protocol.ChunkFetchRequest;
import org.apache.spark.network.protocol.Message;
import org.apache.spark.network.protocol.RequestMessage;
import org.apache.spark.network.protocol.ResponseMessage;
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
Expand All @@ -47,7 +49,7 @@
* on the channel for at least `requestTimeoutMs`. Note that this is duplex traffic; we will not
* timeout if the client is continuously sending but getting no responses, for simplicity.
*/
public class TransportChannelHandler extends ChannelInboundHandlerAdapter {
public class TransportChannelHandler extends SimpleChannelInboundHandler<Message> {
private static final Logger logger = LoggerFactory.getLogger(TransportChannelHandler.class);

private final TransportClient client;
Expand Down Expand Up @@ -112,8 +114,21 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}

/**
* Overwrite acceptInboundMessage to properly delegate ChunkFetchRequest messages
* to ChunkFetchRequestHandler.
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {
public boolean acceptInboundMessage(Object msg) throws Exception {
if (msg instanceof ChunkFetchRequest) {
return false;
} else {
return super.acceptInboundMessage(msg);
}
}

@Override
public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception {
if (request instanceof RequestMessage) {
requestHandler.handle((RequestMessage) request);
} else if (request instanceof ResponseMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Throwables;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Properties;

import com.google.common.primitives.Ints;
import io.netty.util.NettyRuntime;

/**
* A central location that tracks all the settings we expose to users.
Expand Down Expand Up @@ -281,4 +282,31 @@ public Properties cryptoConf() {
public long maxChunksBeingTransferred() {
return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE);
}

/**
* Percentage of io.serverThreads used by netty to process ChunkFetchRequest.
* Shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages.
* Although when calling the async writeAndFlush on the underlying channel to send
* response back to client, the I/O on the channel is still being handled by
* {@link org.apache.spark.network.server.TransportServer}'s default EventLoopGroup
* that's registered with the Channel, by waiting inside the ChunkFetchRequest handler
* threads for the completion of sending back responses, we are able to put a limit on
* the max number of threads from TransportServer's default EventLoopGroup that are
* going to be consumed by writing response to ChunkFetchRequest, which are I/O intensive
* and could take long time to process due to disk contentions. By configuring a slightly
* higher number of shuffler server threads, we are able to reserve some threads for
* handling other RPC messages, thus making the Client less likely to experience timeout
* when sending RPC messages to the shuffle server. Default to 0, which is 2*#cores
* or io.serverThreads. 90 would mean 90% of 2*#cores or 90% of io.serverThreads
* which equals 0.9 * 2*#cores or 0.9 * io.serverThreads.
*/
public int chunkFetchHandlerThreads() {
if (!this.getModuleName().equalsIgnoreCase("shuffle")) {
return 0;
}
int chunkFetchHandlerThreadsPercent =
conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0);
return this.serverThreads() > 0 ? (this.serverThreads() * chunkFetchHandlerThreadsPercent)/100:
(2 * NettyRuntime.availableProcessors() * chunkFetchHandlerThreadsPercent)/100;
}
}
Loading

0 comments on commit a00fdca

Please sign in to comment.