Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Service Bus - Merging previously approved -cross transaction feature - into feature branch - preparation for April release. #20356

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eng/jacoco-test-coverage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.1.0-beta.1</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>2.1.0-beta.2</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
5 changes: 2 additions & 3 deletions eng/versioning/version_client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ com.azure:azure-communication-identity;1.0.0-beta.6;1.0.0
com.azure:azure-communication-phonenumbers;1.0.0-beta.6;1.0.0-beta.7
com.azure:azure-containers-containerregistry;1.0.0.beta.1;1.0.0-beta.1
com.azure:azure-core;1.15.0;1.16.0-beta.1
com.azure:azure-core-amqp;2.0.3;2.1.0-beta.1
com.azure:azure-core-amqp;2.0.3;2.1.0-beta.2
com.azure:azure-core-amqp-experimental;1.0.0-beta.1;1.0.0-beta.1
com.azure:azure-core-experimental;1.0.0-beta.12;1.0.0-beta.13
com.azure:azure-core-http-jdk-httpclient;1.0.0-beta.1;1.0.0-beta.1
Expand Down Expand Up @@ -230,8 +230,7 @@ com.azure.resourcemanager:azure-resourcemanager-delegatednetwork;1.0.0-beta.1;1.
# note: The unreleased dependencies will not be manipulated with the automatic PR creation code.
# In the pom, the version update tag after the version should name the unreleased package and the dependency version:
# <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
unreleased_com.azure:azure-core-amqp;2.1.0-beta.1

unreleased_com.azure:azure-core-amqp;2.1.0-beta.2
# Released Beta dependencies: Copy the entry from above, prepend "beta_", remove the current
# version and set the version to the released beta. Released beta dependencies are only valid
# for dependency versions. These entries are specifically for when we've released a beta for
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/azure-core-amqp-experimental/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.1.0-beta.1</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>2.1.0-beta.2</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
</dependency>

<dependency>
Expand Down
7 changes: 6 additions & 1 deletion sdk/core/azure-core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# Release History

## 2.1.0-beta.1 (Unreleased)
## 2.1.0-beta.2 (unreleased)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing changelog info, if you't=re preparing for release.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for previously approved PR to merge into this branch for April. But for April Release, I will have separate release PR in few days.


## 2.1.0-beta.1 (2021-03-26)
### New Features
- Exposes 'AmqpTransactionCoordinator' via AmqpSession.
- Added API in interface 'AmqpSession.getOrCreateTransactionCoordinator()'.

## 2.0.3 (2021-03-09)

Expand Down
2 changes: 1 addition & 1 deletion sdk/core/azure-core-amqp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ own AMQP client library that abstracts from the underlying transport library's i
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.0.3</version>
<version>2.1.0-beta.1</version>
</dependency>
```
[//]: # ({x-version-update-end})
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/azure-core-amqp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.1.0-beta.1</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>2.1.0-beta.2</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<packaging>jar</packaging>

<name>Microsoft Azure Java Core AMQP Library</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,22 @@ public interface AmqpSession extends Disposable {
* @return A completable mono.
*/
Mono<Void> rollbackTransaction(AmqpTransaction transaction);

/**
* Creates the {@link AmqpTransactionCoordinator} on the {@link AmqpSession} which is used to create/commit or
* rollback the transaction. A transaction can span over one or more message broker entities. The interface
* {@link AmqpSession} provides default implementation for back-word compatibility but it throws
* {@link RuntimeException} to warn what that an implementing class must override and provide implementation of this
* API. Azure SDK already provides implementation for this API.
*
* @return newly created {@link AmqpTransactionCoordinator}.
* @throws UnsupportedOperationException Indicting implementation not found error. Azure SDK should provide
* implementation of this API but if runtime is not able to find it in its classpath or version mismatch can cause
* this exception.
*
* @see <a href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transactions-v1.0-os.html#section-coordination">Transaction Coordination</a>
*/
default Mono<AmqpTransactionCoordinator> getOrCreateTransactionCoordinator() {
return Mono.error(new UnsupportedOperationException("Implementation not found error."));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp;

import reactor.core.publisher.Mono;

/**
* Provides an API to manage AMQP transaction on message broker. A transaction is used where one or more operation in
* messaging broker is part of one unit of work. In general a transaction involve with many operations on one message
* broker entity.
*<p>
* Distributed Transactions: A distributed transaction where operations spans over different message broker entities.
*
* @see <a href="http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transactions-v1.0-os.html#choice-txn-capability-distributed-transactions">Distributed Transactions</a>
* @see <a href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transactions-v1.0-os.html#section-transactions">Transactions</a>
*/
public interface AmqpTransactionCoordinator {

/**
* Completes the transaction. All the work in this transaction will either rollback or committed as one unit of
* work.
* @param transaction that needs to be completed on message broker.
* @param isCommit this flag indicates that the work associated with this transaction should commit or rollback.
* @return a completable {@link Mono}.
*/
Mono<Void> discharge(AmqpTransaction transaction, boolean isCommit);

/**
* Creates the transaction in message broker. Successful completion of this API indicates that a transaction
* identifier has successfully been created on the message broker. Once a transaction has been created, it must be
* completed by using {@link AmqpTransactionCoordinator#discharge(AmqpTransaction, boolean)} API.
* @return the created transaction id represented by {@link AmqpTransaction}.
*/
Mono<AmqpTransaction> declare();
}
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,9 @@ void dispose(ErrorCondition errorCondition) {
}

subscriptions.dispose();
tokenManager.close();
if (tokenManager != null) {
tokenManager.close();
}

if (sender.getLocalState() == EndpointState.CLOSED) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.AmqpTransaction;
import com.azure.core.amqp.AmqpTransactionCoordinator;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
Expand Down Expand Up @@ -170,26 +171,26 @@ public Duration getOperationTimeout() {
*/
@Override
public Mono<AmqpTransaction> createTransaction() {
return createTransactionCoordinator()
.flatMap(coordinator -> coordinator.createTransaction());
return getOrCreateTransactionCoordinator()
.flatMap(coordinator -> coordinator.declare());
}

/**
* {@inheritDoc}
*/
@Override
public Mono<Void> commitTransaction(AmqpTransaction transaction) {
return createTransactionCoordinator()
.flatMap(coordinator -> coordinator.completeTransaction(transaction, true));
return getOrCreateTransactionCoordinator()
.flatMap(coordinator -> coordinator.discharge(transaction, true));
}

/**
* {@inheritDoc}
*/
@Override
public Mono<Void> rollbackTransaction(AmqpTransaction transaction) {
return createTransactionCoordinator()
.flatMap(coordinator -> coordinator.completeTransaction(transaction, false));
return getOrCreateTransactionCoordinator()
.flatMap(coordinator -> coordinator.discharge(transaction, false));
}

/**
Expand Down Expand Up @@ -219,10 +220,10 @@ public boolean removeLink(String linkName) {
}

/**
*
* @return {@link Mono} of {@link TransactionCoordinator}
* {@inheritDoc}
*/
private Mono<TransactionCoordinator> createTransactionCoordinator() {
@Override
public Mono<AmqpTransactionCoordinator> getOrCreateTransactionCoordinator() {
if (isDisposed()) {
return Mono.error(logger.logExceptionAsError(new IllegalStateException(String.format(
"Cannot create coordinator send link '%s' from a closed session.", TRANSACTION_LINK_NAME))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpTransaction;
import com.azure.core.amqp.AmqpTransactionCoordinator;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
Expand All @@ -21,7 +22,7 @@
/**
* Encapsulates transaction functions.
*/
final class TransactionCoordinator {
final class TransactionCoordinator implements AmqpTransactionCoordinator {

private final ClientLogger logger = new ClientLogger(TransactionCoordinator.class);

Expand All @@ -42,7 +43,8 @@ final class TransactionCoordinator {
*
* @return a completable {@link Mono} which represent {@link DeliveryState}.
*/
Mono<Void> completeTransaction(AmqpTransaction transaction, boolean isCommit) {
@Override
public Mono<Void> discharge(AmqpTransaction transaction, boolean isCommit) {
final Message message = Proton.message();
Discharge discharge = new Discharge();
discharge.setFail(!isCommit);
Expand Down Expand Up @@ -74,7 +76,8 @@ Mono<Void> completeTransaction(AmqpTransaction transaction, boolean isCommit) {
*
* @return a completable {@link Mono} which represent {@link DeliveryState}.
*/
Mono<AmqpTransaction> createTransaction() {
@Override
public Mono<AmqpTransaction> declare() {
final Message message = Proton.message();
Declare declare = new Declare();
message.setBody(new AmqpValue(declare));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
import com.azure.core.amqp.AmqpRetryMode;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpTransactionCoordinator;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.FixedAmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpResponseCode;
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
Expand All @@ -40,6 +42,7 @@
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -238,6 +241,47 @@ void createProducerAgainAfterException() {
sendLinkHandler.onLinkRemoteClose(closeSendEvent);
}

/**
* Verifies that we can create coordinator.
*/
@Test
void getOrCreateTransactionCoordinator() {
// Arrange
final String transactionLinkName = "coordinator";
final String linkName = transactionLinkName;
final String entityPath = transactionLinkName;

final TokenManager tokenManager = mock(TokenManager.class);
final SendLinkHandler sendLinkHandler = new SendLinkHandler(ID, HOST, linkName, entityPath);

when(session.sender(linkName)).thenReturn(sender);
when(tokenManagerProvider.getTokenManager(cbsNodeSupplier, entityPath)).thenReturn(tokenManager);
when(tokenManager.authorize()).thenReturn(Mono.just(1000L));
when(tokenManager.getAuthorizationResults())
.thenReturn(Flux.create(sink -> sink.next(AmqpResponseCode.ACCEPTED)));
when(reactorHandlerProvider.createSendLinkHandler(ID, HOST, linkName, entityPath))
.thenReturn(sendLinkHandler);

StepVerifier.create(
reactorSession.getOrCreateTransactionCoordinator())
.then(() -> handler.onSessionRemoteOpen(event))
.thenAwait(Duration.ofSeconds(2))
.assertNext(Assertions::assertNotNull)
.verifyComplete();

verify(session).sender(transactionLinkName);
verify(sender).setTarget(any(Coordinator.class));
verify(session).open();

final AmqpTransactionCoordinator coordinator1 = reactorSession.getOrCreateTransactionCoordinator()
.block(TIMEOUT);

final AmqpTransactionCoordinator coordinator2 = reactorSession.getOrCreateTransactionCoordinator()
.block(TIMEOUT);

assertSame(coordinator1, coordinator2);
}

@Test
void createConsumer() {
// Arrange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testCompleteTransactionRejected(boolean isCommit) {

doReturn(Mono.just(outcome)).when(sendLink).send(any(byte[].class), anyInt(), eq(DeliveryImpl.DEFAULT_MESSAGE_FORMAT), isNull());

StepVerifier.create(transactionCoordinator.completeTransaction(transaction, isCommit))
StepVerifier.create(transactionCoordinator.discharge(transaction, isCommit))
.verifyError(IllegalArgumentException.class);

verify(sendLink, times(1)).send(any(byte[].class), anyInt(), eq(DeliveryImpl.DEFAULT_MESSAGE_FORMAT), isNull());
Expand All @@ -86,7 +86,7 @@ public void testCompleteTransaction(boolean isCommit) {

doReturn(Mono.just(outcome)).when(sendLink).send(any(byte[].class), anyInt(), eq(DeliveryImpl.DEFAULT_MESSAGE_FORMAT), isNull());

StepVerifier.create(transactionCoordinator.completeTransaction(transaction, isCommit))
StepVerifier.create(transactionCoordinator.discharge(transaction, isCommit))
.verifyComplete();

verify(sendLink, times(1)).send(any(byte[].class), anyInt(), eq(DeliveryImpl.DEFAULT_MESSAGE_FORMAT), isNull());
Expand All @@ -100,7 +100,7 @@ public void testCreateTransactionRejected() {

doReturn(Mono.just(outcome)).when(sendLink).send(any(byte[].class), anyInt(), eq(DeliveryImpl.DEFAULT_MESSAGE_FORMAT), isNull());

StepVerifier.create(transactionCoordinator.createTransaction())
StepVerifier.create(transactionCoordinator.declare())
.verifyError(IllegalArgumentException.class);

verify(sendLink, times(1)).send(any(byte[].class), anyInt(), eq(DeliveryImpl.DEFAULT_MESSAGE_FORMAT), isNull());
Expand All @@ -116,7 +116,7 @@ public void testCreateTransaction() {

doReturn(Mono.just(transactionState)).when(sendLink).send(any(byte[].class), anyInt(), eq(DeliveryImpl.DEFAULT_MESSAGE_FORMAT), isNull());

StepVerifier.create(transactionCoordinator.createTransaction())
StepVerifier.create(transactionCoordinator.declare())
.assertNext(actual -> {
Assertions.assertNotNull(actual);
Assertions.assertArrayEquals(transactionId, actual.getTransactionId().array());
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.1.0-beta.1</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>2.1.0-beta.2</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.1.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core-amqp;dependency} -->
<version>2.1.0-beta.2</version> <!-- {x-version-update;unreleased_com.azure:azure-core-amqp;dependency} -->
</dependency>

<!-- Test dependencies -->
Expand Down
6 changes: 3 additions & 3 deletions sdk/servicebus/azure-messaging-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ To quickly create the needed Service Bus resources in Azure and to receive a con

### Include the package

[//]: # ({x-version-update-start;com.azure:azure-messaging-servicebus;current})
[//]: # ({x-version-update-start;beta_com.azure:azure-messaging-servicebus;dependency})
```xml
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.1.0</version>
<version>7.2.0-beta.1</version>
</dependency>
```
[//]: # ({x-version-update-end})
Expand Down Expand Up @@ -86,7 +86,7 @@ platform. First, add the package:
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.2.3</version>
<version>1.2.4</version>
</dependency>
```
[//]: # ({x-version-update-end})
Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/azure-messaging-servicebus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
</scm>

<properties>
<jacoco.min.linecoverage>0.18</jacoco.min.linecoverage>
<jacoco.min.linecoverage>0.20</jacoco.min.linecoverage>
<jacoco.min.branchcoverage>0.09</jacoco.min.branchcoverage>
</properties>

Expand All @@ -47,7 +47,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.1.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core-amqp;dependency} -->
<version>2.1.0-beta.2</version> <!-- {x-version-update;unreleased_com.azure:azure-core-amqp;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
Loading