From 5fc6353f6aa7b8ca2ced0a7af9c0b6223d2c6ffc Mon Sep 17 00:00:00 2001 From: Abhipsa Misra Date: Tue, 8 Nov 2022 14:13:26 -0800 Subject: [PATCH 1/5] Cancel amqp token refresher as a part of transport cleanup (#2935) --- .../Amqp/AmqpAuthenticationRefresher.cs | 89 +++++++++++++------ 1 file changed, 64 insertions(+), 25 deletions(-) diff --git a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs index 6528970d0d..f6cdbf39a6 100644 --- a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs +++ b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs @@ -22,6 +22,7 @@ internal class AmqpAuthenticationRefresher : IAmqpAuthenticationRefresher, IDisp private TimeSpan _operationTimeout; private Task _refreshLoop; private bool _disposed; + private CancellationTokenSource _refresherCancellationTokenSource; internal AmqpAuthenticationRefresher(DeviceIdentity deviceIdentity, AmqpIoTCbsLink amqpCbsLink) { @@ -64,9 +65,28 @@ public async Task InitLoopAsync(TimeSpan timeout) timeout) .ConfigureAwait(false); + // This cancellation token source is disposed when the authentication refresher is disposed + // or if this code block is executed more than once per instance of AmqpAuthenticationRefresher (not expected). + + if (_refresherCancellationTokenSource != null) + { + if (Logging.IsEnabled) + Logging.Info(this, "_refresherCancellationTokenSource was already initialized, whhich was unexpected. Canceling and disposing the previous instance.", nameof(InitLoopAsync)); + + try + { + _refresherCancellationTokenSource.Cancel(); + } + catch (ObjectDisposedException) + { + } + _refresherCancellationTokenSource.Dispose(); + } + _refresherCancellationTokenSource = new CancellationTokenSource(); + if (refreshOn < DateTime.MaxValue) { - StartLoop(refreshOn, newToken); + StartLoop(refreshOn, _refresherCancellationTokenSource.Token); } if (Logging.IsEnabled) @@ -82,6 +102,8 @@ public void StartLoop(DateTime refreshOn, CancellationToken cancellationToken) Logging.Enter(this, refreshOn, $"{nameof(StartLoop)}"); } + // This task runs in the background and is unmonitored. + // When this refresher is disposed it signals this task to be cancelled. _refreshLoop = RefreshLoopAsync(refreshOn, cancellationToken); if (Logging.IsEnabled) { @@ -97,12 +119,13 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc while (!cancellationToken.IsCancellationRequested) { if (Logging.IsEnabled) - { - Logging.Info(this, refreshesOn, $"Before {nameof(RefreshLoopAsync)}"); - } + Logging.Info(this, refreshesOn, $"{_amqpIotCbsTokenProvider} before {nameof(RefreshLoopAsync)}"); if (waitTime > TimeSpan.Zero) { + if (Logging.IsEnabled) + Logging.Info(this, refreshesOn, $"{_amqpIotCbsTokenProvider} waiting {waitTime} {nameof(RefreshLoopAsync)}."); + await Task.Delay(waitTime, cancellationToken).ConfigureAwait(false); } @@ -120,19 +143,18 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc _operationTimeout) .ConfigureAwait(false); } - catch (IotHubCommunicationException ex) + catch (Exception ex) when (ex is IotHubCommunicationException || ex is OperationCanceledException) { + // In case the token refresh is not successful either due to a communication exception or cancellation token cancellation + // then log the exception and continue. + // This task runs on an unmonitored thread so there is no point throwing these exceptions. if (Logging.IsEnabled) - { - Logging.Info(this, refreshesOn, $"Refresh token failed {ex}"); - } + Logging.Error(this, refreshesOn, $"{_amqpIotCbsTokenProvider} refresh token failed {ex}"); } finally { if (Logging.IsEnabled) - { - Logging.Info(this, refreshesOn, $"After {nameof(RefreshLoopAsync)}"); - } + Logging.Info(this, refreshesOn, $"{_amqpIotCbsTokenProvider} after {nameof(RefreshLoopAsync)}"); } waitTime = refreshesOn - DateTime.UtcNow; @@ -143,11 +165,20 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc public void StopLoop() { if (Logging.IsEnabled) + Logging.Enter(this, nameof(StopLoop)); + + try + { + _refresherCancellationTokenSource?.Cancel(); + } + catch (ObjectDisposedException) { - Logging.Info(this, $"{nameof(StopLoop)}"); + if (Logging.IsEnabled) + Logging.Error(this, "The cancellation token source has already been canceled and disposed", nameof(StopLoop)); } - _cancellationTokenSource?.Cancel(); + if (Logging.IsEnabled) + Logging.Exit(this, nameof(StopLoop)); } public void Dispose() @@ -158,23 +189,31 @@ public void Dispose() private void Dispose(bool disposing) { - if (_disposed) + try { - return; - } + if (Logging.IsEnabled) + { + Logging.Enter(this, $"Disposed={_disposed}; disposing={disposing}", $"{nameof(AmqpAuthenticationRefresher)}.{nameof(Dispose)}"); + } - if (Logging.IsEnabled) - { - Logging.Info(this, disposing, $"{nameof(Dispose)}"); - } + if (!_disposed) + { + if (disposing) + { + _refresherCancellationTokenSource?.Dispose(); + _amqpIotCbsTokenProvider?.Dispose(); + } - if (disposing) + _disposed = true; + } + } + finally { - StopLoop(); - _cancellationTokenSource.Dispose(); + if (Logging.IsEnabled) + { + Logging.Exit(this, $"Disposed={_disposed}; disposing={disposing}", $"{nameof(AmqpAuthenticationRefresher)}.{nameof(Dispose)}"); + } } - - _disposed = true; } } } From d89970a385c107441f560502531ab7182d753681 Mon Sep 17 00:00:00 2001 From: Abhipsa Misra Date: Mon, 19 Dec 2022 10:55:06 -0800 Subject: [PATCH 2/5] Await completion of refresh loop (#3021) --- .../Amqp/AmqpAuthenticationRefresher.cs | 36 +++++--- .../Transport/Amqp/AmqpConnectionHolder.cs | 27 +++--- iothub/device/src/Transport/Amqp/AmqpUnit.cs | 90 ++++++++++++------- .../Amqp/IAmqpAuthenticationRefresher.cs | 2 +- .../Transport/Amqp/IAmqpConnectionHolder.cs | 2 +- 5 files changed, 102 insertions(+), 55 deletions(-) diff --git a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs index f6cdbf39a6..3af0c9776f 100644 --- a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs +++ b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs @@ -149,7 +149,7 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc // then log the exception and continue. // This task runs on an unmonitored thread so there is no point throwing these exceptions. if (Logging.IsEnabled) - Logging.Error(this, refreshesOn, $"{_amqpIotCbsTokenProvider} refresh token failed {ex}"); + Logging.Error(this, refreshesOn, $"{_amqpIotCbsTokenProvider} refresh token failed: {ex}"); } finally { @@ -162,23 +162,37 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc } } - public void StopLoop() + public async Task StopLoopAsync() { - if (Logging.IsEnabled) - Logging.Enter(this, nameof(StopLoop)); - try { - _refresherCancellationTokenSource?.Cancel(); + if (Logging.IsEnabled) + Logging.Enter(this, nameof(StopLoopAsync)); + + try + { + _refresherCancellationTokenSource?.Cancel(); + } + catch (ObjectDisposedException) + { + if (Logging.IsEnabled) + Logging.Error(this, "The cancellation token source has already been canceled and disposed", nameof(StopLoopAsync)); + } + + // Await the completion of _refreshLoop. + // This will ensure that when StopLoopAsync has been exited then no more token refresh attempts are in-progress. + await _refreshLoop.ConfigureAwait(false); + } + catch (Exception ex) + { + if (Logging.IsEnabled) + Logging.Error(this, $"Caught exception when stopping token refresh loop: {ex}"); } - catch (ObjectDisposedException) + finally { if (Logging.IsEnabled) - Logging.Error(this, "The cancellation token source has already been canceled and disposed", nameof(StopLoop)); + Logging.Exit(this, nameof(StopLoopAsync)); } - - if (Logging.IsEnabled) - Logging.Exit(this, nameof(StopLoop)); } public void Dispose() diff --git a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs index f3d0eaea15..31f16c8bac 100644 --- a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs +++ b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs @@ -75,7 +75,7 @@ private void OnConnectionClosed(object o, EventArgs args) if (_amqpIoTConnection != null && ReferenceEquals(_amqpIoTConnection, o)) { - _amqpAuthenticationRefresher?.StopLoop(); + _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); HashSet amqpUnits; lock (_unitsLock) { @@ -92,19 +92,20 @@ private void OnConnectionClosed(object o, EventArgs args) } } - public void Shutdown() + public async Task ShutdownAsync() { if (Logging.IsEnabled) + Logging.Enter(this, _amqpIotConnection, nameof(ShutdownAsync)); + + if (_amqpAuthenticationRefresher != null) { - Logging.Enter(this, _amqpIoTConnection, $"{nameof(Shutdown)}"); + await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); } - _amqpAuthenticationRefresher?.StopLoop(); - _amqpIoTConnection?.SafeClose(); + _amqpIotConnection?.SafeClose(); + if (Logging.IsEnabled) - { - Logging.Exit(this, _amqpIoTConnection, $"{nameof(Shutdown)}"); - } + Logging.Exit(this, _amqpIotConnection, nameof(ShutdownAsync)); } public void Dispose() @@ -232,8 +233,12 @@ public async Task EnsureConnectionAsync(TimeSpan timeout) } catch (Exception ex) when (!ex.IsFatal()) { - amqpAuthenticationRefresher?.StopLoop(); - amqpIoTConnection?.SafeClose(); + if (amqpAuthenticationRefresher != null) + { + await amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } + + amqpIotConnection?.SafeClose(); throw; } finally @@ -261,7 +266,7 @@ public void RemoveAmqpUnit(AmqpUnit amqpUnit) if (_amqpUnits.Count == 0) { // TODO #887: handle gracefulDisconnect - Shutdown(); + _ = ShutdownAsync().ConfigureAwait(false); } } if (Logging.IsEnabled) diff --git a/iothub/device/src/Transport/Amqp/AmqpUnit.cs b/iothub/device/src/Transport/Amqp/AmqpUnit.cs index 4b08a12f94..9584e79195 100644 --- a/iothub/device/src/Transport/Amqp/AmqpUnit.cs +++ b/iothub/device/src/Transport/Amqp/AmqpUnit.cs @@ -139,7 +139,7 @@ internal async Task EnsureSessionIsOpenAsync(TimeSpan timeout) } catch (Exception) { - Cleanup(); + await CleanupAsync().ConfigureAwait(false); throw; } finally @@ -168,11 +168,16 @@ public async Task CloseAsync(TimeSpan timeout) { try { - await _amqpIoTSession.CloseAsync(timeout).ConfigureAwait(false); + await _amqpIotSession.CloseAsync(cancellationToken).ConfigureAwait(false); + + if (_amqpAuthenticationRefresher != null) + { + await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } } finally { - Cleanup(); + await CleanupAsync().ConfigureAwait(false); } } } @@ -185,18 +190,25 @@ public async Task CloseAsync(TimeSpan timeout) } } - private void Cleanup() + private async Task CleanupAsync() { - Logging.Enter(this, nameof(Cleanup)); + if (Logging.IsEnabled) + Logging.Enter(this, nameof(CleanupAsync)); + + _amqpIotSession?.SafeClose(); + + if (_amqpAuthenticationRefresher != null) + { + await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); + } - _amqpIoTSession?.SafeClose(); - _amqpAuthenticationRefresher?.StopLoop(); - if (!_deviceIdentity.IsPooling()) + if (!_deviceIdentity.IsPooling() && _amqpConnectionHolder != null) { - _amqpConnectionHolder?.Shutdown(); + await _amqpConnectionHolder.ShutdownAsync().ConfigureAwait(false); } - Logging.Exit(this, nameof(Cleanup)); + if (Logging.IsEnabled) + Logging.Exit(this, nameof(CleanupAsync)); } #endregion Open-Close @@ -804,7 +816,7 @@ public void OnConnectionDisconnected() { Logging.Enter(this, nameof(OnConnectionDisconnected)); - _amqpAuthenticationRefresher?.StopLoop(); + _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); _onUnitDisconnected(); Logging.Exit(this, nameof(OnConnectionDisconnected)); @@ -816,7 +828,9 @@ private void OnSessionDisconnected(object o, EventArgs args) if (ReferenceEquals(o, _amqpIoTSession)) { - _amqpAuthenticationRefresher?.StopLoop(); + _ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false); + + // calls TransportHandler.OnTransportDisconnected() which sets the transport layer up to retry _onUnitDisconnected(); } Logging.Exit(this, o, nameof(OnSessionDisconnected)); @@ -834,31 +848,45 @@ public void Dispose() private void Dispose(bool disposing) { - if (_disposed) + try { - return; - } + if (Logging.IsEnabled) + { + Logging.Enter(this, $"Device pooling={_deviceIdentity?.IsPooling()}; disposed={_disposed}; disposing={disposing}", $"{nameof(AmqpUnit)}.{nameof(Dispose)}"); + } + + if (!_disposed) + { + if (disposing) + { + if (!_deviceIdentity.IsPooling()) + { + _amqpConnectionHolder?.Dispose(); + } - _disposed = true; + // For device sas authenticated clients the authentication refresher is associated with the AMQP unit itself, + // so it needs to be explicitly disposed. + _amqpAuthenticationRefresher?.Dispose(); - if (disposing) - { - Logging.Enter(this, disposing, nameof(Dispose)); + _sessionSemaphore?.Dispose(); + _messageReceivingLinkSemaphore?.Dispose(); + _messageReceivingCallbackSemaphore?.Dispose(); + _eventReceivingLinkSemaphore?.Dispose(); + _methodLinkSemaphore?.Dispose(); + _twinLinksSemaphore?.Dispose(); - Cleanup(); - if (!_deviceIdentity.IsPooling()) - { - _amqpConnectionHolder?.Dispose(); + Logging.Exit(this, disposing, nameof(Dispose)); + } } - _sessionSemaphore?.Dispose(); - _messageReceivingLinkSemaphore?.Dispose(); - _messageReceivingCallbackSemaphore?.Dispose(); - _eventReceivingLinkSemaphore?.Dispose(); - _methodLinkSemaphore?.Dispose(); - _twinLinksSemaphore?.Dispose(); - - Logging.Exit(this, disposing, nameof(Dispose)); + _disposed = true; + } + finally + { + if (Logging.IsEnabled) + { + Logging.Exit(this, $"Device pooling={_deviceIdentity?.IsPooling()}; disposed={_disposed}; disposing={disposing}", $"{nameof(AmqpUnit)}.{nameof(Dispose)}"); + } } } diff --git a/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs b/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs index 3f7d4b6bc6..9f3c4432a6 100644 --- a/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs +++ b/iothub/device/src/Transport/Amqp/IAmqpAuthenticationRefresher.cs @@ -10,7 +10,7 @@ namespace Microsoft.Azure.Devices.Client.Transport.Amqp internal interface IAmqpAuthenticationRefresher : IDisposable { Task InitLoopAsync(TimeSpan timeout); - void StopLoop(); void StartLoop(DateTime refreshOn, CancellationToken cancellationToken); + Task StopLoopAsync(); } } diff --git a/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs b/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs index a3130f8044..35f4af6b6f 100644 --- a/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs +++ b/iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs @@ -12,6 +12,6 @@ internal interface IAmqpConnectionHolder : IDisposable Task OpenSessionAsync(DeviceIdentity deviceIdentity, TimeSpan timeout); Task EnsureConnectionAsync(TimeSpan timeout); Task CreateRefresherAsync(DeviceIdentity deviceIdentity, TimeSpan timeout); - void Shutdown(); + Task ShutdownAsync(); } } From b5e76a83d258ddc4c82d6c55a18c93e7b29d1a02 Mon Sep 17 00:00:00 2001 From: Abhipsa Misra Date: Mon, 19 Dec 2022 11:20:48 -0800 Subject: [PATCH 3/5] updates --- .../Amqp/AmqpAuthenticationRefresher.cs | 23 +++++++------------ 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs index 3af0c9776f..9b8306e503 100644 --- a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs +++ b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs @@ -13,13 +13,12 @@ namespace Microsoft.Azure.Devices.Client.Transport.Amqp { internal class AmqpAuthenticationRefresher : IAmqpAuthenticationRefresher, IDisposable { - private static readonly string[] AccessRightsStringArray = AccessRightsHelper.AccessRightsToStringArray(AccessRights.DeviceConnect); + private static readonly string[] s_accessRightsStringArray = AccessRightsHelper.AccessRightsToStringArray(AccessRights.DeviceConnect); private readonly AmqpIoTCbsLink _amqpIoTCbsLink; private readonly IotHubConnectionString _connectionString; private readonly AmqpIoTCbsTokenProvider _amqpIoTCbsTokenProvider; private readonly string _audience; - private CancellationTokenSource _cancellationTokenSource; - private TimeSpan _operationTimeout; + private readonly TimeSpan _operationTimeout; private Task _refreshLoop; private bool _disposed; private CancellationTokenSource _refresherCancellationTokenSource; @@ -50,18 +49,13 @@ public async Task InitLoopAsync(TimeSpan timeout) Logging.Enter(this, timeout, $"{nameof(InitLoopAsync)}"); } - CancellationTokenSource oldTokenSource = _cancellationTokenSource; - _cancellationTokenSource = new CancellationTokenSource(); - CancellationToken newToken = _cancellationTokenSource.Token; - oldTokenSource?.Cancel(); - DateTime refreshOn = await _amqpIoTCbsLink .SendTokenAsync( _amqpIoTCbsTokenProvider, _connectionString.AmqpEndpoint, _audience, _audience, - AccessRightsStringArray, + s_accessRightsStringArray, timeout) .ConfigureAwait(false); @@ -119,12 +113,12 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc while (!cancellationToken.IsCancellationRequested) { if (Logging.IsEnabled) - Logging.Info(this, refreshesOn, $"{_amqpIotCbsTokenProvider} before {nameof(RefreshLoopAsync)}"); + Logging.Info(this, refreshesOn, $"{_amqpIoTCbsTokenProvider} before {nameof(RefreshLoopAsync)}"); if (waitTime > TimeSpan.Zero) { if (Logging.IsEnabled) - Logging.Info(this, refreshesOn, $"{_amqpIotCbsTokenProvider} waiting {waitTime} {nameof(RefreshLoopAsync)}."); + Logging.Info(this, refreshesOn, $"{_amqpIoTCbsTokenProvider} waiting {waitTime} {nameof(RefreshLoopAsync)}."); await Task.Delay(waitTime, cancellationToken).ConfigureAwait(false); } @@ -139,7 +133,7 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc _connectionString.AmqpEndpoint, _audience, _audience, - AccessRightsStringArray, + s_accessRightsStringArray, _operationTimeout) .ConfigureAwait(false); } @@ -149,12 +143,12 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc // then log the exception and continue. // This task runs on an unmonitored thread so there is no point throwing these exceptions. if (Logging.IsEnabled) - Logging.Error(this, refreshesOn, $"{_amqpIotCbsTokenProvider} refresh token failed: {ex}"); + Logging.Error(this, refreshesOn, $"{_amqpIoTCbsTokenProvider} refresh token failed: {ex}"); } finally { if (Logging.IsEnabled) - Logging.Info(this, refreshesOn, $"{_amqpIotCbsTokenProvider} after {nameof(RefreshLoopAsync)}"); + Logging.Info(this, refreshesOn, $"{_amqpIoTCbsTokenProvider} after {nameof(RefreshLoopAsync)}"); } waitTime = refreshesOn - DateTime.UtcNow; @@ -215,7 +209,6 @@ private void Dispose(bool disposing) if (disposing) { _refresherCancellationTokenSource?.Dispose(); - _amqpIotCbsTokenProvider?.Dispose(); } _disposed = true; From ded222e0da10b9a8e62f5c63760af763064c0595 Mon Sep 17 00:00:00 2001 From: Abhipsa Misra Date: Mon, 19 Dec 2022 12:28:03 -0800 Subject: [PATCH 4/5] fix --- iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs | 8 ++++---- iothub/device/src/Transport/Amqp/AmqpUnit.cs | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs index 31f16c8bac..b10e96bc98 100644 --- a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs +++ b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs @@ -95,17 +95,17 @@ private void OnConnectionClosed(object o, EventArgs args) public async Task ShutdownAsync() { if (Logging.IsEnabled) - Logging.Enter(this, _amqpIotConnection, nameof(ShutdownAsync)); + Logging.Enter(this, _amqpIoTConnection, nameof(ShutdownAsync)); if (_amqpAuthenticationRefresher != null) { await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); } - _amqpIotConnection?.SafeClose(); + _amqpIoTConnection?.SafeClose(); if (Logging.IsEnabled) - Logging.Exit(this, _amqpIotConnection, nameof(ShutdownAsync)); + Logging.Exit(this, _amqpIoTConnection, nameof(ShutdownAsync)); } public void Dispose() @@ -238,7 +238,7 @@ public async Task EnsureConnectionAsync(TimeSpan timeout) await amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false); } - amqpIotConnection?.SafeClose(); + amqpIoTConnection?.SafeClose(); throw; } finally diff --git a/iothub/device/src/Transport/Amqp/AmqpUnit.cs b/iothub/device/src/Transport/Amqp/AmqpUnit.cs index 9584e79195..5ee391f9c3 100644 --- a/iothub/device/src/Transport/Amqp/AmqpUnit.cs +++ b/iothub/device/src/Transport/Amqp/AmqpUnit.cs @@ -168,7 +168,7 @@ public async Task CloseAsync(TimeSpan timeout) { try { - await _amqpIotSession.CloseAsync(cancellationToken).ConfigureAwait(false); + await _amqpIoTSession.CloseAsync(timeout).ConfigureAwait(false); if (_amqpAuthenticationRefresher != null) { @@ -195,7 +195,7 @@ private async Task CleanupAsync() if (Logging.IsEnabled) Logging.Enter(this, nameof(CleanupAsync)); - _amqpIotSession?.SafeClose(); + _amqpIoTSession?.SafeClose(); if (_amqpAuthenticationRefresher != null) { From c696e0753a47f628abffedaae622b18660a40d55 Mon Sep 17 00:00:00 2001 From: Abhipsa Misra Date: Mon, 19 Dec 2022 12:48:52 -0800 Subject: [PATCH 5/5] yaml nstall netcoreapp 2.1.x --- vsts/vsts.yaml | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/vsts/vsts.yaml b/vsts/vsts.yaml index 0493cbd463..092663b121 100644 --- a/vsts/vsts.yaml +++ b/vsts/vsts.yaml @@ -485,10 +485,13 @@ jobs: inputs: script: 'choco install dotnet4.5.1' - - task: CmdLine@2 - displayName: 'Install .NET Core 2.1' + - task: UseDotNet@2 + displayName: 'Use .NET Core SDK 2.1' inputs: - script: 'choco install dotnetcore-2.1-sdk' + packageType: sdk + version: 2.1.x + performMultiLevelLookup: true + installationPath: $(Agent.ToolsDirectory)/dotnet - task: PublishTestResults@2 displayName: "Publish Test Results **/*.trx"