Skip to content

Commit

Permalink
apacheGH-268: Fix ClientConnectionService.sendHeartBeat()
Browse files Browse the repository at this point in the history
If a reply is requested, but none arrives within the timeout, throw an
exception and terminate the connection.

This rolls back the changes made in ClientConnectionService in commit
de2f8fe. It's quite all right to use the synchronous implementation of
Session.request() because heartbeats are not sent from I/O threads.

Bug: apache#268
  • Loading branch information
tomaswolf committed Nov 22, 2022
1 parent 45fd3a4 commit 5805f62
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -30,7 +28,6 @@
import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.future.GlobalRequestFuture;
import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.session.helpers.AbstractConnectionService;
Expand Down Expand Up @@ -128,33 +125,11 @@ protected boolean sendHeartBeat() {
buf.putBoolean(withReply);

if (withReply) {
Instant start = Instant.now();
CountDownLatch replyReceived = new CountDownLatch(1);
GlobalRequestFuture writeFuture = session.request(buf, heartbeatRequest, (cmd, reply) -> {
replyReceived.countDown();
Buffer reply = session.request(heartbeatRequest, buf, heartbeatReplyMaxWait);
if (reply != null) {
if (log.isTraceEnabled()) {
log.trace("sendHeartBeat({}) received reply={} size={} for request={}", session,
SshConstants.getCommandMessageName(cmd), reply.available(), heartbeatRequest);
}
});
writeFuture.await(heartbeatReplyMaxWait);
Throwable t = writeFuture.getException();
if (t != null) {
// We couldn't even send the request.
throw new IOException(t.getMessage(), t);
}
Duration elapsed = Duration.between(start, Instant.now());
if (elapsed.compareTo(heartbeatReplyMaxWait) < 0) {
long toWait = heartbeatReplyMaxWait.minus(elapsed).toMillis();
if (toWait > 0) {
try {
replyReceived.await(toWait, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (log.isTraceEnabled()) {
log.trace("sendHeartBeat({}) interrupted waiting for reply to request={}", session,
heartbeatRequest);
}
}
log.trace("sendHeartBeat({}) received reply size={} for request={}",
session, reply.available(), heartbeatRequest);
}
}
} else {
Expand Down
31 changes: 31 additions & 0 deletions sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,37 @@ public Result process(
}
}

@Test // see GH-268
public void testTimeoutOnMissingHeartbeatResponse() throws Exception {
CoreModuleProperties.IDLE_TIMEOUT.set(sshd, Duration.ofSeconds(30));
List<RequestHandler<ConnectionService>> globalHandlers = sshd.getGlobalRequestHandlers();
sshd.setGlobalRequestHandlers(Collections.singletonList(new AbstractConnectionServiceRequestHandler() {
@Override
public Result process(ConnectionService connectionService, String request, boolean wantReply, Buffer buffer)
throws Exception {
// Never reply;
return Result.Replied;
}
}));
CoreModuleProperties.HEARTBEAT_INTERVAL.set(client, HEARTBEAT);
CoreModuleProperties.HEARTBEAT_REPLY_WAIT.set(client, Duration.ofSeconds(1));
try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(CONNECT_TIMEOUT)
.getSession()) {
session.addPasswordIdentity(getCurrentTestName());
session.auth().verify(AUTH_TIMEOUT);

try (ClientChannel channel = session.createChannel(Channel.CHANNEL_SHELL)) {
long waitStart = System.currentTimeMillis();
Collection<ClientChannelEvent> result = channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), TIMEOUT);
long waitEnd = System.currentTimeMillis();
assertTrue("Wrong channel state after wait of " + (waitEnd - waitStart) + " ms: " + result,
result.contains(ClientChannelEvent.CLOSED));
}
} finally {
sshd.setGlobalRequestHandlers(globalHandlers); // restore original
}
}

public static class TestEchoShellFactory extends EchoShellFactory {
public TestEchoShellFactory() {
super();
Expand Down

0 comments on commit 5805f62

Please sign in to comment.