Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update QoS settings to AtLeastOnce for mqtt layer subscriptions #2302

Merged
merged 1 commit into from
Feb 9, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 20 additions & 16 deletions iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
using Newtonsoft.Json;

#if NET5_0

using TaskCompletionSource = System.Threading.Tasks.TaskCompletionSource;

#else
using TaskCompletionSource = Microsoft.Azure.Devices.Shared.TaskCompletionSource;
#endif
Expand Down Expand Up @@ -107,7 +109,8 @@ internal sealed class MqttTransportHandler : TransportHandler, IMqttIotHubEventH
private readonly Func<IPAddress[], int, Task<IChannel>> _channelFactory;
private readonly Queue<string> _completionQueue;
private readonly MqttIotHubAdapterFactory _mqttIotHubAdapterFactory;
private readonly QualityOfService _qos;
private readonly QualityOfService _qosSendPacketToService;
private readonly QualityOfService _qosReceivePacketFromService;
private readonly bool _retainMessagesAcrossSessions;
private readonly object _syncRoot = new object();
private readonly RetryPolicy _closeRetryPolicy;
Expand Down Expand Up @@ -168,7 +171,8 @@ internal MqttTransportHandler(
_deviceboundMessageFilter = string.Format(CultureInfo.InvariantCulture, DeviceBoundMessagesTopicFilter, iotHubConnectionString.DeviceId);
_deviceboundMessagePrefix = string.Format(CultureInfo.InvariantCulture, DeviceBoundMessagesTopicPrefix, iotHubConnectionString.DeviceId);

_qos = settings.PublishToServerQoS;
_qosSendPacketToService = settings.PublishToServerQoS;
_qosReceivePacketFromService = settings.ReceivingQoS;
drwill-ms marked this conversation as resolved.
Show resolved Hide resolved

// If the CleanSession flag is set to false, C2D messages will be retained across device sessions, i.e. the device
// will receive the C2D messages that were sent to it while it was disconnected.
Expand Down Expand Up @@ -294,7 +298,7 @@ public override async Task<Message> ReceiveAsync(CancellationToken cancellationT
}

await WaitUntilC2dMessageArrivesAsync(cancellationToken).ConfigureAwait(false);
return ProcessMessage();
return ProcessC2dMessage();
}
finally
{
Expand Down Expand Up @@ -333,7 +337,7 @@ public override async Task<Message> ReceiveAsync(TimeoutHelper timeoutHelper)
using var cts = new CancellationTokenSource(timeout);

await WaitUntilC2dMessageArrivesAsync(cts.Token).ConfigureAwait(false);
return ProcessMessage();
return ProcessC2dMessage();
}
finally
{
Expand All @@ -342,20 +346,20 @@ public override async Task<Message> ReceiveAsync(TimeoutHelper timeoutHelper)
}
}

private Message ProcessMessage()
private Message ProcessC2dMessage()
{
Message message = null;

try
{
if (Logging.IsEnabled)
Logging.Enter(this, message, $"Will begin processing received C2D message, queue size={_messageQueue.Count}", nameof(ProcessMessage));
Logging.Enter(this, message, $"Will begin processing received C2D message, queue size={_messageQueue.Count}", nameof(ProcessC2dMessage));

lock (_syncRoot)
{
if (_messageQueue.TryDequeue(out message))
{
if (_qos == QualityOfService.AtLeastOnce)
if (_qosReceivePacketFromService == QualityOfService.AtLeastOnce)
{
_completionQueue.Enqueue(message.LockToken);
}
Expand All @@ -369,7 +373,7 @@ private Message ProcessMessage()
finally
{
if (Logging.IsEnabled)
Logging.Exit(this, message, $"Processed received C2D message with Id={message?.MessageId}", nameof(ProcessMessage));
Logging.Exit(this, message, $"Processed received C2D message with Id={message?.MessageId}", nameof(ProcessC2dMessage));
}
}

Expand All @@ -393,7 +397,7 @@ public override async Task CompleteAsync(string lockToken, CancellationToken can
cancellationToken.ThrowIfCancellationRequested();
EnsureValidState();

if (_qos == QualityOfService.AtMostOnce)
if (_qosReceivePacketFromService == QualityOfService.AtMostOnce)
{
throw new IotHubException("Complete is not allowed for QoS 0.", isTransient: false);
}
Expand Down Expand Up @@ -560,7 +564,7 @@ private async Task HandleIncomingMessagesAsync()
if (Logging.IsEnabled)
Logging.Enter(this, "Process C2D message via callback", nameof(HandleIncomingMessagesAsync));

Message message = ProcessMessage();
Message message = ProcessC2dMessage();

// We are intentionally not awaiting _deviceMessageReceivedListener callback.
// This is a user-supplied callback that isn't required to be awaited by us. We can simply invoke it and continue.
Expand Down Expand Up @@ -647,7 +651,7 @@ private async Task HandleIncomingEventMessageAsync(Message message)
// Add the endpoint as a SystemProperty
message.SystemProperties.Add(MessageSystemPropertyNames.InputName, inputName);

if (_qos == QualityOfService.AtLeastOnce)
if (_qosReceivePacketFromService == QualityOfService.AtLeastOnce)
{
lock (_syncRoot)
{
Expand Down Expand Up @@ -828,7 +832,7 @@ public override async Task EnableMethodsAsync(CancellationToken cancellationToke
// Codes_SRS_CSHARP_MQTT_TRANSPORT_18_001: `EnableMethodsAsync` shall subscribe using the '$iothub/methods/POST/' topic filter.
// Codes_SRS_CSHARP_MQTT_TRANSPORT_18_002: `EnableMethodsAsync` shall wait for a SUBACK for the subscription request.
// Codes_SRS_CSHARP_MQTT_TRANSPORT_18_003: `EnableMethodsAsync` shall return failure if the subscription request fails.
await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(MethodPostTopicFilter, QualityOfService.AtMostOnce))).ConfigureAwait(true);
await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(MethodPostTopicFilter, _qosReceivePacketFromService))).ConfigureAwait(true);
}

public override async Task DisableMethodsAsync(CancellationToken cancellationToken)
Expand All @@ -850,7 +854,7 @@ public override async Task EnableEventReceiveAsync(bool isAnEdgeModule, Cancella
// Codes_SRS_CSHARP_MQTT_TRANSPORT_33_021: `EnableEventReceiveAsync` shall subscribe using the 'devices/{0}/modules/{1}/' topic filter.
// Codes_SRS_CSHARP_MQTT_TRANSPORT_33_022: `EnableEventReceiveAsync` shall wait for a SUBACK for the subscription request.
// Codes_SRS_CSHARP_MQTT_TRANSPORT_33_023: `EnableEventReceiveAsync` shall return failure if the subscription request fails.
await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(_receiveEventMessageFilter, _qos))).ConfigureAwait(true);
await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(_receiveEventMessageFilter, _qosReceivePacketFromService))).ConfigureAwait(true);
}

public override async Task DisableEventReceiveAsync(bool isAnEdgeModule, CancellationToken cancellationToken)
Expand Down Expand Up @@ -892,7 +896,7 @@ public override async Task EnableTwinPatchAsync(CancellationToken cancellationTo
// Codes_SRS_CSHARP_MQTT_TRANSPORT_18_010: `EnableTwinPatchAsync` shall subscribe using the '$iothub/twin/PATCH/properties/desired/#' topic filter.
// Codes_SRS_CSHARP_MQTT_TRANSPORT_18_011: `EnableTwinPatchAsync` shall wait for a SUBACK on the subscription request.
// Codes_SRS_CSHARP_MQTT_TRANSPORT_18_012: `EnableTwinPatchAsync` shall return failure if the subscription request fails.
await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(TwinPatchTopicFilter, QualityOfService.AtMostOnce))).ConfigureAwait(true);
await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(TwinPatchTopicFilter, _qosReceivePacketFromService))).ConfigureAwait(true);

if (Logging.IsEnabled)
Logging.Exit(this, cancellationToken, nameof(EnableTwinPatchAsync));
Expand Down Expand Up @@ -1081,7 +1085,7 @@ private async Task SubscribeCloudToDeviceMessagesAsync()
if (TryStateTransition(TransportState.Open, TransportState.Subscribing))
{
await _channel
.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(_deviceboundMessageFilter, QualityOfService.AtLeastOnce)))
.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(_deviceboundMessageFilter, _qosReceivePacketFromService)))
.ConfigureAwait(true);

if (TryStateTransition(TransportState.Subscribing, TransportState.Receiving)
Expand All @@ -1100,7 +1104,7 @@ private Task SubscribeTwinResponsesAsync()
0,
new SubscriptionRequest(
TwinResponseTopicFilter,
QualityOfService.AtMostOnce)));
_qosReceivePacketFromService)));
}

private bool ParseResponseTopic(string topicName, out string rid, out int status)
Expand Down