Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel twin ops on disconnect #3287

Merged
merged 29 commits into from
Apr 20, 2023
Merged
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5ebb0fe
Cancel pending twin operations on disconnect
Apr 19, 2023
d831d48
Add/update logging
Apr 19, 2023
9ea0ec6
Reduce concurrent dictionaries to one per protocol
Apr 19, 2023
8fce7e6
Fix twin patch response
Apr 19, 2023
19a8e0a
Unify twin update and logging in long haul
Apr 19, 2023
9dd4909
Improve error handling in long haul
Apr 19, 2023
2eee027
Fix old task cleanup
Apr 20, 2023
9b86af4
Add retry to long haul init
Apr 20, 2023
a6d0f39
merge
Apr 20, 2023
0a6014b
Rearrange methods
Apr 20, 2023
287a8fa
Fix method name
Apr 20, 2023
1593b03
PR feedback from Bryce
Apr 20, 2023
eb8b7b3
Cancel pending twin operations on disconnect
Apr 19, 2023
99b9b7d
Add/update logging
Apr 19, 2023
586e53d
Reduce concurrent dictionaries to one per protocol
Apr 19, 2023
b8d858f
Fix twin patch response
Apr 19, 2023
4f68177
Unify twin update and logging in long haul
Apr 19, 2023
348e82b
Improve error handling in long haul
Apr 19, 2023
ed93221
Fix old task cleanup
Apr 20, 2023
3b2654e
Add retry to long haul init
Apr 20, 2023
c74dc83
merge
Apr 20, 2023
365968c
Rearrange methods
Apr 20, 2023
0eabbb1
Fix method name
Apr 20, 2023
fcd79d3
PR feedback from Bryce
Apr 20, 2023
af27f20
PR feedback
Apr 20, 2023
0fb999d
adding cancellation toke for InitializeAsync in module client
tmahmood-microsoft Apr 20, 2023
d418081
Merge branch 'drwill/cancel-twinops-on-disconnect' of https://github.…
tmahmood-microsoft Apr 20, 2023
f45d3fc
Fix MQTT twin response
Apr 20, 2023
04fa965
Add unit tests for payload conventions
Apr 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Improve error handling in long haul
David R. Williamson committed Apr 20, 2023
commit 348e82b65e72f10b98ecc884b89a29a6fc1a0dc7
1 change: 1 addition & 0 deletions e2e/LongHaul/device/Program.cs
Original file line number Diff line number Diff line change
@@ -110,6 +110,7 @@ await Task
.ConfigureAwait(false);
}
catch (OperationCanceledException) { } // user signalled an exit
catch (AggregateException ex) when (ex.InnerException is OperationCanceledException) { } // user signaled an exit
brycewang-microsoft marked this conversation as resolved.
Show resolved Hide resolved
catch (Exception ex)
{
s_logger.Trace($"Device app failed with exception {ex}", TraceSeverity.Error);
92 changes: 47 additions & 45 deletions e2e/LongHaul/service/IotHub.cs
Original file line number Diff line number Diff line change
@@ -68,63 +68,65 @@ public async Task MonitorConnectedDevicesAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
AsyncPageable<ClientTwin> allDevices = s_serviceClient.Query.Create<ClientTwin>(
"SELECT deviceId, connectionState FROM devices where is_defined(properties.reported.runId)",
ct);
try
drwill-ms marked this conversation as resolved.
Show resolved Hide resolved
{
AsyncPageable<ClientTwin> allDevices = s_serviceClient.Query.Create<ClientTwin>(
"SELECT deviceId, connectionState, lastActivityTime FROM devices where is_defined(properties.reported.runId)",
ct);

AsyncPageable<ClientTwin> allModules = s_serviceClient.Query.Create<ClientTwin>(
"SELECT deviceId, moduleId, connectionState FROM devices.modules where is_defined(properties.reported.runId)",
ct);
AsyncPageable<ClientTwin> allModules = s_serviceClient.Query.Create<ClientTwin>(
"SELECT deviceId, moduleId, connectionState FROM devices.modules where is_defined(properties.reported.runId)",
ct);

await foreach (ClientTwin device in allDevices)
await foreach (ClientTwin device in allDevices)
{
string deviceId = device.DeviceId;

if (s_onlineDeviceOperations.ContainsKey(deviceId)
&& device.ConnectionState is ClientConnectionState.Disconnected)
{
CancellationTokenSource source = s_onlineDeviceOperations[deviceId].Item2;
// Signal cancellation to all tasks on the particular device.
source.Cancel();
// Dispose the cancellation token source.
source.Dispose();
// Remove the correlated device operations and cancellation token source of the particular device from the dictionary.
s_onlineDeviceOperations.TryRemove(deviceId, out _);
}
else if (!s_onlineDeviceOperations.ContainsKey(deviceId)
&& device.ConnectionState is ClientConnectionState.Connected)
{
// For each online device, initiate a new cancellation token source.
// Once the device goes offline, cancel all operations on this device.
var source = new CancellationTokenSource();
CancellationToken token = source.Token;
if (s_onlineDeviceOperations.ContainsKey(deviceId)
&& device.ConnectionState is ClientConnectionState.Disconnected)
{
CancellationTokenSource source = s_onlineDeviceOperations[deviceId].Item2;
// Signal cancellation to all tasks on the particular device.
source.Cancel();
// Dispose the cancellation token source.
source.Dispose();
// Remove the correlated device operations and cancellation token source of the particular device from the dictionary.
s_onlineDeviceOperations.TryRemove(deviceId, out _);
}
else if (!s_onlineDeviceOperations.ContainsKey(deviceId)
&& device.ConnectionState is ClientConnectionState.Connected)
{
// For each online device, initiate a new cancellation token source.
// Once the device goes offline, cancel all operations on this device.
var source = new CancellationTokenSource();
CancellationToken token = source.Token;

async Task Operations()
{
var deviceOperations = new DeviceOperations(s_serviceClient, deviceId, _logger.Clone());
_logger.Trace($"Creating {nameof(DeviceOperations)} on the device [{deviceId}]", TraceSeverity.Verbose);

Logger loggerPerDevice = _logger.Clone();
loggerPerDevice.LoggerContext.Add(DeviceId, deviceId);
var deviceOperations = new DeviceOperations(s_serviceClient, deviceId, loggerPerDevice);
_logger.Trace($"Creating {nameof(DeviceOperations)} on the device [{deviceId}]", TraceSeverity.Verbose);

try
{
await Task
.WhenAll(
deviceOperations.InvokeDirectMethodAsync(loggerPerDevice.Clone(), token),
deviceOperations.SetDesiredPropertiesAsync("guidValue", Guid.NewGuid().ToString(), loggerPerDevice.Clone(), token),
deviceOperations.SendC2dMessagesAsync(loggerPerDevice.Clone(), token))
.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
_logger.Trace($"Operations on [{deviceId}] have been canceled as the device goes offline.", TraceSeverity.Information);
}
catch (Exception ex)
{
_logger.Trace($"Service app failed with exception {ex}", TraceSeverity.Error);

try
{
await Task
.WhenAll(
deviceOperations.InvokeDirectMethodAsync(loggerPerDevice.Clone(), token),
deviceOperations.SetDesiredPropertiesAsync("guidValue", Guid.NewGuid().ToString(), loggerPerDevice.Clone(), token),
deviceOperations.SendC2dMessagesAsync(loggerPerDevice.Clone(), token))
.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
_logger.Trace($"Operations on [{deviceId}] have been canceled as the device goes offline.", TraceSeverity.Information);
}
catch (Exception ex)
{
_logger.Trace($"Service app failed with exception {ex}", TraceSeverity.Error);
}
}
}

// Passing in "Operations()" as Task so we don't need to manually call "Invoke()" on it.
var operationsTuple = new Tuple<Task, CancellationTokenSource>(Operations(), source);