Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7003] Improve reliability of connection failure detection between Netty block transfer service endpoints #5584

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.Lists;
import io.netty.channel.Channel;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -106,6 +107,7 @@ public TransportChannelHandler initializePipeline(SocketChannel channel) {
.addLast("encoder", encoder)
.addLast("frameDecoder", NettyUtils.createFrameDecoder())
.addLast("decoder", decoder)
.addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
// NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
// would require more logic to guarantee if this were not part of the same event loop.
.addLast("handler", channelHandler);
Expand All @@ -126,7 +128,8 @@ private TransportChannelHandler createChannelHandler(Channel channel) {
TransportClient client = new TransportClient(channel, responseHandler);
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
rpcHandler);
return new TransportChannelHandler(client, responseHandler, requestHandler);
return new TransportChannelHandler(client, responseHandler, requestHandler,
conf.connectionTimeoutMs());
}

public TransportConf getConf() { return conf; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -50,13 +50,17 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {

private final Map<Long, RpcResponseCallback> outstandingRpcs;

private AtomicLong timeOfLastRequestNs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can u document the semantics of this, and how it'd be used?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and final?


public TransportResponseHandler(Channel channel) {
this.channel = channel;
this.outstandingFetches = new ConcurrentHashMap<StreamChunkId, ChunkReceivedCallback>();
this.outstandingRpcs = new ConcurrentHashMap<Long, RpcResponseCallback>();
this.timeOfLastRequestNs = new AtomicLong(0);
}

public void addFetchRequest(StreamChunkId streamChunkId, ChunkReceivedCallback callback) {
timeOfLastRequestNs.set(System.nanoTime());
outstandingFetches.put(streamChunkId, callback);
}

Expand All @@ -65,6 +69,7 @@ public void removeFetchRequest(StreamChunkId streamChunkId) {
}

public void addRpcRequest(long requestId, RpcResponseCallback callback) {
timeOfLastRequestNs.set(System.nanoTime());
outstandingRpcs.put(requestId, callback);
}

Expand Down Expand Up @@ -161,8 +166,12 @@ public void handle(ResponseMessage message) {
}

/** Returns total number of outstanding requests (fetch requests + rpcs) */
@VisibleForTesting
public int numOutstandingRequests() {
return outstandingFetches.size() + outstandingRpcs.size();
}

/** Returns the time in nanoseconds of when the last request was sent out. */
public long getTimeOfLastRequestNs() {
return timeOfLastRequestNs.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,21 +42,29 @@
* Client.
* This means that the Client also needs a RequestHandler and the Server needs a ResponseHandler,
* for the Client's responses to the Server's requests.
*
* This class also handles timeouts from a {@link io.netty.handler.timeout.IdleStateHandler}.
* We consider a connection timed out if there are outstanding fetch or RPC requests but no traffic
* on the channel for at least `requestTimeoutMs`. Note that this is duplex traffic; we will not
* timeout if the client is continuously sending but getting no responses, for simplicity.
*/
public class TransportChannelHandler extends SimpleChannelInboundHandler<Message> {
private final Logger logger = LoggerFactory.getLogger(TransportChannelHandler.class);

private final TransportClient client;
private final TransportResponseHandler responseHandler;
private final TransportRequestHandler requestHandler;
private final long requestTimeoutNs;

public TransportChannelHandler(
TransportClient client,
TransportResponseHandler responseHandler,
TransportRequestHandler requestHandler) {
TransportRequestHandler requestHandler,
long requestTimeoutMs) {
this.client = client;
this.responseHandler = responseHandler;
this.requestHandler = requestHandler;
this.requestTimeoutNs = requestTimeoutMs * 1000L * 1000;
}

public TransportClient getClient() {
Expand Down Expand Up @@ -93,4 +103,25 @@ public void channelRead0(ChannelHandlerContext ctx, Message request) {
responseHandler.handle((ResponseMessage) request);
}
}

/** Triggered based on events from an {@link io.netty.handler.timeout.IdleStateHandler}. */
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
// See class comment for timeout semantics. In addition to ensuring we only timeout while
// there are outstanding requests, we also do a secondary consistency check to ensure
// there's no race between the idle timeout and incrementing the numOutstandingRequests.
boolean hasInFlightRequests = responseHandler.numOutstandingRequests() > 0;
boolean isActuallyOverdue =
System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;
if (e.state() == IdleState.ALL_IDLE && hasInFlightRequests && isActuallyOverdue) {
String address = NettyUtils.getRemoteAddress(ctx.channel());
logger.error("Channel to {} has been quiet for {} ms while there are outstanding " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Channel -> Connection?

quiet -> idle?

"requests. Assuming connection is dead; please adjust spark.network.timeout if this " +
"is wrong.", address, requestTimeoutNs / 1000 / 1000);
ctx.close();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.util;

import com.google.common.collect.Maps;

import java.util.Map;
import java.util.NoSuchElementException;

/** ConfigProvider based on an immutable Map. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove immutable? I don't think you actually care about mutability of the map here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well we make a copy, was the point

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in that case -- maybe just say we will make a copy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's like twice as many characters -- do you even tweet, bro?

public class MapConfigProvider extends ConfigProvider {
private final Map<String, String> config;

public MapConfigProvider(Map<String, String> config) {
this.config = Maps.newHashMap(config);
}

@Override
public String get(String name) {
String value = config.get(name);
if (value == null) {
throw new NoSuchElementException(name);
}
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public static ByteToMessageDecoder createFrameDecoder() {
return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 8, -8, 8);
}

/** Returns the remote address on the channel or "&lt;remote address&gt;" if none exists. */
/** Returns the remote address on the channel or "&lt;unknown remote&gt;" if none exists. */
public static String getRemoteAddress(Channel channel) {
if (channel != null && channel.remoteAddress() != null) {
return channel.remoteAddress().toString();
Expand Down
Loading