Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2 - bug fix] Queue MQTT C2D messages if callback not set #3336

Merged
merged 6 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions iothub/device/src/Pipeline/MqttTransportHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ namespace Microsoft.Azure.Devices.Client.Transport
internal sealed class MqttTransportHandler : TransportHandlerBase, IDisposable
{
private const int ProtocolGatewayPort = 8883;
private const int MessageQueueSize = 10;

private const string DeviceToCloudMessagesTopicFormat = "devices/{0}/messages/events/";
private const string ModuleToCloudMessagesTopicFormat = "devices/{0}/modules/{1}/messages/events/";
Expand Down Expand Up @@ -106,6 +105,7 @@ internal sealed class MqttTransportHandler : TransportHandlerBase, IDisposable

private bool _isSubscribedToTwinResponses;
private bool _isDeviceReceiveMessageCallbackSet;
private int _messageQueueSize;

private readonly string _deviceId;
private readonly string _moduleId;
Expand Down Expand Up @@ -185,6 +185,7 @@ internal MqttTransportHandler(PipelineContext context, IDelegatingHandler nextHa

_hostName = context.IotHubConnectionCredentials.HostName;
_connectionCredentials = context.IotHubConnectionCredentials;
_messageQueueSize = _mqttTransportSettings.MessageQueueSize;

if (_mqttTransportSettings.Protocol == IotHubClientTransportProtocol.Tcp)
{
Expand Down Expand Up @@ -1101,6 +1102,8 @@ private async Task HandleReceivedMessageAsync(MqttApplicationMessageReceivedEven

if (topic.StartsWith(_deviceBoundMessagesTopic, StringComparison.InvariantCulture))
{
// If C2D message callback is not set, messages are added to a queue to be processed later.
// However, the messages are still being ack'ed right away.
await HandleReceivedCloudToDeviceMessageAsync(ProcessC2DMessage(receivedEventArgs)).ConfigureAwait(false);
await receivedEventArgs.AcknowledgeAsync(CancellationToken.None).ConfigureAwait(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comments here explaining that messages are being added to a queue for later processing, and are being ack'ed immediately

}
Expand Down Expand Up @@ -1144,13 +1147,13 @@ private async Task HandleReceivedCloudToDeviceMessageAsync(IncomingMessage recei
else if(!_isDeviceReceiveMessageCallbackSet)
{
_messageQueue.Enqueue(receivedCloudToDeviceMessage);
tmahmood-microsoft marked this conversation as resolved.
Show resolved Hide resolved
while(_messageQueue.Count > MessageQueueSize)
while(_messageQueue.Count > _messageQueueSize)
{
_messageQueue.TryDequeue(out _);
if (Logging.IsEnabled)
{
Logging.Info(this, $"Queue size of {MessageQueueSize} for C2D messages has been reached, removing oldest queued C2D message. " +
$"To avoid losing further messages, set SetIncomingMessageCallbackAsync() to process the messages.");
Logging.Info(this, $"Queue size of {_messageQueueSize} for C2D messages has been reached, removing oldest queued C2D message. " +
$"To avoid losing further messages, set SetIncomingMessageCallbackAsync() to process the messages or increase message queue size in IotHubClientMqttSettings.");
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions iothub/device/src/TransportSettings/IotHubClientMqttSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ public IotHubClientMqttSettings(IotHubClientTransportProtocol transportProtocol
/// </remarks>
public RemoteCertificateValidationCallback RemoteCertificateValidationCallback { get; set; }

/// <summary>
/// C2D message queue size when callback has not been set.
tmahmood-microsoft marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
/// <remarks>
/// If C2D message callback is not set, messages from service are received and stored in a queue until a callback is set.
tmahmood-microsoft marked this conversation as resolved.
Show resolved Hide resolved
/// If the number of messages sent is greater than queue size, older messages are removed as latest messages are added.
tmahmood-microsoft marked this conversation as resolved.
Show resolved Hide resolved
/// </remarks>
public int MessageQueueSize { get; set; } = 10;
tmahmood-microsoft marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Used by Edge runtime to specify an authentication chain for Edge-to-Edge connections.
/// </summary>
Expand All @@ -116,6 +125,7 @@ internal override IotHubClientTransportSettings Clone()
WillMessage = WillMessage,
RemoteCertificateValidationCallback = RemoteCertificateValidationCallback,
AuthenticationChain = AuthenticationChain,
MessageQueueSize = MessageQueueSize,
};
}
}
Expand Down