diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java index 25aeb9e0cca50..0965f0d477fe4 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java @@ -269,6 +269,52 @@ void transactionReceiveCompleteCommitMixClient(MessagingEntityType entityType) { StepVerifier.create(sender.commitTransaction(transaction.get())).expectComplete().verify(TIMEOUT); } + @ParameterizedTest + @MethodSource("com.azure.messaging.servicebus.IntegrationTestBase#messagingEntityProvider") + void receivesMessagesFromSession(MessagingEntityType entityType) { + assertNotNull(sessionId, "'sessionId' should have been set."); + // Arrange + final int entityIndex = TestUtils.USE_CASE_SINGLE_SESSION; + final boolean isSessionEnabled = true; + final String messageId = "session-message"; + final String contents = "some-contents"; + final int numberToSend = 5; + + this.sender + = toClose(getSenderBuilder(entityType, entityIndex, isSessionEnabled, false).buildAsyncClient()); + + final Disposable subscription = Flux.interval(Duration.ofMillis(500)).take(numberToSend).flatMap(index -> { + final ServiceBusMessage message = getServiceBusMessage(contents, messageId).setSessionId(sessionId); + return sender.sendMessage(message).thenReturn(index); + }) + .subscribe(number -> logger.info("sessionId[{}] sent[{}] Message sent.", sessionId, number), + error -> logger.error("sessionId[{}] Error encountered.", sessionId, error), + () -> logger.info("sessionId[{}] Finished sending.", sessionId)); + toClose(subscription); + + this.sessionReceiver + = toClose(getSessionReceiverBuilder(entityType, entityIndex, false, DEFAULT_RETRY_OPTIONS) + .buildAsyncClient()); + this.receiver = toClose(sessionReceiver.acceptSession(sessionId).block()); + + // Act & Assert + StepVerifier + .create(receiver.receiveMessages() + .concatMap(receivedMessage -> receiver.complete(receivedMessage).thenReturn(receivedMessage))) + .assertNext(serviceBusReceivedMessage -> assertMessageEquals(sessionId, messageId, contents, + serviceBusReceivedMessage)) + .assertNext(serviceBusReceivedMessage -> assertMessageEquals(sessionId, messageId, contents, + serviceBusReceivedMessage)) + .assertNext(serviceBusReceivedMessage -> assertMessageEquals(sessionId, messageId, contents, + serviceBusReceivedMessage)) + .assertNext(serviceBusReceivedMessage -> assertMessageEquals(sessionId, messageId, contents, + serviceBusReceivedMessage)) + .assertNext(serviceBusReceivedMessage -> assertMessageEquals(sessionId, messageId, contents, + serviceBusReceivedMessage)) + .thenCancel() + .verify(Duration.ofMinutes(2)); + } + /** * Verifies that we can send and receive two messages. */