Skip to content

Commit

Permalink
For c2d, take payload as object, similar to other operations
Browse files Browse the repository at this point in the history
  • Loading branch information
David R. Williamson committed Mar 13, 2023
1 parent f7bec7d commit ec34f3d
Show file tree
Hide file tree
Showing 27 changed files with 147 additions and 142 deletions.
69 changes: 35 additions & 34 deletions e2e/test/helpers/TestDeviceCallbackHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ internal sealed class TestDeviceCallbackHandler : IDisposable

private readonly SemaphoreSlim _receivedMessageCallbackSemaphore = new(0, 1);
private ExceptionDispatchInfo _receiveMessageExceptionDispatch;
private Message _expectedMessageSentByService;
private OutgoingMessage _expectedMessageSentByService;

internal TestDeviceCallbackHandler(IotHubDeviceClient deviceClient, TestDevice testDevice)
{
Expand All @@ -39,7 +39,7 @@ internal string ExpectedTwinPropertyValue
set => Volatile.Write(ref _expectedTwinPropertyValue, value);
}

internal Message ExpectedMessageSentByService
internal OutgoingMessage ExpectedMessageSentByService
{
get => Volatile.Read(ref _expectedMessageSentByService);
set => Volatile.Write(ref _expectedMessageSentByService, value);
Expand Down Expand Up @@ -129,10 +129,41 @@ internal async Task WaitForTwinCallbackAsync(CancellationToken ct)
_twinExceptionDispatch?.Throw();
}

internal async Task SetMessageReceiveCallbackHandlerAsync()
internal async Task SetMessageReceiveCallbackHandlerAsync<T>()
{
await _deviceClient.OpenAsync().ConfigureAwait(false);
await _deviceClient.SetIncomingMessageCallbackAsync(OnC2dMessageReceivedAsync).ConfigureAwait(false);
await _deviceClient.SetIncomingMessageCallbackAsync((IncomingMessage message) =>
{
VerboseTestLogger.WriteLine($"{nameof(SetMessageReceiveCallbackHandlerAsync)}: DeviceClient {_testDevice.Id} received message with Id: {message.MessageId}.");

try
{
if (ExpectedMessageSentByService != null)
{
message.MessageId.Should().Be(ExpectedMessageSentByService.MessageId, "Received message Id should match what was sent by service");
message.UserId.Should().Be(ExpectedMessageSentByService.UserId, "Received user Id should match what was sent by service");
message.TryGetPayload(out T payload).Should().BeTrue();

ExpectedMessageSentByService.Payload.Should().BeOfType<T>();
var expectedPayload = (T)ExpectedMessageSentByService.Payload;
payload.Should().Be(expectedPayload);
}

VerboseTestLogger.WriteLine($"{nameof(SetMessageReceiveCallbackHandlerAsync)}: DeviceClient completed message with Id: {message.MessageId}.");
return Task.FromResult(MessageAcknowledgement.Complete);
}
catch (Exception ex)
{
VerboseTestLogger.WriteLine($"{nameof(SetMessageReceiveCallbackHandlerAsync)}: Error during DeviceClient receive message callback: {ex}.");
_receiveMessageExceptionDispatch = ExceptionDispatchInfo.Capture(ex);
return Task.FromResult(MessageAcknowledgement.Abandon);
}
finally
{
// Always notify that we got the callback.
_receivedMessageCallbackSemaphore.Release();
}
}).ConfigureAwait(false);
}

internal async Task UnsetMessageReceiveCallbackHandlerAsync()
Expand All @@ -146,35 +177,5 @@ internal async Task WaitForReceiveMessageCallbackAsync(CancellationToken ct)
await _receivedMessageCallbackSemaphore.WaitAsync(ct).ConfigureAwait(false);
_receiveMessageExceptionDispatch?.Throw();
}

private Task<MessageAcknowledgement> OnC2dMessageReceivedAsync(IncomingMessage message)
{
VerboseTestLogger.WriteLine($"{nameof(SetMessageReceiveCallbackHandlerAsync)}: DeviceClient {_testDevice.Id} received message with Id: {message.MessageId}.");

try
{
if (ExpectedMessageSentByService != null)
{
message.MessageId.Should().Be(ExpectedMessageSentByService.MessageId, "Received message Id should match what was sent by service");
message.UserId.Should().Be(ExpectedMessageSentByService.UserId, "Received user Id should match what was sent by service");
message.TryGetPayload(out string payload).Should().BeTrue();
Encoding.UTF8.GetBytes(payload).Should().BeEquivalentTo(ExpectedMessageSentByService.Payload);
}

VerboseTestLogger.WriteLine($"{nameof(SetMessageReceiveCallbackHandlerAsync)}: DeviceClient completed message with Id: {message.MessageId}.");
return Task.FromResult(MessageAcknowledgement.Complete);
}
catch (Exception ex)
{
VerboseTestLogger.WriteLine($"{nameof(SetMessageReceiveCallbackHandlerAsync)}: Error during DeviceClient receive message callback: {ex}.");
_receiveMessageExceptionDispatch = ExceptionDispatchInfo.Capture(ex);
return Task.FromResult(MessageAcknowledgement.Abandon);
}
finally
{
// Always notify that we got the callback.
_receivedMessageCallbackSemaphore.Release();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private async Task DeviceCombinedClientOperationsAsync(
using var serviceClient = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionString);

// Message payload and properties for C2D operation
var messagesSent = new Dictionary<string, Tuple<Message, string>>();
var messagesSent = new Dictionary<string, Tuple<OutgoingMessage, string>>();

// Twin properties
var twinPropertyMap = new Dictionary<string, List<string>>();
Expand All @@ -65,7 +65,7 @@ async Task InitOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler _

// Send C2D Message
VerboseTestLogger.WriteLine($"{nameof(CombinedClientOperationsPoolAmqpTests)}: Send C2D for device={testDevice.Id}");
Message msg = MessageReceiveE2ETests.ComposeC2dTestMessage(out string payload, out string p1Value);
OutgoingMessage msg = MessageReceiveE2ETests.ComposeC2dTestMessage(out string payload, out string p1Value);
messagesSent.Add(testDevice.Id, Tuple.Create(msg, payload));
await serviceClient.Messages.OpenAsync().ConfigureAwait(false);
Task sendC2dMessage = serviceClient.Messages.SendAsync(testDevice.Id, msg);
Expand Down Expand Up @@ -100,8 +100,8 @@ async Task TestOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler _

// C2D Operation
VerboseTestLogger.WriteLine($"{nameof(CombinedClientOperationsPoolAmqpTests)}: Operation 2: Receive C2D for device={testDevice.Id}");
Tuple<Message, string> msgSent = messagesSent[testDevice.Id];
Message msg = msgSent.Item1;
Tuple<OutgoingMessage, string> msgSent = messagesSent[testDevice.Id];
OutgoingMessage msg = msgSent.Item1;
string payload = msgSent.Item2;

Task verifyDeviceClientReceivesMessage = MessageReceiveE2ETests.VerifyReceivedC2dMessageAsync(testDevice.DeviceClient, testDevice.Id, msg, payload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,12 @@ async Task InitOperationAsync(IotHubDeviceClient deviceClient, TestDevice testDe
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20));
await deviceClient.OpenAsync(cts.Token).ConfigureAwait(false);

await testDeviceCallbackHandler.SetMessageReceiveCallbackHandlerAsync().ConfigureAwait(false);
await testDeviceCallbackHandler.SetMessageReceiveCallbackHandlerAsync<string>().ConfigureAwait(false);
}

async Task TestOperationAsync(IotHubDeviceClient deviceClient, TestDevice testDevice, TestDeviceCallbackHandler testDeviceCallbackHandler)
{
Message msg = MessageReceiveE2ETests.ComposeC2dTestMessage(out string payload, out string p1Value);
OutgoingMessage msg = MessageReceiveE2ETests.ComposeC2dTestMessage(out string payload, out string p1Value);
testDeviceCallbackHandler.ExpectedMessageSentByService = msg;
await serviceClient.Messages.SendAsync(testDevice.Id, msg).ConfigureAwait(false);

Expand Down
10 changes: 5 additions & 5 deletions e2e/test/iothub/device/MessageReceiveE2EPoolAmqpTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,13 @@ private async Task ReceiveMessage_PoolOverAmqpAsync(
int devicesCount,
ConnectionStringAuthScope authScope = ConnectionStringAuthScope.Device)
{
var messagesSent = new Dictionary<string, Tuple<Message, string>>();
var messagesSent = new Dictionary<string, Tuple<OutgoingMessage, string>>();

// Initialize the service client

async Task InitOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler _)
{
Message msg = MessageReceiveE2ETests.ComposeC2dTestMessage(out string payload, out string _);
OutgoingMessage msg = MessageReceiveE2ETests.ComposeC2dTestMessage(out string payload, out string _);
messagesSent.Add(testDevice.Id, Tuple.Create(msg, payload));

await TestDevice.ServiceClient.Messages.OpenAsync().ConfigureAwait(false);
Expand All @@ -135,7 +135,7 @@ async Task TestOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler _
VerboseTestLogger.WriteLine($"{nameof(MessageReceiveE2EPoolAmqpTests)}: Preparing to receive message for device {testDevice.Id}");
await testDevice.OpenWithRetryAsync().ConfigureAwait(false);

Tuple<Message, string> msgSent = messagesSent[testDevice.Id];
Tuple<OutgoingMessage, string> msgSent = messagesSent[testDevice.Id];
await MessageReceiveE2ETests.VerifyReceivedC2dMessageAsync(testDevice.DeviceClient, testDevice.Id, msgSent.Item1, msgSent.Item2).ConfigureAwait(false);
}

Expand Down Expand Up @@ -166,10 +166,10 @@ async Task InitOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler t
{
await serviceClient.Messages.OpenAsync().ConfigureAwait(false);

Message msg = MessageReceiveE2ETests.ComposeC2dTestMessage(out string _, out string _);
OutgoingMessage msg = MessageReceiveE2ETests.ComposeC2dTestMessage(out string _, out string _);

await testDevice.OpenWithRetryAsync().ConfigureAwait(false);
await testDeviceCallbackHandler.SetMessageReceiveCallbackHandlerAsync().ConfigureAwait(false);
await testDeviceCallbackHandler.SetMessageReceiveCallbackHandlerAsync<string>().ConfigureAwait(false);
testDeviceCallbackHandler.ExpectedMessageSentByService = msg;

await serviceClient.Messages.SendAsync(testDevice.Id, msg).ConfigureAwait(false);
Expand Down
24 changes: 12 additions & 12 deletions e2e/test/iothub/device/MessageReceiveE2ETests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ public async Task DeviceReceiveMessageAfterOpenCloseOpen_Mqtt()
await ReceiveMessageAfterOpenCloseOpenAsync(TestDeviceType.Sasl, new IotHubClientMqttSettings()).ConfigureAwait(false);
}

public static Message ComposeC2dTestMessage(out string payload, out string p1Value)
public static OutgoingMessage ComposeC2dTestMessage(out string payload, out string p1Value)
{
payload = Guid.NewGuid().ToString();
string messageId = Guid.NewGuid().ToString();
p1Value = Guid.NewGuid().ToString();
string userId = Guid.NewGuid().ToString();

VerboseTestLogger.WriteLine($"{nameof(ComposeC2dTestMessage)}: messageId='{messageId}' userId='{userId}' payload='{payload}' p1Value='{p1Value}'");
var message = new Message(Encoding.UTF8.GetBytes(payload))
var message = new OutgoingMessage(payload)
{
MessageId = messageId,
UserId = userId,
Expand All @@ -91,7 +91,7 @@ public static Message ComposeC2dTestMessage(out string payload, out string p1Val
return message;
}

public static async Task VerifyReceivedC2dMessageAsync(IotHubDeviceClient dc, string deviceId, Message message, string payload)
public static async Task VerifyReceivedC2dMessageAsync(IotHubDeviceClient dc, string deviceId, OutgoingMessage message, string payload)
{
string receivedMessageDestination = $"/devices/{deviceId}/messages/deviceBound";

Expand Down Expand Up @@ -148,15 +148,15 @@ private static async Task ReceiveMessageUsingCallbackAndUnsubscribeAsync(TestDev
using var deviceHandler = new TestDeviceCallbackHandler(deviceClient, testDevice);
await testDevice.OpenWithRetryAsync().ConfigureAwait(false);

var serviceClient = TestDevice.ServiceClient;
IotHubServiceClient serviceClient = TestDevice.ServiceClient;

await serviceClient.Messages.OpenAsync().ConfigureAwait(false);

// Now, set a callback on the device client to receive C2D messages.
await deviceHandler.SetMessageReceiveCallbackHandlerAsync().ConfigureAwait(false);
await deviceHandler.SetMessageReceiveCallbackHandlerAsync<string>().ConfigureAwait(false);

// Now, send a message to the device from the service.
Message firstMsg = ComposeC2dTestMessage(out string _, out string _);
OutgoingMessage firstMsg = ComposeC2dTestMessage(out string _, out string _);
deviceHandler.ExpectedMessageSentByService = firstMsg;
await serviceClient.Messages.SendAsync(testDevice.Id, firstMsg).ConfigureAwait(false);
VerboseTestLogger.WriteLine($"Sent C2D message from service, messageId={firstMsg.MessageId} - to be received on callback");
Expand All @@ -169,9 +169,9 @@ private static async Task ReceiveMessageUsingCallbackAndUnsubscribeAsync(TestDev
await deviceHandler.UnsetMessageReceiveCallbackHandlerAsync().ConfigureAwait(false);

// Send a message to the device from the service.
Message secondMsg = ComposeC2dTestMessage(out string _, out string _);
OutgoingMessage secondMsg = ComposeC2dTestMessage(out string _, out string _);
await serviceClient.Messages.SendAsync(testDevice.Id, secondMsg).ConfigureAwait(false);
VerboseTestLogger.WriteLine($"Sent C2D message from service, messageId={secondMsg.MessageId} - to be received on polling ReceiveAsync");
VerboseTestLogger.WriteLine($"Sent C2D message from service, messageId={secondMsg.MessageId} - which should not be received.");

try
{
Expand All @@ -193,15 +193,15 @@ private static async Task ReceiveMessageAfterOpenCloseOpenAsync(TestDeviceType t
await deviceClient.CloseAsync().ConfigureAwait(false);
await deviceClient.OpenAsync().ConfigureAwait(false);

var serviceClient = TestDevice.ServiceClient;
IotHubServiceClient serviceClient = TestDevice.ServiceClient;

await serviceClient.Messages.OpenAsync().ConfigureAwait(false);

// Now, set a callback on the device client to receive C2D messages.
await deviceHandler.SetMessageReceiveCallbackHandlerAsync().ConfigureAwait(false);
await deviceHandler.SetMessageReceiveCallbackHandlerAsync<string>().ConfigureAwait(false);

// Now, send a message to the device from the service.
Message testMessage = ComposeC2dTestMessage(out string _, out string _);
OutgoingMessage testMessage = ComposeC2dTestMessage(out string _, out string _);
deviceHandler.ExpectedMessageSentByService = testMessage;
await serviceClient.Messages.SendAsync(testDevice.Id, testMessage).ConfigureAwait(false);
VerboseTestLogger.WriteLine($"Sent C2D message from service, messageId={testMessage.MessageId} - to be received on callback");
Expand Down Expand Up @@ -234,7 +234,7 @@ void ConnectionStatusChangeHandler(ConnectionStatusInfo connectionStatusInfo)
using var testDeviceCallbackHandler = new TestDeviceCallbackHandler(deviceClient, testDevice);

// Subscribe to receive C2D messages over the callback.
await testDeviceCallbackHandler.SetMessageReceiveCallbackHandlerAsync().ConfigureAwait(false);
await testDeviceCallbackHandler.SetMessageReceiveCallbackHandlerAsync<string>().ConfigureAwait(false);

// This will make the client unsubscribe from the mqtt c2d topic/close the amqp c2d link. Neither event
// should close the connection as a whole, though.
Expand Down
4 changes: 2 additions & 2 deletions e2e/test/iothub/device/MessageReceiveFaultInjectionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,12 @@ async Task InitOperationAsync(IotHubDeviceClient deviceClient, TestDevice testDe
await deviceClient.OpenAsync(cts.Token).ConfigureAwait(false);

testDeviceCallbackHandler = new TestDeviceCallbackHandler(deviceClient, testDevice);
await testDeviceCallbackHandler.SetMessageReceiveCallbackHandlerAsync().ConfigureAwait(false);
await testDeviceCallbackHandler.SetMessageReceiveCallbackHandlerAsync<string>().ConfigureAwait(false);
}

async Task TestOperationAsync(IotHubDeviceClient deviceClient, TestDevice testDevice)
{
Message message = MessageReceiveE2ETests.ComposeC2dTestMessage(out string payload, out string p1Value);
OutgoingMessage message = MessageReceiveE2ETests.ComposeC2dTestMessage(out string payload, out string p1Value);

testDeviceCallbackHandler.ExpectedMessageSentByService = message;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private static async Task TestServiceClientInvalidServiceCertificateAsync(IotHub
Protocol = protocol,
};
using var service = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionStringInvalidServiceCertificate, options);
var testMessage = new Message();
var testMessage = new OutgoingMessage();
await service.Messages.OpenAsync().ConfigureAwait(false);
await service.Messages.SendAsync("testDevice1", testMessage).ConfigureAwait(false);
await service.Messages.CloseAsync().ConfigureAwait(false);
Expand Down
6 changes: 3 additions & 3 deletions e2e/test/iothub/service/IoTHubServiceProxyE2ETests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public async Task ServiceClient_SendSingleMessage_WithProxy()
await using TestDevice testDevice = await TestDevice.GetTestDeviceAsync(s_devicePrefix).ConfigureAwait(false);
await using var deviceClient = new IotHubDeviceClient(testDevice.ConnectionString);
using var serviceClient = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionString, options);
(Message testMessage, string messageId, string payload, string p1Value) = ComposeTelemetryMessage();
(OutgoingMessage testMessage, string messageId, string payload, string p1Value) = ComposeTelemetryMessage();
await serviceClient.Messages.OpenAsync().ConfigureAwait(false);
await serviceClient.Messages.SendAsync(testDevice.Id, testMessage).ConfigureAwait(false);

Expand Down Expand Up @@ -111,14 +111,14 @@ public async Task JobClient_ScheduleAndRunTwinJob_WithProxy()
}
}

private (Message message, string messageId, string payload, string p1Value) ComposeTelemetryMessage()
private (OutgoingMessage message, string messageId, string payload, string p1Value) ComposeTelemetryMessage()
{
string messageId = Guid.NewGuid().ToString();
string payload = Guid.NewGuid().ToString();
string p1Value = Guid.NewGuid().ToString();

VerboseTestLogger.WriteLine($"{nameof(ComposeTelemetryMessage)}: messageId='{messageId}' payload='{payload}' p1Value='{p1Value}'");
var message = new Message(Encoding.UTF8.GetBytes(payload))
var message = new OutgoingMessage(Encoding.UTF8.GetBytes(payload))
{
MessageId = messageId,
Properties = { ["property1"] = p1Value }
Expand Down
Loading

0 comments on commit ec34f3d

Please sign in to comment.