From d89970a385c107441f560502531ab7182d753681 Mon Sep 17 00:00:00 2001 From: Abhipsa Misra Date: Mon, 19 Dec 2022 10:55:06 -0800 Subject: [PATCH] Await completion of refresh loop (#3021) --- .../Amqp/AmqpAuthenticationRefresher.cs | 36 +++++--- .../Transport/Amqp/AmqpConnectionHolder.cs | 27 +++--- iothub/device/src/Transport/Amqp/AmqpUnit.cs | 90 ++++++++++++------- .../Amqp/IAmqpAuthenticationRefresher.cs | 2 +- .../Transport/Amqp/IAmqpConnectionHolder.cs | 2 +- 5 files changed, 102 insertions(+), 55 deletions(-) diff --git a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs index f6cdbf39a6..3af0c9776f 100644 --- a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs +++ b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs @@ -149,7 +149,7 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc // then log the exception and continue. // This task runs on an unmonitored thread so there is no point throwing these exceptions. if (Logging.IsEnabled) - Logging.Error(this, refreshesOn, $"{_amqpIotCbsTokenProvider} refresh token failed {ex}"); + Logging.Error(this, refreshesOn, $"{_amqpIotCbsTokenProvider} refresh token failed: {ex}"); } finally { @@ -162,23 +162,37 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc } } - public void StopLoop() + public async Task StopLoopAsync() { - if (Logging.IsEnabled) - Logging.Enter(this, nameof(StopLoop)); - try { - _refresherCancellationTokenSource?.Cancel(); + if (Logging.IsEnabled) + Logging.Enter(this, nameof(StopLoopAsync)); + + try + { + _refresherCancellationTokenSource?.Cancel(); + } + catch (ObjectDisposedException) + { + if (Logging.IsEnabled) + Logging.Error(this, "The cancellation token source has already been canceled and disposed", nameof(StopLoopAsync)); + } + + // Await the completion of _refreshLoop. + // This will ensure that when StopLoopAsync has been exited then no more token refresh attempts are in-progress. + await _refreshLoop.ConfigureAwait(false); + } + catch (Exception ex) + { + if (Logging.IsEnabled) + Logging.Error(this, $"Caught exception when stopping token refresh loop: {ex}"); } - catch (ObjectDisposedException) + finally { if (Logging.IsEnabled) - Logging.Error(this, "The cancellation token source has already been canceled and disposed", nameof(StopLoop)); + Logging.Exit(this, nameof(StopLoopAsync)); } - - if (Logging.IsEnabled) - Logging.Exit(this, nameof(StopLoop)); } public void Dispose() diff --git a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs index f3d0eaea15..31f16c8bac 100644 --- a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs +++ b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs @@ -75,7 +75,7 @@ private void OnConnectionClosed(object o, EventArgs args) if (_amqpIoTConnection != null && ReferenceEquals(_amqpIoTConnection, o)) { - _amqpAuthenticationRefresher?.StopLoop(); + _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); HashSet amqpUnits; lock (_unitsLock) { @@ -92,19 +92,20 @@ private void OnConnectionClosed(object o, EventArgs args) } } - public void Shutdown() + public async Task ShutdownAsync() { if (Logging.IsEnabled) + Logging.Enter(this, _amqpIotConnection, nameof(ShutdownAsync)); + + if (_amqpAuthenticationRefresher != null) { - Logging.Enter(this, _amqpIoTConnection, $"{nameof(Shutdown)}"); + await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); } - _amqpAuthenticationRefresher?.StopLoop(); - _amqpIoTConnection?.SafeClose(); + _amqpIotConnection?.SafeClose(); + if (Logging.IsEnabled) - { - Logging.Exit(this, _amqpIoTConnection, $"{nameof(Shutdown)}"); - } + Logging.Exit(this, _amqpIotConnection, nameof(ShutdownAsync)); } public void Dispose() @@ -232,8 +233,12 @@ public async Task EnsureConnectionAsync(TimeSpan timeout) } catch (Exception ex) when (!ex.IsFatal()) { - amqpAuthenticationRefresher?.StopLoop(); - amqpIoTConnection?.SafeClose(); + if (amqpAuthenticationRefresher != null) + { + await amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } + + amqpIotConnection?.SafeClose(); throw; } finally @@ -261,7 +266,7 @@ public void RemoveAmqpUnit(AmqpUnit amqpUnit) if (_amqpUnits.Count == 0) { // TODO #887: handle gracefulDisconnect - Shutdown(); + _ = ShutdownAsync().ConfigureAwait(false); } } if (Logging.IsEnabled) diff --git a/iothub/device/src/Transport/Amqp/AmqpUnit.cs b/iothub/device/src/Transport/Amqp/AmqpUnit.cs index 4b08a12f94..9584e79195 100644 --- a/iothub/device/src/Transport/Amqp/AmqpUnit.cs +++ b/iothub/device/src/Transport/Amqp/AmqpUnit.cs @@ -139,7 +139,7 @@ internal async Task EnsureSessionIsOpenAsync(TimeSpan timeout) } catch (Exception) { - Cleanup(); + await CleanupAsync().ConfigureAwait(false); throw; } finally @@ -168,11 +168,16 @@ public async Task CloseAsync(TimeSpan timeout) { try { - await _amqpIoTSession.CloseAsync(timeout).ConfigureAwait(false); + await _amqpIotSession.CloseAsync(cancellationToken).ConfigureAwait(false); + + if (_amqpAuthenticationRefresher != null) + { + await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } } finally { - Cleanup(); + await CleanupAsync().ConfigureAwait(false); } } } @@ -185,18 +190,25 @@ public async Task CloseAsync(TimeSpan timeout) } } - private void Cleanup() + private async Task CleanupAsync() { - Logging.Enter(this, nameof(Cleanup)); + if (Logging.IsEnabled) + Logging.Enter(this, nameof(CleanupAsync)); + + _amqpIotSession?.SafeClose(); + + if (_amqpAuthenticationRefresher != null) + { + await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } - _amqpIoTSession?.SafeClose(); - _amqpAuthenticationRefresher?.StopLoop(); - if (!_deviceIdentity.IsPooling()) + if (!_deviceIdentity.IsPooling() && _amqpConnectionHolder != null) { - _amqpConnectionHolder?.Shutdown(); + await _amqpConnectionHolder.ShutdownAsync().ConfigureAwait(false); } - Logging.Exit(this, nameof(Cleanup)); + if (Logging.IsEnabled) + Logging.Exit(this, nameof(CleanupAsync)); } #endregion Open-Close @@ -804,7 +816,7 @@ public void OnConnectionDisconnected() { Logging.Enter(this, nameof(OnConnectionDisconnected)); - _amqpAuthenticationRefresher?.StopLoop(); + _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); _onUnitDisconnected(); Logging.Exit(this, nameof(OnConnectionDisconnected)); @@ -816,7 +828,9 @@ private void OnSessionDisconnected(object o, EventArgs args) if (ReferenceEquals(o, _amqpIoTSession)) { - _amqpAuthenticationRefresher?.StopLoop(); + _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); + + // calls TransportHandler.OnTransportDisconnected() which sets the transport layer up to retry _onUnitDisconnected(); } Logging.Exit(this, o, nameof(OnSessionDisconnected)); @@ -834,31 +848,45 @@ public void Dispose() private void Dispose(bool disposing) { - if (_disposed) + try { - return; - } + if (Logging.IsEnabled) + { + Logging.Enter(this, $"Device pooling={_deviceIdentity?.IsPooling()}; disposed={_disposed}; disposing={disposing}", $"{nameof(AmqpUnit)}.{nameof(Dispose)}"); + } + + if (!_disposed) + { + if (disposing) + { + if (!_deviceIdentity.IsPooling()) + { + _amqpConnectionHolder?.Dispose(); + } - _disposed = true; + // For device sas authenticated clients the authentication refresher is associated with the AMQP unit itself, + // so it needs to be explicitly disposed. + _amqpAuthenticationRefresher?.Dispose(); - if (disposing) - { - Logging.Enter(this, disposing, nameof(Dispose)); + _sessionSemaphore?.Dispose(); + _messageReceivingLinkSemaphore?.Dispose(); + _messageReceivingCallbackSemaphore?.Dispose(); + _eventReceivingLinkSemaphore?.Dispose(); + _methodLinkSemaphore?.Dispose(); + _twinLinksSemaphore?.Dispose(); - Cleanup(); - if (!_deviceIdentity.IsPooling()) - { - _amqpConnectionHolder?.Dispose(); + Logging.Exit(this, disposing, nameof(Dispose)); + } } - _sessionSemaphore?.Dispose(); - _messageReceivingLinkSemaphore?.Dispose(); - _messageReceivingCallbackSemaphore?.Dispose(); - _eventReceivingLinkSemaphore?.Dispose(); - _methodLinkSemaphore?.Dispose(); - _twinLinksSemaphore?.Dispose(); - - Logging.Exit(this, disposing, nameof(Dispose)); + _disposed = true; + } + finally + { + if (Logging.IsEnabled) + { + Logging.Exit(this, $"Device pooling={_deviceIdentity?.IsPooling()}; disposed={_disposed}; disposing={disposing}", $"{nameof(AmqpUnit)}.{nameof(Dispose)}"); + } } } diff --git a/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs b/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs index 3f7d4b6bc6..9f3c4432a6 100644 --- a/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs +++ b/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs @@ -10,7 +10,7 @@ namespace Microsoft.Azure.Devices.Client.Transport.Amqp internal interface IAmqpAuthenticationRefresher : IDisposable { Task InitLoopAsync(TimeSpan timeout); - void StopLoop(); void StartLoop(DateTime refreshOn, CancellationToken cancellationToken); + Task StopLoopAsync(); } } diff --git a/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs b/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs index a3130f8044..35f4af6b6f 100644 --- a/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs +++ b/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs @@ -12,6 +12,6 @@ internal interface IAmqpConnectionHolder : IDisposable Task OpenSessionAsync(DeviceIdentity deviceIdentity, TimeSpan timeout); Task EnsureConnectionAsync(TimeSpan timeout); Task CreateRefresherAsync(DeviceIdentity deviceIdentity, TimeSpan timeout); - void Shutdown(); + Task ShutdownAsync(); } }