Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Utilize retry policy when retrying to send SAS token in CBS auth in AMQP #3074

Merged
merged 13 commits into from
Jan 24, 2023
12 changes: 11 additions & 1 deletion iothub/device/src/IotHubBaseClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ internal IotHubBaseClient(
DesiredPropertyUpdateCallback = OnDesiredStatePatchReceived,
ConnectionStatusChangeHandler = OnConnectionStatusChanged,
MessageEventCallback = OnMessageReceivedAsync,
RetryPolicy = _clientOptions.RetryPolicy ?? new IotHubClientNoRetry(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set RetryPolicy with the default exponential backoff policy unless user set it to null which is IotHubClientNoRetry()

};

InnerHandler = pipelineBuilder.Build(PipelineContext, _clientOptions.RetryPolicy);
InnerHandler = pipelineBuilder.Build(PipelineContext);

if (Logging.IsEnabled)
Logging.Exit(this, _clientOptions.TransportSettings, nameof(IotHubBaseClient) + "_ctor");
Expand Down Expand Up @@ -116,6 +117,10 @@ public async Task OpenAsync(CancellationToken cancellationToken = default)
cancellationToken.ThrowIfCancellationRequested();

await InnerHandler.OpenAsync(cancellationToken).ConfigureAwait(false);
if (_clientOptions.TransportSettings is IotHubClientAmqpSettings)
{
InnerHandler.SetSasTokenRefreshesOn();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sets SAS token refreshes on only for AMQP

}

/// <summary>
Expand Down Expand Up @@ -403,6 +408,11 @@ public async Task CloseAsync(CancellationToken cancellationToken = default)
cancellationToken.ThrowIfCancellationRequested();

await InnerHandler.CloseAsync(cancellationToken).ConfigureAwait(false);

if (_clientOptions.TransportSettings is IotHubClientAmqpSettings)
{
await InnerHandler.StopSasTokenLoopAsync().ConfigureAwait(false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stop SAS token refresh loop only for AMQP

}
}

/// <inheritdoc/>
Expand Down
6 changes: 1 addition & 5 deletions iothub/device/src/Pipeline/ClientPipelineBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public ClientPipelineBuilder With(ContinuationFactory<IDelegatingHandler> handle
return this;
}

public IDelegatingHandler Build(PipelineContext context, IIotHubClientRetryPolicy retryPolicy)
public IDelegatingHandler Build(PipelineContext context)
{
if (_pipeline.Count == 0)
{
Expand All @@ -34,10 +34,6 @@ public IDelegatingHandler Build(PipelineContext context, IIotHubClientRetryPolic
{
ContinuationFactory<IDelegatingHandler> currentFactory = _pipeline[i];
currentHandler = currentFactory(context, nextHandler);
if (currentHandler is RetryDelegatingHandler retryHandler)
{
retryHandler.SetRetryPolicy(retryPolicy ?? new IotHubClientNoRetry());
}
currentHandler.ContinuationFactory = nextFactory;

nextHandler = currentHandler;
Expand Down
23 changes: 23 additions & 0 deletions iothub/device/src/Pipeline/DefaultDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,29 @@ public virtual Task<long> UpdateReportedPropertiesAsync(ReportedProperties repor
return NextHandler?.UpdateReportedPropertiesAsync(reportedProperties, cancellationToken) ?? Task.FromResult(0L);
}

public virtual Task<DateTime> RefreshSasTokenAsync(CancellationToken cancellationToken)
{
ThrowIfDisposed();
return NextHandler?.RefreshSasTokenAsync(cancellationToken) ?? Task.FromResult(DateTime.UtcNow);
}

public virtual DateTime GetSasTokenRefreshesOn()
{
ThrowIfDisposed();
return NextHandler?.GetSasTokenRefreshesOn() ?? DateTime.UtcNow;
}

public virtual void SetSasTokenRefreshesOn()
{
ThrowIfDisposed();
}

public virtual Task StopSasTokenLoopAsync()
{
ThrowIfDisposed();
return NextHandler?.StopSasTokenLoopAsync() ?? Task.CompletedTask;
}

public virtual bool IsUsable => NextHandler?.IsUsable ?? true;

public virtual void Dispose()
Expand Down
5 changes: 5 additions & 0 deletions iothub/device/src/Pipeline/ErrorDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public override Task<TwinProperties> GetTwinAsync(CancellationToken cancellation
return ExecuteWithErrorHandlingAsync(() => base.GetTwinAsync(cancellationToken));
}

public override Task<DateTime> RefreshSasTokenAsync(CancellationToken cancellationToken)
{
return ExecuteWithErrorHandlingAsync(() => base.RefreshSasTokenAsync(cancellationToken));
}

public override Task<long> UpdateReportedPropertiesAsync(ReportedProperties reportedProperties, CancellationToken cancellationToken)
{
return ExecuteWithErrorHandlingAsync(() => base.UpdateReportedPropertiesAsync(reportedProperties, cancellationToken));
Expand Down
9 changes: 9 additions & 0 deletions iothub/device/src/Pipeline/IDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ internal interface IDelegatingHandler : IContinuationProvider<IDelegatingHandler

Task SendMethodResponseAsync(DirectMethodResponse methodResponse, CancellationToken cancellationToken);

Task<DateTime> RefreshSasTokenAsync(CancellationToken cancellationToken);

DateTime GetSasTokenRefreshesOn();

void SetSasTokenRefreshesOn();

Task StopSasTokenLoopAsync();

// Twin.
Task<TwinProperties> GetTwinAsync(CancellationToken cancellationToken);

Expand All @@ -43,5 +51,6 @@ internal interface IDelegatingHandler : IContinuationProvider<IDelegatingHandler
Task EnableTwinPatchAsync(CancellationToken cancellationToken);

Task DisableTwinPatchAsync(CancellationToken cancellationToken);

}
}
2 changes: 2 additions & 0 deletions iothub/device/src/Pipeline/PipelineContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,7 @@ internal class PipelineContext
internal Func<DirectMethodRequest, Task> MethodCallback { get; set; }

internal Func<IncomingMessage, Task<MessageAcknowledgement>> MessageEventCallback { get; set; }

internal IIotHubClientRetryPolicy RetryPolicy { get; set; }
}
}
151 changes: 149 additions & 2 deletions iothub/device/src/Pipeline/RetryDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,19 @@ internal class RetryDelegatingHandler : DefaultDelegatingHandler

private readonly Action<ConnectionStatusInfo> _onConnectionStatusChanged;

private CancellationTokenSource _loopCancellationTokenSource;
private Task _refreshLoop;

internal RetryDelegatingHandler(PipelineContext context, IDelegatingHandler innerHandler)
: base(context, innerHandler)
{
_retryPolicy = new IotHubClientExponentialBackoffRetryPolicy(RetryMaxCount, TimeSpan.FromMinutes(2));
_retryPolicy = context.RetryPolicy;
_internalRetryHandler = new RetryHandler(_retryPolicy);

_onConnectionStatusChanged = context.ConnectionStatusChangeHandler;

if (Logging.IsEnabled)
Logging.Associate(this, _internalRetryHandler, nameof(SetRetryPolicy));
Logging.Associate(this, _internalRetryHandler, nameof(RetryDelegatingHandler));
}

internal virtual void SetRetryPolicy(IIotHubClientRetryPolicy retryPolicy)
Expand Down Expand Up @@ -470,6 +473,149 @@ public override async Task CloseAsync(CancellationToken cancellationToken)
}
}

public override async Task<DateTime> RefreshSasTokenAsync(CancellationToken cancellationToken)
{
if (Logging.IsEnabled)
Logging.Enter(this, cancellationToken, nameof(RefreshSasTokenAsync));

try
{
return await _internalRetryHandler
.RunWithRetryAsync(
async () =>
{
await VerifyIsOpenAsync(cancellationToken).ConfigureAwait(false);
return await base.RefreshSasTokenAsync(cancellationToken).ConfigureAwait(false);
},
cancellationToken)
.ConfigureAwait(false);
}
finally
{
if (Logging.IsEnabled)
Logging.Exit(this, cancellationToken, nameof(RefreshSasTokenAsync));
}
}

public override void SetSasTokenRefreshesOn()
{
if (Logging.IsEnabled)
Logging.Enter(this, nameof(SetSasTokenRefreshesOn));

if (_refreshLoop == null)
{
if (_loopCancellationTokenSource != null)
{
if (Logging.IsEnabled)
Logging.Info(this, "_loopCancellationTokenSource was already initialized, which was unexpected. Canceling and disposing the previous instance.", nameof(SetSasTokenRefreshesOn));

try
{
_loopCancellationTokenSource.Cancel();
}
catch (ObjectDisposedException)
{
}
_loopCancellationTokenSource.Dispose();
}
_loopCancellationTokenSource = new CancellationTokenSource();

DateTime refreshesOn = GetSasTokenRefreshesOn();
if (refreshesOn < DateTime.MaxValue)
{
StartSasTokenLoop(refreshesOn, _loopCancellationTokenSource.Token);
}
}

if (Logging.IsEnabled)
Logging.Exit(this, nameof(SetSasTokenRefreshesOn));
}

public override async Task StopSasTokenLoopAsync()
{
try
{
if (Logging.IsEnabled)
Logging.Enter(this, nameof(StopSasTokenLoopAsync));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
try
{
if (Logging.IsEnabled)
Logging.Enter(this, nameof(StopSasTokenLoopAsync));
if (Logging.IsEnabled)
Logging.Enter(this, nameof(StopSasTokenLoopAsync));
try
{

try
{
_loopCancellationTokenSource?.Cancel();
}
catch (ObjectDisposedException)
{
if (Logging.IsEnabled)
Logging.Error(this, "The cancellation token source has already been canceled and disposed", nameof(StopSasTokenLoopAsync));
}

// Await the completion of _refreshLoop.
// This will ensure that when StopLoopAsync has been exited then no more token refresh attempts are in-progress.
await _refreshLoop.ConfigureAwait(false);
}
catch (Exception ex)
{
if (Logging.IsEnabled)
Logging.Error(this, $"Caught exception when stopping token refresh loop: {ex}");
schoims marked this conversation as resolved.
Show resolved Hide resolved
}
finally
{
if (Logging.IsEnabled)
Logging.Exit(this, nameof(StopSasTokenLoopAsync));
}
}

private void StartSasTokenLoop(DateTime refreshesOn, CancellationToken cancellationToken)
{
if (Logging.IsEnabled)
Logging.Enter(this, refreshesOn, nameof(StartSasTokenLoop));

// This task runs in the background and is unmonitored.
// When this refresher is disposed it signals this task to be cancelled.
_refreshLoop = RefreshSasTokenLoopAsync(refreshesOn, cancellationToken);

if (Logging.IsEnabled)
Logging.Exit(this, refreshesOn, nameof(StartSasTokenLoop));
}

private async Task RefreshSasTokenLoopAsync(DateTime refreshesOn, CancellationToken cancellationToken)
{
try
{
if (Logging.IsEnabled)
Logging.Enter(this, refreshesOn, nameof(RefreshSasTokenLoopAsync));

schoims marked this conversation as resolved.
Show resolved Hide resolved
TimeSpan waitTime = refreshesOn - DateTime.UtcNow;

while (!cancellationToken.IsCancellationRequested)
{
if (Logging.IsEnabled)
Logging.Info(this, refreshesOn, $"Before {nameof(RefreshSasTokenLoopAsync)} with wait time {waitTime}.");

if (waitTime > TimeSpan.Zero)
{
if (Logging.IsEnabled)
Logging.Info(this, refreshesOn, $"Token refreshes after {waitTime} {nameof(RefreshSasTokenLoopAsync)}.");

await Task.Delay(waitTime, cancellationToken).ConfigureAwait(false);
}

refreshesOn = await RefreshSasTokenAsync(cancellationToken).ConfigureAwait(false);

if (Logging.IsEnabled)
Logging.Info(this, refreshesOn, "Token has been refreshed.");

waitTime = refreshesOn - DateTime.UtcNow;
schoims marked this conversation as resolved.
Show resolved Hide resolved
}
}
// OperationCanceledException can be thrown when the connection is closing or the cancellationToken is signaled
catch (OperationCanceledException) { return; }
finally
{
if (Logging.IsEnabled)
Logging.Exit(this, refreshesOn, nameof(RefreshSasTokenLoopAsync));
}
}

private async Task VerifyIsOpenAsync(CancellationToken cancellationToken)
{
await _handlerSemaphore.WaitAsync(cancellationToken);
Expand Down Expand Up @@ -702,6 +848,7 @@ protected private override void Dispose(bool disposing)
{
_handleDisconnectCts?.Cancel();
_handleDisconnectCts?.Dispose();
_loopCancellationTokenSource?.Dispose();
if (_handlerSemaphore != null && _handlerSemaphore.CurrentCount == 0)
{
_handlerSemaphore.Release();
Expand Down
Loading