From 65105c1e65487952407fe771b04562d4f6e2725b Mon Sep 17 00:00:00 2001 From: Hemant Tanwar Date: Thu, 25 Mar 2021 15:02:06 -0700 Subject: [PATCH 1/5] ServiceBus - cross entity transaction feature (#19863) New Feature: Cross entity transaction API --- eng/jacoco-test-coverage/pom.xml | 2 +- eng/versioning/version_client.txt | 4 +- sdk/core/azure-core-amqp/CHANGELOG.md | 3 + sdk/core/azure-core-amqp/README.md | 4 +- sdk/core/azure-core-amqp/pom.xml | 2 +- .../java/com/azure/core/amqp/AmqpSession.java | 18 + .../core/amqp/AmqpTransactionCoordinator.java | 36 ++ .../amqp/implementation/ReactorSender.java | 4 +- .../amqp/implementation/ReactorSession.java | 19 +- .../TransactionCoordinator.java | 9 +- .../implementation/ReactorSessionTest.java | 44 ++ .../TransactionCoordinatorTest.java | 8 +- .../azure-resourcemanager-samples/pom.xml | 2 +- .../pom.xml | 2 +- .../azure-messaging-servicebus/CHANGELOG.md | 4 + .../azure-messaging-servicebus/README.md | 6 +- .../azure-messaging-servicebus/pom.xml | 6 +- .../servicebus/ServiceBusClientBuilder.java | 26 +- .../ServiceBusCreateSessionOptions.java | 30 ++ .../ServiceBusReactorAmqpConnection.java | 20 +- .../ServiceBusReactorSession.java | 33 +- .../ServiceBusMixClientIntegrationTest.java | 459 ++++++++++++++++++ ...ceBusSenderAsyncClientIntegrationTest.java | 4 +- .../azure/messaging/servicebus/TestUtils.java | 4 +- .../ServiceBusReactorSessionTest.java | 44 +- .../ServiceBusReceiveLinkProcessorTest.java | 3 +- sdk/servicebus/pom.xml | 2 +- .../pom.xml | 2 +- 28 files changed, 753 insertions(+), 47 deletions(-) create mode 100644 sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpTransactionCoordinator.java create mode 100644 sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusCreateSessionOptions.java create mode 100644 sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMixClientIntegrationTest.java diff --git a/eng/jacoco-test-coverage/pom.xml b/eng/jacoco-test-coverage/pom.xml index baafd4465976..58e4d5616dd3 100644 --- a/eng/jacoco-test-coverage/pom.xml +++ b/eng/jacoco-test-coverage/pom.xml @@ -209,7 +209,7 @@ com.azure azure-messaging-servicebus - 7.1.0 + 7.2.0-beta.1 com.azure diff --git a/eng/versioning/version_client.txt b/eng/versioning/version_client.txt index 096a95febd45..478edaf58fee 100644 --- a/eng/versioning/version_client.txt +++ b/eng/versioning/version_client.txt @@ -80,7 +80,7 @@ com.azure:azure-iot-deviceupdate;1.0.0-beta.1;1.0.0-beta.1 com.azure:azure-messaging-eventgrid;2.0.0-beta.4;2.0.0-beta.5 com.azure:azure-messaging-eventhubs;5.5.0;5.6.0-beta.1 com.azure:azure-messaging-eventhubs-checkpointstore-blob;1.5.0;1.6.0-beta.1 -com.azure:azure-messaging-servicebus;7.0.2;7.1.0 +com.azure:azure-messaging-servicebus;7.1.0;7.2.0-beta.1 com.azure:azure-messaging-servicebus-track1-perf;1.0.0-beta.1;1.0.0-beta.1 com.azure:azure-messaging-servicebus-track2-perf;1.0.0-beta.1;1.0.0-beta.1 com.azure:azure-mixedreality-authentication;1.0.0-beta.1;1.0.0 @@ -225,3 +225,5 @@ com.azure.resourcemanager:azure-resourcemanager-datadog;1.0.0-beta.1;1.0.0-beta. # beta_:;dependency-version # note: Released beta versions will not be manipulated with the automatic PR creation code. beta_com.azure:azure-security-keyvault-keys;4.3.0-beta.1 +beta_com.azure:azure-core-amqp;2.1.0-beta.1 +beta_com.azure:azure-messaging-servicebus;7.2.0-beta.1 diff --git a/sdk/core/azure-core-amqp/CHANGELOG.md b/sdk/core/azure-core-amqp/CHANGELOG.md index fa68d82a0bb7..985141767003 100644 --- a/sdk/core/azure-core-amqp/CHANGELOG.md +++ b/sdk/core/azure-core-amqp/CHANGELOG.md @@ -1,6 +1,9 @@ # Release History ## 2.1.0-beta.1 (Unreleased) +### New Features +- Exposes 'AmqpTransactionCoordinator' via AmqpSession. +- Added API in interface 'AmqpSession.getOrCreateTransactionCoordinator()'. ## 2.0.3 (2021-03-09) diff --git a/sdk/core/azure-core-amqp/README.md b/sdk/core/azure-core-amqp/README.md index 1af004424b0d..6c595d480f60 100644 --- a/sdk/core/azure-core-amqp/README.md +++ b/sdk/core/azure-core-amqp/README.md @@ -11,12 +11,12 @@ own AMQP client library that abstracts from the underlying transport library's i ### Include the package -[//]: # ({x-version-update-start;com.azure:azure-core-amqp;current}) +[//]: # ({x-version-update-start;beta_com.azure:azure-core-amqp;dependency}) ```xml com.azure azure-core-amqp - 2.0.3 + 2.1.0-beta.1 ``` [//]: # ({x-version-update-end}) diff --git a/sdk/core/azure-core-amqp/pom.xml b/sdk/core/azure-core-amqp/pom.xml index e83281073d09..733de4489c58 100644 --- a/sdk/core/azure-core-amqp/pom.xml +++ b/sdk/core/azure-core-amqp/pom.xml @@ -58,7 +58,7 @@ com.azure azure-core - 1.15.0-beta.1 + 1.14.0 com.microsoft.azure diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpSession.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpSession.java index a28b346d3b3b..a80451f8e930 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpSession.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpSession.java @@ -91,4 +91,22 @@ public interface AmqpSession extends Disposable { * @return A completable mono. */ Mono rollbackTransaction(AmqpTransaction transaction); + + /** + * Creates the {@link AmqpTransactionCoordinator} on the {@link AmqpSession} which is used to create/commit or + * rollback the transaction. A transaction can span over one or more message broker entities. The interface + * {@link AmqpSession} provides default implementation for back-word compatibility but it throws + * {@link RuntimeException} to warn what that an implementing class must override and provide implementation of this + * API. Azure SDK already provides implementation for this API. + * + * @return newly created {@link AmqpTransactionCoordinator}. + * @throws UnsupportedOperationException Indicting implementation not found error. Azure SDK should provide + * implementation of this API but if runtime is not able to find it in its classpath or version mismatch can cause + * this exception. + * + * @see Transaction Coordination + */ + default Mono getOrCreateTransactionCoordinator() { + return Mono.error(new UnsupportedOperationException("Implementation not found error.")); + } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpTransactionCoordinator.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpTransactionCoordinator.java new file mode 100644 index 000000000000..70e30f0aee7f --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpTransactionCoordinator.java @@ -0,0 +1,36 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp; + +import reactor.core.publisher.Mono; + +/** + * Provides an API to manage AMQP transaction on message broker. A transaction is used where one or more operation in + * messaging broker is part of one unit of work. In general a transaction involve with many operations on one message + * broker entity. + *

+ * Distributed Transactions: A distributed transaction where operations spans over different message broker entities. + * + * @see Distributed Transactions + * @see Transactions + */ +public interface AmqpTransactionCoordinator { + + /** + * Completes the transaction. All the work in this transaction will either rollback or committed as one unit of + * work. + * @param transaction that needs to be completed on message broker. + * @param isCommit this flag indicates that the work associated with this transaction should commit or rollback. + * @return a completable {@link Mono}. + */ + Mono discharge(AmqpTransaction transaction, boolean isCommit); + + /** + * Creates the transaction in message broker. Successful completion of this API indicates that a transaction + * identifier has successfully been created on the message broker. Once a transaction has been created, it must be + * completed by using {@link AmqpTransactionCoordinator#discharge(AmqpTransaction, boolean)} API. + * @return the created transaction id represented by {@link AmqpTransaction}. + */ + Mono declare(); +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java index f9ec16619512..98dfc2d582b3 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java @@ -304,7 +304,9 @@ void dispose(ErrorCondition errorCondition) { } subscriptions.dispose(); - tokenManager.close(); + if (tokenManager != null) { + tokenManager.close(); + } if (sender.getLocalState() == EndpointState.CLOSED) { return; diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java index 45cae61bdfad..33c8e78255f5 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java @@ -9,6 +9,7 @@ import com.azure.core.amqp.AmqpRetryPolicy; import com.azure.core.amqp.AmqpSession; import com.azure.core.amqp.AmqpTransaction; +import com.azure.core.amqp.AmqpTransactionCoordinator; import com.azure.core.amqp.ClaimsBasedSecurityNode; import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler; import com.azure.core.amqp.implementation.handler.SendLinkHandler; @@ -171,8 +172,8 @@ public Duration getOperationTimeout() { */ @Override public Mono createTransaction() { - return createTransactionCoordinator() - .flatMap(coordinator -> coordinator.createTransaction()); + return getOrCreateTransactionCoordinator() + .flatMap(coordinator -> coordinator.declare()); } /** @@ -180,8 +181,8 @@ public Mono createTransaction() { */ @Override public Mono commitTransaction(AmqpTransaction transaction) { - return createTransactionCoordinator() - .flatMap(coordinator -> coordinator.completeTransaction(transaction, true)); + return getOrCreateTransactionCoordinator() + .flatMap(coordinator -> coordinator.discharge(transaction, true)); } /** @@ -189,8 +190,8 @@ public Mono commitTransaction(AmqpTransaction transaction) { */ @Override public Mono rollbackTransaction(AmqpTransaction transaction) { - return createTransactionCoordinator() - .flatMap(coordinator -> coordinator.completeTransaction(transaction, false)); + return getOrCreateTransactionCoordinator() + .flatMap(coordinator -> coordinator.discharge(transaction, false)); } /** @@ -220,10 +221,10 @@ public boolean removeLink(String linkName) { } /** - * - * @return {@link Mono} of {@link TransactionCoordinator} + * {@inheritDoc} */ - private Mono createTransactionCoordinator() { + @Override + public Mono getOrCreateTransactionCoordinator() { if (isDisposed()) { return Mono.error(logger.logExceptionAsError(new IllegalStateException(String.format( "Cannot create coordinator send link '%s' from a closed session.", TRANSACTION_LINK_NAME)))); diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/TransactionCoordinator.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/TransactionCoordinator.java index 2ba984fc145b..65f8d8ac0a88 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/TransactionCoordinator.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/TransactionCoordinator.java @@ -4,6 +4,7 @@ package com.azure.core.amqp.implementation; import com.azure.core.amqp.AmqpTransaction; +import com.azure.core.amqp.AmqpTransactionCoordinator; import com.azure.core.util.logging.ClientLogger; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Binary; @@ -21,7 +22,7 @@ /** * Encapsulates transaction functions. */ -final class TransactionCoordinator { +final class TransactionCoordinator implements AmqpTransactionCoordinator { private final ClientLogger logger = new ClientLogger(TransactionCoordinator.class); @@ -42,7 +43,8 @@ final class TransactionCoordinator { * * @return a completable {@link Mono} which represent {@link DeliveryState}. */ - Mono completeTransaction(AmqpTransaction transaction, boolean isCommit) { + @Override + public Mono discharge(AmqpTransaction transaction, boolean isCommit) { final Message message = Proton.message(); Discharge discharge = new Discharge(); discharge.setFail(!isCommit); @@ -74,7 +76,8 @@ Mono completeTransaction(AmqpTransaction transaction, boolean isCommit) { * * @return a completable {@link Mono} which represent {@link DeliveryState}. */ - Mono createTransaction() { + @Override + public Mono declare() { final Message message = Proton.message(); Declare declare = new Declare(); message.setBody(new AmqpValue(declare)); diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java index 8cdebd9c16db..19819b29810f 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java @@ -8,6 +8,7 @@ import com.azure.core.amqp.AmqpRetryMode; import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.amqp.AmqpRetryPolicy; +import com.azure.core.amqp.AmqpTransactionCoordinator; import com.azure.core.amqp.ClaimsBasedSecurityNode; import com.azure.core.amqp.FixedAmqpRetryPolicy; import com.azure.core.amqp.exception.AmqpErrorCondition; @@ -15,6 +16,7 @@ import com.azure.core.amqp.implementation.handler.SendLinkHandler; import com.azure.core.amqp.implementation.handler.SessionHandler; import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transaction.Coordinator; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Event; @@ -40,6 +42,7 @@ import java.util.Map; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -233,6 +236,47 @@ void createProducerAgainAfterException() { sendLinkHandler.onLinkRemoteClose(closeSendEvent); } + /** + * Verifies that we can create coordinator. + */ + @Test + void getOrCreateTransactionCoordinator() { + // Arrange + final String TRANSACTION_LINK_NAME = "coordinator"; + final String linkName = TRANSACTION_LINK_NAME; + final String entityPath = TRANSACTION_LINK_NAME; + + final TokenManager tokenManager = mock(TokenManager.class); + final SendLinkHandler sendLinkHandler = new SendLinkHandler(ID, HOST, linkName, entityPath); + + when(session.sender(linkName)).thenReturn(sender); + when(tokenManagerProvider.getTokenManager(cbsNodeSupplier, entityPath)).thenReturn(tokenManager); + when(tokenManager.authorize()).thenReturn(Mono.just(1000L)); + when(tokenManager.getAuthorizationResults()) + .thenReturn(Flux.create(sink -> sink.next(AmqpResponseCode.ACCEPTED))); + when(reactorHandlerProvider.createSendLinkHandler(ID, HOST, linkName, entityPath)) + .thenReturn(sendLinkHandler); + + StepVerifier.create( + reactorSession.getOrCreateTransactionCoordinator()) + .then(() -> handler.onSessionRemoteOpen(event)) + .thenAwait(Duration.ofSeconds(2)) + .assertNext(Assertions::assertNotNull) + .verifyComplete(); + + verify(session).sender(TRANSACTION_LINK_NAME); + verify(sender).setTarget(any(Coordinator.class)); + verify(session).open(); + + final AmqpTransactionCoordinator coordinator1 = reactorSession.getOrCreateTransactionCoordinator() + .block(TIMEOUT); + + final AmqpTransactionCoordinator coordinator2 = reactorSession.getOrCreateTransactionCoordinator() + .block(TIMEOUT); + + assertSame(coordinator1, coordinator2); + } + @Test void createConsumer() { // Arrange diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/TransactionCoordinatorTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/TransactionCoordinatorTest.java index db37ccfa3955..313477a828c2 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/TransactionCoordinatorTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/TransactionCoordinatorTest.java @@ -64,7 +64,7 @@ public void testCompleteTransactionRejected(boolean isCommit) { doReturn(Mono.just(outcome)).when(sendLink).send(any(byte[].class), anyInt(), eq(DeliveryImpl.DEFAULT_MESSAGE_FORMAT), isNull()); - StepVerifier.create(transactionCoordinator.completeTransaction(transaction, isCommit)) + StepVerifier.create(transactionCoordinator.discharge(transaction, isCommit)) .verifyError(IllegalArgumentException.class); verify(sendLink, times(1)).send(any(byte[].class), anyInt(), eq(DeliveryImpl.DEFAULT_MESSAGE_FORMAT), isNull()); @@ -81,7 +81,7 @@ public void testCompleteTransaction(boolean isCommit) { doReturn(Mono.just(outcome)).when(sendLink).send(any(byte[].class), anyInt(), eq(DeliveryImpl.DEFAULT_MESSAGE_FORMAT), isNull()); - StepVerifier.create(transactionCoordinator.completeTransaction(transaction, isCommit)) + StepVerifier.create(transactionCoordinator.discharge(transaction, isCommit)) .verifyComplete(); verify(sendLink, times(1)).send(any(byte[].class), anyInt(), eq(DeliveryImpl.DEFAULT_MESSAGE_FORMAT), isNull()); @@ -95,7 +95,7 @@ public void testCreateTransactionRejected() { doReturn(Mono.just(outcome)).when(sendLink).send(any(byte[].class), anyInt(), eq(DeliveryImpl.DEFAULT_MESSAGE_FORMAT), isNull()); - StepVerifier.create(transactionCoordinator.createTransaction()) + StepVerifier.create(transactionCoordinator.declare()) .verifyError(IllegalArgumentException.class); verify(sendLink, times(1)).send(any(byte[].class), anyInt(), eq(DeliveryImpl.DEFAULT_MESSAGE_FORMAT), isNull()); @@ -111,7 +111,7 @@ public void testCreateTransaction() { doReturn(Mono.just(transactionState)).when(sendLink).send(any(byte[].class), anyInt(), eq(DeliveryImpl.DEFAULT_MESSAGE_FORMAT), isNull()); - StepVerifier.create(transactionCoordinator.createTransaction()) + StepVerifier.create(transactionCoordinator.declare()) .assertNext(actual -> { Assertions.assertNotNull(actual); Assertions.assertArrayEquals(transactionId, actual.getTransactionId().array()); diff --git a/sdk/resourcemanager/azure-resourcemanager-samples/pom.xml b/sdk/resourcemanager/azure-resourcemanager-samples/pom.xml index f8b5d80965f3..c20bbcab1fcb 100644 --- a/sdk/resourcemanager/azure-resourcemanager-samples/pom.xml +++ b/sdk/resourcemanager/azure-resourcemanager-samples/pom.xml @@ -118,7 +118,7 @@ com.azure azure-messaging-servicebus - 7.0.2 + 7.1.0 io.fabric8 diff --git a/sdk/servicebus/azure-messaging-servicebus-track2-perf/pom.xml b/sdk/servicebus/azure-messaging-servicebus-track2-perf/pom.xml index 76b77081bc91..5fe6eb982203 100644 --- a/sdk/servicebus/azure-messaging-servicebus-track2-perf/pom.xml +++ b/sdk/servicebus/azure-messaging-servicebus-track2-perf/pom.xml @@ -25,7 +25,7 @@ com.azure azure-messaging-servicebus - 7.1.0 + 7.2.0-beta.1 com.azure diff --git a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md index 622ae73f2f96..f8b42c4d8c4a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md @@ -1,5 +1,9 @@ # Release History +## 7.2.0-beta.1 (2021-03-18) +### New Features +- Added support for distributed transactions across entities via API 'ServiceBusClientBuilder.enableCrossEntityTransactions()'. + ## 7.1.0 (2021-03-10) ### Bug Fixes - Continue to receive messages regardless of user not settling the received message in PEEK_LOCK mode [#19247](https://github.com/Azure/azure-sdk-for-java/issues/19247). diff --git a/sdk/servicebus/azure-messaging-servicebus/README.md b/sdk/servicebus/azure-messaging-servicebus/README.md index 9c1c961fcea1..7c680fa11b7a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/README.md +++ b/sdk/servicebus/azure-messaging-servicebus/README.md @@ -32,12 +32,12 @@ To quickly create the needed Service Bus resources in Azure and to receive a con ### Include the package -[//]: # ({x-version-update-start;com.azure:azure-messaging-servicebus;current}) +[//]: # ({x-version-update-start;beta_com.azure:azure-messaging-servicebus;dependency}) ```xml com.azure azure-messaging-servicebus - 7.1.0 + 7.2.0-beta.1 ``` [//]: # ({x-version-update-end}) @@ -86,7 +86,7 @@ platform. First, add the package: com.azure azure-identity - 1.2.3 + 1.2.4 ``` [//]: # ({x-version-update-end}) diff --git a/sdk/servicebus/azure-messaging-servicebus/pom.xml b/sdk/servicebus/azure-messaging-servicebus/pom.xml index ea05280134e8..6539333beda8 100644 --- a/sdk/servicebus/azure-messaging-servicebus/pom.xml +++ b/sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -14,7 +14,7 @@ com.azure azure-messaging-servicebus - 7.1.0 + 7.2.0-beta.1 Microsoft Azure client library for Service Bus Libraries built on Microsoft Azure Service Bus @@ -34,7 +34,7 @@ - 0.18 + 0.20 0.09 @@ -47,7 +47,7 @@ com.azure azure-core-amqp - 2.0.3 + 2.1.0-beta.1 com.azure diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java index 4d2edd6cb4e9..dd6f442972d0 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java @@ -134,6 +134,7 @@ public final class ServiceBusClientBuilder { private Scheduler scheduler; private AmqpTransportType transport = AmqpTransportType.AMQP; private SslDomain.VerifyMode verifyMode; + private boolean crossEntityTransactions; /** * Keeps track of the open clients that were created from this builder when there is a shared connection. @@ -187,6 +188,28 @@ public ServiceBusClientBuilder connectionString(String connectionString) { return credential(properties.getEndpoint().getHost(), tokenCredential); } + /** + * Enable cross entity transaction on the connection to Service bus. Use this feature only when your transaction + * scope spans across different Service Bus entities. + * + *

Avoid using non-transaction API on this client

+ * Since this feature will set up connection to Service Bus optimised to enable this feature. Once all the clients + * have been setup, the first receiver or sender used will initialize 'send-via' queue as a single message transfer + * entity. All the messages will flow via this queue. Thus this client is not suitable for any non-transaction API. + * + *

When not to enable this feature

+ * If your transaction involved in one Service bus entity only. For example you are receiving from one + * queue/subscription and you want to settle your own messages which are part of one transaction. + * + * @return The updated {@link ServiceBusSenderClientBuilder} object. + * + * @see Service Bus transactions + */ + public ServiceBusClientBuilder enableCrossEntityTransactions() { + this.crossEntityTransactions = true; + return this; + } + private TokenCredential getTokenCredential(ConnectionStringProperties properties) { TokenCredential tokenCredential; if (properties.getSharedAccessSignature() == null) { @@ -391,7 +414,8 @@ private ServiceBusConnectionProcessor getOrCreateConnectionProcessor(MessageSeri final String connectionId = StringUtil.getRandomString("MF"); return (ServiceBusAmqpConnection) new ServiceBusReactorAmqpConnection(connectionId, - connectionOptions, provider, handlerProvider, tokenManagerProvider, serializer); + connectionOptions, provider, handlerProvider, tokenManagerProvider, serializer, + crossEntityTransactions); }).repeat(); sharedConnection = connectionFlux.subscribeWith(new ServiceBusConnectionProcessor( diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusCreateSessionOptions.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusCreateSessionOptions.java new file mode 100644 index 000000000000..6c7c34c15372 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusCreateSessionOptions.java @@ -0,0 +1,30 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus.implementation; + +import com.azure.core.annotation.Immutable; + +/** + * An option bag to provide configuration required to create a {@link ServiceBusReactorSession}. + */ +@Immutable +public final class ServiceBusCreateSessionOptions { + private final boolean distributedTransactionsSupport; + + /** + * Constructor to create {@link ServiceBusCreateSessionOptions}. + * @param distributedTransactionsSupport if session supports distributed transaction across different entities. + */ + public ServiceBusCreateSessionOptions(boolean distributedTransactionsSupport) { + this.distributedTransactionsSupport = distributedTransactionsSupport; + } + + /** + * Determine is distributed transactions are supported across different entities. + * @return true if distributed transactions across different entities are supported. + */ + public boolean isDistributedTransactionsSupported() { + return this.distributedTransactionsSupport; + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorAmqpConnection.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorAmqpConnection.java index 47edc565f7bf..5876add1e5a7 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorAmqpConnection.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorAmqpConnection.java @@ -37,6 +37,7 @@ public class ServiceBusReactorAmqpConnection extends ReactorConnection implement private static final String MANAGEMENT_SESSION_NAME = "mgmt-session"; private static final String MANAGEMENT_LINK_NAME = "mgmt"; private static final String MANAGEMENT_ADDRESS = "$management"; + private static final String CROSS_ENTITY_TRANSACTIONS_LINK_NAME = "crossentity-coordinator"; private final ClientLogger logger = new ClientLogger(ServiceBusReactorAmqpConnection.class); /** @@ -54,6 +55,7 @@ public class ServiceBusReactorAmqpConnection extends ReactorConnection implement private final Scheduler scheduler; private final String fullyQualifiedNamespace; private final CbsAuthorizationType authorizationType; + private final boolean distributedTransactionsSupport; /** * Creates a new AMQP connection that uses proton-j. @@ -64,10 +66,13 @@ public class ServiceBusReactorAmqpConnection extends ReactorConnection implement * @param handlerProvider Provides {@link BaseHandler} to listen to proton-j reactor events. * @param tokenManagerProvider Provides a token manager for authorizing with CBS node. * @param messageSerializer Serializes and deserializes proton-j messages. + * @param distributedTransactionsSupport indicate if distributed transaction across different entities is required + * for this connection. */ public ServiceBusReactorAmqpConnection(String connectionId, ConnectionOptions connectionOptions, ReactorProvider reactorProvider, ReactorHandlerProvider handlerProvider, - TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer) { + TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, + boolean distributedTransactionsSupport) { super(connectionId, connectionOptions, reactorProvider, handlerProvider, tokenManagerProvider, messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.FIRST); @@ -80,6 +85,7 @@ public ServiceBusReactorAmqpConnection(String connectionId, ConnectionOptions co this.messageSerializer = messageSerializer; this.scheduler = connectionOptions.getScheduler(); this.fullyQualifiedNamespace = connectionOptions.getFullyQualifiedNamespace(); + this.distributedTransactionsSupport = distributedTransactionsSupport; } @Override @@ -141,11 +147,11 @@ public Mono getManagementNode(String entityPath, Messa public Mono createSendLink(String linkName, String entityPath, AmqpRetryOptions retryOptions, String transferEntityPath) { - return createSession(entityPath).cast(ServiceBusSession.class).flatMap(session -> { + return createSession(linkName).cast(ServiceBusSession.class).flatMap(session -> { logger.verbose("Get or create sender link : '{}'", linkName); final AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(retryOptions); - return session.createProducer(linkName, entityPath, retryOptions.getTryTimeout(), + return session.createProducer(linkName + entityPath, entityPath, retryOptions.getTryTimeout(), retryPolicy, transferEntityPath).cast(AmqpSendLink.class); }); } @@ -176,6 +182,11 @@ public Mono createReceiveLink(String linkName, String ent }); } + @Override + public Mono createSession(String sessionName) { + return super.createSession(distributedTransactionsSupport ? CROSS_ENTITY_TRANSACTIONS_LINK_NAME : sessionName); + } + /** * Creates or gets an existing receive link. The same link is returned if there is an existing receive link with the * same {@code linkName}. Otherwise, a new link is created and returned. @@ -215,6 +226,7 @@ public void dispose() { @Override protected AmqpSession createSession(String sessionName, Session session, SessionHandler handler) { return new ServiceBusReactorSession(session, handler, sessionName, reactorProvider, handlerProvider, - getClaimsBasedSecurityNode(), tokenManagerProvider, messageSerializer, retryOptions); + getClaimsBasedSecurityNode(), tokenManagerProvider, messageSerializer, retryOptions, + new ServiceBusCreateSessionOptions(distributedTransactionsSupport)); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java index 33eb66fb092d..6d24a6c3f872 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.java @@ -53,6 +53,7 @@ class ServiceBusReactorSession extends ReactorSession implements ServiceBusSessi private final TokenManagerProvider tokenManagerProvider; private final Mono cbsNodeSupplier; private final AmqpRetryOptions retryOptions; + private final boolean distributedTransactionsSupport; /** * Creates a new AMQP session using proton-j. @@ -66,16 +67,19 @@ class ServiceBusReactorSession extends ReactorSession implements ServiceBusSessi * @param tokenManagerProvider Provides {@link TokenManager} that authorizes the client when performing * operations on the message broker. * @param retryOptions Retry options. + * @param createOptions the options to create {@link ServiceBusReactorSession}. */ ServiceBusReactorSession(Session session, SessionHandler sessionHandler, String sessionName, ReactorProvider provider, ReactorHandlerProvider handlerProvider, Mono cbsNodeSupplier, - TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, AmqpRetryOptions retryOptions) { + TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, AmqpRetryOptions retryOptions, + ServiceBusCreateSessionOptions createOptions) { super(session, sessionHandler, sessionName, provider, handlerProvider, cbsNodeSupplier, tokenManagerProvider, messageSerializer, retryOptions); this.retryOptions = retryOptions; this.retryPolicy = RetryUtil.getRetryPolicy(retryOptions); this.tokenManagerProvider = tokenManagerProvider; this.cbsNodeSupplier = cbsNodeSupplier; + this.distributedTransactionsSupport = createOptions.isDistributedTransactionsSupported(); } @Override @@ -125,6 +129,22 @@ public Mono createProducer(String linkName, String entityPath, Duratio } } + @Override + public Mono createProducer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry) { + return this.createProducer(linkName, entityPath, timeout, retry, (Map) null); + } + + @Override + protected Mono createProducer(String linkName, String entityPath, Duration timeout, + AmqpRetryPolicy retry, Map linkProperties) { + if (distributedTransactionsSupport) { + return getOrCreateTransactionCoordinator().flatMap(coordinator -> super.createProducer(linkName, entityPath, + timeout, retry, linkProperties)); + } else { + return super.createProducer(linkName, entityPath, timeout, retry, linkProperties); + } + } + @Override protected ReactorReceiver createConsumer(String entityPath, Receiver receiver, ReceiveLinkHandler receiveLinkHandler, TokenManager tokenManager, ReactorProvider reactorProvider) { @@ -164,7 +184,14 @@ private Mono createConsumer(String linkName, String entit return Mono.error(new RuntimeException("ReceiveMode is not supported: " + receiveMode)); } - return createConsumer(linkName, entityPath, timeout, retry, filter, linkProperties, null, - senderSettleMode, receiverSettleMode).cast(ServiceBusReceiveLink.class); + if (distributedTransactionsSupport) { + return getOrCreateTransactionCoordinator().flatMap(transactionCoordinator -> createConsumer(linkName, + entityPath, timeout, retry, filter, linkProperties, null, senderSettleMode, + receiverSettleMode) + .cast(ServiceBusReceiveLink.class)); + } else { + return createConsumer(linkName, entityPath, timeout, retry, filter, linkProperties, + null, senderSettleMode, receiverSettleMode).cast(ServiceBusReceiveLink.class); + } } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMixClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMixClientIntegrationTest.java new file mode 100644 index 000000000000..88121551064e --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMixClientIntegrationTest.java @@ -0,0 +1,459 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +import com.azure.core.util.logging.ClientLogger; +import com.azure.messaging.servicebus.implementation.MessagingEntityType; +import com.azure.messaging.servicebus.models.CompleteOptions; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +/** + * Test where various clients are involved for example Sender, Receiver and Processor client. + */ +public class ServiceBusMixClientIntegrationTest extends IntegrationTestBase { + private ServiceBusSenderAsyncClient sender; + private ServiceBusReceiverAsyncClient receiver; + private final AtomicInteger messagesPending = new AtomicInteger(); + + ServiceBusMixClientIntegrationTest() { + super(new ClientLogger(ServiceBusSenderAsyncClientIntegrationTest.class)); + } + + @Override + protected void beforeTest() { + sessionId = UUID.randomUUID().toString(); + } + + @Override + protected void afterTest() { + dispose(sender); + + final int numberOfMessages = messagesPending.get(); + if (numberOfMessages < 1) { + dispose(receiver); + return; + } + + try { + if (receiver == null) { + return; + } + receiver.receiveMessages() + .take(numberOfMessages) + .map(message -> { + logger.info("Message received: {}", message.getSequenceNumber()); + return message; + }) + .timeout(Duration.ofSeconds(5), Mono.empty()) + .blockLast(); + } catch (Exception e) { + logger.warning("Error occurred when draining queue.", e); + } finally { + dispose(receiver); + } + } + + /** + * Use case: Test cross entity transaction using processor client and sender. + * 1. Read messages from entity A. + * 2. complete the messages from entity A and write to entity B. + * 2. commit the transaction. + */ + @ParameterizedTest + @ValueSource(strings = {"true", "false"}) + void crossEntityQueueTransaction(boolean isSessionEnabled) throws InterruptedException { + + // Arrange + final boolean useCredentials = false; + final MessagingEntityType entityType = MessagingEntityType.QUEUE; + final int receiveQueueAIndex = TestUtils.USE_CASE_TXN_1; + final int sendQueueBIndex = TestUtils.USE_CASE_TXN_2; + final String queueA = isSessionEnabled ? getSessionQueueName(receiveQueueAIndex) : getQueueName(receiveQueueAIndex); + final String queueB = isSessionEnabled ? getSessionQueueName(sendQueueBIndex) : getQueueName(sendQueueBIndex); + final AtomicBoolean transactionComplete = new AtomicBoolean(); + final CountDownLatch countdownLatch = new CountDownLatch(1); + final AtomicInteger receivedMessages = new AtomicInteger(); + + final String messageId = UUID.randomUUID().toString(); + ServiceBusMessage message = TestUtils.getServiceBusMessage(CONTENTS_BYTES, messageId); + message.setSessionId(sessionId); + final List messages = Arrays.asList(message); + + ServiceBusClientBuilder builder = getBuilder(useCredentials).enableCrossEntityTransactions(); + + final ServiceBusSenderAsyncClient senderAsyncA; + final ServiceBusSenderClient senderSyncB; + + // Initialize sender + senderAsyncA = builder.sender().queueName(queueA).buildAsyncClient(); + senderSyncB = builder.sender().queueName(queueB).buildClient(); + + Consumer processMessage = (context) -> { + receivedMessages.incrementAndGet(); + messagesPending.incrementAndGet(); + ServiceBusReceivedMessage myMessage = context.getMessage(); + System.out.printf("Processing message. MessageId: %s, Sequence #: %s. Contents: %s %n", myMessage.getMessageId(), + myMessage.getSequenceNumber(), myMessage.getBody()); + if (receivedMessages.get() == 1) { + + //Start a transaction + ServiceBusTransactionContext transactionId = senderSyncB.createTransaction(); + context.complete(new CompleteOptions().setTransactionContext(transactionId)); + senderSyncB.sendMessage(new ServiceBusMessage(CONTENTS_BYTES).setMessageId(messageId).setSessionId(sessionId), transactionId); + senderSyncB.commitTransaction(transactionId); + transactionComplete.set(true); + countdownLatch.countDown(); + logger.verbose("Transaction committed."); + } + }; + + Consumer processError = context -> { + System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'. Error Source: '%s' %n", + context.getFullyQualifiedNamespace(), context.getEntityPath(), context.getErrorSource()); + Assertions.fail("Failed processing of message.", context.getException()); + + if (!(context.getException() instanceof ServiceBusException)) { + System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException()); + } + }; + + final ServiceBusProcessorClient processorA; + // Initialize processor client + if (isSessionEnabled) { + processorA = builder.sessionProcessor().disableAutoComplete().queueName(queueA) + .processMessage(processMessage).processError(processError) + .buildProcessorClient(); + } else { + processorA = builder.processor().disableAutoComplete().queueName(queueA) + .processMessage(processMessage).processError(processError) + .buildProcessorClient(); + } + + // Send messages + StepVerifier.create(senderAsyncA.sendMessages(messages)).verifyComplete(); + // Create an instance of the processor through the ServiceBusClientBuilder + + // Act + System.out.println("Starting the processor"); + processorA.start(); + + // Assert + System.out.println("Listening for 10 seconds..."); + if (countdownLatch.await(10, TimeUnit.SECONDS)) { + System.out.println("Completed processing successfully."); + Assertions.assertTrue(transactionComplete.get()); + } else { + System.out.println("Closing processor."); + Assertions.fail("Failed to process message."); + } + + processorA.close(); + + // Verify that message is received by queue B + if (!isSessionEnabled) { + setSenderAndReceiver(entityType, sendQueueBIndex, false); + StepVerifier.create(receiver.receiveMessages().take(1)) + .assertNext(receivedMessage -> { + assertMessageEquals(receivedMessage, messageId, isSessionEnabled); + messagesPending.decrementAndGet(); + }).verifyComplete(); + } + } + + /** + * Use case: Test cross entity transaction using processor client and sender. + * 1. Read messages from entity A. + * 2. complete the messages from entity A and write to entity B. + * 2. commit the transaction. + */ + @ParameterizedTest + @ValueSource(strings = {"true", "false"}) + void crossEntitySubscriptionTransaction(boolean isSessionEnabled) throws InterruptedException { + + // Arrange + final boolean useCredentials = false; + final MessagingEntityType entityType = MessagingEntityType.SUBSCRIPTION; + final int receiveQueueAIndex = TestUtils.USE_CASE_TXN_1; + final int sendQueueBIndex = TestUtils.USE_CASE_TXN_2; + final String topicA = getTopicName(receiveQueueAIndex); + final String topicB = getTopicName(sendQueueBIndex); + final AtomicBoolean transactionComplete = new AtomicBoolean(); + + final CountDownLatch countdownLatch = new CountDownLatch(1); + final AtomicInteger receivedMessages = new AtomicInteger(); + + final String messageId = UUID.randomUUID().toString(); + ServiceBusMessage message = TestUtils.getServiceBusMessage(CONTENTS_BYTES, messageId); + message.setSessionId(sessionId); + final List messages = Arrays.asList(message); + + ServiceBusClientBuilder builder = getBuilder(useCredentials).enableCrossEntityTransactions(); + + // Initialize sender + final ServiceBusSenderAsyncClient senderAsyncA = builder.sender().topicName(topicA).buildAsyncClient(); + final ServiceBusSenderClient senderSyncB = builder.sender().topicName(topicB).buildClient(); + + Consumer processMessage = (context) -> { + receivedMessages.incrementAndGet(); + messagesPending.incrementAndGet(); + ServiceBusReceivedMessage myMessage = context.getMessage(); + System.out.printf("Processing message. MessageId: %s, Sequence #: %s. Contents: %s %n", myMessage.getMessageId(), + myMessage.getSequenceNumber(), myMessage.getBody()); + if (receivedMessages.get() == 1) { + + //Start a transaction + ServiceBusTransactionContext transactionId = senderSyncB.createTransaction(); + context.complete(new CompleteOptions().setTransactionContext(transactionId)); + senderSyncB.sendMessage(new ServiceBusMessage(CONTENTS_BYTES).setMessageId(messageId).setSessionId(sessionId), transactionId); + senderSyncB.commitTransaction(transactionId); + transactionComplete.set(true); + countdownLatch.countDown(); + logger.verbose("Transaction committed."); + } + }; + + Consumer processError = context -> { + System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'. Error Source: '%s' %n", + context.getFullyQualifiedNamespace(), context.getEntityPath(), context.getErrorSource()); + Assertions.fail("Failed processing of message.", context.getException()); + + if (!(context.getException() instanceof ServiceBusException)) { + System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException()); + } + }; + + final ServiceBusProcessorClient processorA; + // Initialize processor client + if (isSessionEnabled) { + processorA = builder.sessionProcessor().disableAutoComplete().topicName(topicA).subscriptionName("session-subscription") + .processMessage(processMessage).processError(processError) + .buildProcessorClient(); + } else { + processorA = builder.processor().disableAutoComplete().topicName(topicA).subscriptionName("subscription") + .processMessage(processMessage).processError(processError) + .buildProcessorClient(); + } + + // Send messages + StepVerifier.create(senderAsyncA.sendMessages(messages)).verifyComplete(); + // Create an instance of the processor through the ServiceBusClientBuilder + + // Act + System.out.println("Starting the processor"); + processorA.start(); + + // Assert + System.out.println("Listening for 10 seconds..."); + if (countdownLatch.await(10, TimeUnit.SECONDS)) { + System.out.println("Completed processing successfully."); + Assertions.assertTrue(transactionComplete.get()); + } else { + System.out.println("Closing processor."); + Assertions.fail("Failed to process message."); + } + + processorA.close(); + + // Verify that message is received by queue B + if (!isSessionEnabled) { + setSenderAndReceiver(entityType, sendQueueBIndex, false); + StepVerifier.create(receiver.receiveMessages().take(1)) + .assertNext(receivedMessage -> { + assertMessageEquals(receivedMessage, messageId, isSessionEnabled); + messagesPending.decrementAndGet(); + }).verifyComplete(); + } + } + + /** + * Use case: Test cross entity transaction using receiver and senders. + * 1. Read messages from entity A. + * 2. complete the messages from entity A and write to entity B. + * 2. commit the transaction. + */ + @ParameterizedTest + @ValueSource(strings = {"true", "false"}) + void crossEntityQueueTransactionWithReceiverSenderTest(boolean isSessionEnabled) throws InterruptedException { + + // Arrange + final boolean useCredentials = false; + final MessagingEntityType entityType = MessagingEntityType.QUEUE; + final int receiveQueueAIndex = TestUtils.USE_CASE_TXN_1; + final int sendQueueBIndex = TestUtils.USE_CASE_TXN_2; + final String queueA = isSessionEnabled ? getSessionQueueName(receiveQueueAIndex) : getQueueName(receiveQueueAIndex); + final String queueB = isSessionEnabled ? getSessionQueueName(sendQueueBIndex) : getQueueName(sendQueueBIndex); + final AtomicBoolean transactionComplete = new AtomicBoolean(); + + final CountDownLatch countdownLatch = new CountDownLatch(1); + + final String messageId = UUID.randomUUID().toString(); + ServiceBusMessage message = TestUtils.getServiceBusMessage(CONTENTS_BYTES, messageId); + message.setSessionId(sessionId); + final List messages = Arrays.asList(message); + + ServiceBusClientBuilder builder = getBuilder(useCredentials).enableCrossEntityTransactions(); + + // Initialize sender + final ServiceBusSenderAsyncClient senderAsyncA = builder.sender().queueName(queueA).buildAsyncClient(); + final ServiceBusSenderClient senderSyncB = builder.sender().queueName(queueB).buildClient(); + + // Send messages + StepVerifier.create(senderAsyncA.sendMessages(messages)).verifyComplete(); + + final ServiceBusReceiverAsyncClient receiverA; + + if (isSessionEnabled) { + receiverA = builder.sessionReceiver().disableAutoComplete().queueName(queueA) + .buildAsyncClient().acceptNextSession().block(); + } else { + receiverA = builder.receiver().disableAutoComplete().queueName(queueA) + .buildAsyncClient(); + } + + receiverA.receiveMessages().flatMap(receivedMessage -> { + //Start a transaction + logger.verbose("Received message sequence number {}. Creating transaction", receivedMessage.getSequenceNumber()); + ServiceBusTransactionContext transactionId = senderSyncB.createTransaction(); + receiverA.complete(receivedMessage, new CompleteOptions().setTransactionContext(transactionId)).block(); + senderSyncB.sendMessage(new ServiceBusMessage(CONTENTS_BYTES).setMessageId(messageId).setSessionId(sessionId), transactionId); + senderSyncB.commitTransaction(transactionId); + transactionComplete.set(true); + countdownLatch.countDown(); + logger.verbose("Transaction committed."); + return Mono.just(receivedMessage); + }).subscribe(); + + // Act + System.out.println("Listening for 10 seconds..."); + if (countdownLatch.await(10, TimeUnit.SECONDS)) { + System.out.println("Completed message processing successfully."); + Assertions.assertTrue(transactionComplete.get()); + } else { + System.out.println("Some error."); + Assertions.fail("Failed to process message."); + } + + // Assert + // Verify that message is received by entity B + if (!isSessionEnabled) { + setSenderAndReceiver(entityType, sendQueueBIndex, false); + StepVerifier.create(receiver.receiveMessages().take(1)) + .assertNext(receivedMessage -> { + assertMessageEquals(receivedMessage, messageId, isSessionEnabled); + messagesPending.decrementAndGet(); + }).verifyComplete(); + } + } + + /** + * Use case: Test cross entity transaction using receiver and senders. + * 1. Read messages from entity A. + * 2. complete the messages from entity A and write to entity B. + * 2. commit the transaction. + */ + @ParameterizedTest + @ValueSource(strings = {"true", "false"}) + void crossEntitySubscriptionTransactionWithReceiverSenderTest(boolean isSessionEnabled) throws InterruptedException { + + // Arrange + final boolean useCredentials = false; + final MessagingEntityType entityType = MessagingEntityType.SUBSCRIPTION; + final int receiveQueueAIndex = TestUtils.USE_CASE_TXN_1; + final int sendQueueBIndex = TestUtils.USE_CASE_TXN_2; + final String topicA = getTopicName(receiveQueueAIndex); + final String topicB = getTopicName(sendQueueBIndex); + final AtomicBoolean transactionComplete = new AtomicBoolean(); + + final CountDownLatch countdownLatch = new CountDownLatch(1); + + final String messageId = UUID.randomUUID().toString(); + ServiceBusMessage message = TestUtils.getServiceBusMessage(CONTENTS_BYTES, messageId); + message.setSessionId(sessionId); + final List messages = Arrays.asList(message); + + ServiceBusClientBuilder builder = getBuilder(useCredentials).enableCrossEntityTransactions(); + + // Initialize sender + final ServiceBusSenderAsyncClient senderAsyncA = builder.sender().topicName(topicA).buildAsyncClient(); + final ServiceBusSenderClient senderSyncB = builder.sender().topicName(topicB).buildClient(); + + + // Send messages + StepVerifier.create(senderAsyncA.sendMessages(messages)).verifyComplete(); + + final ServiceBusReceiverAsyncClient receiverA; + + if (isSessionEnabled) { + receiverA = builder.sessionReceiver().disableAutoComplete().topicName(topicA).subscriptionName("session-subscription") + .buildAsyncClient().acceptNextSession().block(); + } else { + receiverA = builder.receiver().disableAutoComplete().topicName(topicA).subscriptionName("subscription") + .buildAsyncClient(); + } + + receiverA.receiveMessages().flatMap(receivedMessage -> { + //Start a transaction + logger.verbose("Received message sequence number {}. Creating transaction", receivedMessage.getSequenceNumber()); + ServiceBusTransactionContext transactionId = senderSyncB.createTransaction(); + receiverA.complete(receivedMessage, new CompleteOptions().setTransactionContext(transactionId)).block(); + senderSyncB.sendMessage(new ServiceBusMessage(CONTENTS_BYTES).setMessageId(messageId).setSessionId(sessionId), transactionId); + senderSyncB.commitTransaction(transactionId); + transactionComplete.set(true); + countdownLatch.countDown(); + logger.verbose("Transaction committed."); + return Mono.just(receivedMessage); + }).subscribe(); + + // Act + System.out.println("Listening for 10 seconds..."); + if (countdownLatch.await(10, TimeUnit.SECONDS)) { + System.out.println("Completed message processing successfully."); + Assertions.assertTrue(transactionComplete.get()); + } else { + System.out.println("Some error."); + Assertions.fail("Failed to process message."); + } + + // Assert + // Verify that message is received by entity B + if (!isSessionEnabled) { + setSenderAndReceiver(entityType, sendQueueBIndex, false); + StepVerifier.create(receiver.receiveMessages().take(1)) + .assertNext(receivedMessage -> { + assertMessageEquals(receivedMessage, messageId, isSessionEnabled); + messagesPending.decrementAndGet(); + }).verifyComplete(); + } + } + + /** + * Sets the sender and receiver. If session is enabled, then a single-named session receiver is created. + */ + private void setSenderAndReceiver(MessagingEntityType entityType, int entityIndex, boolean useCredentials) { + final boolean isSessionAware = false; + final boolean sharedConnection = true; + + this.sender = getSenderBuilder(useCredentials, entityType, entityIndex, isSessionAware, sharedConnection) + .buildAsyncClient(); + this.receiver = getReceiverBuilder(useCredentials, entityType, entityIndex, sharedConnection) + .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE) + .disableAutoComplete() + .buildAsyncClient(); + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientIntegrationTest.java index 5f6b95fa0b06..4fd2bc94f012 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientIntegrationTest.java @@ -153,8 +153,8 @@ void viaQueueMessageSendTest() { // Arrange final boolean useCredentials = false; final Duration shortTimeout = Duration.ofSeconds(15); - final int viaIntermediateEntity = TestUtils.USE_CASE_SEND_VIA_QUEUE_1; - final int destinationEntity = TestUtils.USE_CASE_SEND_VIA_QUEUE_2; + final int viaIntermediateEntity = TestUtils.USE_CASE_TXN_1; + final int destinationEntity = TestUtils.USE_CASE_TXN_2; final boolean shareConnection = true; final MessagingEntityType entityType = MessagingEntityType.QUEUE; final boolean isSessionEnabled = false; diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TestUtils.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TestUtils.java index 7b50b329c83e..3adcaf743b49 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TestUtils.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TestUtils.java @@ -56,8 +56,8 @@ public class TestUtils { static final int USE_CASE_PEEK_RECEIVE_AND_DEFER = 10; static final int USE_CASE_PEEK_TRANSACTION_SENDRECEIVE_AND_COMPLETE = 11; static final int USE_CASE_SINGLE_SESSION = 12; - static final int USE_CASE_SEND_VIA_QUEUE_1 = 13; - static final int USE_CASE_SEND_VIA_QUEUE_2 = 14; + static final int USE_CASE_TXN_1 = 13; + static final int USE_CASE_TXN_2 = 14; static final int USE_CASE_SEND_VIA_TOPIC_1 = 15; static final int USE_CASE_SEND_VIA_TOPIC_2 = 16; static final int USE_CASE_VALIDATE_AMQP_PROPERTIES = 17; diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java index 41985101d532..507471b15bf8 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java @@ -194,7 +194,8 @@ void setup(TestInfo testInfo) { when(reactorProvider.getReactorDispatcher()).thenReturn(dispatcher); serviceBusReactorSession = new ServiceBusReactorSession(session, handler, SESSION_NAME, reactorProvider, - handlerProvider, cbsNodeSupplier, tokenManagerProvider, messageSerializer, retryOptions); + handlerProvider, cbsNodeSupplier, tokenManagerProvider, messageSerializer, retryOptions, + new ServiceBusCreateSessionOptions(false)); } @AfterEach @@ -263,7 +264,7 @@ void createSenderLink() throws IOException { // Act serviceBusReactorSession.createProducer(ENTITY_PATH, ENTITY_PATH, retryOptions.getTryTimeout(), - retryPolicy, null) + retryPolicy) .subscribe(); // Assert @@ -278,4 +279,43 @@ void createSenderLink() throws IOException { verify(senderViaEntity, never()).setProperties(anyMap()); } + + /** + * Test for create Sender Link. + */ + @Test + void createCoordinatorLink() throws IOException { + // Arrange + final String transactionLinkName = "coordinator"; + final Sender coordinatorSenderEntity = mock(Sender.class); + doNothing().when(coordinatorSenderEntity).setSource(any(Source.class)); + doNothing().when(coordinatorSenderEntity).setSenderSettleMode(SenderSettleMode.UNSETTLED); + doNothing().when(coordinatorSenderEntity).setTarget(any(Target.class)); + doNothing().when(coordinatorSenderEntity).setTarget(any(Target.class)); + when(coordinatorSenderEntity.attachments()).thenReturn(record); + when(session.sender(transactionLinkName)).thenReturn(coordinatorSenderEntity); + + final ServiceBusReactorSession serviceBusReactorSession = new ServiceBusReactorSession(session, handler, + SESSION_NAME, reactorProvider, handlerProvider, cbsNodeSupplier, tokenManagerProvider, messageSerializer, + retryOptions, new ServiceBusCreateSessionOptions(true)); + + when(handlerProvider.createSendLinkHandler(CONNECTION_ID, HOSTNAME, transactionLinkName, transactionLinkName)) + .thenReturn(sendEntityLinkHandler); + + // Act + serviceBusReactorSession.getOrCreateTransactionCoordinator() + .subscribe(); + + // Assert + verify(tokenManagerEntity, never()).authorize(); + + verify(dispatcher).invoke(dispatcherCaptor.capture()); + List invocations = dispatcherCaptor.getAllValues(); + + // Apply the invocation. + invocations.get(0).run(); + + verify(coordinatorSenderEntity).open(); + verify(session).sender(transactionLinkName); + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java index e68d66e9a771..3e1037e164c3 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java @@ -119,7 +119,7 @@ void constructor() { * Verifies that we can get a new AMQP receive link and fetch a few messages. */ @Test - void createNewLink() { + void createNewLink() throws InterruptedException { // Arrange ServiceBusReceiveLinkProcessor processor = Flux.create(sink -> sink.next(link1)) .subscribeWith(linkProcessor); @@ -581,6 +581,7 @@ void receivesFromFirstLink() { final Integer creditValue = value.get(); assertEquals(0, creditValue); + verify(link1, times(3)).addCredits(eq(PREFETCH)); } /** diff --git a/sdk/servicebus/pom.xml b/sdk/servicebus/pom.xml index 76f46d207ec2..e53e0b572450 100644 --- a/sdk/servicebus/pom.xml +++ b/sdk/servicebus/pom.xml @@ -18,7 +18,7 @@ com.azure azure-messaging-servicebus - 7.1.0 + 7.2.0-beta.1 diff --git a/sdk/spring/azure-spring-boot-samples/azure-spring-boot-sample-servicebus/pom.xml b/sdk/spring/azure-spring-boot-samples/azure-spring-boot-sample-servicebus/pom.xml index 65dd29f072e6..00ed9a63123d 100644 --- a/sdk/spring/azure-spring-boot-samples/azure-spring-boot-sample-servicebus/pom.xml +++ b/sdk/spring/azure-spring-boot-samples/azure-spring-boot-sample-servicebus/pom.xml @@ -31,7 +31,7 @@ com.azure azure-messaging-servicebus - 7.1.0 + 7.2.0-beta.1 From f9ec2ef3f27f32e0a861b5cd82cb49105754ff26 Mon Sep 17 00:00:00 2001 From: Hemant Tanwar Date: Fri, 26 Mar 2021 09:03:18 -0700 Subject: [PATCH 2/5] unit test case, fix checkstyle error (#20157) Co-authored-by: hemanttanwar --- .../core/amqp/implementation/ReactorSessionTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java index 19819b29810f..4487a25521da 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java @@ -242,9 +242,9 @@ void createProducerAgainAfterException() { @Test void getOrCreateTransactionCoordinator() { // Arrange - final String TRANSACTION_LINK_NAME = "coordinator"; - final String linkName = TRANSACTION_LINK_NAME; - final String entityPath = TRANSACTION_LINK_NAME; + final String transactionLinkName = "coordinator"; + final String linkName = transactionLinkName; + final String entityPath = transactionLinkName; final TokenManager tokenManager = mock(TokenManager.class); final SendLinkHandler sendLinkHandler = new SendLinkHandler(ID, HOST, linkName, entityPath); @@ -264,7 +264,7 @@ void getOrCreateTransactionCoordinator() { .assertNext(Assertions::assertNotNull) .verifyComplete(); - verify(session).sender(TRANSACTION_LINK_NAME); + verify(session).sender(transactionLinkName); verify(sender).setTarget(any(Coordinator.class)); verify(session).open(); From c733eb5fa0e0ab2dde21f9ab3f2ac75faf85a4e6 Mon Sep 17 00:00:00 2001 From: Hemant Tanwar Date: Fri, 26 Mar 2021 14:44:01 -0700 Subject: [PATCH 3/5] updated change log (#20171) Co-authored-by: hemanttanwar --- sdk/core/azure-core-amqp/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/core/azure-core-amqp/CHANGELOG.md b/sdk/core/azure-core-amqp/CHANGELOG.md index 985141767003..a949aa561022 100644 --- a/sdk/core/azure-core-amqp/CHANGELOG.md +++ b/sdk/core/azure-core-amqp/CHANGELOG.md @@ -1,6 +1,6 @@ # Release History -## 2.1.0-beta.1 (Unreleased) +## 2.1.0-beta.1 (2021-03-26) ### New Features - Exposes 'AmqpTransactionCoordinator' via AmqpSession. - Added API in interface 'AmqpSession.getOrCreateTransactionCoordinator()'. From 2f35aecd1d81e99ae76cf5fa0bff6686231429e7 Mon Sep 17 00:00:00 2001 From: hemanttanwar Date: Tue, 6 Apr 2021 11:28:09 -0700 Subject: [PATCH 4/5] Fix version txt --- eng/jacoco-test-coverage/pom.xml | 2 +- eng/versioning/version_client.txt | 5 ++--- sdk/core/azure-core-amqp-experimental/pom.xml | 2 +- sdk/core/azure-core-amqp/README.md | 2 +- sdk/core/azure-core-amqp/pom.xml | 2 +- sdk/core/pom.xml | 2 +- sdk/eventhubs/azure-messaging-eventhubs/pom.xml | 2 +- sdk/servicebus/azure-messaging-servicebus/pom.xml | 2 +- .../implementation/ServiceBusReceiveLinkProcessorTest.java | 4 ---- 9 files changed, 9 insertions(+), 14 deletions(-) diff --git a/eng/jacoco-test-coverage/pom.xml b/eng/jacoco-test-coverage/pom.xml index 1eb217464559..ed53f3963831 100644 --- a/eng/jacoco-test-coverage/pom.xml +++ b/eng/jacoco-test-coverage/pom.xml @@ -99,7 +99,7 @@ com.azure azure-core-amqp - 2.1.0-beta.1 + 2.1.0-beta.2 com.azure diff --git a/eng/versioning/version_client.txt b/eng/versioning/version_client.txt index 572606d5c24d..3f837ef34874 100644 --- a/eng/versioning/version_client.txt +++ b/eng/versioning/version_client.txt @@ -54,7 +54,7 @@ com.azure:azure-communication-identity;1.0.0-beta.6;1.0.0 com.azure:azure-communication-phonenumbers;1.0.0-beta.6;1.0.0-beta.7 com.azure:azure-containers-containerregistry;1.0.0.beta.1;1.0.0-beta.1 com.azure:azure-core;1.15.0;1.16.0-beta.1 -com.azure:azure-core-amqp;2.0.3;2.1.0-beta.1 +com.azure:azure-core-amqp;2.0.3;2.1.0-beta.2 com.azure:azure-core-amqp-experimental;1.0.0-beta.1;1.0.0-beta.1 com.azure:azure-core-experimental;1.0.0-beta.12;1.0.0-beta.13 com.azure:azure-core-http-jdk-httpclient;1.0.0-beta.1;1.0.0-beta.1 @@ -230,7 +230,7 @@ com.azure.resourcemanager:azure-resourcemanager-delegatednetwork;1.0.0-beta.1;1. # note: The unreleased dependencies will not be manipulated with the automatic PR creation code. # In the pom, the version update tag after the version should name the unreleased package and the dependency version: # - +unreleased_com.azure:azure-core-amqp;2.1.0-beta.2 # Released Beta dependencies: Copy the entry from above, prepend "beta_", remove the current # version and set the version to the released beta. Released beta dependencies are only valid # for dependency versions. These entries are specifically for when we've released a beta for @@ -239,4 +239,3 @@ com.azure.resourcemanager:azure-resourcemanager-delegatednetwork;1.0.0-beta.1;1. # beta_:;dependency-version # note: Released beta versions will not be manipulated with the automatic PR creation code. beta_com.azure:azure-security-keyvault-keys;4.3.0-beta.1 -beta_com.azure:azure-core-amqp;2.1.0-beta.1 diff --git a/sdk/core/azure-core-amqp-experimental/pom.xml b/sdk/core/azure-core-amqp-experimental/pom.xml index 6561ede7b03a..dd80afcdba3c 100644 --- a/sdk/core/azure-core-amqp-experimental/pom.xml +++ b/sdk/core/azure-core-amqp-experimental/pom.xml @@ -58,7 +58,7 @@ com.azure azure-core-amqp - 2.1.0-beta.1 + 2.1.0-beta.2 diff --git a/sdk/core/azure-core-amqp/README.md b/sdk/core/azure-core-amqp/README.md index 6c595d480f60..fa2ac97335a6 100644 --- a/sdk/core/azure-core-amqp/README.md +++ b/sdk/core/azure-core-amqp/README.md @@ -11,7 +11,7 @@ own AMQP client library that abstracts from the underlying transport library's i ### Include the package -[//]: # ({x-version-update-start;beta_com.azure:azure-core-amqp;dependency}) +[//]: # ({x-version-update-start;com.azure:azure-core-amqp;current}) ```xml com.azure diff --git a/sdk/core/azure-core-amqp/pom.xml b/sdk/core/azure-core-amqp/pom.xml index 06eb7ac5b523..1a6318c51ae1 100644 --- a/sdk/core/azure-core-amqp/pom.xml +++ b/sdk/core/azure-core-amqp/pom.xml @@ -14,7 +14,7 @@ com.azure azure-core-amqp - 2.1.0-beta.1 + 2.1.0-beta.2 jar Microsoft Azure Java Core AMQP Library diff --git a/sdk/core/pom.xml b/sdk/core/pom.xml index 02895e00faed..6a8068b241ac 100644 --- a/sdk/core/pom.xml +++ b/sdk/core/pom.xml @@ -37,7 +37,7 @@ com.azure azure-core-amqp - 2.1.0-beta.1 + 2.1.0-beta.2 com.azure diff --git a/sdk/eventhubs/azure-messaging-eventhubs/pom.xml b/sdk/eventhubs/azure-messaging-eventhubs/pom.xml index 62e4eef96981..6a096c25c1d2 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/pom.xml +++ b/sdk/eventhubs/azure-messaging-eventhubs/pom.xml @@ -42,7 +42,7 @@ com.azure azure-core-amqp - 2.1.0-beta.1 + 2.1.0-beta.2 diff --git a/sdk/servicebus/azure-messaging-servicebus/pom.xml b/sdk/servicebus/azure-messaging-servicebus/pom.xml index ae3a7c3b9899..cbd9c4db430b 100644 --- a/sdk/servicebus/azure-messaging-servicebus/pom.xml +++ b/sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -47,7 +47,7 @@ com.azure azure-core-amqp - 2.1.0-beta.1 + 2.1.0-beta.2 com.azure diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java index 25fb01ca9773..9945debdad0b 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java @@ -594,15 +594,11 @@ void receivesFromFirstLink() throws InterruptedException { final Integer creditValue = value.get(); assertEquals(0, creditValue); -<<<<<<< HEAD - verify(link1, times(3)).addCredits(eq(PREFETCH)); -======= // Add credit for each time 'onNext' is called, plus once when publisher is subscribed. final boolean awaited = countDownLatch.await(5, TimeUnit.SECONDS); Assertions.assertTrue(awaited); ->>>>>>> feature/servicebus-support-amqp-data-types-17614 } /** From 76f43f67433e6b047079bc1f7adf783aea0a5843 Mon Sep 17 00:00:00 2001 From: hemanttanwar Date: Wed, 7 Apr 2021 11:00:01 -0700 Subject: [PATCH 5/5] Adding missing entry in changelog --- sdk/core/azure-core-amqp/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/core/azure-core-amqp/CHANGELOG.md b/sdk/core/azure-core-amqp/CHANGELOG.md index a949aa561022..348c561a9d59 100644 --- a/sdk/core/azure-core-amqp/CHANGELOG.md +++ b/sdk/core/azure-core-amqp/CHANGELOG.md @@ -1,5 +1,7 @@ # Release History +## 2.1.0-beta.2 (unreleased) + ## 2.1.0-beta.1 (2021-03-26) ### New Features - Exposes 'AmqpTransactionCoordinator' via AmqpSession.