-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-7003] Improve reliability of connection failure detection between Netty block transfer service endpoints #5584
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,8 @@ | |
|
||
import io.netty.channel.ChannelHandlerContext; | ||
import io.netty.channel.SimpleChannelInboundHandler; | ||
import io.netty.handler.timeout.IdleState; | ||
import io.netty.handler.timeout.IdleStateEvent; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
@@ -40,21 +42,29 @@ | |
* Client. | ||
* This means that the Client also needs a RequestHandler and the Server needs a ResponseHandler, | ||
* for the Client's responses to the Server's requests. | ||
* | ||
* This class also handles timeouts from a {@link io.netty.handler.timeout.IdleStateHandler}. | ||
* We consider a connection timed out if there are outstanding fetch or RPC requests but no traffic | ||
* on the channel for at least `requestTimeoutMs`. Note that this is duplex traffic; we will not | ||
* timeout if the client is continuously sending but getting no responses, for simplicity. | ||
*/ | ||
public class TransportChannelHandler extends SimpleChannelInboundHandler<Message> { | ||
private final Logger logger = LoggerFactory.getLogger(TransportChannelHandler.class); | ||
|
||
private final TransportClient client; | ||
private final TransportResponseHandler responseHandler; | ||
private final TransportRequestHandler requestHandler; | ||
private final long requestTimeoutNs; | ||
|
||
public TransportChannelHandler( | ||
TransportClient client, | ||
TransportResponseHandler responseHandler, | ||
TransportRequestHandler requestHandler) { | ||
TransportRequestHandler requestHandler, | ||
long requestTimeoutMs) { | ||
this.client = client; | ||
this.responseHandler = responseHandler; | ||
this.requestHandler = requestHandler; | ||
this.requestTimeoutNs = requestTimeoutMs * 1000L * 1000; | ||
} | ||
|
||
public TransportClient getClient() { | ||
|
@@ -93,4 +103,25 @@ public void channelRead0(ChannelHandlerContext ctx, Message request) { | |
responseHandler.handle((ResponseMessage) request); | ||
} | ||
} | ||
|
||
/** Triggered based on events from an {@link io.netty.handler.timeout.IdleStateHandler}. */ | ||
@Override | ||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { | ||
if (evt instanceof IdleStateEvent) { | ||
IdleStateEvent e = (IdleStateEvent) evt; | ||
// See class comment for timeout semantics. In addition to ensuring we only timeout while | ||
// there are outstanding requests, we also do a secondary consistency check to ensure | ||
// there's no race between the idle timeout and incrementing the numOutstandingRequests. | ||
boolean hasInFlightRequests = responseHandler.numOutstandingRequests() > 0; | ||
boolean isActuallyOverdue = | ||
System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs; | ||
if (e.state() == IdleState.ALL_IDLE && hasInFlightRequests && isActuallyOverdue) { | ||
String address = NettyUtils.getRemoteAddress(ctx.channel()); | ||
logger.error("Channel to {} has been quiet for {} ms while there are outstanding " + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Channel -> Connection? quiet -> idle? |
||
"requests. Assuming connection is dead; please adjust spark.network.timeout if this " + | ||
"is wrong.", address, requestTimeoutNs / 1000 / 1000); | ||
ctx.close(); | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.network.util; | ||
|
||
import com.google.common.collect.Maps; | ||
|
||
import java.util.Map; | ||
import java.util.NoSuchElementException; | ||
|
||
/** ConfigProvider based on an immutable Map. */ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove immutable? I don't think you actually care about mutability of the map here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. well we make a copy, was the point There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in that case -- maybe just say we will make a copy? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that's like twice as many characters -- do you even tweet, bro? |
||
public class MapConfigProvider extends ConfigProvider { | ||
private final Map<String, String> config; | ||
|
||
public MapConfigProvider(Map<String, String> config) { | ||
this.config = Maps.newHashMap(config); | ||
} | ||
|
||
@Override | ||
public String get(String name) { | ||
String value = config.get(name); | ||
if (value == null) { | ||
throw new NoSuchElementException(name); | ||
} | ||
return value; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can u document the semantics of this, and how it'd be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and final?