Skip to content

Commit

Permalink
Fix TopicsRecoveryTests
Browse files Browse the repository at this point in the history
  • Loading branch information
thegridman committed Feb 20, 2025
1 parent f6eeb5f commit be1c222
Showing 1 changed file with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -368,30 +368,31 @@ public void shouldSubscribeWithAnonymousSubscriberAfterServiceRestart() throws E
}

// A latch to catch the subscriber disconnect
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch latchConnect = new CountDownLatch(1);
CountDownLatch latchDisconnect = new CountDownLatch(1);
subscriber.addStateListener((subscriber1, nNewState, nPrevState) ->
{
if (nNewState == NamedTopicSubscriber.STATE_DISCONNECTED)
{
latch.countDown();
latchDisconnect.countDown();
}
if (nNewState == NamedTopicSubscriber.STATE_DISCONNECTED)
{
latchConnect.countDown();
}
});

restartService(topic);

// The subscriber should have disconnected at least once, it may already be reconnected
// so we cannot just check its state
assertThat(latch.await(5, TimeUnit.MINUTES), is(true));
assertThat(latchDisconnect.await(5, TimeUnit.MINUTES), is(true));
// Should eventually be reconnected
Eventually.assertDeferred(subscriber::getState, is(NamedTopicSubscriber.STATE_CONNECTED));

Logger.info(">>>> Subscriber receiving remaining " + (cMsgTotal - m) + " messages of " + cbMessage + " bytes");
for ( ; m < cMsgTotal; m++)
{
element = subscriber.receive().get(1, TimeUnit.MINUTES);
assertThat(element, is(notNullValue()));
Message message = element.getValue();
assertThat(message, is(notNullValue()));
assertThat(message.m_id, is(m));
}
Logger.info(">>>> Subscriber receiving remaining messages");
element = subscriber.receive().get(1, TimeUnit.MINUTES);
assertThat(element, is(notNullValue()));
}
}
}
Expand Down

0 comments on commit be1c222

Please sign in to comment.