Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass byte[] through as payload for C2D for binary payloads #3229

Merged
merged 4 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 18 additions & 24 deletions e2e/Tests/helpers/TaskCompletionSourceHelper.cs
Original file line number Diff line number Diff line change
@@ -1,37 +1,31 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Azure.Devices.E2ETests.helpers
namespace Microsoft.Azure.Devices.E2ETests
{
public class TaskCompletionSourceHelper
/// <summary>
/// Modern .NET supports waiting on the TaskCompletionSource with a cancellation token, but older ones
/// do not. We can bind that task with a call to Task.Delay to get the same effect, though.
drwill-ms marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
internal static class TaskCompletionSourceHelper
{
/// <summary>
/// Gets the result of the provided task completion source or throws OperationCancelledException if the provided
/// cancellation token is cancelled beforehand.
/// </summary>
/// <typeparam name="T">The type of the result of the task completion source.</typeparam>
/// <param name="taskCompletionSource">The task completion source to asynchronously wait for the result of.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>The result of the provided task completion source if it completes before the provided cancellation token is cancelled.</returns>
/// <exception cref="OperationCanceledException">If the cancellation token is cancelled before the provided task completion source finishes.</exception>
public static async Task<T> GetTaskCompletionSourceResultAsync<T>(TaskCompletionSource<T> taskCompletionSource, CancellationToken cancellationToken)
internal static async Task<T> WaitAsync<T>(this TaskCompletionSource<T> taskCompletionSource, CancellationToken ct)
{
// Note that Task.Delay(-1, cancellationToken) effectively waits until the cancellation token is cancelled. The -1 value
// just means that the task is allowed to run indefinitely.
Task finishedTask = await Task.WhenAny(taskCompletionSource.Task, Task.Delay(-1, cancellationToken)).ConfigureAwait(false);
#if NET6_0_OR_GREATER
return await taskCompletionSource.Task.WaitAsync(ct).ConfigureAwait(false);
drwill-ms marked this conversation as resolved.
Show resolved Hide resolved
#else
await Task
drwill-ms marked this conversation as resolved.
Show resolved Hide resolved
.WhenAny(
taskCompletionSource.Task,
Task.Delay(-1, ct))
.ConfigureAwait(false);

// If the finished task is not the cancellation token
if (finishedTask is Task<T>)
{
return await ((Task<T>)finishedTask).ConfigureAwait(false);
}

// Otherwise throw operation cancelled exception since the cancellation token was cancelled before the task finished.
throw new OperationCanceledException();
ct.ThrowIfCancellationRequested();
return await taskCompletionSource.Task.ConfigureAwait(false);
#endif
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async Task TestOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler t

// D2C Operation
VerboseTestLogger.WriteLine($"{nameof(CombinedClientOperationsPoolAmqpTests)}: Operation 1: Send D2C for device={testDevice.Id}");
TelemetryMessage message = TelemetryE2ETests.ComposeD2cTestMessage(out string _, out string _);
TelemetryMessage message = TelemetryMessageE2eTests.ComposeD2cTestMessage(out string _, out string _);
Task sendD2cMessage = testDevice.DeviceClient.SendTelemetryAsync(message);
clientOperations.Add(sendD2cMessage);

Expand All @@ -105,7 +105,7 @@ async Task TestOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler t
OutgoingMessage msg = msgSent.Item1;
string payload = msgSent.Item2;

Task verifyDeviceClientReceivesMessage = MessageReceiveE2ETests.VerifyReceivedC2dMessageAsync(testDevice.DeviceClient, testDevice.Id, msg, payload);
Task verifyDeviceClientReceivesMessage = IncomingMessageCallbackE2eTests.VerifyReceivedC2dMessageAsync(testDevice.DeviceClient, testDevice.Id, msg, payload);
clientOperations.Add(verifyDeviceClientReceivesMessage);

// Invoke direct methods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private static async Task SendMessageTestAsync(IotHubClientTransportSettings tra
await using TestDevice testDevice = await TestDevice.GetTestDeviceAsync(s_devicePrefix, TestDeviceType.X509).ConfigureAwait(false);
IotHubDeviceClient deviceClient = testDevice.CreateDeviceClient(new IotHubClientOptions(transportSetting));
await testDevice.OpenWithRetryAsync().ConfigureAwait(false);
TelemetryMessage message = TelemetryE2ETests.ComposeD2cTestMessage(out string _, out string _);
TelemetryMessage message = TelemetryMessageE2eTests.ComposeD2cTestMessage(out string _, out string _);
await deviceClient.SendTelemetryAsync(message).ConfigureAwait(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Microsoft.Azure.Devices.E2ETests.Methods
[TestClass]
[TestCategory("E2E")]
[TestCategory("IoTHub-Client")]
public class MethodE2ECustomPayloadTests : E2EMsTestBase
public class DirectMethodE2eCustomPayloadTests : E2EMsTestBase
{
private static readonly DirectMethodRequestPayload _customTypeRequest = new() { DesiredState = "on" };
private static readonly DirectMethodResponsePayload _customTypeResponse = new() { CurrentState = "off" };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ async Task TestInitAsync(IotHubDeviceClient deviceClient, TestDevice testDevice,

async Task TestOperationAsync(IotHubDeviceClient deviceClient, TestDevice testDevice, TestDeviceCallbackHandler _)
{
TelemetryMessage testMessage = TelemetryE2ETests.ComposeD2cTestMessage(out string payload, out string p1Value);
TelemetryMessage testMessage = TelemetryMessageE2eTests.ComposeD2cTestMessage(out string payload, out string p1Value);

VerboseTestLogger.WriteLine($"{nameof(FaultInjectionPoolAmqpTests)}.{testDevice.Id}: payload='{payload}' p1Value='{p1Value}'");
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ namespace Microsoft.Azure.Devices.E2ETests.Messaging
[TestClass]
[TestCategory("E2E")]
[TestCategory("IoTHub-Client")]
public class MessageReceiveE2EPoolAmqpTests : E2EMsTestBase
public class IncomingMessageCallbackE2ePoolAmqpTests : E2EMsTestBase
{
private readonly string DevicePrefix = $"{nameof(MessageReceiveE2EPoolAmqpTests)}_";
private readonly string DevicePrefix = $"{nameof(IncomingMessageCallbackE2ePoolAmqpTests)}_";

[TestMethod]
[Timeout(TestTimeoutMilliseconds)]
Expand Down Expand Up @@ -132,11 +132,11 @@ async Task InitOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler _

async Task TestOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler _)
{
VerboseTestLogger.WriteLine($"{nameof(MessageReceiveE2EPoolAmqpTests)}: Preparing to receive message for device {testDevice.Id}");
VerboseTestLogger.WriteLine($"{nameof(IncomingMessageCallbackE2ePoolAmqpTests)}: Preparing to receive message for device {testDevice.Id}");
await testDevice.OpenWithRetryAsync().ConfigureAwait(false);

Tuple<OutgoingMessage, string> msgSent = messagesSent[testDevice.Id];
await MessageReceiveE2ETests.VerifyReceivedC2dMessageAsync(testDevice.DeviceClient, testDevice.Id, msgSent.Item1, msgSent.Item2).ConfigureAwait(false);
await IncomingMessageCallbackE2eTests.VerifyReceivedC2dMessageAsync(testDevice.DeviceClient, testDevice.Id, msgSent.Item1, msgSent.Item2).ConfigureAwait(false);
}

await PoolingOverAmqp
Expand Down Expand Up @@ -176,7 +176,7 @@ async Task InitOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler t

async Task TestOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler testDeviceCallbackHandler)
{
VerboseTestLogger.WriteLine($"{nameof(MessageReceiveE2EPoolAmqpTests)}: Preparing to receive message for device {testDevice.Id}");
VerboseTestLogger.WriteLine($"{nameof(IncomingMessageCallbackE2ePoolAmqpTests)}: Preparing to receive message for device {testDevice.Id}");

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20));
await testDeviceCallbackHandler.WaitForReceiveMessageCallbackAsync(cts.Token).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.E2ETests.helpers;
using Microsoft.Azure.Devices.E2ETests.Helpers;
using Microsoft.Azure.Devices.E2ETests.Helpers.Templates;
using Microsoft.VisualStudio.TestTools.UnitTesting;
Expand All @@ -20,9 +20,9 @@ namespace Microsoft.Azure.Devices.E2ETests.Messaging
[TestCategory("E2E")]
[TestCategory("IoTHub-Client")]
[TestCategory("LongRunning")]
public class MessageReceiveE2ETests : E2EMsTestBase
public class IncomingMessageCallbackE2eTests : E2EMsTestBase
{
private static readonly string s_devicePrefix = $"{nameof(MessageReceiveE2ETests)}_";
private static readonly string s_devicePrefix = $"{nameof(IncomingMessageCallbackE2eTests)}_";

private static readonly TimeSpan s_oneSecond = TimeSpan.FromSeconds(1);
private static readonly TimeSpan s_fiveSeconds = TimeSpan.FromSeconds(5);
Expand Down Expand Up @@ -88,16 +88,14 @@ public static async Task VerifyReceivedC2dMessageAsync(IotHubDeviceClient dc, st

using var cts = new CancellationTokenSource(s_oneMinute);
var c2dMessageReceived = new TaskCompletionSource<IncomingMessage>(TaskCreationOptions.RunContinuationsAsynchronously);
Func<IncomingMessage, Task<MessageAcknowledgement>> OnC2DMessageReceived = (message) =>
Task<MessageAcknowledgement> OnC2DMessageReceived(IncomingMessage message)
{
c2dMessageReceived.TrySetResult(message);
return Task.FromResult(MessageAcknowledgement.Complete);
};
}
await dc.SetIncomingMessageCallbackAsync(OnC2DMessageReceived).ConfigureAwait(false);

IncomingMessage receivedMessage = await TaskCompletionSourceHelper
.GetTaskCompletionSourceResultAsync(c2dMessageReceived, cts.Token)
.ConfigureAwait(false);
drwill-ms marked this conversation as resolved.
Show resolved Hide resolved
IncomingMessage receivedMessage = await c2dMessageReceived.WaitAsync(cts.Token).ConfigureAwait(false);

receivedMessage.MessageId.Should().Be(message.MessageId, "Received message Id is not what was sent by service");
receivedMessage.UserId.Should().Be(message.UserId, "Received user Id is not what was sent by service");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ namespace Microsoft.Azure.Devices.E2ETests.Messaging
[TestClass]
[TestCategory("FaultInjection")]
[TestCategory("IoTHub-Client")]
public partial class MessageReceiveFaultInjectionTests : E2EMsTestBase
public partial class IncomingMessageCallbackFaultInjectionTests : E2EMsTestBase
{
private readonly string DevicePrefix = $"{nameof(MessageReceiveFaultInjectionTests)}_";
private readonly string DevicePrefix = $"{nameof(IncomingMessageCallbackFaultInjectionTests)}_";

[TestMethod]
[Timeout(TestTimeoutMilliseconds)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ async Task InitAsync(IotHubDeviceClient deviceClient, TestDevice testDevice)

async Task TestOperationAsync(IotHubDeviceClient deviceClient, TestDevice testDevice)
{
TelemetryMessage testMessage = TelemetryE2ETests.ComposeD2cTestMessage(out string _, out string _);
TelemetryMessage testMessage = TelemetryMessageE2eTests.ComposeD2cTestMessage(out string _, out string _);
using var cts = new CancellationTokenSource(operationTimeout);
await deviceClient.SendTelemetryAsync(testMessage, cts.Token).ConfigureAwait(false);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Microsoft.Azure.Devices.E2ETests.Messaging
[TestClass]
[TestCategory("E2E")]
[TestCategory("IoTHub-Client")]
public partial class TelemetryE2ETests : E2EMsTestBase
public partial class TelemetryMessageE2eTests : E2EMsTestBase
{
private const int MessageBatchCount = 5;

Expand All @@ -28,7 +28,7 @@ public partial class TelemetryE2ETests : E2EMsTestBase
// the message size is less than 1 MB.
private const int OverlyExceedAllowedMessageSizeInBytes = 3000 * 1024;

private readonly string _idPrefix = $"{nameof(TelemetryE2ETests)}_";
private readonly string _idPrefix = $"{nameof(TelemetryMessageE2eTests)}_";
private static readonly string s_proxyServerAddress = TestConfiguration.IotHub.ProxyServerAddress;

[TestMethod]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ namespace Microsoft.Azure.Devices.E2ETests.Messaging
[TestClass]
[TestCategory("E2E")]
[TestCategory("IoTHub-Client")]
public class MessageSendE2EPoolAmqpTests : E2EMsTestBase
public class TelemetryMessageSendE2ePoolAmqpTests : E2EMsTestBase
{
private readonly string _devicePrefix = $"{nameof(MessageSendE2EPoolAmqpTests)}_";
private readonly string _devicePrefix = $"{nameof(TelemetryMessageSendE2ePoolAmqpTests)}_";

[TestMethod]
[Timeout(LongRunningTestTimeoutMilliseconds)]
Expand Down Expand Up @@ -53,8 +53,8 @@ async Task InitAsync(TestDevice testDevice, TestDeviceCallbackHandler c)

async Task TestOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler _)
{
TelemetryMessage testMessage = TelemetryE2ETests.ComposeD2cTestMessage(out string payload, out string p1Value);
VerboseTestLogger.WriteLine($"{nameof(MessageSendE2EPoolAmqpTests)}.{testDevice.Id}: messageId='{testMessage.MessageId}' payload='{payload}' p1Value='{p1Value}'");
TelemetryMessage testMessage = TelemetryMessageE2eTests.ComposeD2cTestMessage(out string payload, out string p1Value);
VerboseTestLogger.WriteLine($"{nameof(TelemetryMessageSendE2ePoolAmqpTests)}.{testDevice.Id}: messageId='{testMessage.MessageId}' payload='{payload}' p1Value='{p1Value}'");
await testDevice.DeviceClient.SendTelemetryAsync(testMessage).ConfigureAwait(false);
}

Expand Down
21 changes: 10 additions & 11 deletions e2e/Tests/iothub/service/FileUploadNotificationE2ETest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ public class FileUploadNotificationE2ETest : E2EMsTestBase
[DataRow(IotHubTransportProtocol.WebSocket, 1, true)]
public async Task FileUploadNotification_FileUploadNotificationProcessor_ReceivesNotifications(IotHubTransportProtocol protocol, int filesToUpload, bool shouldReconnect)
{
// arrange

var options = new IotHubServiceClientOptions
{
Protocol = protocol
};

using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(TestTimeoutMilliseconds));
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
drwill-ms marked this conversation as resolved.
Show resolved Hide resolved
using var serviceClient = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionString, options);
using StorageContainer storage = await StorageContainer.GetInstanceAsync("fileupload", false).ConfigureAwait(false);
using var fileNotification = new SemaphoreSlim(1, 1);
Expand Down Expand Up @@ -94,22 +96,23 @@ async Task<AcknowledgementType> OnFileUploadNotificationReceived(FileUploadNotif
await serviceClient.FileUploadNotifications.OpenAsync(cts.Token).ConfigureAwait(false);
}

// act
for (int i = 0; i < filesToUpload; ++i)
{
string fileName = $"TestPayload-{Guid.NewGuid()}.txt";
files.Add(fileName, false);
await UploadFile(fileName, cts.Token).ConfigureAwait(false);
}

await Task
.WhenAny(
allFilesFound.Task,
Task.Delay(-1, cts.Token))
.ConfigureAwait(false);
VerboseTestLogger.WriteLine($"Waiting on file upload notification...");
await allFilesFound.WaitAsync(cts.Token).ConfigureAwait(false);

// assert
allFilesFound.Task.IsCompleted.Should().BeTrue();
}
finally
{
VerboseTestLogger.WriteLine($"Cleanup: closing client...");
await serviceClient.FileUploadNotifications.CloseAsync().ConfigureAwait(false);
}
}
Expand Down Expand Up @@ -149,11 +152,7 @@ public async Task FileUploadNotification_ErrorProcessor_ReceivesNotifications(Io
// file upload notification without closing and re-opening as long as there is more
// than one file upload notification to consume.
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(TestTimeoutMilliseconds));
await Task
.WhenAny(
errorProcessorNotified.Task,
Task.Delay(-1, cts.Token))
.ConfigureAwait(false);
await errorProcessorNotified.WaitAsync(cts.Token).ConfigureAwait(false);
errorProcessorNotified.Task.IsCompleted.Should().BeTrue();
}
finally
Expand Down
16 changes: 4 additions & 12 deletions e2e/Tests/iothub/service/MessageFeedbackReceiverE2ETest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,14 @@ Task<MessageAcknowledgement> OnC2DMessageReceived(IncomingMessage message)
await serviceClient.Messages.SendAsync(testDevice.Device.Id, message).ConfigureAwait(false);

// Wait for the device to receive the message.
await Task
.WhenAny(
Task.Delay(TimeSpan.FromSeconds(20)),
c2dMessageReceived.Task)
.ConfigureAwait(false);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20));
await c2dMessageReceived.WaitAsync(cts.Token).ConfigureAwait(false);

c2dMessageReceived.Task.IsCompleted.Should().BeTrue("Timed out waiting for C2D message to be received by device");

// Wait for the service to receive the feedback message.
await Task
.WhenAny(
// Wait for up to 200 seconds for the feedback message as the service may not send messages
// until they can batch others, even up to a minute later.
Task.Delay(TimeSpan.FromSeconds(200)),
feedbackMessageReceived.Task)
.ConfigureAwait(false);
using var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(200));
await feedbackMessageReceived.WaitAsync(cts2.Token).ConfigureAwait(false);

feedbackMessageReceived.Task.IsCompleted.Should().BeTrue("service client never received c2d feedback message even though the device received the message");
}
Expand Down
Loading