From 5593e88454b6106d1eca210e9a43404a06c0e530 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Mon, 6 Jan 2025 13:02:05 -0800 Subject: [PATCH] Flaky test pass1 (#855) * Update shared subscription test to remove invalid assertions on a property that does not necessarily hold (all subscriptions receive messages) * Update SelfPubSub test to not use counting variables that are pointless (they won't ever be wrong) as well as unsafe from race conditions (they are set and read from different threads with no explicit synchronizations, also the future that was waited upon strictly happens-before the setting of the counter which leads to consistency issues) * Update the double client id connection failure test to be more forgiving to potential race conditions between the two clients fighting over the client id * Adds a delay between subscribe and disconnect in the will test. Did not get an encouraging answer when the service team was queried about whether subscribe completion had any eventual consistency issues relative to a subsequent publish (connection will). Co-authored-by: Bret Ambrose --- .../awssdk/crt/test/Mqtt5ClientTest.java | 15 +++++++-------- .../awssdk/crt/test/SelfPubSubTest.java | 19 ------------------- 2 files changed, 7 insertions(+), 27 deletions(-) diff --git a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java index fda1139f5..1fc36ea73 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java @@ -1132,9 +1132,10 @@ public void onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn on @Override public void onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn) { - connectedFuture.completeExceptionally(new Exception( - "[" + client_name + "] Could not connect! Error code is: " + onConnectionFailureReturn.getErrorCode() - )); + // failing the connected future here is not valid from a race condition standpoint. It is possible that + // the interrupting client itself gets interrupted and fails to fully connect due to the original client + // interrupting it. Eventually it will succeed (briefly) as the two clients fight over the client id + // with increasing reconnect backoff. } @Override @@ -2083,6 +2084,9 @@ public void Op_UC4() { eventsTwo.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); subscriber.subscribe(subscribePacketBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); + // Paranoid about service-side eventual consistency. Add a wait to reduce chances of a missed will publish. + Thread.sleep(2000); + publisher.stop(disconnectPacketBuilder.build()); publishEvents.publishReceivedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); @@ -2171,11 +2175,6 @@ public void Op_SharedSubscription() { // Wait a little longer just to ensure that no packets beyond expectations are arrived. publishEvents.afterCompletionFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS); - // Check that both clients received packets. - // PublishEvents_Futured_Counted also checks for duplicated packets, so this one assert is enough - // to ensure that AWS IoT Core sent different packets to different subscribers. - assertTrue(publishEvents.clientsReceived.size() == 2); - subscriberOneClient.stop(); subscriberTwoClient.stop(); publisherClient.stop(); diff --git a/src/test/java/software/amazon/awssdk/crt/test/SelfPubSubTest.java b/src/test/java/software/amazon/awssdk/crt/test/SelfPubSubTest.java index 18906a2f8..5de86f2ed 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/SelfPubSubTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/SelfPubSubTest.java @@ -34,9 +34,6 @@ public SelfPubSubTest() { static final String TEST_TOPIC = "publish/me/senpai/" + UUID.randomUUID().toString(); static final String TEST_PAYLOAD = "PUBLISH ME! SHINY AND CHROME!"; - int pubsAcked = 0; - int subsAcked = 0; - @Test public void testPubSub() { skipIfNetworkUnavailable(); @@ -65,27 +62,21 @@ public void testPubSub() { CompletableFuture subscribed = connection.subscribe(TEST_TOPIC, QualityOfService.AT_LEAST_ONCE, messageHandler); - subscribed.thenApply(unused -> subsAcked++); int packetId = subscribed.get(); assertNotSame(0, packetId); - assertEquals("Single subscription", 1, subsAcked); MqttMessage message = new MqttMessage(TEST_TOPIC, TEST_PAYLOAD.getBytes(), QualityOfService.AT_LEAST_ONCE, false); CompletableFuture published = connection.publish(message); - published.thenApply(unused -> pubsAcked++); packetId = published.get(); assertNotSame(0, packetId); - assertEquals("Published", 1, pubsAcked); published = connection.publish(message); - published.thenApply(unused -> pubsAcked++); packetId = published.get(); assertNotSame(0, packetId); - assertEquals("Published", 2, pubsAcked); MqttMessage received = receivedFuture.get(); assertEquals("Received", message.getTopic(), received.getTopic()); @@ -94,11 +85,9 @@ public void testPubSub() { assertEquals("Received", message.getRetain(), received.getRetain()); CompletableFuture unsubscribed = connection.unsubscribe(TEST_TOPIC); - unsubscribed.thenApply(unused -> subsAcked--); packetId = unsubscribed.get(); assertNotSame(0, packetId); - assertEquals("No Subscriptions", 0, subsAcked); } catch (Exception ex) { fail(ex.getMessage()); } @@ -142,33 +131,25 @@ public void testPubSubOnMessage() { try { CompletableFuture subscribed = connection.subscribe(TEST_TOPIC, QualityOfService.AT_LEAST_ONCE); - subscribed.thenApply(unused -> subsAcked++); int packetId = subscribed.get(); assertNotSame(0, packetId); - assertEquals("Single subscription", 1, subsAcked); MqttMessage message = new MqttMessage(TEST_TOPIC, TEST_PAYLOAD.getBytes(), QualityOfService.AT_LEAST_ONCE); CompletableFuture published = connection.publish(message); - published.thenApply(unused -> pubsAcked++); packetId = published.get(); assertNotSame(0, packetId); - assertEquals("Published", 1, pubsAcked); published = connection.publish(message); - published.thenApply(unused -> pubsAcked++); packetId = published.get(); assertNotSame(0, packetId); - assertEquals("Published", 2, pubsAcked); CompletableFuture unsubscribed = connection.unsubscribe(TEST_TOPIC); - unsubscribed.thenApply(unused -> subsAcked--); packetId = unsubscribed.get(); assertNotSame(0, packetId); - assertEquals("No Subscriptions", 0, subsAcked); } catch (Exception ex) { fail(ex.getMessage()); }