-
Notifications
You must be signed in to change notification settings - Fork 492
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Utilize retry policy when retrying to send SAS token in CBS auth in AMQP #3074
Changes from 12 commits
6e6c1c5
6bb4d8c
89acbe0
7cd6b08
5d57df8
05ab518
21a63f7
fa3ef04
c68fec4
9144246
265c0dc
863994b
5ebe6f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -62,9 +62,10 @@ internal IotHubBaseClient( | |
DesiredPropertyUpdateCallback = OnDesiredStatePatchReceived, | ||
ConnectionStatusChangeHandler = OnConnectionStatusChanged, | ||
MessageEventCallback = OnMessageReceivedAsync, | ||
RetryPolicy = _clientOptions.RetryPolicy ?? new IotHubClientNoRetry(), | ||
}; | ||
|
||
InnerHandler = pipelineBuilder.Build(PipelineContext, _clientOptions.RetryPolicy); | ||
InnerHandler = pipelineBuilder.Build(PipelineContext); | ||
|
||
if (Logging.IsEnabled) | ||
Logging.Exit(this, _clientOptions.TransportSettings, nameof(IotHubBaseClient) + "_ctor"); | ||
|
@@ -116,6 +117,10 @@ public async Task OpenAsync(CancellationToken cancellationToken = default) | |
cancellationToken.ThrowIfCancellationRequested(); | ||
|
||
await InnerHandler.OpenAsync(cancellationToken).ConfigureAwait(false); | ||
if (_clientOptions.TransportSettings is IotHubClientAmqpSettings) | ||
{ | ||
InnerHandler.SetSasTokenRefreshesOn(); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sets SAS token refreshes on only for AMQP |
||
} | ||
|
||
/// <summary> | ||
|
@@ -403,6 +408,11 @@ public async Task CloseAsync(CancellationToken cancellationToken = default) | |
cancellationToken.ThrowIfCancellationRequested(); | ||
|
||
await InnerHandler.CloseAsync(cancellationToken).ConfigureAwait(false); | ||
|
||
if (_clientOptions.TransportSettings is IotHubClientAmqpSettings) | ||
{ | ||
await InnerHandler.StopSasTokenLoopAsync().ConfigureAwait(false); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stop SAS token refresh loop only for AMQP |
||
} | ||
} | ||
|
||
/// <inheritdoc/> | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -29,16 +29,19 @@ internal class RetryDelegatingHandler : DefaultDelegatingHandler | |||||||||||||||||||
|
||||||||||||||||||||
private readonly Action<ConnectionStatusInfo> _onConnectionStatusChanged; | ||||||||||||||||||||
|
||||||||||||||||||||
private CancellationTokenSource _loopCancellationTokenSource; | ||||||||||||||||||||
private Task _refreshLoop; | ||||||||||||||||||||
|
||||||||||||||||||||
internal RetryDelegatingHandler(PipelineContext context, IDelegatingHandler innerHandler) | ||||||||||||||||||||
: base(context, innerHandler) | ||||||||||||||||||||
{ | ||||||||||||||||||||
_retryPolicy = new IotHubClientExponentialBackoffRetryPolicy(RetryMaxCount, TimeSpan.FromMinutes(2)); | ||||||||||||||||||||
_retryPolicy = context.RetryPolicy; | ||||||||||||||||||||
_internalRetryHandler = new RetryHandler(_retryPolicy); | ||||||||||||||||||||
|
||||||||||||||||||||
_onConnectionStatusChanged = context.ConnectionStatusChangeHandler; | ||||||||||||||||||||
|
||||||||||||||||||||
if (Logging.IsEnabled) | ||||||||||||||||||||
Logging.Associate(this, _internalRetryHandler, nameof(SetRetryPolicy)); | ||||||||||||||||||||
Logging.Associate(this, _internalRetryHandler, nameof(RetryDelegatingHandler)); | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
internal virtual void SetRetryPolicy(IIotHubClientRetryPolicy retryPolicy) | ||||||||||||||||||||
|
@@ -470,6 +473,149 @@ public override async Task CloseAsync(CancellationToken cancellationToken) | |||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
public override async Task<DateTime> RefreshSasTokenAsync(CancellationToken cancellationToken) | ||||||||||||||||||||
{ | ||||||||||||||||||||
if (Logging.IsEnabled) | ||||||||||||||||||||
Logging.Enter(this, cancellationToken, nameof(RefreshSasTokenAsync)); | ||||||||||||||||||||
|
||||||||||||||||||||
try | ||||||||||||||||||||
{ | ||||||||||||||||||||
return await _internalRetryHandler | ||||||||||||||||||||
.RunWithRetryAsync( | ||||||||||||||||||||
async () => | ||||||||||||||||||||
{ | ||||||||||||||||||||
await VerifyIsOpenAsync(cancellationToken).ConfigureAwait(false); | ||||||||||||||||||||
return await base.RefreshSasTokenAsync(cancellationToken).ConfigureAwait(false); | ||||||||||||||||||||
}, | ||||||||||||||||||||
cancellationToken) | ||||||||||||||||||||
.ConfigureAwait(false); | ||||||||||||||||||||
} | ||||||||||||||||||||
finally | ||||||||||||||||||||
{ | ||||||||||||||||||||
if (Logging.IsEnabled) | ||||||||||||||||||||
Logging.Exit(this, cancellationToken, nameof(RefreshSasTokenAsync)); | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
public override void SetSasTokenRefreshesOn() | ||||||||||||||||||||
{ | ||||||||||||||||||||
if (Logging.IsEnabled) | ||||||||||||||||||||
Logging.Enter(this, nameof(SetSasTokenRefreshesOn)); | ||||||||||||||||||||
|
||||||||||||||||||||
if (_refreshLoop == null) | ||||||||||||||||||||
{ | ||||||||||||||||||||
if (_loopCancellationTokenSource != null) | ||||||||||||||||||||
{ | ||||||||||||||||||||
if (Logging.IsEnabled) | ||||||||||||||||||||
Logging.Info(this, "_loopCancellationTokenSource was already initialized, which was unexpected. Canceling and disposing the previous instance.", nameof(SetSasTokenRefreshesOn)); | ||||||||||||||||||||
|
||||||||||||||||||||
try | ||||||||||||||||||||
{ | ||||||||||||||||||||
_loopCancellationTokenSource.Cancel(); | ||||||||||||||||||||
} | ||||||||||||||||||||
catch (ObjectDisposedException) | ||||||||||||||||||||
{ | ||||||||||||||||||||
} | ||||||||||||||||||||
_loopCancellationTokenSource.Dispose(); | ||||||||||||||||||||
} | ||||||||||||||||||||
_loopCancellationTokenSource = new CancellationTokenSource(); | ||||||||||||||||||||
|
||||||||||||||||||||
DateTime refreshesOn = GetSasTokenRefreshesOn(); | ||||||||||||||||||||
if (refreshesOn < DateTime.MaxValue) | ||||||||||||||||||||
{ | ||||||||||||||||||||
StartSasTokenLoop(refreshesOn, _loopCancellationTokenSource.Token); | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
if (Logging.IsEnabled) | ||||||||||||||||||||
Logging.Exit(this, nameof(SetSasTokenRefreshesOn)); | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
public override async Task StopSasTokenLoopAsync() | ||||||||||||||||||||
{ | ||||||||||||||||||||
try | ||||||||||||||||||||
{ | ||||||||||||||||||||
if (Logging.IsEnabled) | ||||||||||||||||||||
Logging.Enter(this, nameof(StopSasTokenLoopAsync)); | ||||||||||||||||||||
|
||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||
try | ||||||||||||||||||||
{ | ||||||||||||||||||||
_loopCancellationTokenSource?.Cancel(); | ||||||||||||||||||||
} | ||||||||||||||||||||
catch (ObjectDisposedException) | ||||||||||||||||||||
{ | ||||||||||||||||||||
if (Logging.IsEnabled) | ||||||||||||||||||||
Logging.Error(this, "The cancellation token source has already been canceled and disposed", nameof(StopSasTokenLoopAsync)); | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
// Await the completion of _refreshLoop. | ||||||||||||||||||||
// This will ensure that when StopLoopAsync has been exited then no more token refresh attempts are in-progress. | ||||||||||||||||||||
await _refreshLoop.ConfigureAwait(false); | ||||||||||||||||||||
} | ||||||||||||||||||||
catch (Exception ex) | ||||||||||||||||||||
{ | ||||||||||||||||||||
if (Logging.IsEnabled) | ||||||||||||||||||||
Logging.Error(this, $"Caught exception when stopping token refresh loop: {ex}"); | ||||||||||||||||||||
schoims marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||
} | ||||||||||||||||||||
finally | ||||||||||||||||||||
{ | ||||||||||||||||||||
if (Logging.IsEnabled) | ||||||||||||||||||||
Logging.Exit(this, nameof(StopSasTokenLoopAsync)); | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
private void StartSasTokenLoop(DateTime refreshesOn, CancellationToken cancellationToken) | ||||||||||||||||||||
{ | ||||||||||||||||||||
if (Logging.IsEnabled) | ||||||||||||||||||||
Logging.Enter(this, refreshesOn, nameof(StartSasTokenLoop)); | ||||||||||||||||||||
|
||||||||||||||||||||
// This task runs in the background and is unmonitored. | ||||||||||||||||||||
// When this refresher is disposed it signals this task to be cancelled. | ||||||||||||||||||||
_refreshLoop = RefreshSasTokenLoopAsync(refreshesOn, cancellationToken); | ||||||||||||||||||||
|
||||||||||||||||||||
if (Logging.IsEnabled) | ||||||||||||||||||||
Logging.Exit(this, refreshesOn, nameof(StartSasTokenLoop)); | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
private async Task RefreshSasTokenLoopAsync(DateTime refreshesOn, CancellationToken cancellationToken) | ||||||||||||||||||||
{ | ||||||||||||||||||||
try | ||||||||||||||||||||
{ | ||||||||||||||||||||
if (Logging.IsEnabled) | ||||||||||||||||||||
Logging.Enter(this, refreshesOn, nameof(RefreshSasTokenLoopAsync)); | ||||||||||||||||||||
|
||||||||||||||||||||
schoims marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||
TimeSpan waitTime = refreshesOn - DateTime.UtcNow; | ||||||||||||||||||||
|
||||||||||||||||||||
while (!cancellationToken.IsCancellationRequested) | ||||||||||||||||||||
{ | ||||||||||||||||||||
if (Logging.IsEnabled) | ||||||||||||||||||||
Logging.Info(this, refreshesOn, $"Before {nameof(RefreshSasTokenLoopAsync)} with wait time {waitTime}."); | ||||||||||||||||||||
|
||||||||||||||||||||
if (waitTime > TimeSpan.Zero) | ||||||||||||||||||||
{ | ||||||||||||||||||||
if (Logging.IsEnabled) | ||||||||||||||||||||
Logging.Info(this, refreshesOn, $"Token refreshes after {waitTime} {nameof(RefreshSasTokenLoopAsync)}."); | ||||||||||||||||||||
|
||||||||||||||||||||
await Task.Delay(waitTime, cancellationToken).ConfigureAwait(false); | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
refreshesOn = await RefreshSasTokenAsync(cancellationToken).ConfigureAwait(false); | ||||||||||||||||||||
|
||||||||||||||||||||
if (Logging.IsEnabled) | ||||||||||||||||||||
Logging.Info(this, refreshesOn, "Token has been refreshed."); | ||||||||||||||||||||
|
||||||||||||||||||||
waitTime = refreshesOn - DateTime.UtcNow; | ||||||||||||||||||||
schoims marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
// OperationCanceledException can be thrown when the connection is closing or the cancellationToken is signaled | ||||||||||||||||||||
catch (OperationCanceledException) { return; } | ||||||||||||||||||||
finally | ||||||||||||||||||||
{ | ||||||||||||||||||||
if (Logging.IsEnabled) | ||||||||||||||||||||
Logging.Exit(this, refreshesOn, nameof(RefreshSasTokenLoopAsync)); | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
private async Task VerifyIsOpenAsync(CancellationToken cancellationToken) | ||||||||||||||||||||
{ | ||||||||||||||||||||
await _handlerSemaphore.WaitAsync(cancellationToken); | ||||||||||||||||||||
|
@@ -702,6 +848,7 @@ protected private override void Dispose(bool disposing) | |||||||||||||||||||
{ | ||||||||||||||||||||
_handleDisconnectCts?.Cancel(); | ||||||||||||||||||||
_handleDisconnectCts?.Dispose(); | ||||||||||||||||||||
_loopCancellationTokenSource?.Dispose(); | ||||||||||||||||||||
if (_handlerSemaphore != null && _handlerSemaphore.CurrentCount == 0) | ||||||||||||||||||||
{ | ||||||||||||||||||||
_handlerSemaphore.Release(); | ||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set RetryPolicy with the default exponential backoff policy unless user set it to null which is IotHubClientNoRetry()