Skip to content

Commit

Permalink
Performance tuning for TimeoutTask in DefaultFuture (#4129)
Browse files Browse the repository at this point in the history
Performance tuning for TimeoutTask in DefaultFuture, manually merge #4085 and #4087
  • Loading branch information
chickenlj authored and carryxyh committed May 23, 2019
1 parent 5285952 commit 1a66206
Showing 1 changed file with 16 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,19 @@ public class DefaultFuture extends CompletableFuture<Object> {

private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();

private static final Map<Long, Timeout> PENDING_TASKS = new ConcurrentHashMap<>();

public static final Timer TIME_OUT_TIMER = new HashedWheelTimer(
new NamedThreadFactory("dubbo-future-timeout", true),
30,
TimeUnit.MILLISECONDS);

// invoke id.
private final long id;
private final Long id;
private final Channel channel;
private final Request request;
private final int timeout;
private final long start = System.currentTimeMillis();
private volatile long sent;
private Timeout timeoutCheckTask;

private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
Expand All @@ -79,9 +78,8 @@ private DefaultFuture(Channel channel, Request request, int timeout) {
* check time out of the future
*/
private static void timeoutCheck(DefaultFuture future) {
TimeoutCheckTask task = new TimeoutCheckTask(future);
Timeout t = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
PENDING_TASKS.put(future.getId(), t);
TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
}

/**
Expand Down Expand Up @@ -140,15 +138,19 @@ public static void closeChannel(Channel channel) {
}

public static void received(Channel channel, Response response) {
received(channel, response, false);
}

public static void received(Channel channel, Response response, boolean timeout) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
Timeout t = PENDING_TASKS.remove(future.getId());
if (t != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {
// decrease Time
t.cancel();
}
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
Expand Down Expand Up @@ -229,14 +231,15 @@ private String getTimeoutMessage(boolean scan) {

private static class TimeoutCheckTask implements TimerTask {

private DefaultFuture future;
private final Long requestID;

TimeoutCheckTask(DefaultFuture future) {
this.future = future;
TimeoutCheckTask(Long requestID) {
this.requestID = requestID;
}

@Override
public void run(Timeout timeout) {
DefaultFuture future = DefaultFuture.getFuture(requestID);
if (future == null || future.isDone()) {
return;
}
Expand All @@ -246,7 +249,7 @@ public void run(Timeout timeout) {
timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
// handle response.
DefaultFuture.received(future.getChannel(), timeoutResponse);
DefaultFuture.received(future.getChannel(), timeoutResponse, true);

}
}
Expand Down

0 comments on commit 1a66206

Please sign in to comment.