Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Await completion of refresh loop (#3021)
Browse files Browse the repository at this point in the history
abhipsaMisra committed Dec 19, 2022
1 parent 5fc6353 commit d89970a
Showing 5 changed files with 102 additions and 55 deletions.
36 changes: 25 additions & 11 deletions iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs
Original file line number Diff line number Diff line change
@@ -149,7 +149,7 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc
// then log the exception and continue.
// This task runs on an unmonitored thread so there is no point throwing these exceptions.
if (Logging.IsEnabled)
Logging.Error(this, refreshesOn, $"{_amqpIotCbsTokenProvider} refresh token failed {ex}");
Logging.Error(this, refreshesOn, $"{_amqpIotCbsTokenProvider} refresh token failed: {ex}");
}
finally
{
@@ -162,23 +162,37 @@ private async Task RefreshLoopAsync(DateTime refreshesOn, CancellationToken canc
}
}

public void StopLoop()
public async Task StopLoopAsync()
{
if (Logging.IsEnabled)
Logging.Enter(this, nameof(StopLoop));

try
{
_refresherCancellationTokenSource?.Cancel();
if (Logging.IsEnabled)
Logging.Enter(this, nameof(StopLoopAsync));

try
{
_refresherCancellationTokenSource?.Cancel();
}
catch (ObjectDisposedException)
{
if (Logging.IsEnabled)
Logging.Error(this, "The cancellation token source has already been canceled and disposed", nameof(StopLoopAsync));
}

// Await the completion of _refreshLoop.
// This will ensure that when StopLoopAsync has been exited then no more token refresh attempts are in-progress.
await _refreshLoop.ConfigureAwait(false);
}
catch (Exception ex)
{
if (Logging.IsEnabled)
Logging.Error(this, $"Caught exception when stopping token refresh loop: {ex}");
}
catch (ObjectDisposedException)
finally
{
if (Logging.IsEnabled)
Logging.Error(this, "The cancellation token source has already been canceled and disposed", nameof(StopLoop));
Logging.Exit(this, nameof(StopLoopAsync));
}

if (Logging.IsEnabled)
Logging.Exit(this, nameof(StopLoop));
}

public void Dispose()
27 changes: 16 additions & 11 deletions iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs
Original file line number Diff line number Diff line change
@@ -75,7 +75,7 @@ private void OnConnectionClosed(object o, EventArgs args)

if (_amqpIoTConnection != null && ReferenceEquals(_amqpIoTConnection, o))
{
_amqpAuthenticationRefresher?.StopLoop();
_ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false);
HashSet<AmqpUnit> amqpUnits;
lock (_unitsLock)
{
@@ -92,19 +92,20 @@ private void OnConnectionClosed(object o, EventArgs args)
}
}

public void Shutdown()
public async Task ShutdownAsync()
{
if (Logging.IsEnabled)
Logging.Enter(this, _amqpIotConnection, nameof(ShutdownAsync));

if (_amqpAuthenticationRefresher != null)
{
Logging.Enter(this, _amqpIoTConnection, $"{nameof(Shutdown)}");
await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false);
}

_amqpAuthenticationRefresher?.StopLoop();
_amqpIoTConnection?.SafeClose();
_amqpIotConnection?.SafeClose();

if (Logging.IsEnabled)
{
Logging.Exit(this, _amqpIoTConnection, $"{nameof(Shutdown)}");
}
Logging.Exit(this, _amqpIotConnection, nameof(ShutdownAsync));
}

public void Dispose()
@@ -232,8 +233,12 @@ public async Task<AmqpIoTConnection> EnsureConnectionAsync(TimeSpan timeout)
}
catch (Exception ex) when (!ex.IsFatal())
{
amqpAuthenticationRefresher?.StopLoop();
amqpIoTConnection?.SafeClose();
if (amqpAuthenticationRefresher != null)
{
await amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false);
}

amqpIotConnection?.SafeClose();
throw;
}
finally
@@ -261,7 +266,7 @@ public void RemoveAmqpUnit(AmqpUnit amqpUnit)
if (_amqpUnits.Count == 0)
{
// TODO #887: handle gracefulDisconnect
Shutdown();
_ = ShutdownAsync().ConfigureAwait(false);
}
}
if (Logging.IsEnabled)
90 changes: 59 additions & 31 deletions iothub/device/src/Transport/Amqp/AmqpUnit.cs
Original file line number Diff line number Diff line change
@@ -139,7 +139,7 @@ internal async Task<AmqpIoTSession> EnsureSessionIsOpenAsync(TimeSpan timeout)
}
catch (Exception)
{
Cleanup();
await CleanupAsync().ConfigureAwait(false);
throw;
}
finally
@@ -168,11 +168,16 @@ public async Task CloseAsync(TimeSpan timeout)
{
try
{
await _amqpIoTSession.CloseAsync(timeout).ConfigureAwait(false);
await _amqpIotSession.CloseAsync(cancellationToken).ConfigureAwait(false);

if (_amqpAuthenticationRefresher != null)
{
await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false);
}
}
finally
{
Cleanup();
await CleanupAsync().ConfigureAwait(false);
}
}
}
@@ -185,18 +190,25 @@ public async Task CloseAsync(TimeSpan timeout)
}
}

private void Cleanup()
private async Task CleanupAsync()
{
Logging.Enter(this, nameof(Cleanup));
if (Logging.IsEnabled)
Logging.Enter(this, nameof(CleanupAsync));

_amqpIotSession?.SafeClose();

if (_amqpAuthenticationRefresher != null)
{
await _amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false);
}

_amqpIoTSession?.SafeClose();
_amqpAuthenticationRefresher?.StopLoop();
if (!_deviceIdentity.IsPooling())
if (!_deviceIdentity.IsPooling() && _amqpConnectionHolder != null)
{
_amqpConnectionHolder?.Shutdown();
await _amqpConnectionHolder.ShutdownAsync().ConfigureAwait(false);
}

Logging.Exit(this, nameof(Cleanup));
if (Logging.IsEnabled)
Logging.Exit(this, nameof(CleanupAsync));
}

#endregion Open-Close
@@ -804,7 +816,7 @@ public void OnConnectionDisconnected()
{
Logging.Enter(this, nameof(OnConnectionDisconnected));

_amqpAuthenticationRefresher?.StopLoop();
_ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false);
_onUnitDisconnected();

Logging.Exit(this, nameof(OnConnectionDisconnected));
@@ -816,7 +828,9 @@ private void OnSessionDisconnected(object o, EventArgs args)

if (ReferenceEquals(o, _amqpIoTSession))
{
_amqpAuthenticationRefresher?.StopLoop();
_ = _amqpAuthenticationRefresher?.StopLoopAsync().ConfigureAwait(false);

// calls TransportHandler.OnTransportDisconnected() which sets the transport layer up to retry
_onUnitDisconnected();
}
Logging.Exit(this, o, nameof(OnSessionDisconnected));
@@ -834,31 +848,45 @@ public void Dispose()

private void Dispose(bool disposing)
{
if (_disposed)
try
{
return;
}
if (Logging.IsEnabled)
{
Logging.Enter(this, $"Device pooling={_deviceIdentity?.IsPooling()}; disposed={_disposed}; disposing={disposing}", $"{nameof(AmqpUnit)}.{nameof(Dispose)}");
}

if (!_disposed)
{
if (disposing)
{
if (!_deviceIdentity.IsPooling())
{
_amqpConnectionHolder?.Dispose();
}

_disposed = true;
// For device sas authenticated clients the authentication refresher is associated with the AMQP unit itself,
// so it needs to be explicitly disposed.
_amqpAuthenticationRefresher?.Dispose();

if (disposing)
{
Logging.Enter(this, disposing, nameof(Dispose));
_sessionSemaphore?.Dispose();
_messageReceivingLinkSemaphore?.Dispose();
_messageReceivingCallbackSemaphore?.Dispose();
_eventReceivingLinkSemaphore?.Dispose();
_methodLinkSemaphore?.Dispose();
_twinLinksSemaphore?.Dispose();

Cleanup();
if (!_deviceIdentity.IsPooling())
{
_amqpConnectionHolder?.Dispose();
Logging.Exit(this, disposing, nameof(Dispose));
}
}

_sessionSemaphore?.Dispose();
_messageReceivingLinkSemaphore?.Dispose();
_messageReceivingCallbackSemaphore?.Dispose();
_eventReceivingLinkSemaphore?.Dispose();
_methodLinkSemaphore?.Dispose();
_twinLinksSemaphore?.Dispose();

Logging.Exit(this, disposing, nameof(Dispose));
_disposed = true;
}
finally
{
if (Logging.IsEnabled)
{
Logging.Exit(this, $"Device pooling={_deviceIdentity?.IsPooling()}; disposed={_disposed}; disposing={disposing}", $"{nameof(AmqpUnit)}.{nameof(Dispose)}");
}
}
}

Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ namespace Microsoft.Azure.Devices.Client.Transport.Amqp
internal interface IAmqpAuthenticationRefresher : IDisposable
{
Task InitLoopAsync(TimeSpan timeout);
void StopLoop();
void StartLoop(DateTime refreshOn, CancellationToken cancellationToken);
Task StopLoopAsync();
}
}
2 changes: 1 addition & 1 deletion iothub/device/src/Transport/Amqp/IAmqpConnectionHolder.cs
Original file line number Diff line number Diff line change
@@ -12,6 +12,6 @@ internal interface IAmqpConnectionHolder : IDisposable
Task<AmqpIoTSession> OpenSessionAsync(DeviceIdentity deviceIdentity, TimeSpan timeout);
Task<AmqpIoTConnection> EnsureConnectionAsync(TimeSpan timeout);
Task<IAmqpAuthenticationRefresher> CreateRefresherAsync(DeviceIdentity deviceIdentity, TimeSpan timeout);
void Shutdown();
Task ShutdownAsync();
}
}

0 comments on commit d89970a

Please sign in to comment.