From cee30500cab41748728a90db7f188f2d6ea4b142 Mon Sep 17 00:00:00 2001 From: Abhipsa Misra <abhipsa.misra@microsoft.com> Date: Wed, 9 Feb 2022 09:13:57 -0800 Subject: [PATCH] fix(iot-device): Update QoS settings to AtLeastOnce for mqtt layer subscriptions --- .../Transport/Mqtt/MqttTransportHandler.cs | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs b/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs index 44e892f632..e6fb1756e3 100644 --- a/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs +++ b/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs @@ -33,7 +33,9 @@ using Newtonsoft.Json; #if NET5_0 + using TaskCompletionSource = System.Threading.Tasks.TaskCompletionSource; + #else using TaskCompletionSource = Microsoft.Azure.Devices.Shared.TaskCompletionSource; #endif @@ -107,7 +109,8 @@ internal sealed class MqttTransportHandler : TransportHandler, IMqttIotHubEventH private readonly Func<IPAddress[], int, Task<IChannel>> _channelFactory; private readonly Queue<string> _completionQueue; private readonly MqttIotHubAdapterFactory _mqttIotHubAdapterFactory; - private readonly QualityOfService _qos; + private readonly QualityOfService _qosSendPacketToService; + private readonly QualityOfService _qosReceivePacketFromService; private readonly bool _retainMessagesAcrossSessions; private readonly object _syncRoot = new object(); private readonly RetryPolicy _closeRetryPolicy; @@ -168,7 +171,8 @@ internal MqttTransportHandler( _deviceboundMessageFilter = string.Format(CultureInfo.InvariantCulture, DeviceBoundMessagesTopicFilter, iotHubConnectionString.DeviceId); _deviceboundMessagePrefix = string.Format(CultureInfo.InvariantCulture, DeviceBoundMessagesTopicPrefix, iotHubConnectionString.DeviceId); - _qos = settings.PublishToServerQoS; + _qosSendPacketToService = settings.PublishToServerQoS; + _qosReceivePacketFromService = settings.ReceivingQoS; // If the CleanSession flag is set to false, C2D messages will be retained across device sessions, i.e. the device // will receive the C2D messages that were sent to it while it was disconnected. @@ -294,7 +298,7 @@ public override async Task<Message> ReceiveAsync(CancellationToken cancellationT } await WaitUntilC2dMessageArrivesAsync(cancellationToken).ConfigureAwait(false); - return ProcessMessage(); + return ProcessC2dMessage(); } finally { @@ -333,7 +337,7 @@ public override async Task<Message> ReceiveAsync(TimeoutHelper timeoutHelper) using var cts = new CancellationTokenSource(timeout); await WaitUntilC2dMessageArrivesAsync(cts.Token).ConfigureAwait(false); - return ProcessMessage(); + return ProcessC2dMessage(); } finally { @@ -342,20 +346,20 @@ public override async Task<Message> ReceiveAsync(TimeoutHelper timeoutHelper) } } - private Message ProcessMessage() + private Message ProcessC2dMessage() { Message message = null; try { if (Logging.IsEnabled) - Logging.Enter(this, message, $"Will begin processing received C2D message, queue size={_messageQueue.Count}", nameof(ProcessMessage)); + Logging.Enter(this, message, $"Will begin processing received C2D message, queue size={_messageQueue.Count}", nameof(ProcessC2dMessage)); lock (_syncRoot) { if (_messageQueue.TryDequeue(out message)) { - if (_qos == QualityOfService.AtLeastOnce) + if (_qosReceivePacketFromService == QualityOfService.AtLeastOnce) { _completionQueue.Enqueue(message.LockToken); } @@ -369,7 +373,7 @@ private Message ProcessMessage() finally { if (Logging.IsEnabled) - Logging.Exit(this, message, $"Processed received C2D message with Id={message?.MessageId}", nameof(ProcessMessage)); + Logging.Exit(this, message, $"Processed received C2D message with Id={message?.MessageId}", nameof(ProcessC2dMessage)); } } @@ -393,7 +397,7 @@ public override async Task CompleteAsync(string lockToken, CancellationToken can cancellationToken.ThrowIfCancellationRequested(); EnsureValidState(); - if (_qos == QualityOfService.AtMostOnce) + if (_qosReceivePacketFromService == QualityOfService.AtMostOnce) { throw new IotHubException("Complete is not allowed for QoS 0.", isTransient: false); } @@ -560,7 +564,7 @@ private async Task HandleIncomingMessagesAsync() if (Logging.IsEnabled) Logging.Enter(this, "Process C2D message via callback", nameof(HandleIncomingMessagesAsync)); - Message message = ProcessMessage(); + Message message = ProcessC2dMessage(); // We are intentionally not awaiting _deviceMessageReceivedListener callback. // This is a user-supplied callback that isn't required to be awaited by us. We can simply invoke it and continue. @@ -647,7 +651,7 @@ private async Task HandleIncomingEventMessageAsync(Message message) // Add the endpoint as a SystemProperty message.SystemProperties.Add(MessageSystemPropertyNames.InputName, inputName); - if (_qos == QualityOfService.AtLeastOnce) + if (_qosReceivePacketFromService == QualityOfService.AtLeastOnce) { lock (_syncRoot) { @@ -828,7 +832,7 @@ public override async Task EnableMethodsAsync(CancellationToken cancellationToke // Codes_SRS_CSHARP_MQTT_TRANSPORT_18_001: `EnableMethodsAsync` shall subscribe using the '$iothub/methods/POST/' topic filter. // Codes_SRS_CSHARP_MQTT_TRANSPORT_18_002: `EnableMethodsAsync` shall wait for a SUBACK for the subscription request. // Codes_SRS_CSHARP_MQTT_TRANSPORT_18_003: `EnableMethodsAsync` shall return failure if the subscription request fails. - await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(MethodPostTopicFilter, QualityOfService.AtMostOnce))).ConfigureAwait(true); + await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(MethodPostTopicFilter, _qosReceivePacketFromService))).ConfigureAwait(true); } public override async Task DisableMethodsAsync(CancellationToken cancellationToken) @@ -850,7 +854,7 @@ public override async Task EnableEventReceiveAsync(bool isAnEdgeModule, Cancella // Codes_SRS_CSHARP_MQTT_TRANSPORT_33_021: `EnableEventReceiveAsync` shall subscribe using the 'devices/{0}/modules/{1}/' topic filter. // Codes_SRS_CSHARP_MQTT_TRANSPORT_33_022: `EnableEventReceiveAsync` shall wait for a SUBACK for the subscription request. // Codes_SRS_CSHARP_MQTT_TRANSPORT_33_023: `EnableEventReceiveAsync` shall return failure if the subscription request fails. - await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(_receiveEventMessageFilter, _qos))).ConfigureAwait(true); + await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(_receiveEventMessageFilter, _qosReceivePacketFromService))).ConfigureAwait(true); } public override async Task DisableEventReceiveAsync(bool isAnEdgeModule, CancellationToken cancellationToken) @@ -892,7 +896,7 @@ public override async Task EnableTwinPatchAsync(CancellationToken cancellationTo // Codes_SRS_CSHARP_MQTT_TRANSPORT_18_010: `EnableTwinPatchAsync` shall subscribe using the '$iothub/twin/PATCH/properties/desired/#' topic filter. // Codes_SRS_CSHARP_MQTT_TRANSPORT_18_011: `EnableTwinPatchAsync` shall wait for a SUBACK on the subscription request. // Codes_SRS_CSHARP_MQTT_TRANSPORT_18_012: `EnableTwinPatchAsync` shall return failure if the subscription request fails. - await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(TwinPatchTopicFilter, QualityOfService.AtMostOnce))).ConfigureAwait(true); + await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(TwinPatchTopicFilter, _qosReceivePacketFromService))).ConfigureAwait(true); if (Logging.IsEnabled) Logging.Exit(this, cancellationToken, nameof(EnableTwinPatchAsync)); @@ -1081,7 +1085,7 @@ private async Task SubscribeCloudToDeviceMessagesAsync() if (TryStateTransition(TransportState.Open, TransportState.Subscribing)) { await _channel - .WriteAsync(new SubscribePacket(0, new SubscriptionRequest(_deviceboundMessageFilter, QualityOfService.AtLeastOnce))) + .WriteAsync(new SubscribePacket(0, new SubscriptionRequest(_deviceboundMessageFilter, _qosReceivePacketFromService))) .ConfigureAwait(true); if (TryStateTransition(TransportState.Subscribing, TransportState.Receiving) @@ -1100,7 +1104,7 @@ private Task SubscribeTwinResponsesAsync() 0, new SubscriptionRequest( TwinResponseTopicFilter, - QualityOfService.AtMostOnce))); + _qosReceivePacketFromService))); } private bool ParseResponseTopic(string topicName, out string rid, out int status)