diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md b/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md index b9c8892f42a5a..a6c95a939462c 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md +++ b/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md @@ -8,6 +8,8 @@ ### Bugs Fixed +- Querying runtime data and other management operations will now correctly guards against the race condition where an AMQP link is in the process of closing as the operation attempts to use it. These errors will now properly be classified as retriable as they are for producer and consumer operations. + ### Other Changes ## 5.11.5 (2024-07-31) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs index 75cdfefd8f29f..759822e9ffa43 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs @@ -200,7 +200,7 @@ protected AmqpClient(string host, clientOptions.CertificateValidationCallback); ManagementLink = new FaultTolerantAmqpObject( - linkTimeout => ConnectionScope.OpenManagementLinkAsync(operationTimeout, linkTimeout, CancellationToken.None), + linkTimeout => CreateManagementLinkAsync(operationTimeout, linkTimeout, CancellationToken.None), link => { link.Session?.SafeClose(); @@ -545,6 +545,35 @@ public override async Task CloseAsync(CancellationToken cancellationToken) } } + /// + /// Creates the AMQP link to be used for management operations and ensures + /// that any corresponding state has been updated based on the link configuration. + /// + /// + /// The timeout to apply to management operations using the link.. + /// The timeout to apply for creating the link. + /// The cancellation token to consider when creating the link. + /// + /// The AMQP link to use for management operations. + /// + private async Task CreateManagementLinkAsync(TimeSpan operationTimeout, + TimeSpan linkTimeout, + CancellationToken cancellationToken) + { + var link = default(RequestResponseAmqpLink); + + try + { + link = await ConnectionScope.OpenManagementLinkAsync(operationTimeout, linkTimeout, CancellationToken.None).ConfigureAwait(false); + } + catch (Exception ex) + { + ExceptionDispatchInfo.Capture(ex.TranslateConnectionCloseDuringLinkCreationException(EventHubName)).Throw(); + } + + return link; + } + /// /// Acquires an access token for authorization with the Event Hubs service. ///