From 5805f6277dbd72149eb9284a18aa65e9a0cff6d4 Mon Sep 17 00:00:00 2001 From: Thomas Wolf Date: Fri, 11 Nov 2022 21:07:38 +0100 Subject: [PATCH] GH-268: Fix ClientConnectionService.sendHeartBeat() If a reply is requested, but none arrives within the timeout, throw an exception and terminate the connection. This rolls back the changes made in ClientConnectionService in commit de2f8fef. It's quite all right to use the synchronous implementation of Session.request() because heartbeats are not sent from I/O threads. Bug: https://github.com/apache/mina-sshd/issues/268 --- .../session/ClientConnectionService.java | 33 +++---------------- .../java/org/apache/sshd/KeepAliveTest.java | 31 +++++++++++++++++ 2 files changed, 35 insertions(+), 29 deletions(-) diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java index 25e79baa4..6017d3feb 100644 --- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java +++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java @@ -20,8 +20,6 @@ import java.io.IOException; import java.time.Duration; -import java.time.Instant; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -30,7 +28,6 @@ import org.apache.sshd.common.FactoryManager; import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.SshException; -import org.apache.sshd.common.future.GlobalRequestFuture; import org.apache.sshd.common.io.IoWriteFuture; import org.apache.sshd.common.session.Session; import org.apache.sshd.common.session.helpers.AbstractConnectionService; @@ -128,33 +125,11 @@ protected boolean sendHeartBeat() { buf.putBoolean(withReply); if (withReply) { - Instant start = Instant.now(); - CountDownLatch replyReceived = new CountDownLatch(1); - GlobalRequestFuture writeFuture = session.request(buf, heartbeatRequest, (cmd, reply) -> { - replyReceived.countDown(); + Buffer reply = session.request(heartbeatRequest, buf, heartbeatReplyMaxWait); + if (reply != null) { if (log.isTraceEnabled()) { - log.trace("sendHeartBeat({}) received reply={} size={} for request={}", session, - SshConstants.getCommandMessageName(cmd), reply.available(), heartbeatRequest); - } - }); - writeFuture.await(heartbeatReplyMaxWait); - Throwable t = writeFuture.getException(); - if (t != null) { - // We couldn't even send the request. - throw new IOException(t.getMessage(), t); - } - Duration elapsed = Duration.between(start, Instant.now()); - if (elapsed.compareTo(heartbeatReplyMaxWait) < 0) { - long toWait = heartbeatReplyMaxWait.minus(elapsed).toMillis(); - if (toWait > 0) { - try { - replyReceived.await(toWait, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - if (log.isTraceEnabled()) { - log.trace("sendHeartBeat({}) interrupted waiting for reply to request={}", session, - heartbeatRequest); - } - } + log.trace("sendHeartBeat({}) received reply size={} for request={}", + session, reply.available(), heartbeatRequest); } } } else { diff --git a/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java b/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java index 3d5e16fe2..42bdeb015 100644 --- a/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java +++ b/sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java @@ -217,6 +217,37 @@ public Result process( } } + @Test // see GH-268 + public void testTimeoutOnMissingHeartbeatResponse() throws Exception { + CoreModuleProperties.IDLE_TIMEOUT.set(sshd, Duration.ofSeconds(30)); + List> globalHandlers = sshd.getGlobalRequestHandlers(); + sshd.setGlobalRequestHandlers(Collections.singletonList(new AbstractConnectionServiceRequestHandler() { + @Override + public Result process(ConnectionService connectionService, String request, boolean wantReply, Buffer buffer) + throws Exception { + // Never reply; + return Result.Replied; + } + })); + CoreModuleProperties.HEARTBEAT_INTERVAL.set(client, HEARTBEAT); + CoreModuleProperties.HEARTBEAT_REPLY_WAIT.set(client, Duration.ofSeconds(1)); + try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(CONNECT_TIMEOUT) + .getSession()) { + session.addPasswordIdentity(getCurrentTestName()); + session.auth().verify(AUTH_TIMEOUT); + + try (ClientChannel channel = session.createChannel(Channel.CHANNEL_SHELL)) { + long waitStart = System.currentTimeMillis(); + Collection result = channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), TIMEOUT); + long waitEnd = System.currentTimeMillis(); + assertTrue("Wrong channel state after wait of " + (waitEnd - waitStart) + " ms: " + result, + result.contains(ClientChannelEvent.CLOSED)); + } + } finally { + sshd.setGlobalRequestHandlers(globalHandlers); // restore original + } + } + public static class TestEchoShellFactory extends EchoShellFactory { public TestEchoShellFactory() { super();