Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize heartbeat #3299

Merged
merged 4 commits into from
Jan 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,9 @@ public class Constants {
public static final long LEAST_HEARTBEAT_DURATION = 1000;

/**
* ticks per wheel. Currently only contains two tasks, so 16 locations are enough
* ticks per wheel.
*/
public static final int TICKS_PER_WHEEL = 16;
public static final int TICKS_PER_WHEEL = 128;

public static final String HEARTBEAT_TIMEOUT_KEY = "heartbeat.timeout";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public abstract class AbstractTimerTask implements TimerTask {

private final Long tick;

protected volatile boolean cancel = false;

AbstractTimerTask(ChannelProvider channelProvider, Long tick) {
if (channelProvider == null || tick == null) {
throw new IllegalArgumentException();
Expand All @@ -54,11 +56,19 @@ static Long now() {
return System.currentTimeMillis();
}

public void cancel() {
this.cancel = true;
}

private void reput(Timeout timeout, Long tick) {
if (timeout == null || tick == null) {
throw new IllegalArgumentException();
}

if (cancel) {
return;
}

Timer timer = timeout.timer();
if (timer.isStop() || timeout.isCancelled()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,13 @@ public class HeaderExchangeClient implements ExchangeClient {
private int heartbeat;
private int idleTimeout;

private static HashedWheelTimer idleCheckTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-idleCheck", true), 1,
private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-idleCheck", true), 1,
TimeUnit.SECONDS, Constants.TICKS_PER_WHEEL);

private HeartbeatTimerTask heartBeatTimerTask;

private ReconnectTimerTask reconnectTimerTask;

public HeaderExchangeClient(Client client, boolean needHeartbeat) {
Assert.notNull(client, "Client can't be null");
this.client = client;
Expand Down Expand Up @@ -182,12 +186,17 @@ private void startIdleCheckTask() {
HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, idleTimeout);

this.heartBeatTimerTask = heartBeatTimerTask;
this.reconnectTimerTask = reconnectTimerTask;

// init task and start timer.
idleCheckTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
idleCheckTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
IDLE_CHECK_TIMER.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
}

private void doClose() {
heartBeatTimerTask.cancel();
reconnectTimerTask.cancel();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ public class HeaderExchangeServer implements ExchangeServer {
private int idleTimeout;
private AtomicBoolean closed = new AtomicBoolean(false);

private static HashedWheelTimer idleCheckTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-server-idleCheck", true), 1,
private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(new NamedThreadFactory("dubbo-server-idleCheck", true), 1,
TimeUnit.SECONDS, Constants.TICKS_PER_WHEEL);

private CloseTimerTask closeTimerTask;

public HeaderExchangeServer(Server server) {
Assert.notNull(server, "server == null");
this.server = server;
Expand Down Expand Up @@ -148,6 +150,11 @@ private void doClose() {
if (!closed.compareAndSet(false, true)) {
return;
}
cancelCloseTask();
}

private void cancelCloseTask() {
closeTimerTask.cancel();
}

@Override
Expand Down Expand Up @@ -214,6 +221,8 @@ public void reset(URL url) {
heartbeat = h;
idleTimeout = t;

// we need cancel the exist closeTimeout first.
cancelCloseTask();
startIdleCheckTask();
}
}
Expand Down Expand Up @@ -262,9 +271,10 @@ private void startIdleCheckTask() {

long idleTimeoutTick = calculateLeastDuration(idleTimeout);
CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout);
this.closeTimerTask = closeTimerTask;

// init task and start timer.
idleCheckTimer.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS);
IDLE_CHECK_TIMER.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS);
}

}