Skip to content

Commit

Permalink
move one test for receving from one session from ServiceBusSessionMan…
Browse files Browse the repository at this point in the history
…agerIntegrationTest to ServiceBusReceiverAsyncClientIntegrationTest
  • Loading branch information
anuchandy committed Jan 10, 2025
1 parent bee48de commit fae62c5
Showing 1 changed file with 46 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,52 @@ void transactionReceiveCompleteCommitMixClient(MessagingEntityType entityType) {
StepVerifier.create(sender.commitTransaction(transaction.get())).expectComplete().verify(TIMEOUT);
}

@ParameterizedTest
@MethodSource("com.azure.messaging.servicebus.IntegrationTestBase#messagingEntityProvider")
void receivesMessagesFromSession(MessagingEntityType entityType) {
assertNotNull(sessionId, "'sessionId' should have been set.");
// Arrange
final int entityIndex = TestUtils.USE_CASE_SINGLE_SESSION;
final boolean isSessionEnabled = true;
final String messageId = "session-message";
final String contents = "some-contents";
final int numberToSend = 5;

this.sender
= toClose(getSenderBuilder(entityType, entityIndex, isSessionEnabled, false).buildAsyncClient());

final Disposable subscription = Flux.interval(Duration.ofMillis(500)).take(numberToSend).flatMap(index -> {
final ServiceBusMessage message = getServiceBusMessage(contents, messageId).setSessionId(sessionId);
return sender.sendMessage(message).thenReturn(index);
})
.subscribe(number -> logger.info("sessionId[{}] sent[{}] Message sent.", sessionId, number),
error -> logger.error("sessionId[{}] Error encountered.", sessionId, error),
() -> logger.info("sessionId[{}] Finished sending.", sessionId));
toClose(subscription);

this.sessionReceiver
= toClose(getSessionReceiverBuilder(entityType, entityIndex, false, DEFAULT_RETRY_OPTIONS)
.buildAsyncClient());
this.receiver = toClose(sessionReceiver.acceptSession(sessionId).block());

// Act & Assert
StepVerifier
.create(receiver.receiveMessages()
.concatMap(receivedMessage -> receiver.complete(receivedMessage).thenReturn(receivedMessage)))
.assertNext(serviceBusReceivedMessage -> assertMessageEquals(sessionId, messageId, contents,
serviceBusReceivedMessage))
.assertNext(serviceBusReceivedMessage -> assertMessageEquals(sessionId, messageId, contents,
serviceBusReceivedMessage))
.assertNext(serviceBusReceivedMessage -> assertMessageEquals(sessionId, messageId, contents,
serviceBusReceivedMessage))
.assertNext(serviceBusReceivedMessage -> assertMessageEquals(sessionId, messageId, contents,
serviceBusReceivedMessage))
.assertNext(serviceBusReceivedMessage -> assertMessageEquals(sessionId, messageId, contents,
serviceBusReceivedMessage))
.thenCancel()
.verify(Duration.ofMinutes(2));
}

/**
* Verifies that we can send and receive two messages.
*/
Expand Down

0 comments on commit fae62c5

Please sign in to comment.