Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel twin ops on disconnect #3287

Merged
merged 29 commits into from
Apr 20, 2023
Merged
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5ebb0fe
Cancel pending twin operations on disconnect
Apr 19, 2023
d831d48
Add/update logging
Apr 19, 2023
9ea0ec6
Reduce concurrent dictionaries to one per protocol
Apr 19, 2023
8fce7e6
Fix twin patch response
Apr 19, 2023
19a8e0a
Unify twin update and logging in long haul
Apr 19, 2023
9dd4909
Improve error handling in long haul
Apr 19, 2023
2eee027
Fix old task cleanup
Apr 20, 2023
9b86af4
Add retry to long haul init
Apr 20, 2023
a6d0f39
merge
Apr 20, 2023
0a6014b
Rearrange methods
Apr 20, 2023
287a8fa
Fix method name
Apr 20, 2023
1593b03
PR feedback from Bryce
Apr 20, 2023
eb8b7b3
Cancel pending twin operations on disconnect
Apr 19, 2023
99b9b7d
Add/update logging
Apr 19, 2023
586e53d
Reduce concurrent dictionaries to one per protocol
Apr 19, 2023
b8d858f
Fix twin patch response
Apr 19, 2023
4f68177
Unify twin update and logging in long haul
Apr 19, 2023
348e82b
Improve error handling in long haul
Apr 19, 2023
ed93221
Fix old task cleanup
Apr 20, 2023
3b2654e
Add retry to long haul init
Apr 20, 2023
c74dc83
merge
Apr 20, 2023
365968c
Rearrange methods
Apr 20, 2023
0eabbb1
Fix method name
Apr 20, 2023
fcd79d3
PR feedback from Bryce
Apr 20, 2023
af27f20
PR feedback
Apr 20, 2023
0fb999d
adding cancellation toke for InitializeAsync in module client
tmahmood-microsoft Apr 20, 2023
d418081
Merge branch 'drwill/cancel-twinops-on-disconnect' of https://github.…
tmahmood-microsoft Apr 20, 2023
f45d3fc
Fix MQTT twin response
Apr 20, 2023
04fa965
Add unit tests for payload conventions
Apr 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Reduce concurrent dictionaries to one per protocol
  • Loading branch information
David R. Williamson committed Apr 20, 2023
commit 586e53d633d1736dbc24f4baaa79ddc2bf8f4eab
29 changes: 12 additions & 17 deletions iothub/device/src/Pipeline/AmqpTransportHandler.cs
Original file line number Diff line number Diff line change
@@ -22,8 +22,7 @@ internal class AmqpTransportHandler : TransportHandlerBase
protected AmqpUnit _amqpUnit;
private readonly Action<DesiredProperties> _onDesiredStatePatchListener;
private readonly object _lock = new();
private readonly ConcurrentDictionary<string, TaskCompletionSource<AmqpMessage>> _twinResponseCompletions = new();
private readonly ConcurrentDictionary<string, DateTimeOffset> _twinResponseTimeouts = new();
private readonly ConcurrentDictionary<string, PendingAmqpTwinOperation> _pendingTwinOperations = new();
drwill-ms marked this conversation as resolved.
Show resolved Hide resolved

// Timer to check if any expired messages exist. The timer is executed after each hour of execution.
private readonly Timer _twinTimeoutTimer;
@@ -32,7 +31,7 @@ internal class AmqpTransportHandler : TransportHandlerBase

internal IotHubConnectionCredentials _connectionCredentials;

private static readonly TimeSpan s_twinResponseTimeout = TimeSpan.FromMinutes(60);
private static readonly TimeSpan s_twinResponseTimeout = TimeSpan.FromMinutes(2);
drwill-ms marked this conversation as resolved.
Show resolved Hide resolved
private bool _closed;

static AmqpTransportHandler()
@@ -145,7 +144,7 @@ public override async Task OpenAsync(CancellationToken cancellationToken)
{
await _amqpUnit.OpenAsync(cancellationToken).ConfigureAwait(false);

// The timer would invoke callback in an hour and every hour thereafter.
// The timer would invoke callback in the specified time and that duration thereafter.
_twinTimeoutTimer.Change(s_twinResponseTimeout, s_twinResponseTimeout);
}
finally
@@ -449,8 +448,7 @@ private async Task<AmqpMessage> RoundTripTwinMessageAsync(
{
cancellationToken.ThrowIfCancellationRequested();
var taskCompletionSource = new TaskCompletionSource<AmqpMessage>(TaskCreationOptions.RunContinuationsAsynchronously);
_twinResponseCompletions[correlationId] = taskCompletionSource;
_twinResponseTimeouts[correlationId] = DateTimeOffset.UtcNow;
_pendingTwinOperations[correlationId] = new PendingAmqpTwinOperation(taskCompletionSource);

await _amqpUnit.SendTwinMessageAsync(amqpTwinMessageType, correlationId, reportedProperties, cancellationToken).ConfigureAwait(false);

@@ -467,8 +465,7 @@ private async Task<AmqpMessage> RoundTripTwinMessageAsync(
}
finally
{
_twinResponseCompletions.TryRemove(correlationId, out _);
_twinResponseTimeouts.TryRemove(correlationId, out _);
_pendingTwinOperations.TryRemove(correlationId, out _);

if (Logging.IsEnabled)
Logging.Exit(this, cancellationToken, nameof(RoundTripTwinMessageAsync));
@@ -499,15 +496,15 @@ private void TwinMessageListener(AmqpMessage responseFromService, string correla
Logging.Info(this, $"Received a response for operation with correlation Id {correlationId}.", nameof(TwinMessageListener));

// For Get and Patch, complete the task.
if (_twinResponseCompletions.TryRemove(correlationId, out TaskCompletionSource<AmqpMessage> task))
if (_pendingTwinOperations.TryRemove(correlationId, out PendingAmqpTwinOperation pendingOperation))
{
if (ex == default)
{
task.TrySetResult(responseFromService);
pendingOperation.CompletionTask.TrySetResult(responseFromService);
}
else
{
task.TrySetException(ex);
pendingOperation.CompletionTask.TrySetException(ex);
}
}
else
@@ -536,19 +533,17 @@ private void RemoveOldOperations(object state)
if (Logging.IsEnabled)
Logging.Info(this, $"Removing operations older than {maxAge}", nameof(RemoveOldOperations));

_ = _twinResponseTimeouts
.Where(x => DateTimeOffset.UtcNow - x.Value > s_twinResponseTimeout)
_ = _pendingTwinOperations
.Where(x => DateTimeOffset.UtcNow - x.Value.RequestSentOnUtc > s_twinResponseTimeout)
.Select(x =>
{
if (_twinResponseCompletions.TryRemove(x.Key, out TaskCompletionSource<AmqpMessage> twinResponse))
if (_pendingTwinOperations.TryRemove(x.Key, out PendingAmqpTwinOperation pendingOperation))
{
if (Logging.IsEnabled)
Logging.Info(this, $"Removing twin response for {x.Key}", nameof(RemoveOldOperations));

twinResponse.TrySetException(new IotHubClientException("Did not receive twin response from service.", IotHubClientErrorCode.NetworkErrors));
pendingOperation.CompletionTask.TrySetException(new IotHubClientException("Did not receive twin response from service.", IotHubClientErrorCode.NetworkErrors));
}

_twinResponseTimeouts.TryRemove(x.Key, out DateTimeOffset _);
return true;
});
}
77 changes: 29 additions & 48 deletions iothub/device/src/Pipeline/MqttTransportHandler.cs
Original file line number Diff line number Diff line change
@@ -97,17 +97,14 @@ internal sealed class MqttTransportHandler : TransportHandlerBase, IDisposable
private readonly Action<DesiredProperties> _onDesiredStatePatchListener;
private readonly Func<IncomingMessage, Task<MessageAcknowledgement>> _messageReceivedListener;

private readonly ConcurrentDictionary<string, TaskCompletionSource<GetTwinResponse>> _getTwinResponseCompletions = new();
private readonly ConcurrentDictionary<string, TaskCompletionSource<PatchTwinResponse>> _reportedPropertyUpdateResponseCompletions = new();

private readonly ConcurrentDictionary<string, DateTimeOffset> _twinResponseTimeouts = new();

private bool _isSubscribedToTwinResponses;
private readonly ConcurrentDictionary<string, PendingMqttTwinOperation> _pendingTwinOperations = new();

// Timer to check if any expired messages exist. The timer is executed after each hour of execution.
private readonly Timer _twinTimeoutTimer;

private static TimeSpan s_twinResponseTimeout = TimeSpan.FromMinutes(60);
private static TimeSpan s_twinResponseTimeout = TimeSpan.FromMinutes(2);

private bool _isSubscribedToTwinResponses;

private readonly string _deviceId;
private readonly string _moduleId;
@@ -315,7 +312,7 @@ public override async Task OpenAsync(CancellationToken cancellationToken)
_mqttClient.DisconnectedAsync += HandleDisconnectionAsync;
_mqttClient.ApplicationMessageReceivedAsync += HandleReceivedMessageAsync;

// The timer would invoke callback in an hour and every hour thereafter.
// The timer would invoke callback in the specified time and that duration thereafter.
_twinTimeoutTimer.Change(s_twinResponseTimeout, s_twinResponseTimeout);
}
catch (MqttConnectingFailedException ex)
@@ -707,8 +704,7 @@ public override async Task<TwinProperties> GetTwinAsync(CancellationToken cancel
// Note the request as "in progress" before actually sending it so that no matter how quickly the service
// responds, this layer can correlate the request.
var taskCompletionSource = new TaskCompletionSource<GetTwinResponse>(TaskCreationOptions.RunContinuationsAsynchronously);
_getTwinResponseCompletions[requestId] = taskCompletionSource;
_twinResponseTimeouts[requestId] = DateTimeOffset.UtcNow;
_pendingTwinOperations[requestId] = new PendingMqttTwinOperation(taskCompletionSource);

MqttClientPublishResult result = await _mqttClient.PublishAsync(mqttMessage, cancellationToken).ConfigureAwait(false);

@@ -776,8 +772,7 @@ public override async Task<TwinProperties> GetTwinAsync(CancellationToken cancel
finally
{
// No matter what, remove the requestId from this dictionary since no thread will be waiting for it anymore
_getTwinResponseCompletions.TryRemove(requestId, out _);
_twinResponseTimeouts.TryRemove(requestId, out _);
_pendingTwinOperations.TryRemove(requestId, out _);
}
}
finally
@@ -816,8 +811,7 @@ public override async Task<long> UpdateReportedPropertiesAsync(ReportedPropertie
// Note the request as "in progress" before actually sending it so that no matter how quickly the service
// responds, this layer can correlate the request.
var taskCompletionSource = new TaskCompletionSource<PatchTwinResponse>(TaskCreationOptions.RunContinuationsAsynchronously);
_reportedPropertyUpdateResponseCompletions[requestId] = taskCompletionSource;
_twinResponseTimeouts[requestId] = DateTimeOffset.UtcNow;
_pendingTwinOperations[requestId] = new PendingMqttTwinOperation(taskCompletionSource);

MqttClientPublishResult result = await _mqttClient.PublishAsync(mqttMessage, cancellationToken).ConfigureAwait(false);

@@ -885,8 +879,7 @@ public override async Task<long> UpdateReportedPropertiesAsync(ReportedPropertie
finally
{
// No matter what, remove the requestId from this dictionary since no thread will be waiting for it anymore
_reportedPropertyUpdateResponseCompletions.TryRemove(requestId, out TaskCompletionSource<PatchTwinResponse> _);
_twinResponseTimeouts.TryRemove(requestId, out DateTimeOffset _);
_pendingTwinOperations.TryRemove(requestId, out _);
}
}
finally
@@ -1185,7 +1178,7 @@ private void HandleTwinResponse(MqttApplicationMessageReceivedEventArgs received
{
byte[] payloadBytes = receivedEventArgs.ApplicationMessage.Payload ?? Array.Empty<byte>();

if (_getTwinResponseCompletions.TryRemove(receivedRequestId, out TaskCompletionSource<GetTwinResponse> getTwinCompletion))
if (_pendingTwinOperations.TryRemove(receivedRequestId, out PendingMqttTwinOperation getTwinOperation))
{
if (Logging.IsEnabled)
Logging.Info(this, $"Received response to get twin request with request id {receivedRequestId}.", nameof(HandleTwinResponse));
@@ -1221,7 +1214,7 @@ private void HandleTwinResponse(MqttApplicationMessageReceivedEventArgs received
ErrorResponseMessage = errorResponse,
};

getTwinCompletion.TrySetResult(getTwinResponse);
getTwinOperation.TwinResponseTask.TrySetResult(getTwinResponse);
}
else
{
@@ -1232,34 +1225,32 @@ private void HandleTwinResponse(MqttApplicationMessageReceivedEventArgs received
// For this reason, we use NewtonSoft Json serializer for this deserialization.
TwinDocument clientTwinProperties = DefaultPayloadConvention.Instance.GetObject<TwinDocument>(payloadBytes);

var twinDesiredProperties = new DesiredProperties(clientTwinProperties.Desired)
drwill-ms marked this conversation as resolved.
Show resolved Hide resolved
{
PayloadConvention = _payloadConvention,
};

var twinReportedProperties = new ReportedProperties(clientTwinProperties.Reported, true)
{
PayloadConvention = _payloadConvention,
};

var getTwinResponse = new GetTwinResponse
{
Status = status,
Twin = new TwinProperties(twinDesiredProperties, twinReportedProperties),
Twin = new TwinProperties(
new DesiredProperties(clientTwinProperties.Desired)
{
PayloadConvention = _payloadConvention,
},
new ReportedProperties(clientTwinProperties.Reported, true)
{
PayloadConvention = _payloadConvention,
}),
};

getTwinCompletion.TrySetResult(getTwinResponse);
getTwinOperation.TwinResponseTask.TrySetResult(getTwinResponse);
}
catch (JsonReaderException ex)
{
if (Logging.IsEnabled)
Logging.Error(this, $"Failed to parse Twin JSON. Message body: '{Encoding.UTF8.GetString(payloadBytes)}'. Exception: {ex}.", nameof(HandleTwinResponse));

getTwinCompletion.TrySetException(ex);
getTwinOperation.TwinResponseTask.TrySetException(ex);
}
}
}
else if (_reportedPropertyUpdateResponseCompletions.TryRemove(receivedRequestId, out TaskCompletionSource<PatchTwinResponse> patchTwinCompletion))
else if (_pendingTwinOperations.TryRemove(receivedRequestId, out PendingMqttTwinOperation pendingPatchOperation))
{
if (Logging.IsEnabled)
Logging.Info(this, $"Received response to patch twin request with request id {receivedRequestId}.", nameof(HandleTwinResponse));
@@ -1294,7 +1285,7 @@ private void HandleTwinResponse(MqttApplicationMessageReceivedEventArgs received
ErrorResponseMessage = errorResponse,
};

patchTwinCompletion.TrySetResult(patchTwinResponse);
pendingPatchOperation.TwinPatchTask.TrySetResult(patchTwinResponse);
}
else if (Logging.IsEnabled)
{
@@ -1370,28 +1361,18 @@ private void RemoveOldOperations(object state)
Logging.Info(this, $"Removing operations older than {maxAge}", nameof(RemoveOldOperations));

const string exceptionMessage = "Did not receive twin response from service.";
_ = _twinResponseTimeouts
.Where(x => DateTimeOffset.UtcNow - x.Value > maxAge)
_ = _pendingTwinOperations
.Where(x => DateTimeOffset.UtcNow - x.Value.RequestSentOnUtc > maxAge)
.Select(x =>
{
if (_getTwinResponseCompletions.TryRemove(x.Key, out TaskCompletionSource<GetTwinResponse> twinResponse))
if (_pendingTwinOperations.TryRemove(x.Key, out PendingMqttTwinOperation pendingOperation))
{
if (Logging.IsEnabled)
Logging.Info(this, $"Removing twin response for {x.Key}", nameof(RemoveOldOperations));

twinResponse.TrySetException(new IotHubClientException(exceptionMessage, IotHubClientErrorCode.NetworkErrors));
pendingOperation.TwinResponseTask?.TrySetException(new IotHubClientException(exceptionMessage, IotHubClientErrorCode.NetworkErrors));
pendingOperation.TwinPatchTask?.TrySetException(new IotHubClientException(exceptionMessage, IotHubClientErrorCode.NetworkErrors));
}

if (_reportedPropertyUpdateResponseCompletions.TryRemove(x.Key, out TaskCompletionSource<PatchTwinResponse> reportedPropertyUpdateResponse))
{
if (Logging.IsEnabled)
Logging.Info(this, $"Removing twin response for {x.Key}", nameof(RemoveOldOperations));

twinResponse.TrySetException(new IotHubClientException(exceptionMessage, IotHubClientErrorCode.NetworkErrors));
}

_twinResponseTimeouts.TryRemove(x.Key, out DateTimeOffset _);

return true;
});
}
33 changes: 33 additions & 0 deletions iothub/device/src/Transport/Amqp/PendingAmqpTwinOperation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Threading.Tasks;
using Microsoft.Azure.Amqp;

namespace Microsoft.Azure.Devices.Client.Transport.Amqp
drwill-ms marked this conversation as resolved.
Show resolved Hide resolved
{
/// <summary>
/// A class for holding the task completion source of a pending operation and the date/time when that operation
/// was initiated. Because PubSub behavior has us send a request on one sending link and receive a response later on another
/// receiving link, we need a way to identify the request, when it was sent, and the task completion source to complete the
/// waiting user's task.
/// </summary>
internal class PendingAmqpTwinOperation
drwill-ms marked this conversation as resolved.
Show resolved Hide resolved
{
public PendingAmqpTwinOperation(TaskCompletionSource<AmqpMessage> completionTask)
{
CompletionTask = completionTask;
}

/// <summary>
/// The pending task to be signaled when complete.
/// </summary>
public TaskCompletionSource<AmqpMessage> CompletionTask { get; }

/// <summary>
/// When the request was sent so we know when to time out older operations
/// </summary>
public DateTimeOffset RequestSentOnUtc { get; set; } = DateTimeOffset.UtcNow;
}
}
Loading