From 0d4e3ce9a7a649cd1923f4107fc00d4b47d6638c Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Thu, 19 Dec 2024 11:26:23 -0800 Subject: [PATCH 1/4] Don't assert about shared publish routing distribution --- .../software/amazon/awssdk/crt/test/Mqtt5ClientTest.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java index fda1139f5..db57c4fd5 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java @@ -2171,11 +2171,6 @@ public void Op_SharedSubscription() { // Wait a little longer just to ensure that no packets beyond expectations are arrived. publishEvents.afterCompletionFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); - // Check that both clients received packets. - // PublishEvents_Futured_Counted also checks for duplicated packets, so this one assert is enough - // to ensure that AWS IoT Core sent different packets to different subscribers. - assertTrue(publishEvents.clientsReceived.size() == 2); - subscriberOneClient.stop(); subscriberTwoClient.stop(); publisherClient.stop(); From c6eb4ff3a3c816f576f6cff13df6036c2b65b72d Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Thu, 19 Dec 2024 12:07:28 -0800 Subject: [PATCH 2/4] Merge test futures to try and avoid potential race condition in evaluation; don't let an interrupting client connection failure fail the entire interrupt(clientid) test --- .../awssdk/crt/test/Mqtt5ClientTest.java | 7 +++--- .../awssdk/crt/test/SelfPubSubTest.java | 24 +++++++------------ 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java index db57c4fd5..5d2dc4c7d 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java @@ -1132,9 +1132,10 @@ public void onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn on @Override public void onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn) { - connectedFuture.completeExceptionally(new Exception( - "[" + client_name + "] Could not connect! Error code is: " + onConnectionFailureReturn.getErrorCode() - )); + // failing the connected future here is not valid from a race condition standpoint. It is possible that + // the interrupting client itself gets interrupted and fails to fully connect due to the original client + // interrupting it. Eventually it will succeed (briefly) as the two clients fight over the client id + // with increasing reconnect backoff. } @Override diff --git a/src/test/java/software/amazon/awssdk/crt/test/SelfPubSubTest.java b/src/test/java/software/amazon/awssdk/crt/test/SelfPubSubTest.java index 18906a2f8..5cae3a5e8 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/SelfPubSubTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/SelfPubSubTest.java @@ -64,8 +64,7 @@ public void testPubSub() { }; CompletableFuture subscribed = connection.subscribe(TEST_TOPIC, QualityOfService.AT_LEAST_ONCE, - messageHandler); - subscribed.thenApply(unused -> subsAcked++); + messageHandler).thenApply(unused -> subsAcked++); int packetId = subscribed.get(); assertNotSame(0, packetId); @@ -73,15 +72,13 @@ public void testPubSub() { MqttMessage message = new MqttMessage(TEST_TOPIC, TEST_PAYLOAD.getBytes(), QualityOfService.AT_LEAST_ONCE, false); - CompletableFuture published = connection.publish(message); - published.thenApply(unused -> pubsAcked++); + CompletableFuture published = connection.publish(message).thenApply(unused -> pubsAcked++); packetId = published.get(); assertNotSame(0, packetId); assertEquals("Published", 1, pubsAcked); - published = connection.publish(message); - published.thenApply(unused -> pubsAcked++); + published = connection.publish(message).thenApply(unused -> pubsAcked++); packetId = published.get(); assertNotSame(0, packetId); @@ -93,8 +90,7 @@ public void testPubSub() { assertEquals("Received", message.getQos(), received.getQos()); assertEquals("Received", message.getRetain(), received.getRetain()); - CompletableFuture unsubscribed = connection.unsubscribe(TEST_TOPIC); - unsubscribed.thenApply(unused -> subsAcked--); + CompletableFuture unsubscribed = connection.unsubscribe(TEST_TOPIC).thenApply(unused -> subsAcked--); packetId = unsubscribed.get(); assertNotSame(0, packetId); @@ -141,30 +137,26 @@ public void testPubSubOnMessage() { null); try { - CompletableFuture subscribed = connection.subscribe(TEST_TOPIC, QualityOfService.AT_LEAST_ONCE); - subscribed.thenApply(unused -> subsAcked++); + CompletableFuture subscribed = connection.subscribe(TEST_TOPIC, QualityOfService.AT_LEAST_ONCE).thenApply(unused -> subsAcked++); int packetId = subscribed.get(); assertNotSame(0, packetId); assertEquals("Single subscription", 1, subsAcked); MqttMessage message = new MqttMessage(TEST_TOPIC, TEST_PAYLOAD.getBytes(), QualityOfService.AT_LEAST_ONCE); - CompletableFuture published = connection.publish(message); - published.thenApply(unused -> pubsAcked++); + CompletableFuture published = connection.publish(message).thenApply(unused -> pubsAcked++); packetId = published.get(); assertNotSame(0, packetId); assertEquals("Published", 1, pubsAcked); - published = connection.publish(message); - published.thenApply(unused -> pubsAcked++); + published = connection.publish(message).thenApply(unused -> pubsAcked++); packetId = published.get(); assertNotSame(0, packetId); assertEquals("Published", 2, pubsAcked); - CompletableFuture unsubscribed = connection.unsubscribe(TEST_TOPIC); - unsubscribed.thenApply(unused -> subsAcked--); + CompletableFuture unsubscribed = connection.unsubscribe(TEST_TOPIC).thenApply(unused -> subsAcked--); packetId = unsubscribed.get(); assertNotSame(0, packetId); From eb94c9929e75f3d9f474d4a19225032068e8ab17 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Thu, 19 Dec 2024 12:15:48 -0800 Subject: [PATCH 3/4] On second thought, get rid of the counting checks entirely; they're pointless and unsafe --- .../awssdk/crt/test/SelfPubSubTest.java | 27 ++++++------------- 1 file changed, 8 insertions(+), 19 deletions(-) diff --git a/src/test/java/software/amazon/awssdk/crt/test/SelfPubSubTest.java b/src/test/java/software/amazon/awssdk/crt/test/SelfPubSubTest.java index 5cae3a5e8..5de86f2ed 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/SelfPubSubTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/SelfPubSubTest.java @@ -34,9 +34,6 @@ public SelfPubSubTest() { static final String TEST_TOPIC = "publish/me/senpai/" + UUID.randomUUID().toString(); static final String TEST_PAYLOAD = "PUBLISH ME! SHINY AND CHROME!"; - int pubsAcked = 0; - int subsAcked = 0; - @Test public void testPubSub() { skipIfNetworkUnavailable(); @@ -64,25 +61,22 @@ public void testPubSub() { }; CompletableFuture subscribed = connection.subscribe(TEST_TOPIC, QualityOfService.AT_LEAST_ONCE, - messageHandler).thenApply(unused -> subsAcked++); + messageHandler); int packetId = subscribed.get(); assertNotSame(0, packetId); - assertEquals("Single subscription", 1, subsAcked); MqttMessage message = new MqttMessage(TEST_TOPIC, TEST_PAYLOAD.getBytes(), QualityOfService.AT_LEAST_ONCE, false); - CompletableFuture published = connection.publish(message).thenApply(unused -> pubsAcked++); + CompletableFuture published = connection.publish(message); packetId = published.get(); assertNotSame(0, packetId); - assertEquals("Published", 1, pubsAcked); - published = connection.publish(message).thenApply(unused -> pubsAcked++); + published = connection.publish(message); packetId = published.get(); assertNotSame(0, packetId); - assertEquals("Published", 2, pubsAcked); MqttMessage received = receivedFuture.get(); assertEquals("Received", message.getTopic(), received.getTopic()); @@ -90,11 +84,10 @@ public void testPubSub() { assertEquals("Received", message.getQos(), received.getQos()); assertEquals("Received", message.getRetain(), received.getRetain()); - CompletableFuture unsubscribed = connection.unsubscribe(TEST_TOPIC).thenApply(unused -> subsAcked--); + CompletableFuture unsubscribed = connection.unsubscribe(TEST_TOPIC); packetId = unsubscribed.get(); assertNotSame(0, packetId); - assertEquals("No Subscriptions", 0, subsAcked); } catch (Exception ex) { fail(ex.getMessage()); } @@ -137,30 +130,26 @@ public void testPubSubOnMessage() { null); try { - CompletableFuture subscribed = connection.subscribe(TEST_TOPIC, QualityOfService.AT_LEAST_ONCE).thenApply(unused -> subsAcked++); + CompletableFuture subscribed = connection.subscribe(TEST_TOPIC, QualityOfService.AT_LEAST_ONCE); int packetId = subscribed.get(); assertNotSame(0, packetId); - assertEquals("Single subscription", 1, subsAcked); MqttMessage message = new MqttMessage(TEST_TOPIC, TEST_PAYLOAD.getBytes(), QualityOfService.AT_LEAST_ONCE); - CompletableFuture published = connection.publish(message).thenApply(unused -> pubsAcked++); + CompletableFuture published = connection.publish(message); packetId = published.get(); assertNotSame(0, packetId); - assertEquals("Published", 1, pubsAcked); - published = connection.publish(message).thenApply(unused -> pubsAcked++); + published = connection.publish(message); packetId = published.get(); assertNotSame(0, packetId); - assertEquals("Published", 2, pubsAcked); - CompletableFuture unsubscribed = connection.unsubscribe(TEST_TOPIC).thenApply(unused -> subsAcked--); + CompletableFuture unsubscribed = connection.unsubscribe(TEST_TOPIC); packetId = unsubscribed.get(); assertNotSame(0, packetId); - assertEquals("No Subscriptions", 0, subsAcked); } catch (Exception ex) { fail(ex.getMessage()); } From 620c5e109a01701c608564075b4a29690c7be32b Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Mon, 6 Jan 2025 09:52:25 -0800 Subject: [PATCH 4/4] Add a delay before triggering the will test disconnect --- .../java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java index 5d2dc4c7d..1fc36ea73 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java @@ -2084,6 +2084,9 @@ public void Op_UC4() { eventsTwo.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); subscriber.subscribe(subscribePacketBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + // Paranoid about service-side eventual consistency. Add a wait to reduce chances of a missed will publish. + Thread.sleep(2000); + publisher.stop(disconnectPacketBuilder.build()); publishEvents.publishReceivedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);