Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2 - bug fix] Queue MQTT C2D messages if callback not set #3336

Merged
merged 6 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 5 additions & 12 deletions iothub/device/src/IotHubBaseClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ public async Task SetIncomingMessageCallbackAsync(
finally
{
_receiveMessageSemaphore.Release();
if (messageCallback != null)
{
await InnerHandler.EnsurePendingMessagesAreDeliveredAsync(cancellationToken).ConfigureAwait(false);
}

if (Logging.IsEnabled)
Logging.Exit(this, messageCallback, nameof(SetIncomingMessageCallbackAsync));
Expand Down Expand Up @@ -780,18 +784,7 @@ internal async Task<MessageAcknowledgement> OnMessageReceivedAsync(IncomingMessa

try
{
Func<IncomingMessage, Task<MessageAcknowledgement>> callback = _receiveMessageCallback;

if (callback != null)
{
return await callback.Invoke(message).ConfigureAwait(false);
}

// The SDK should only receive messages when the user sets a listener, so this should never happen.
if (Logging.IsEnabled)
Logging.Error(this, $"Received a message when no listener was set. Abandoning message with message Id: {message.MessageId}.", nameof(OnMessageReceivedAsync));

return MessageAcknowledgement.Abandon;
return await _receiveMessageCallback.Invoke(message).ConfigureAwait(false);
}
catch (Exception ex)
{
Expand Down
7 changes: 7 additions & 0 deletions iothub/device/src/Pipeline/AmqpTransportHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,13 @@ public override async Task EnableReceiveMessageAsync(CancellationToken cancellat
}
}

// This method is added to ensure that over MQTT devices can receive messages that were sent when it was disconnected.
// This behavior is available by default over AMQP, so no additional implementation is required here.
public override Task EnsurePendingMessagesAreDeliveredAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}

public override async Task DisableReceiveMessageAsync(CancellationToken cancellationToken)
{
if (Logging.IsEnabled)
Expand Down
8 changes: 8 additions & 0 deletions iothub/device/src/Pipeline/DefaultDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ public virtual Task EnableReceiveMessageAsync(CancellationToken cancellationToke
return NextHandler?.EnableReceiveMessageAsync(cancellationToken) ?? Task.CompletedTask;
}

// This is to ensure that if device connects over MQTT with CleanSession flag set to false,
// then any message sent while the device was disconnected is delivered on the callback.
public virtual Task EnsurePendingMessagesAreDeliveredAsync(CancellationToken cancellationToken)
{
ThrowIfDisposed();
return NextHandler?.EnsurePendingMessagesAreDeliveredAsync(cancellationToken);
}

public virtual Task DisableReceiveMessageAsync(CancellationToken cancellationToken)
{
ThrowIfDisposed();
Expand Down
7 changes: 7 additions & 0 deletions iothub/device/src/Pipeline/ExceptionRemappingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ public override Task EnableReceiveMessageAsync(CancellationToken cancellationTok
return ExecuteWithExceptionRemappingAsync(() => base.EnableReceiveMessageAsync(cancellationToken));
}

// This is to ensure that if device connects over MQTT with CleanSession flag set to false,
// then any message sent while the device was disconnected is delivered on the callback.
public override Task EnsurePendingMessagesAreDeliveredAsync(CancellationToken cancellationToken)
{
return ExecuteWithExceptionRemappingAsync(() => base.EnsurePendingMessagesAreDeliveredAsync(cancellationToken));
}

public override Task DisableReceiveMessageAsync(CancellationToken cancellationToken)
{
return ExecuteWithExceptionRemappingAsync(() => base.DisableReceiveMessageAsync(cancellationToken));
Expand Down
4 changes: 4 additions & 0 deletions iothub/device/src/Pipeline/IDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ internal interface IDelegatingHandler : IContinuationProvider<IDelegatingHandler

Task EnableReceiveMessageAsync(CancellationToken cancellationToken);

// This is to ensure that if device connects over MQTT with CleanSession flag set to false,
// then any message sent while the device was disconnected is delivered on the callback.
Task EnsurePendingMessagesAreDeliveredAsync(CancellationToken cancellationToken);

Task DisableReceiveMessageAsync(CancellationToken cancellationToken);

// Methods.
Expand Down
79 changes: 57 additions & 22 deletions iothub/device/src/Pipeline/MqttTransportHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ internal sealed class MqttTransportHandler : TransportHandlerBase, IDisposable
private const string ModuleToCloudMessagesTopicFormat = "devices/{0}/modules/{1}/messages/events/";
private readonly string _deviceToCloudMessagesTopic;
private readonly string _moduleToCloudMessagesTopic;
private readonly ConcurrentQueue<IncomingMessage> _messageQueue;

// Topic names for receiving cloud-to-device messages.

Expand Down Expand Up @@ -103,6 +104,8 @@ internal sealed class MqttTransportHandler : TransportHandlerBase, IDisposable
private readonly Timer _twinTimeoutTimer;

private bool _isSubscribedToTwinResponses;
private bool _isDeviceReceiveMessageCallbackSet;
private int _messageQueueSize;

private readonly string _deviceId;
private readonly string _moduleId;
Expand Down Expand Up @@ -178,9 +181,11 @@ internal MqttTransportHandler(PipelineContext context, IDelegatingHandler nextHa
var mqttFactory = new MqttFactory(new MqttLogger());
_mqttClient = mqttFactory.CreateMqttClient();
_mqttClientOptionsBuilder = new MqttClientOptionsBuilder();
_messageQueue = new ConcurrentQueue<IncomingMessage>();

_hostName = context.IotHubConnectionCredentials.HostName;
_connectionCredentials = context.IotHubConnectionCredentials;
_messageQueueSize = _mqttTransportSettings.MessageQueueSize;

if (_mqttTransportSettings.Protocol == IotHubClientTransportProtocol.Tcp)
{
Expand Down Expand Up @@ -579,6 +584,7 @@ public override async Task EnableReceiveMessageAsync(CancellationToken cancellat
await SubscribeAsync(_moduleEventMessageTopic, cancellationToken).ConfigureAwait(false);
}
}
_isDeviceReceiveMessageCallbackSet = true;
}
catch (Exception ex) when (ex is not IotHubClientException && ex is not OperationCanceledException)
{
Expand All @@ -594,6 +600,26 @@ public override async Task EnableReceiveMessageAsync(CancellationToken cancellat
}
}

public override async Task EnsurePendingMessagesAreDeliveredAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

// If the device connects with a CleanSession flag set to false, we will need to deliver the messages
// that were sent before the client had subscribed to the C2D message receive topic.
if (!_mqttTransportSettings.CleanSession)
{
IncomingMessage message = null;
// Received C2D messages are enqueued into _messageQueue.
while (!_messageQueue.IsEmpty)
{
if(_messageQueue.TryDequeue(out message))
{
await HandleReceivedCloudToDeviceMessageAsync(message).ConfigureAwait(false);
}
}
}
}

public override async Task DisableReceiveMessageAsync(CancellationToken cancellationToken)
{
if (Logging.IsEnabled)
Expand All @@ -616,6 +642,7 @@ public override async Task DisableReceiveMessageAsync(CancellationToken cancella
await UnsubscribeAsync(_moduleEventMessageTopic, cancellationToken).ConfigureAwait(false);
}
}
_isDeviceReceiveMessageCallbackSet = false;
}
catch (Exception ex) when (ex is not IotHubClientException && ex is not OperationCanceledException)
{
Expand Down Expand Up @@ -1054,14 +1081,31 @@ private Task HandleDisconnectionAsync(MqttClientDisconnectedEventArgs disconnect
return Task.CompletedTask;
}

private IncomingMessage ProcessC2DMessage(MqttApplicationMessageReceivedEventArgs receivedEventArgs)
{
byte[] payload = receivedEventArgs.ApplicationMessage.Payload;

var receivedCloudToDeviceMessage = new IncomingMessage(payload)
{
PayloadConvention = _payloadConvention,
};

PopulateMessagePropertiesFromMqttMessage(receivedCloudToDeviceMessage, receivedEventArgs.ApplicationMessage);

return receivedCloudToDeviceMessage;
}

private async Task HandleReceivedMessageAsync(MqttApplicationMessageReceivedEventArgs receivedEventArgs)
{
receivedEventArgs.AutoAcknowledge = false;
string topic = receivedEventArgs.ApplicationMessage.Topic;

if (topic.StartsWith(_deviceBoundMessagesTopic, StringComparison.InvariantCulture))
{
await HandleReceivedCloudToDeviceMessageAsync(receivedEventArgs).ConfigureAwait(false);
// If C2D message callback is not set, messages are added to a queue to be processed later.
// However, the messages are still being ack'ed right away.
await HandleReceivedCloudToDeviceMessageAsync(ProcessC2DMessage(receivedEventArgs)).ConfigureAwait(false);
await receivedEventArgs.AcknowledgeAsync(CancellationToken.None).ConfigureAwait(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comments here explaining that messages are being added to a queue for later processing, and are being ack'ed immediately

}
else if (topic.StartsWith(TwinDesiredPropertiesPatchTopic, StringComparison.InvariantCulture))
{
Expand All @@ -1088,18 +1132,9 @@ private async Task HandleReceivedMessageAsync(MqttApplicationMessageReceivedEven
}
}

private async Task HandleReceivedCloudToDeviceMessageAsync(MqttApplicationMessageReceivedEventArgs receivedEventArgs)
private async Task HandleReceivedCloudToDeviceMessageAsync(IncomingMessage receivedCloudToDeviceMessage)
{
byte[] payload = receivedEventArgs.ApplicationMessage.Payload;

var receivedCloudToDeviceMessage = new IncomingMessage(payload)
{
PayloadConvention = _payloadConvention,
};

PopulateMessagePropertiesFromMqttMessage(receivedCloudToDeviceMessage, receivedEventArgs.ApplicationMessage);

if (_messageReceivedListener != null)
if (_messageReceivedListener != null && _isDeviceReceiveMessageCallbackSet)
{
MessageAcknowledgement acknowledgementType = await _messageReceivedListener.Invoke(receivedCloudToDeviceMessage);

Expand All @@ -1108,18 +1143,18 @@ private async Task HandleReceivedCloudToDeviceMessageAsync(MqttApplicationMessag
if (Logging.IsEnabled)
Logging.Error(this, "Cannot 'reject' or 'abandon' a received message over MQTT. Message will be acknowledged as 'complete' instead.");
}

// Note that MQTT does not support Abandon or Reject, so always Complete by acknowledging the message like this.
try
{
await receivedEventArgs.AcknowledgeAsync(CancellationToken.None).ConfigureAwait(false);
}
catch (Exception ex)
}
else if(!_isDeviceReceiveMessageCallbackSet)
{
_messageQueue.Enqueue(receivedCloudToDeviceMessage);
tmahmood-microsoft marked this conversation as resolved.
Show resolved Hide resolved
while(_messageQueue.Count > _messageQueueSize)
{
// This likely happened because the connection was lost. The service will re-send this message so the user
// can acknowledge it on the new connection.
tmahmood-microsoft marked this conversation as resolved.
Show resolved Hide resolved
_messageQueue.TryDequeue(out _);
if (Logging.IsEnabled)
Logging.Error(this, $"Failed to send the acknowledgement for a received cloud to device message {ex}"); ;
{
Logging.Info(this, $"Queue size of {_messageQueueSize} for C2D messages has been reached, removing oldest queued C2D message. " +
$"To avoid losing further messages, set SetIncomingMessageCallbackAsync() to process the messages or increase message queue size in IotHubClientMqttSettings.");
}
}
}
else if (Logging.IsEnabled)
Expand Down
37 changes: 37 additions & 0 deletions iothub/device/src/Pipeline/RetryDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,43 @@ await _internalRetryHandler
}
}

// This is to ensure that if device connects over MQTT with CleanSession flag set to false,
// then any message sent while the device was disconnected is delivered on the callback.
public override async Task EnsurePendingMessagesAreDeliveredAsync(CancellationToken cancellationToken)
{
try
{
if (Logging.IsEnabled)
Logging.Enter(this, cancellationToken, nameof(EnsurePendingMessagesAreDeliveredAsync));

await _internalRetryHandler
.RunWithRetryAsync(
async () =>
{
// Wait to acquire the _cloudToDeviceMessageSubscriptionSemaphore. This ensures that concurrently invoked API calls are invoked in a thread-safe manner.
await _cloudToDeviceMessageSubscriptionSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);

try
{
// Ensure that a callback for receiving messages has been previously set.
Debug.Assert(_deviceReceiveMessageEnabled);
await base.EnsurePendingMessagesAreDeliveredAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
_cloudToDeviceMessageSubscriptionSemaphore?.Release();
}
},
cancellationToken)
.ConfigureAwait(false);
}
finally
{
if (Logging.IsEnabled)
Logging.Exit(this, cancellationToken, nameof(EnsurePendingMessagesAreDeliveredAsync));
}
}

public override async Task DisableReceiveMessageAsync(CancellationToken cancellationToken)
{
if (Logging.IsEnabled)
Expand Down
10 changes: 10 additions & 0 deletions iothub/device/src/TransportSettings/IotHubClientMqttSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ public IotHubClientMqttSettings(IotHubClientTransportProtocol transportProtocol
/// </remarks>
public RemoteCertificateValidationCallback RemoteCertificateValidationCallback { get; set; }

/// <summary>
/// C2D message queue size when callback has not been set.
tmahmood-microsoft marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
/// <remarks>
/// If C2D message callback is not set, messages from service are received and stored in a queue until a callback is set.
tmahmood-microsoft marked this conversation as resolved.
Show resolved Hide resolved
/// If the number of messages sent is greater than queue size, older messages are removed as latest messages are added.
tmahmood-microsoft marked this conversation as resolved.
Show resolved Hide resolved
/// </remarks>
public int MessageQueueSize { get; set; } = 10;
tmahmood-microsoft marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Used by Edge runtime to specify an authentication chain for Edge-to-Edge connections.
/// </summary>
Expand All @@ -116,6 +125,7 @@ internal override IotHubClientTransportSettings Clone()
WillMessage = WillMessage,
RemoteCertificateValidationCallback = RemoteCertificateValidationCallback,
AuthenticationChain = AuthenticationChain,
MessageQueueSize = MessageQueueSize,
};
}
}
Expand Down