Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Amqp link recovery fix #611

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions iothub/device/src/Transport/AmqpTransportHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -252,27 +252,27 @@ public override async Task RecoverConnections(object link, ConnectionType connec

// disconnected link belongs to the current sets
if (((connectionType == ConnectionType.AmqpMethodSending) &&
((link as SendingAmqpLink).Name == methodSendingLinkName)) ||
(this.methodSendingLinkName == null || (link as SendingAmqpLink).Name == methodSendingLinkName)) ||
((connectionType == ConnectionType.AmqpMethodReceiving) &&
((link as ReceivingAmqpLink).Name == methodReceivingLinkName)))
(this.methodReceivingLinkName == null || (link as ReceivingAmqpLink).Name == methodReceivingLinkName)))
{
methodSendingLinkName = null;
methodReceivingLinkName = null;
needMethodRecovery = true;
}

if (((connectionType == ConnectionType.AmqpTwinSending) &&
((link as SendingAmqpLink).Name == twinSendingLinkName)) ||
(this.twinSendingLinkName == null || (link as SendingAmqpLink).Name == this.twinSendingLinkName)) ||
((connectionType == ConnectionType.AmqpTwinReceiving) &&
((link as ReceivingAmqpLink).Name == twinReceivingLinkName)))
(this.twinReceivingLinkName == null || (link as ReceivingAmqpLink).Name == this.twinReceivingLinkName)))
{
twinSendingLinkName = null;
twinReceivingLinkName = null;
needTwinRecovery = true;
}

if (connectionType == ConnectionType.AmqpMessaging &&
(link as ReceivingAmqpLink).Name == eventReceivingLinkName)
(this.eventReceivingLinkName == null || (link as ReceivingAmqpLink).Name == this.eventReceivingLinkName))
{
eventReceivingLinkName = null;
needEventReceivingLinkRecovery = true;
Expand Down
15 changes: 10 additions & 5 deletions iothub/device/src/Transport/ErrorDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ public override Task DisableEventReceiveAsync(CancellationToken cancellationToke

public override Task RecoverConnections(object o, ConnectionType connectionType, CancellationToken cancellationToken)
{
return this.ExecuteWithErrorHandlingAsync(() => base.RecoverConnections(o, connectionType, cancellationToken), true, cancellationToken);
bool reset = connectionType == ConnectionType.MqttConnection;
return this.ExecuteWithErrorHandlingAsync(() => base.RecoverConnections(o, connectionType, cancellationToken), true, cancellationToken, reset);
}

public override Task EnableTwinPatchAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -180,12 +181,12 @@ public override Task SendMethodResponseAsync(MethodResponseInternal methodRespon
return this.ExecuteWithErrorHandlingAsync(() => base.SendMethodResponseAsync(methodResponse, cancellationToken), true, cancellationToken);
}

Task ExecuteWithErrorHandlingAsync(Func<Task> asyncOperation, bool ensureOpen, CancellationToken cancellationToken)
Task ExecuteWithErrorHandlingAsync(Func<Task> asyncOperation, bool ensureOpen, CancellationToken cancellationToken, bool reset = true)
{
return ExecuteWithErrorHandlingAsync(async () => { await asyncOperation().ConfigureAwait(false); return 0; }, ensureOpen, cancellationToken);
return ExecuteWithErrorHandlingAsync(async () => { await asyncOperation().ConfigureAwait(false); return 0; }, ensureOpen, cancellationToken, reset);
}

async Task<T> ExecuteWithErrorHandlingAsync<T>(Func<Task<T>> asyncOperation, bool ensureOpen, CancellationToken cancellationToken)
async Task<T> ExecuteWithErrorHandlingAsync<T>(Func<Task<T>> asyncOperation, bool ensureOpen, CancellationToken cancellationToken, bool reset = true)
{
try
{
Expand Down Expand Up @@ -219,7 +220,11 @@ async Task<T> ExecuteWithErrorHandlingAsync<T>(Func<Task<T>> asyncOperation, boo
throw new IotHubClientTransientException("Transient error occurred, please retry.", ex);
}

await this.Reset(openCompletionBeforeOperationStarted, handlerBeforeOperationStarted).ConfigureAwait(false);
if (reset)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/cc @tameraw @abhipsaMisra

We need to investigate all other cases for this as it appears to be an old design-level bug: InternalClient (Device/Module) has a circular dependency through some of the callbacks.
Instead of having the state machine spread out between Error, Retry and InternalClient callbacks we should push the logic into the correct handler.

{
await this.Reset(openCompletionBeforeOperationStarted, handlerBeforeOperationStarted)
.ConfigureAwait(false);
}

if (Logging.IsEnabled) Logging.Error(this, $"Transient exception caught; IsTransportHandlerStillUsable=false : {ex}");

Expand Down