From cee30500cab41748728a90db7f188f2d6ea4b142 Mon Sep 17 00:00:00 2001
From: Abhipsa Misra <abhipsa.misra@microsoft.com>
Date: Wed, 9 Feb 2022 09:13:57 -0800
Subject: [PATCH] fix(iot-device): Update QoS settings to AtLeastOnce for mqtt
 layer subscriptions

---
 .../Transport/Mqtt/MqttTransportHandler.cs    | 36 ++++++++++---------
 1 file changed, 20 insertions(+), 16 deletions(-)

diff --git a/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs b/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs
index 44e892f632..e6fb1756e3 100644
--- a/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs
+++ b/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs
@@ -33,7 +33,9 @@
 using Newtonsoft.Json;
 
 #if NET5_0
+
 using TaskCompletionSource = System.Threading.Tasks.TaskCompletionSource;
+
 #else
 using TaskCompletionSource = Microsoft.Azure.Devices.Shared.TaskCompletionSource;
 #endif
@@ -107,7 +109,8 @@ internal sealed class MqttTransportHandler : TransportHandler, IMqttIotHubEventH
         private readonly Func<IPAddress[], int, Task<IChannel>> _channelFactory;
         private readonly Queue<string> _completionQueue;
         private readonly MqttIotHubAdapterFactory _mqttIotHubAdapterFactory;
-        private readonly QualityOfService _qos;
+        private readonly QualityOfService _qosSendPacketToService;
+        private readonly QualityOfService _qosReceivePacketFromService;
         private readonly bool _retainMessagesAcrossSessions;
         private readonly object _syncRoot = new object();
         private readonly RetryPolicy _closeRetryPolicy;
@@ -168,7 +171,8 @@ internal MqttTransportHandler(
             _deviceboundMessageFilter = string.Format(CultureInfo.InvariantCulture, DeviceBoundMessagesTopicFilter, iotHubConnectionString.DeviceId);
             _deviceboundMessagePrefix = string.Format(CultureInfo.InvariantCulture, DeviceBoundMessagesTopicPrefix, iotHubConnectionString.DeviceId);
 
-            _qos = settings.PublishToServerQoS;
+            _qosSendPacketToService = settings.PublishToServerQoS;
+            _qosReceivePacketFromService = settings.ReceivingQoS;
 
             // If the CleanSession flag is set to false, C2D messages will be retained across device sessions, i.e. the device
             // will receive the C2D messages that were sent to it while it was disconnected.
@@ -294,7 +298,7 @@ public override async Task<Message> ReceiveAsync(CancellationToken cancellationT
                     }
 
                     await WaitUntilC2dMessageArrivesAsync(cancellationToken).ConfigureAwait(false);
-                    return ProcessMessage();
+                    return ProcessC2dMessage();
                 }
                 finally
                 {
@@ -333,7 +337,7 @@ public override async Task<Message> ReceiveAsync(TimeoutHelper timeoutHelper)
                 using var cts = new CancellationTokenSource(timeout);
 
                 await WaitUntilC2dMessageArrivesAsync(cts.Token).ConfigureAwait(false);
-                return ProcessMessage();
+                return ProcessC2dMessage();
             }
             finally
             {
@@ -342,20 +346,20 @@ public override async Task<Message> ReceiveAsync(TimeoutHelper timeoutHelper)
             }
         }
 
-        private Message ProcessMessage()
+        private Message ProcessC2dMessage()
         {
             Message message = null;
 
             try
             {
                 if (Logging.IsEnabled)
-                    Logging.Enter(this, message, $"Will begin processing received C2D message, queue size={_messageQueue.Count}", nameof(ProcessMessage));
+                    Logging.Enter(this, message, $"Will begin processing received C2D message, queue size={_messageQueue.Count}", nameof(ProcessC2dMessage));
 
                 lock (_syncRoot)
                 {
                     if (_messageQueue.TryDequeue(out message))
                     {
-                        if (_qos == QualityOfService.AtLeastOnce)
+                        if (_qosReceivePacketFromService == QualityOfService.AtLeastOnce)
                         {
                             _completionQueue.Enqueue(message.LockToken);
                         }
@@ -369,7 +373,7 @@ private Message ProcessMessage()
             finally
             {
                 if (Logging.IsEnabled)
-                    Logging.Exit(this, message, $"Processed received C2D message with Id={message?.MessageId}", nameof(ProcessMessage));
+                    Logging.Exit(this, message, $"Processed received C2D message with Id={message?.MessageId}", nameof(ProcessC2dMessage));
             }
         }
 
@@ -393,7 +397,7 @@ public override async Task CompleteAsync(string lockToken, CancellationToken can
             cancellationToken.ThrowIfCancellationRequested();
             EnsureValidState();
 
-            if (_qos == QualityOfService.AtMostOnce)
+            if (_qosReceivePacketFromService == QualityOfService.AtMostOnce)
             {
                 throw new IotHubException("Complete is not allowed for QoS 0.", isTransient: false);
             }
@@ -560,7 +564,7 @@ private async Task HandleIncomingMessagesAsync()
             if (Logging.IsEnabled)
                 Logging.Enter(this, "Process C2D message via callback", nameof(HandleIncomingMessagesAsync));
 
-            Message message = ProcessMessage();
+            Message message = ProcessC2dMessage();
 
             // We are intentionally not awaiting _deviceMessageReceivedListener callback.
             // This is a user-supplied callback that isn't required to be awaited by us. We can simply invoke it and continue.
@@ -647,7 +651,7 @@ private async Task HandleIncomingEventMessageAsync(Message message)
                 // Add the endpoint as a SystemProperty
                 message.SystemProperties.Add(MessageSystemPropertyNames.InputName, inputName);
 
-                if (_qos == QualityOfService.AtLeastOnce)
+                if (_qosReceivePacketFromService == QualityOfService.AtLeastOnce)
                 {
                     lock (_syncRoot)
                     {
@@ -828,7 +832,7 @@ public override async Task EnableMethodsAsync(CancellationToken cancellationToke
             // Codes_SRS_CSHARP_MQTT_TRANSPORT_18_001:  `EnableMethodsAsync` shall subscribe using the '$iothub/methods/POST/' topic filter.
             // Codes_SRS_CSHARP_MQTT_TRANSPORT_18_002:  `EnableMethodsAsync` shall wait for a SUBACK for the subscription request.
             // Codes_SRS_CSHARP_MQTT_TRANSPORT_18_003:  `EnableMethodsAsync` shall return failure if the subscription request fails.
-            await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(MethodPostTopicFilter, QualityOfService.AtMostOnce))).ConfigureAwait(true);
+            await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(MethodPostTopicFilter, _qosReceivePacketFromService))).ConfigureAwait(true);
         }
 
         public override async Task DisableMethodsAsync(CancellationToken cancellationToken)
@@ -850,7 +854,7 @@ public override async Task EnableEventReceiveAsync(bool isAnEdgeModule, Cancella
             // Codes_SRS_CSHARP_MQTT_TRANSPORT_33_021:  `EnableEventReceiveAsync` shall subscribe using the 'devices/{0}/modules/{1}/' topic filter.
             // Codes_SRS_CSHARP_MQTT_TRANSPORT_33_022:  `EnableEventReceiveAsync` shall wait for a SUBACK for the subscription request.
             // Codes_SRS_CSHARP_MQTT_TRANSPORT_33_023:  `EnableEventReceiveAsync` shall return failure if the subscription request fails.
-            await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(_receiveEventMessageFilter, _qos))).ConfigureAwait(true);
+            await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(_receiveEventMessageFilter, _qosReceivePacketFromService))).ConfigureAwait(true);
         }
 
         public override async Task DisableEventReceiveAsync(bool isAnEdgeModule, CancellationToken cancellationToken)
@@ -892,7 +896,7 @@ public override async Task EnableTwinPatchAsync(CancellationToken cancellationTo
             // Codes_SRS_CSHARP_MQTT_TRANSPORT_18_010: `EnableTwinPatchAsync` shall subscribe using the '$iothub/twin/PATCH/properties/desired/#' topic filter.
             // Codes_SRS_CSHARP_MQTT_TRANSPORT_18_011: `EnableTwinPatchAsync` shall wait for a SUBACK on the subscription request.
             // Codes_SRS_CSHARP_MQTT_TRANSPORT_18_012: `EnableTwinPatchAsync` shall return failure if the subscription request fails.
-            await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(TwinPatchTopicFilter, QualityOfService.AtMostOnce))).ConfigureAwait(true);
+            await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(TwinPatchTopicFilter, _qosReceivePacketFromService))).ConfigureAwait(true);
 
             if (Logging.IsEnabled)
                 Logging.Exit(this, cancellationToken, nameof(EnableTwinPatchAsync));
@@ -1081,7 +1085,7 @@ private async Task SubscribeCloudToDeviceMessagesAsync()
             if (TryStateTransition(TransportState.Open, TransportState.Subscribing))
             {
                 await _channel
-                    .WriteAsync(new SubscribePacket(0, new SubscriptionRequest(_deviceboundMessageFilter, QualityOfService.AtLeastOnce)))
+                    .WriteAsync(new SubscribePacket(0, new SubscriptionRequest(_deviceboundMessageFilter, _qosReceivePacketFromService)))
                     .ConfigureAwait(true);
 
                 if (TryStateTransition(TransportState.Subscribing, TransportState.Receiving)
@@ -1100,7 +1104,7 @@ private Task SubscribeTwinResponsesAsync()
                     0,
                     new SubscriptionRequest(
                         TwinResponseTopicFilter,
-                        QualityOfService.AtMostOnce)));
+                        _qosReceivePacketFromService)));
         }
 
         private bool ParseResponseTopic(string topicName, out string rid, out int status)