Skip to content

Commit

Permalink
[SPARK-7003] Improve reliability of connection failure detection betw…
Browse files Browse the repository at this point in the history
…een Netty block transfer service endpoints

Currently we rely on the assumption that an exception will be raised and the channel closed if two endpoints cannot communicate over a Netty TCP channel. However, this guarantee does not hold in all network environments, and [SPARK-6962](https://issues.apache.org/jira/browse/SPARK-6962) seems to point to a case where only the server side of the connection detected a fault.

This patch improves robustness of fetch/rpc requests by having an explicit timeout in the transport layer which closes the connection if there is a period of inactivity while there are outstanding requests.
  • Loading branch information
aarondav committed Apr 20, 2015
1 parent d850b4b commit 37ce656
Show file tree
Hide file tree
Showing 7 changed files with 374 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.Lists;
import io.netty.channel.Channel;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -106,6 +107,7 @@ public TransportChannelHandler initializePipeline(SocketChannel channel) {
.addLast("encoder", encoder)
.addLast("frameDecoder", NettyUtils.createFrameDecoder())
.addLast("decoder", decoder)
.addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
// NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
// would require more logic to guarantee if this were not part of the same event loop.
.addLast("handler", channelHandler);
Expand All @@ -126,7 +128,8 @@ private TransportChannelHandler createChannelHandler(Channel channel) {
TransportClient client = new TransportClient(channel, responseHandler);
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
rpcHandler);
return new TransportChannelHandler(client, responseHandler, requestHandler);
return new TransportChannelHandler(client, responseHandler, requestHandler,
conf.connectionTimeoutMs());
}

public TransportConf getConf() { return conf; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -50,13 +50,17 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {

private final Map<Long, RpcResponseCallback> outstandingRpcs;

private AtomicLong timeOfLastRequestNs;

public TransportResponseHandler(Channel channel) {
this.channel = channel;
this.outstandingFetches = new ConcurrentHashMap<StreamChunkId, ChunkReceivedCallback>();
this.outstandingRpcs = new ConcurrentHashMap<Long, RpcResponseCallback>();
this.timeOfLastRequestNs = new AtomicLong(0);
}

public void addFetchRequest(StreamChunkId streamChunkId, ChunkReceivedCallback callback) {
timeOfLastRequestNs.set(System.nanoTime());
outstandingFetches.put(streamChunkId, callback);
}

Expand All @@ -65,6 +69,7 @@ public void removeFetchRequest(StreamChunkId streamChunkId) {
}

public void addRpcRequest(long requestId, RpcResponseCallback callback) {
timeOfLastRequestNs.set(System.nanoTime());
outstandingRpcs.put(requestId, callback);
}

Expand Down Expand Up @@ -161,8 +166,12 @@ public void handle(ResponseMessage message) {
}

/** Returns total number of outstanding requests (fetch requests + rpcs) */
@VisibleForTesting
public int numOutstandingRequests() {
return outstandingFetches.size() + outstandingRpcs.size();
}

/** Returns the time in nanoseconds of when the last request was sent out. */
public long getTimeOfLastRequestNs() {
return timeOfLastRequestNs.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,21 +42,29 @@
* Client.
* This means that the Client also needs a RequestHandler and the Server needs a ResponseHandler,
* for the Client's responses to the Server's requests.
*
* This class also handles timeouts from a {@link io.netty.handler.timeout.IdleStateHandler}.
* We consider a connection timed out if there are outstanding fetch or RPC requests but no traffic
* on the channel for at least `requestTimeoutMs`. Note that this is duplex traffic; we will not
* timeout if the client is continuously sending but getting no responses, for simplicity.
*/
public class TransportChannelHandler extends SimpleChannelInboundHandler<Message> {
private final Logger logger = LoggerFactory.getLogger(TransportChannelHandler.class);

private final TransportClient client;
private final TransportResponseHandler responseHandler;
private final TransportRequestHandler requestHandler;
private final long requestTimeoutNs;

public TransportChannelHandler(
TransportClient client,
TransportResponseHandler responseHandler,
TransportRequestHandler requestHandler) {
TransportRequestHandler requestHandler,
long requestTimeoutMs) {
this.client = client;
this.responseHandler = responseHandler;
this.requestHandler = requestHandler;
this.requestTimeoutNs = requestTimeoutMs * 1000L * 1000;
}

public TransportClient getClient() {
Expand Down Expand Up @@ -93,4 +103,25 @@ public void channelRead0(ChannelHandlerContext ctx, Message request) {
responseHandler.handle((ResponseMessage) request);
}
}

/** Triggered based on events from an {@link io.netty.handler.timeout.IdleStateHandler}. */
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
// See class comment for timeout semantics. In addition to ensuring we only timeout while
// there are outstanding requests, we also do a secondary consistency check to ensure
// there's no race between the idle timeout and incrementing the numOutstandingRequests.
boolean hasInFlightRequests = responseHandler.numOutstandingRequests() > 0;
boolean isActuallyOverdue =
System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;
if (e.state() == IdleState.ALL_IDLE && hasInFlightRequests && isActuallyOverdue) {
String address = NettyUtils.getRemoteAddress(ctx.channel());
logger.error("Channel to {} has been quiet for {} ms while there are outstanding " +
"requests. Assuming connection is dead; please adjust spark.network.timeout if this " +
"is wrong.", address, requestTimeoutNs / 1000 / 1000);
ctx.close();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.util;

import com.google.common.collect.Maps;

import java.util.Map;
import java.util.NoSuchElementException;

/** ConfigProvider based on an immutable Map. */
public class MapConfigProvider extends ConfigProvider {
private final Map<String, String> config;

public MapConfigProvider(Map<String, String> config) {
this.config = Maps.newHashMap(config);
}

@Override
public String get(String name) {
String value = config.get(name);
if (value == null) {
throw new NoSuchElementException(name);
}
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public static ByteToMessageDecoder createFrameDecoder() {
return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 8, -8, 8);
}

/** Returns the remote address on the channel or "&lt;remote address&gt;" if none exists. */
/** Returns the remote address on the channel or "&lt;unknown remote&gt;" if none exists. */
public static String getRemoteAddress(Channel channel) {
if (channel != null && channel.remoteAddress() != null) {
return channel.remoteAddress().toString();
Expand Down
Loading

0 comments on commit 37ce656

Please sign in to comment.