Skip to content

Commit

Permalink
Flaky test pass1 (#855)
Browse files Browse the repository at this point in the history
* Update shared subscription test to remove invalid assertions on a property that does not necessarily hold (all subscriptions receive messages)
* Update SelfPubSub test to not use counting variables that are pointless (they won't ever be wrong) as well as unsafe from race conditions (they are set and read from different threads with no explicit synchronizations, also the future that was waited upon strictly happens-before the setting of the counter which leads to consistency issues)
* Update the double client id connection failure test to be more forgiving to potential race conditions between the two clients fighting over the client id
* Adds a delay between subscribe and disconnect in the will test. Did not get an encouraging answer when the service team was queried about whether subscribe completion had any eventual consistency issues relative to a subsequent publish (connection will).


Co-authored-by: Bret Ambrose <[email protected]>
  • Loading branch information
bretambrose and Bret Ambrose authored Jan 6, 2025
1 parent 20dd76c commit 5593e88
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1132,9 +1132,10 @@ public void onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn on

@Override
public void onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn) {
connectedFuture.completeExceptionally(new Exception(
"[" + client_name + "] Could not connect! Error code is: " + onConnectionFailureReturn.getErrorCode()
));
// failing the connected future here is not valid from a race condition standpoint. It is possible that
// the interrupting client itself gets interrupted and fails to fully connect due to the original client
// interrupting it. Eventually it will succeed (briefly) as the two clients fight over the client id
// with increasing reconnect backoff.
}

@Override
Expand Down Expand Up @@ -2083,6 +2084,9 @@ public void Op_UC4() {
eventsTwo.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
subscriber.subscribe(subscribePacketBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);

// Paranoid about service-side eventual consistency. Add a wait to reduce chances of a missed will publish.
Thread.sleep(2000);

publisher.stop(disconnectPacketBuilder.build());

publishEvents.publishReceivedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
Expand Down Expand Up @@ -2171,11 +2175,6 @@ public void Op_SharedSubscription() {
// Wait a little longer just to ensure that no packets beyond expectations are arrived.
publishEvents.afterCompletionFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);

// Check that both clients received packets.
// PublishEvents_Futured_Counted also checks for duplicated packets, so this one assert is enough
// to ensure that AWS IoT Core sent different packets to different subscribers.
assertTrue(publishEvents.clientsReceived.size() == 2);

subscriberOneClient.stop();
subscriberTwoClient.stop();
publisherClient.stop();
Expand Down
19 changes: 0 additions & 19 deletions src/test/java/software/amazon/awssdk/crt/test/SelfPubSubTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ public SelfPubSubTest() {
static final String TEST_TOPIC = "publish/me/senpai/" + UUID.randomUUID().toString();
static final String TEST_PAYLOAD = "PUBLISH ME! SHINY AND CHROME!";

int pubsAcked = 0;
int subsAcked = 0;

@Test
public void testPubSub() {
skipIfNetworkUnavailable();
Expand Down Expand Up @@ -65,27 +62,21 @@ public void testPubSub() {

CompletableFuture<Integer> subscribed = connection.subscribe(TEST_TOPIC, QualityOfService.AT_LEAST_ONCE,
messageHandler);
subscribed.thenApply(unused -> subsAcked++);
int packetId = subscribed.get();

assertNotSame(0, packetId);
assertEquals("Single subscription", 1, subsAcked);

MqttMessage message = new MqttMessage(TEST_TOPIC, TEST_PAYLOAD.getBytes(), QualityOfService.AT_LEAST_ONCE,
false);
CompletableFuture<Integer> published = connection.publish(message);
published.thenApply(unused -> pubsAcked++);
packetId = published.get();

assertNotSame(0, packetId);
assertEquals("Published", 1, pubsAcked);

published = connection.publish(message);
published.thenApply(unused -> pubsAcked++);
packetId = published.get();

assertNotSame(0, packetId);
assertEquals("Published", 2, pubsAcked);

MqttMessage received = receivedFuture.get();
assertEquals("Received", message.getTopic(), received.getTopic());
Expand All @@ -94,11 +85,9 @@ public void testPubSub() {
assertEquals("Received", message.getRetain(), received.getRetain());

CompletableFuture<Integer> unsubscribed = connection.unsubscribe(TEST_TOPIC);
unsubscribed.thenApply(unused -> subsAcked--);
packetId = unsubscribed.get();

assertNotSame(0, packetId);
assertEquals("No Subscriptions", 0, subsAcked);
} catch (Exception ex) {
fail(ex.getMessage());
}
Expand Down Expand Up @@ -142,33 +131,25 @@ public void testPubSubOnMessage() {

try {
CompletableFuture<Integer> subscribed = connection.subscribe(TEST_TOPIC, QualityOfService.AT_LEAST_ONCE);
subscribed.thenApply(unused -> subsAcked++);
int packetId = subscribed.get();

assertNotSame(0, packetId);
assertEquals("Single subscription", 1, subsAcked);

MqttMessage message = new MqttMessage(TEST_TOPIC, TEST_PAYLOAD.getBytes(), QualityOfService.AT_LEAST_ONCE);
CompletableFuture<Integer> published = connection.publish(message);
published.thenApply(unused -> pubsAcked++);
packetId = published.get();

assertNotSame(0, packetId);
assertEquals("Published", 1, pubsAcked);

published = connection.publish(message);
published.thenApply(unused -> pubsAcked++);
packetId = published.get();

assertNotSame(0, packetId);
assertEquals("Published", 2, pubsAcked);

CompletableFuture<Integer> unsubscribed = connection.unsubscribe(TEST_TOPIC);
unsubscribed.thenApply(unused -> subsAcked--);
packetId = unsubscribed.get();

assertNotSame(0, packetId);
assertEquals("No Subscriptions", 0, subsAcked);
} catch (Exception ex) {
fail(ex.getMessage());
}
Expand Down

0 comments on commit 5593e88

Please sign in to comment.