Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core-amqp may 2021 beta release for Servicebus beta #21393

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eng/jacoco-test-coverage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.1.0-beta.1</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>2.2.0-beta.2</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
3 changes: 2 additions & 1 deletion eng/versioning/version_client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ com.azure:azure-communication-identity;1.0.0;1.1.0-beta.1
com.azure:azure-communication-phonenumbers;1.0.1;1.1.0-beta.1
com.azure:azure-containers-containerregistry;1.0.0-beta.1;1.0.0-beta.2
com.azure:azure-core;1.16.0;1.17.0-beta.1
com.azure:azure-core-amqp;2.0.5;2.1.0-beta.1
com.azure:azure-core-amqp;2.0.5;2.2.0-beta.2
com.azure:azure-core-amqp-experimental;1.0.0-beta.1;1.0.0-beta.1
com.azure:azure-core-experimental;1.0.0-beta.13;1.0.0-beta.14
com.azure:azure-core-http-jdk-httpclient;1.0.0-beta.1;1.0.0-beta.1
Expand Down Expand Up @@ -302,3 +302,4 @@ com.azure.resourcemanager:azure-resourcemanager-videoanalyzer;1.0.0-beta.1;1.0.0
# beta_<groupId>:<artifactId>;dependency-version
# note: Released beta versions will not be manipulated with the automatic PR creation code.
beta_com.azure:azure-security-keyvault-keys;4.3.0-beta.6
beta_com.azure:azure-core-amqp;2.2.0-beta.2
2 changes: 1 addition & 1 deletion sdk/core/azure-core-amqp-experimental/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.1.0-beta.1</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>2.2.0-beta.2</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
</dependency>

<dependency>
Expand Down
9 changes: 9 additions & 0 deletions sdk/core/azure-core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Release History

## 2.2.0-beta.2 (2021-05-08)

### Dependency Updates
- Upgraded `azure-core` dependency to `1.16.0`.

## 2.0.5 (2021-05-07)

### Dependency Updates
Expand All @@ -14,6 +19,10 @@
### Dependency Updates
- Upgraded `azure-core` dependency to `1.15.0`.

## 2.1.0-beta.1 (2021-03-26)
### New Features
- Exposes 'AmqpTransactionCoordinator' via AmqpSession.

## 2.0.4 (2021-04-12)

### Bug Fixes
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/azure-core-amqp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ own AMQP client library that abstracts from the underlying transport library's i
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.0.5</version>
<version>2.2.0-beta.2</version>
</dependency>
```
[//]: # ({x-version-update-end})
Expand Down
4 changes: 2 additions & 2 deletions sdk/core/azure-core-amqp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.1.0-beta.1</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>2.2.0-beta.2</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<packaging>jar</packaging>

<name>Microsoft Azure Java Core AMQP Library</name>
Expand Down Expand Up @@ -58,7 +58,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.17.0-beta.1</version> <!-- {x-version-update;com.azure:azure-core;current} -->
<version>1.16.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,22 @@ public interface AmqpSession extends Disposable {
* @return A completable mono.
*/
Mono<Void> rollbackTransaction(AmqpTransaction transaction);

/**
* Creates the {@link AmqpTransactionCoordinator} on the {@link AmqpSession} which is used to create/commit or
* rollback the transaction. A transaction can span over one or more message broker entities. The interface
* {@link AmqpSession} provides default implementation for back-word compatibility but it throws
* {@link RuntimeException} to warn what that an implementing class must override and provide implementation of this
* API. Azure SDK already provides implementation for this API.
*
* @return newly created {@link AmqpTransactionCoordinator}.
* @throws UnsupportedOperationException Indicting implementation not found error. Azure SDK should provide
* implementation of this API but if runtime is not able to find it in its classpath or version mismatch can cause
* this exception.
*
* @see <a href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transactions-v1.0-os.html#section-coordination">Transaction Coordination</a>
*/
default Mono<? extends AmqpTransactionCoordinator> getOrCreateTransactionCoordinator() {
return Mono.error(new UnsupportedOperationException("Implementation not found error."));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp;

import reactor.core.publisher.Mono;

/**
* Provides an API to manage AMQP transaction on message broker. A transaction is used where one or more operation in
* messaging broker is part of one unit of work. In general a transaction involve with many operations on one message
* broker entity.
*<p>
* Distributed Transactions: A distributed transaction where operations spans over different message broker entities.
*
* @see <a href="http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transactions-v1.0-os.html#choice-txn-capability-distributed-transactions">Distributed Transactions</a>
* @see <a href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transactions-v1.0-os.html#section-transactions">Transactions</a>
*/
public interface AmqpTransactionCoordinator {

/**
* Completes the transaction. All the work in this transaction will either rollback or committed as one unit of
* work.
* @param transaction that needs to be completed on message broker.
* @param isCommit this flag indicates that the work associated with this transaction should commit or rollback.
* @return a completable {@link Mono}.
*/
Mono<Void> discharge(AmqpTransaction transaction, boolean isCommit);

/**
* Creates the transaction in message broker. Successful completion of this API indicates that a transaction
* identifier has successfully been created on the message broker. Once a transaction has been created, it must be
* completed by using {@link AmqpTransactionCoordinator#discharge(AmqpTransaction, boolean)} API.
* @return the created transaction id represented by {@link AmqpTransaction}.
*/
Mono<AmqpTransaction> declare();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.AmqpShutdownSignal;
import com.azure.core.amqp.AmqpTransaction;
import com.azure.core.amqp.AmqpTransactionCoordinator;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
Expand Down Expand Up @@ -177,26 +178,26 @@ public Duration getOperationTimeout() {
*/
@Override
public Mono<AmqpTransaction> createTransaction() {
return createTransactionCoordinator()
.flatMap(coordinator -> coordinator.createTransaction());
return getOrCreateTransactionCoordinator()
.flatMap(coordinator -> coordinator.declare());
}

/**
* {@inheritDoc}
*/
@Override
public Mono<Void> commitTransaction(AmqpTransaction transaction) {
return createTransactionCoordinator()
.flatMap(coordinator -> coordinator.completeTransaction(transaction, true));
return getOrCreateTransactionCoordinator()
.flatMap(coordinator -> coordinator.discharge(transaction, true));
}

/**
* {@inheritDoc}
*/
@Override
public Mono<Void> rollbackTransaction(AmqpTransaction transaction) {
return createTransactionCoordinator()
.flatMap(coordinator -> coordinator.completeTransaction(transaction, false));
return getOrCreateTransactionCoordinator()
.flatMap(coordinator -> coordinator.discharge(transaction, false));
}

/**
Expand Down Expand Up @@ -264,7 +265,8 @@ Mono<Void> dispose(String message, ErrorCondition errorCondition, boolean dispos
/**
* @return {@link Mono} of {@link TransactionCoordinator}
*/
private Mono<TransactionCoordinator> createTransactionCoordinator() {
@Override
public Mono<? extends AmqpTransactionCoordinator> getOrCreateTransactionCoordinator() {
if (isDisposed()) {
return Mono.error(logger.logExceptionAsError(new IllegalStateException(String.format(
"connectionId[%s] sessionName[%s] Cannot create coordinator send link '%s' from a closed session.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpTransaction;
import com.azure.core.amqp.AmqpTransactionCoordinator;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
Expand All @@ -21,7 +22,7 @@
/**
* Encapsulates transaction functions.
*/
final class TransactionCoordinator {
final class TransactionCoordinator implements AmqpTransactionCoordinator {

private final ClientLogger logger = new ClientLogger(TransactionCoordinator.class);

Expand All @@ -42,7 +43,8 @@ final class TransactionCoordinator {
*
* @return a completable {@link Mono} which represent {@link DeliveryState}.
*/
Mono<Void> completeTransaction(AmqpTransaction transaction, boolean isCommit) {
@Override
public Mono<Void> discharge(AmqpTransaction transaction, boolean isCommit) {
final Message message = Proton.message();
Discharge discharge = new Discharge();
discharge.setFail(!isCommit);
Expand Down Expand Up @@ -74,7 +76,8 @@ Mono<Void> completeTransaction(AmqpTransaction transaction, boolean isCommit) {
*
* @return a completable {@link Mono} which represent {@link DeliveryState}.
*/
Mono<AmqpTransaction> createTransaction() {
@Override
public Mono<AmqpTransaction> declare() {
final Message message = Proton.message();
Declare declare = new Declare();
message.setBody(new AmqpValue(declare));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@

import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Objects;

/**
* This class encapsulates the body of a message. The {@link AmqpMessageBodyType} map to an AMQP specification message
* body types. Current implementation only support {@link AmqpMessageBodyType#DATA DATA} AMQP data type. Track this
* <a href="https://github.com/Azure/azure-sdk-for-java/issues/17614" target="_blank">issue</a> to find out support for
* other AMQP types.
* body types. Current implementation support {@link AmqpMessageBodyType#DATA DATA} AMQP data type.
*
* <p><b>Client should test for {@link AmqpMessageBodyType} before calling corresponding get method. Get methods not
* corresponding to the type of the body throws exception.</b></p>
Expand All @@ -23,6 +22,11 @@
* {@codesnippet com.azure.core.amqp.models.AmqpBodyType.checkBodyType}
*
* @see AmqpMessageBodyType
* @see <a href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#section-primitive-type-definitions" target="_blank">
* Amqp primitive data type.</a>
* @see <a href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format" target="_blank">
* Amqp message format.</a>
*
*/
public final class AmqpMessageBody {
private final ClientLogger logger = new ClientLogger(AmqpMessageBody.class);
Expand All @@ -32,6 +36,8 @@ public final class AmqpMessageBody {
// This the priority here to store payload as `byte[] data` and
private byte[] data;
private List<byte[]> dataList;
private Object value;
private List<Object> sequence;

private AmqpMessageBody() {
// private constructor so no one outside can create instance of this except classes im this package.
Expand All @@ -54,6 +60,47 @@ public static AmqpMessageBody fromData(byte[] data) {
return body;
}

/**
* Creates an instance of {@link AmqpMessageBody} with the given {@link List sequence}. It supports only one
* {@code sequence} at present.
*
* @param sequence used to create an instance of {@link AmqpMessageBody}. A sequence can be {@link List} of
* {@link Object objects}. The {@link Object object} can be any of the AMQP supported primitive data type.
*
* @return newly created instance of {@link AmqpMessageBody}.
*
* @throws NullPointerException if {@code sequence} is null.
* @see <a href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#section-primitive-type-definitions" target="_blank">
* Amqp primitive data type.</a>
*/
public static AmqpMessageBody fromSequence(List<Object> sequence) {
Objects.requireNonNull(sequence, "'sequence' cannot be null.");
AmqpMessageBody body = new AmqpMessageBody();
body.bodyType = AmqpMessageBodyType.SEQUENCE;
body.sequence = sequence;
return body;
}

/**
* Creates an instance of {@link AmqpMessageBody} with the given {@link Object value}. A value can be any of the
* AMQP supported primitive data type.
*
* @param value used to create an instance of {@link AmqpMessageBody}.
*
* @return newly created instance of {@link AmqpMessageBody}.
*
* @throws NullPointerException if {@code value} is null.
* @see <a href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#section-primitive-type-definitions" target="_blank">
* Amqp primitive data type.</a>
*/
public static AmqpMessageBody fromValue(Object value) {
Objects.requireNonNull(value, "'value' cannot be null.");
AmqpMessageBody body = new AmqpMessageBody();
body.bodyType = AmqpMessageBodyType.VALUE;
body.value = value;
return body;
}

/**
* Gets the {@link AmqpMessageBodyType} of the message.
* <p><strong>How to check for {@link AmqpMessageBodyType}</strong></p>
Expand All @@ -80,9 +127,7 @@ public AmqpMessageBodyType getBodyType() {
public IterableStream<byte[]> getData() {
if (bodyType != AmqpMessageBodyType.DATA) {
throw logger.logExceptionAsError(new IllegalArgumentException(
"This method can only be called for AMQP Data body type at present. Track this issue, "
+ "https://github.com/Azure/azure-sdk-for-java/issues/17614 for other body type support in "
+ "future."));
"This method can only be called if AMQP Message body type is 'DATA'."));
}
if (dataList == null) {
dataList = Collections.singletonList(data);
Expand All @@ -107,10 +152,62 @@ public IterableStream<byte[]> getData() {
public byte[] getFirstData() {
if (bodyType != AmqpMessageBodyType.DATA) {
throw logger.logExceptionAsError(new IllegalArgumentException(
"This method can only be called for AMQP Data body type at present. Track this issue, "
+ "https://github.com/Azure/azure-sdk-for-java/issues/17614 for other body type support in "
+ "future."));
String.format(Locale.US, "This method can be called if AMQP Message body type is 'DATA'. "
+ "The actual type is [%s].", bodyType)));
}
return data;
}

/**
* Gets the unmodifiable AMQP Sequence set on this {@link AmqpMessageBody}. It support only one {@code sequence} at
* present.
*
* <p><b>Client should test for {@link AmqpMessageBodyType} before calling corresponding get method. Get methods not
* corresponding to the type of the body throws exception.</b></p>
*
* <p><strong>How to check for {@link AmqpMessageBodyType}</strong></p>
* {@codesnippet com.azure.core.amqp.models.AmqpBodyType.checkBodyType}
* @return sequence of this {@link AmqpMessageBody} instance.
*
* @throws IllegalArgumentException If {@link AmqpMessageBodyType} is not
* {@link AmqpMessageBodyType#SEQUENCE SEQUENCE}.
* @see <a href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#section-primitive-type-definitions" target="_blank">
* Amqp primitive data type.</a>
* @see <a href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format" target="_blank">
* Amqp message format.</a>
*/
public List<Object> getSequence() {
if (bodyType != AmqpMessageBodyType.SEQUENCE) {
throw logger.logExceptionAsError(new IllegalArgumentException(
String.format(Locale.US, "This method can be called if AMQP Message body type is 'SEQUENCE'. "
+ "The actual type is [%s].", bodyType)));
}

return Collections.unmodifiableList(sequence);
}

/**
* Gets the AMQP value set on this {@link AmqpMessageBody} instance. It can be any of the primitive AMQP data type.
*
* <p><b>Client should test for {@link AmqpMessageBodyType} before calling corresponding get method. The 'Get'
* methods not corresponding to the type of the body throws exception.</b></p>
*
* <p><strong>How to check for {@link AmqpMessageBodyType}</strong></p>
* {@codesnippet com.azure.core.amqp.models.AmqpBodyType.checkBodyType}
* @return value of this {@link AmqpMessageBody} instance.
*
* @throws IllegalArgumentException If {@link AmqpMessageBodyType} is not {@link AmqpMessageBodyType#VALUE VALUE}.
* @see <a href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#section-primitive-type-definitions" target="_blank">
* Amqp primitive data type.</a>
* @see <a href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format" target="_blank">
* Amqp message format.</a>
*/
public Object getValue() {
if (bodyType != AmqpMessageBodyType.VALUE) {
throw logger.logExceptionAsError(new IllegalArgumentException(
String.format(Locale.US, "This method can be called if AMQP Message body type is 'VALUE'. "
+ "The actual type is [%s].", bodyType)));
}
return value;
}
}
Loading