From b512edac4b8c5ab2e2e2ff6fd08083256144ede5 Mon Sep 17 00:00:00 2001 From: Jesse Squire Date: Thu, 10 Oct 2024 12:42:55 -0700 Subject: [PATCH] [Event Hubs] Resilient management link creation The focus of these changes is to correctly handle the scenario where the management AMQP link is in the process of closing (such as for an idle timeout) as a new operation attempts to use it. This error should be detected as a special case and retried as it is with producer and consumer operations. --- .../Azure.Messaging.EventHubs/CHANGELOG.md | 2 ++ .../src/Amqp/AmqpClient.cs | 31 ++++++++++++++++++- 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md b/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md index b9c8892f42a5a..a6c95a939462c 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md +++ b/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md @@ -8,6 +8,8 @@ ### Bugs Fixed +- Querying runtime data and other management operations will now correctly guards against the race condition where an AMQP link is in the process of closing as the operation attempts to use it. These errors will now properly be classified as retriable as they are for producer and consumer operations. + ### Other Changes ## 5.11.5 (2024-07-31) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs index 75cdfefd8f29f..759822e9ffa43 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs @@ -200,7 +200,7 @@ protected AmqpClient(string host, clientOptions.CertificateValidationCallback); ManagementLink = new FaultTolerantAmqpObject( - linkTimeout => ConnectionScope.OpenManagementLinkAsync(operationTimeout, linkTimeout, CancellationToken.None), + linkTimeout => CreateManagementLinkAsync(operationTimeout, linkTimeout, CancellationToken.None), link => { link.Session?.SafeClose(); @@ -545,6 +545,35 @@ public override async Task CloseAsync(CancellationToken cancellationToken) } } + /// + /// Creates the AMQP link to be used for management operations and ensures + /// that any corresponding state has been updated based on the link configuration. + /// + /// + /// The timeout to apply to management operations using the link.. + /// The timeout to apply for creating the link. + /// The cancellation token to consider when creating the link. + /// + /// The AMQP link to use for management operations. + /// + private async Task CreateManagementLinkAsync(TimeSpan operationTimeout, + TimeSpan linkTimeout, + CancellationToken cancellationToken) + { + var link = default(RequestResponseAmqpLink); + + try + { + link = await ConnectionScope.OpenManagementLinkAsync(operationTimeout, linkTimeout, CancellationToken.None).ConfigureAwait(false); + } + catch (Exception ex) + { + ExceptionDispatchInfo.Capture(ex.TranslateConnectionCloseDuringLinkCreationException(EventHubName)).Throw(); + } + + return link; + } + /// /// Acquires an access token for authorization with the Event Hubs service. ///