Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2 - bug fix] Queue MQTT C2D messages if callback not set #3336

Merged
merged 6 commits into from
Jun 13, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
cr changes, added e2e test
tmahmood-microsoft committed Jun 8, 2023
commit b48fef381afecd4cb2c72559d7bde3e778fb0935
40 changes: 40 additions & 0 deletions e2e/Tests/iothub/device/IncomingMessageCallbackE2eTests.cs
Original file line number Diff line number Diff line change
@@ -80,6 +80,46 @@ public async Task DeviceReceiveMessageAfterOpenCloseOpen_Mqtt()
await ReceiveMessageAfterOpenCloseOpenAsync(new IotHubClientMqttSettings(), ct).ConfigureAwait(false);
}

[TestMethod]
public async Task DeviceReceiveMessageSetCallbackAfterReconnect_Mqtt()
{
// Setting up one cancellation token for the complete test flow
using var cts = new CancellationTokenSource(s_testTimeout);
CancellationToken ct = cts.Token;

await ReceiveMessageSetCallbackAfterReconnect(new IotHubClientMqttSettings(), ct).ConfigureAwait(false);
}

private static async Task ReceiveMessageSetCallbackAfterReconnect(IotHubClientTransportSettings transportSettings, CancellationToken ct)
{
await using TestDevice testDevice = await TestDevice.GetTestDeviceAsync(s_devicePrefix, ct: ct).ConfigureAwait(false);
IotHubDeviceClient deviceClient = testDevice.CreateDeviceClient(new IotHubClientOptions(transportSettings));
await testDevice.OpenWithRetryAsync(ct).ConfigureAwait(false);
using var deviceHandler = new TestDeviceCallbackHandler(deviceClient, testDevice.Id);

IotHubServiceClient serviceClient = TestDevice.ServiceClient;

await serviceClient.Messages.OpenAsync(ct).ConfigureAwait(false);

await deviceHandler.SetIncomingMessageCallbackHandlerAndCompleteMessageAsync<string>(ct).ConfigureAwait(false);
await deviceClient.CloseAsync(ct).ConfigureAwait(false);


// Now, send a message to the device from the service.
OutgoingMessage firstMsg = OutgoingMessageHelper.ComposeTestMessage(out string _, out string _);
deviceHandler.ExpectedOutgoingMessage = firstMsg;
tmahmood-microsoft marked this conversation as resolved.
Show resolved Hide resolved
await serviceClient.Messages.SendAsync(testDevice.Id, firstMsg, ct).ConfigureAwait(false);
VerboseTestLogger.WriteLine($"Sent C2D message from service, messageId={firstMsg.MessageId} - to be received on callback");

// re-open the client under test.
await deviceClient.OpenAsync(ct).ConfigureAwait(false);
Thread.Sleep(20000);
tmahmood-microsoft marked this conversation as resolved.
Show resolved Hide resolved
await deviceHandler.SetIncomingMessageCallbackHandlerAndCompleteMessageAsync<string>(ct).ConfigureAwait(false);

// Now, set a callback on the device client to receive C2D messages.
await deviceHandler.WaitForIncomingMessageCallbackAsync(ct).ConfigureAwait(false);
}

private static async Task ReceiveMessageUsingCallbackAndUnsubscribeAsync(IotHubClientTransportSettings transportSettings, CancellationToken ct)
{
await using TestDevice testDevice = await TestDevice.GetTestDeviceAsync(s_devicePrefix, ct: ct).ConfigureAwait(false);
2 changes: 1 addition & 1 deletion iothub/device/src/Pipeline/MqttTransportHandler.cs
Original file line number Diff line number Diff line change
@@ -185,7 +185,7 @@ internal MqttTransportHandler(PipelineContext context, IDelegatingHandler nextHa

_hostName = context.IotHubConnectionCredentials.HostName;
_connectionCredentials = context.IotHubConnectionCredentials;
_messageQueueSize = _mqttTransportSettings.MessageQueueSize;
_messageQueueSize = _mqttTransportSettings.IncomingMessageQueueSize;

if (_mqttTransportSettings.Protocol == IotHubClientTransportProtocol.Tcp)
{
Original file line number Diff line number Diff line change
@@ -96,13 +96,14 @@ public IotHubClientMqttSettings(IotHubClientTransportProtocol transportProtocol
public RemoteCertificateValidationCallback RemoteCertificateValidationCallback { get; set; }

/// <summary>
/// C2D message queue size when callback has not been set.
/// Cloud-to-device message queue size for storing offline messages when the client connects with <see cref="CleanSession" /> set to <c>false</c>.
/// </summary>
/// <remarks>
/// If C2D message callback is not set, messages from service are received and stored in a queue until a callback is set.
/// If the number of messages sent is greater than queue size, older messages are removed as latest messages are added.
/// If incoming message callback is not set, messages from service are received and stored in a queue until a callback is set.
/// If the number of messages sent is greater than queue size, older messages are removed as newer messages are added.
/// Setting this to a low value can cause offline undelivered messages to get lost.
/// </remarks>
public int MessageQueueSize { get; set; } = 10;
public int IncomingMessageQueueSize { get; set; } = 10;
tmahmood-microsoft marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Used by Edge runtime to specify an authentication chain for Edge-to-Edge connections.
@@ -125,7 +126,7 @@ internal override IotHubClientTransportSettings Clone()
WillMessage = WillMessage,
RemoteCertificateValidationCallback = RemoteCertificateValidationCallback,
AuthenticationChain = AuthenticationChain,
MessageQueueSize = MessageQueueSize,
IncomingMessageQueueSize = IncomingMessageQueueSize,
};
}
}