Skip to content

Commit

Permalink
Cancel twin ops on disconnect (#3287)
Browse files Browse the repository at this point in the history
* Cancel pending twin operations on disconnect

* Add/update logging

* Reduce concurrent dictionaries to one per protocol

* Fix twin patch response

* Unify twin update and logging in long haul

* Improve error handling in long haul

* Fix old task cleanup

* Add retry to long haul init

* merge

* Rearrange methods

* Fix method name

* PR feedback from Bryce

* Cancel pending twin operations on disconnect

* Add/update logging

* Reduce concurrent dictionaries to one per protocol

* Fix twin patch response

* Unify twin update and logging in long haul

* Improve error handling in long haul

* Fix old task cleanup

* Add retry to long haul init

* merge

* Rearrange methods

* Fix method name

* PR feedback from Bryce

* PR feedback

* adding cancellation toke for InitializeAsync in module client

* Fix MQTT twin response

* Add unit tests for payload conventions

---------

Co-authored-by: tmahmood-microsoft <[email protected]>
  • Loading branch information
drwill-ms and tmahmood-microsoft authored Apr 20, 2023
1 parent 734d871 commit 51a206f
Show file tree
Hide file tree
Showing 18 changed files with 669 additions and 308 deletions.
106 changes: 67 additions & 39 deletions e2e/LongHaul/device/IotHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ internal sealed class IotHub : IAsyncDisposable
private ConnectionStatusChangeReason _disconnectedReason;
private RecommendedAction _disconnectedRecommendedAction;
private volatile IotHubDeviceClient _deviceClient;
private CancellationToken _ct;

private static readonly TimeSpan s_messageLoopSleepTime = TimeSpan.FromSeconds(3);
private static readonly TimeSpan s_deviceTwinUpdateInterval = TimeSpan.FromSeconds(10);
Expand Down Expand Up @@ -66,8 +67,13 @@ public IotHub(Logger logger, Parameters parameters)
/// <summary>
/// Initializes the connection to IoT Hub.
/// </summary>
public async Task InitializeAsync()
public async Task InitializeAsync(CancellationToken? ct = null)
{
if (ct != null)
{
_ct = ct.Value;
}

await _lifetimeControl.WaitAsync().ConfigureAwait(false);

try
Expand Down Expand Up @@ -112,7 +118,7 @@ public async Task SendTelemetryMessagesAsync(Logger logger, CancellationToken ct
var pendingMessages = new List<TelemetryMessage>(maxBulkMessages);
logger.LoggerContext.Add(OperationName, LoggingConstants.TelemetryMessage);
var sw = new Stopwatch();

while (!ct.IsCancellationRequested)
{
logger.Metric(MessageBacklog, _messagesToSend.Count);
Expand Down Expand Up @@ -176,38 +182,6 @@ public async Task SendTelemetryMessagesAsync(Logger logger, CancellationToken ct
}
}

public async Task ReportReadOnlyPropertiesAsync(Logger logger, CancellationToken ct)
{
logger.LoggerContext.Add(OperationName, ReportTwinProperties);
var sw = new Stopwatch();

while (!ct.IsCancellationRequested)
{
try
{
var reported = new ReportedProperties
{
{ "TotalTelemetryMessagesSent", _totalTelemetryMessagesSent },
};

logger.Trace($"Updating reported properties.", TraceSeverity.Information);
sw.Restart();
await _deviceClient.UpdateReportedPropertiesAsync(reported, ct).ConfigureAwait(false);
sw.Stop();

++_totalTwinUpdatesReported;
logger.Metric(TotalTwinUpdatesReported, _totalTwinUpdatesReported);
logger.Metric(ReportedTwinUpdateOperationSeconds, sw.Elapsed.TotalSeconds);
}
catch (Exception ex)
{
logger.Trace($"Exception when reporting properties when connected is {IsConnected}\n{ex}");
}

await Task.Delay(s_deviceTwinUpdateInterval, ct).ConfigureAwait(false);
}
}

public void AddTelemetry(
TelemetryBase telemetryObject,
IDictionary<string, string> extraProperties = null)
Expand Down Expand Up @@ -244,7 +218,26 @@ public void AddTelemetry(
_messagesToSend.Enqueue(iotMessage);
}

public async Task SetPropertiesAsync(string keyName, object properties, Logger logger, CancellationToken ct)
public async Task ReportReadOnlyPropertiesAsync(Logger logger, CancellationToken ct)
{
logger.LoggerContext.Add(OperationName, ReportTwinProperties);
var sw = new Stopwatch();

while (!ct.IsCancellationRequested)
{
sw.Restart();
await SetPropertiesAsync("totalTelemetryMessagesSent", _totalTelemetryMessagesSent, logger, ct).ConfigureAwait(false);
sw.Stop();

++_totalTwinUpdatesReported;
logger.Metric(TotalTwinUpdatesReported, _totalTwinUpdatesReported);
logger.Metric(ReportedTwinUpdateOperationSeconds, sw.Elapsed.TotalSeconds);

await Task.Delay(s_deviceTwinUpdateInterval, ct).ConfigureAwait(false);
}
}

public async Task SetPropertiesAsync<T>(string keyName, T properties, Logger logger, CancellationToken ct)
{
Debug.Assert(_deviceClient != null);
Debug.Assert(properties != null);
Expand All @@ -257,11 +250,11 @@ public async Task SetPropertiesAsync(string keyName, object properties, Logger l
{
try
{
await _deviceClient
long version = await _deviceClient
.UpdateReportedPropertiesAsync(reportedProperties, ct)
.ConfigureAwait(false);

logger.Trace($"Set the reported property with name [{keyName}] in device twin.", TraceSeverity.Information);
logger.Trace($"Set the reported property with key {keyName} and value {properties} in device twin; observed version {version}.", TraceSeverity.Information);
break;
}
catch (Exception ex)
Expand All @@ -270,6 +263,28 @@ await _deviceClient
await Task.Delay(s_retryInterval, ct).ConfigureAwait(false);
}
}

while (!ct.IsCancellationRequested)
{
try
{
TwinProperties twin = await _deviceClient.GetTwinPropertiesAsync(ct).ConfigureAwait(false);
if (!twin.Reported.TryGetValue<T>(keyName, out T actualValue))
{
logger.Trace($"Couldn't find the reported property {keyName} in the device twin.", TraceSeverity.Warning);
}
else if (!actualValue.Equals(properties))
{
logger.Trace($"Couldn't validate value for {keyName} was set to {properties}, found {actualValue}.");
}
break;
}
catch (Exception ex)
{
logger.Trace($"Exception getting twin\n{ex}", TraceSeverity.Warning);
await Task.Delay(s_retryInterval, ct).ConfigureAwait(false);
}
}
}

public async Task UploadFilesAsync(Logger logger, CancellationToken ct)
Expand Down Expand Up @@ -410,7 +425,19 @@ private async void ConnectionStatusChangesHandlerAsync(ConnectionStatusInfo conn
{
case RecommendedAction.OpenConnection:
_logger.Trace($"Following recommended action of reinitializing the client.", TraceSeverity.Information);
await InitializeAsync().ConfigureAwait(false);
while (!_ct.IsCancellationRequested)
{
try
{
await InitializeAsync().ConfigureAwait(false);
break;
}
catch (Exception ex)
{
_logger.Trace($"Exception re-initializing client\n{ex}", TraceSeverity.Warning);
await Task.Delay(s_retryInterval, _ct).ConfigureAwait(false);
}
}
break;

case RecommendedAction.PerformNormally:
Expand Down Expand Up @@ -479,7 +506,8 @@ private async Task DesiredPropertyUpdateCallbackAsync(DesiredProperties properti

if (reported.Any())
{
await _deviceClient.UpdateReportedPropertiesAsync(reported).ConfigureAwait(false);
long version = await _deviceClient.UpdateReportedPropertiesAsync(reported).ConfigureAwait(false);
_logger.Trace($"Updated {reported.Count()} properties and observed new version {version}.", TraceSeverity.Information);

_totalDesiredPropertiesHandled += reported.Count();
_logger.Metric(TotalDesiredPropertiesHandled, _totalDesiredPropertiesHandled);
Expand Down
3 changes: 2 additions & 1 deletion e2e/LongHaul/device/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private static async Task Main(string[] args)
{
try
{
await iotHub.InitializeAsync().ConfigureAwait(false);
await iotHub.InitializeAsync(cts.Token).ConfigureAwait(false);
break;
}
catch (Exception ex)
Expand Down Expand Up @@ -110,6 +110,7 @@ await Task
.ConfigureAwait(false);
}
catch (OperationCanceledException) { } // user signalled an exit
catch (AggregateException ex) when (ex.InnerException is OperationCanceledException) { } // user signaled an exit
catch (Exception ex)
{
s_logger.Trace($"Device app failed with exception {ex}", TraceSeverity.Error);
Expand Down
5 changes: 5 additions & 0 deletions e2e/LongHaul/device/SystemHealthTelemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Linq;
using System.Net.NetworkInformation;
using System.Text.Json.Serialization;
using Newtonsoft.Json;

namespace Microsoft.Azure.Devices.LongHaul.Device
{
Expand All @@ -23,15 +24,19 @@ public SystemHealthTelemetry(int port)
TcpPortFilter.Add(port);
}

[JsonProperty("processCpuUsagePercent")]
[JsonPropertyName("processCpuUsagePercent")]
public double ProcessCpuUsagePercent { get; set; } = UpdateCpuUsage();

[JsonProperty("totalAssignedMemoryBytes")]
[JsonPropertyName("totalAssignedMemoryBytes")]
public long TotalAssignedMemoryBytes { get; set; } = s_currentProcess.WorkingSet64;

[JsonProperty("totalGCBytes")]
[JsonPropertyName("totalGCBytes")]
public long TotalGCBytes { get; set; } = GC.GetTotalMemory(false);

[JsonProperty("activeTcpConnections")]
[JsonPropertyName("activeTcpConnections")]
public long ActiveTcpConnections { get; set; } = UpdateTcpConnections();

Expand Down
17 changes: 17 additions & 0 deletions e2e/LongHaul/device/SystemProperties.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,35 @@

using System.Runtime.InteropServices;
using System.Text.Json.Serialization;
using Newtonsoft.Json;

namespace Microsoft.Azure.Devices.LongHaul.Device
{
internal class SystemProperties
{
[JsonProperty("systemArchitecture")]
[JsonPropertyName("systemArchitecture")]
public string SystemArchitecture { get; set; } = RuntimeInformation.OSArchitecture.ToString();

[JsonProperty("osVersion")]
[JsonPropertyName("osVersion")]
public string OsVersion { get; set; } = RuntimeInformation.OSDescription;

[JsonProperty("frameworkDescription")]
[JsonPropertyName("frameworkDescription")]
public string FrameworkDescription { get; set; } = RuntimeInformation.FrameworkDescription;

public override bool Equals(object obj)
{
return obj is SystemProperties other
&& SystemArchitecture == other.SystemArchitecture
&& OsVersion == other.OsVersion
&& FrameworkDescription == other.FrameworkDescription;
}

public override int GetHashCode()
{
return SystemArchitecture.GetHashCode() ^ OsVersion.GetHashCode() ^ FrameworkDescription.GetHashCode();
}
}
}
2 changes: 2 additions & 0 deletions e2e/LongHaul/device/TelemetryBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Text.Json.Serialization;
using Newtonsoft.Json;

namespace Microsoft.Azure.Devices.LongHaul.Device
{
Expand All @@ -11,6 +12,7 @@ internal abstract class TelemetryBase
/// <summary>
/// The date/time the event occurred, in UTC.
/// </summary>
[JsonProperty("eventDateTimeUtc")]
[JsonPropertyName("eventDateTimeUtc")]
public DateTime? EventDateTimeUtc { get; set; } = DateTime.UtcNow;
}
Expand Down
54 changes: 30 additions & 24 deletions e2e/LongHaul/module/IotHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ internal sealed class IotHub : IAsyncDisposable
private readonly string _moduleConnectionString;
private readonly IotHubClientOptions _clientOptions;
private readonly Logger _logger;
private CancellationToken _ct;

private SemaphoreSlim _lifetimeControl = new(1, 1);

Expand Down Expand Up @@ -56,8 +57,12 @@ public IotHub(Logger logger, Parameters parameters)
/// <summary>
/// Initializes the connection to IoT Hub.
/// </summary>
public async Task InitializeAsync()
public async Task InitializeAsync(CancellationToken? ct = null)
{
if (ct != null)
{
_ct = ct.Value;
}
await _lifetimeControl.WaitAsync().ConfigureAwait(false);
var sw = new Stopwatch();

Expand Down Expand Up @@ -175,25 +180,13 @@ public async Task ReportReadOnlyPropertiesAsync(Logger logger, CancellationToken
var sw = new Stopwatch();
while (!ct.IsCancellationRequested)
{
try
{
var reported = new ReportedProperties
{
{ "TotalTelemetryMessagesSent", _totalTelemetryMessagesToModuleSent },
};

sw.Restart();
await _moduleClient.UpdateReportedPropertiesAsync(reported, ct).ConfigureAwait(false);
sw.Stop();
sw.Restart();
await SetPropertiesAsync("totalTelemetryMessagesSent", _totalTelemetryMessagesToModuleSent, logger, ct).ConfigureAwait(false);
sw.Stop();

++_totalTwinUpdatesToModuleReported;
logger.Metric(TotalTwinUpdatesToModuleReported, _totalTwinUpdatesToModuleReported);
logger.Metric(ReportedTwinUpdateToModuleOperationSeconds, sw.Elapsed.TotalSeconds);
}
catch (Exception ex)
{
logger.Trace($"Exception when reporting properties when connected is {IsConnected}\n{ex}");
}
++_totalTwinUpdatesToModuleReported;
logger.Metric(TotalTwinUpdatesToModuleReported, _totalTwinUpdatesToModuleReported);
logger.Metric(ReportedTwinUpdateToModuleOperationSeconds, sw.Elapsed.TotalSeconds);

await Task.Delay(s_deviceTwinUpdateInterval, ct).ConfigureAwait(false);
}
Expand Down Expand Up @@ -249,11 +242,11 @@ public async Task SetPropertiesAsync(string keyName, object properties, Logger l
{
try
{
await _moduleClient
long version = await _moduleClient
.UpdateReportedPropertiesAsync(reportedProperties, ct)
.ConfigureAwait(false);
.ConfigureAwait(false);

logger.Trace($"Set the reported property with name [{keyName}] in device twin.", TraceSeverity.Information);
logger.Trace($"Set the reported property with key {keyName} and value {properties} in device twin; observed version {version}.", TraceSeverity.Information);
break;
}
catch (Exception ex)
Expand Down Expand Up @@ -333,7 +326,19 @@ private async void ConnectionStatusChangesHandlerAsync(ConnectionStatusInfo conn
{
case RecommendedAction.OpenConnection:
_logger.Trace($"Following recommended action of reinitializing the client.", TraceSeverity.Information);
await InitializeAsync().ConfigureAwait(false);
while (!_ct.IsCancellationRequested)
{
try
{
await InitializeAsync().ConfigureAwait(false);
break;
}
catch (Exception ex)
{
_logger.Trace($"Exception re-initializing client\n{ex}", TraceSeverity.Warning);
await Task.Delay(s_retryInterval, _ct).ConfigureAwait(false);
}
}
break;

case RecommendedAction.PerformNormally:
Expand Down Expand Up @@ -403,7 +408,8 @@ private async Task DesiredPropertyUpdateCallbackAsync(DesiredProperties properti

if (reported.Any())
{
await _moduleClient.UpdateReportedPropertiesAsync(reported).ConfigureAwait(false);
long version = await _moduleClient.UpdateReportedPropertiesAsync(reported).ConfigureAwait(false);
_logger.Trace($"Updated {reported.Count()} properties and observed new version {version}.", TraceSeverity.Information);

_totalDesiredPropertiesToModuleHandled += reported.Count();
_logger.Metric(TotalDesiredPropertiesToModuleHandled, _totalDesiredPropertiesToModuleHandled);
Expand Down
3 changes: 2 additions & 1 deletion e2e/LongHaul/module/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private static async Task Main(string[] args)
{
try
{
await iotHub.InitializeAsync().ConfigureAwait(false);
await iotHub.InitializeAsync(cts.Token).ConfigureAwait(false);
break;
}
catch (Exception ex)
Expand All @@ -96,6 +96,7 @@ await Task
.ConfigureAwait(false);
}
catch (TaskCanceledException) { } // user signalled an exit
catch (AggregateException ex) when (ex.InnerException is OperationCanceledException) { } // user signaled an exit
catch (Exception ex)
{
s_logger.Trace($"Device app failed with exception {ex}", TraceSeverity.Error);
Expand Down
Loading

0 comments on commit 51a206f

Please sign in to comment.