Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP | Draft]: Drop the support for AMQP v1 stack #43735

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions eng/versioning/version_client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ io.clientcore:http-stress;1.0.0-beta.1;1.0.0-beta.1

unreleased_com.azure:azure-monitor-opentelemetry-exporter;1.0.0-beta.31
unreleased_com.azure:azure-monitor-opentelemetry-autoconfigure;1.0.0-beta.1
unreleased_com.azure:azure-core-amqp;2.10.0-beta.1

# Released Beta dependencies: Copy the entry from above, prepend "beta_", remove the current
# version and set the version to the released beta. Released beta dependencies are only valid
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.9.12</version> <!-- {x-version-update;com.azure:azure-core-amqp;dependency} -->
<version>2.10.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core-amqp;dependency} -->
</dependency>

<!-- Test dependencies -->
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/azure-messaging-servicebus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.9.12</version> <!-- {x-version-update;com.azure:azure-core-amqp;dependency} -->
<version>2.10.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core-amqp;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,7 @@ public Flux<ServiceBusReceivedMessage> receiveMessages() {
return receiveMessagesNoBackPressure().limitRate(1, 0);
}

// API used only in v1 stack
@SuppressWarnings("try")
Flux<ServiceBusReceivedMessage> receiveMessagesNoBackPressure() {
return receiveMessagesWithContext(0).handle((serviceBusMessageContext, sink) -> {
Expand All @@ -980,13 +981,17 @@ Flux<ServiceBusReceivedMessage> receiveMessagesNoBackPressure() {
* {@link Flux#take(Duration)}).</li>
* <li>An {@link AmqpException} occurs that causes the receive link to stop.</li>
* </ul>
* <p>
* API used only in v1 stack
* </p>
*
* @return An <b>infinite</b> stream of messages from the Service Bus entity.
*/
Flux<ServiceBusMessageContext> receiveMessagesWithContext() {
return receiveMessagesWithContext(1);
}

// API used only in v1 stack
private Flux<ServiceBusMessageContext> receiveMessagesWithContext(int highTide) {
final Flux<ServiceBusMessageContext> messageFlux = sessionManager != null
? sessionManager.receive()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import static com.azure.messaging.servicebus.TestUtils.getSessionSubscriptionBaseName;
import static com.azure.messaging.servicebus.TestUtils.getSubscriptionBaseName;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

Expand Down Expand Up @@ -444,6 +445,16 @@ protected void assertMessageEquals(ServiceBusReceivedMessage message, String mes
}
}

protected void assertMessageEquals(String sessionId, String messageId, String contents,
ServiceBusReceivedMessage message) {
assertNotNull(message, "'message' should not be null.");
if (!CoreUtils.isNullOrEmpty(sessionId)) {
assertEquals(sessionId, message.getSessionId());
}
assertEquals(messageId, message.getMessageId());
assertEquals(contents, message.getBody().toString());
}

protected final Configuration v1OrV2(boolean isV2) {
final TestConfigurationSource configSource = new TestConfigurationSource();
if (isV2) {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import static com.azure.messaging.servicebus.TestUtils.USE_CASE_PEEK_BATCH_MESSAGES;
import static com.azure.messaging.servicebus.TestUtils.USE_CASE_PEEK_MESSAGE;
import static com.azure.messaging.servicebus.TestUtils.USE_CASE_RECEIVE_AND_COMPLETE;
import static com.azure.messaging.servicebus.TestUtils.getServiceBusMessage;
import static com.azure.messaging.servicebus.TestUtils.getServiceBusMessages;
import static com.azure.messaging.servicebus.TestUtils.getSessionSubscriptionBaseName;
import static com.azure.messaging.servicebus.TestUtils.getSubscriptionBaseName;
Expand Down Expand Up @@ -1441,6 +1442,44 @@ void autoComplete(MessagingEntityType entityType) {
}
}

@ParameterizedTest
@MethodSource("com.azure.messaging.servicebus.IntegrationTestBase#messagingEntityProvider")
void receivesSingleSessionMessages(MessagingEntityType entityType) {
// Arrange
final int entityIndex = TestUtils.USE_CASE_SINGLE_SESSION;
final String messageId = "sessionMessageId";
final String contents = "Some-contents";
final int numberToSend = 5;

setSender(entityType, entityIndex, true);
final Disposable subscription = Flux.interval(Duration.ofMillis(500)).take(numberToSend).flatMap(index -> {
final ServiceBusMessage message = getServiceBusMessage(contents, messageId).setSessionId(sessionId);
return sender.sendMessage(message).thenReturn(index);
})
.subscribe(number -> logger.info("sessionId[{}] sent[{}] Message sent.", sessionId, number),
error -> logger.error("sessionId[{}] Error encountered.", sessionId, error),
() -> logger.info("sessionId[{}] Finished sending.", sessionId));
toClose(subscription);
setReceiver(entityType, entityIndex, true);

// Act & Assert
StepVerifier
.create(receiver.receiveMessages()
.concatMap(receivedMessage -> receiver.complete(receivedMessage).thenReturn(receivedMessage)))
.assertNext(serviceBusReceivedMessage -> assertMessageEquals(sessionId, messageId, contents,
serviceBusReceivedMessage))
.assertNext(serviceBusReceivedMessage -> assertMessageEquals(sessionId, messageId, contents,
serviceBusReceivedMessage))
.assertNext(serviceBusReceivedMessage -> assertMessageEquals(sessionId, messageId, contents,
serviceBusReceivedMessage))
.assertNext(serviceBusReceivedMessage -> assertMessageEquals(sessionId, messageId, contents,
serviceBusReceivedMessage))
.assertNext(serviceBusReceivedMessage -> assertMessageEquals(sessionId, messageId, contents,
serviceBusReceivedMessage))
.thenCancel()
.verify(Duration.ofMinutes(2));
}

/**
* Asserts the length and values with in the map.
*/
Expand Down

This file was deleted.

Loading
Loading