Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TelemetryMessage should be able to take bytes and not serialize them #3242

Merged
merged 5 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions e2e/Tests/helpers/AmqpConnectionStatusChange.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ public class AmqpConnectionStatusChange

public AmqpConnectionStatusChange(string deviceId)
{
ConnectionStatusChangeCount = 0;
_deviceId = deviceId;
}

public int ConnectionStatusChangeCount { get; set; }
public int ConnectionStatusChangeCount { get; private set; }

public void ConnectionStatusChangeHandler(ConnectionStatusInfo connectionStatusInfo)
{
Expand Down
4 changes: 2 additions & 2 deletions e2e/Tests/helpers/TestDeviceCallbackHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,12 @@ await _deviceClient.SetIncomingMessageCallbackAsync((IncomingMessage receivedMes
receivedMessageProperties.Value.Should().Be(expectedMessageProperties.Value, $"The value of \"property1\" did not match for device {_testDeviceId}");
}

VerboseTestLogger.WriteLine($"{nameof(SetIncomingMessageCallbackHandlerAndCompleteMessageAsync)}: DeviceClient completed message with Id: {receivedMessage.MessageId}.");
VerboseTestLogger.WriteLine($"{nameof(SetIncomingMessageCallbackHandlerAndCompleteMessageAsync)}: Device {_testDeviceId} completed message with Id: {receivedMessage.MessageId}.");
return Task.FromResult(MessageAcknowledgement.Complete);
}
catch (Exception ex)
{
VerboseTestLogger.WriteLine($"{nameof(SetIncomingMessageCallbackHandlerAndCompleteMessageAsync)}: Error during DeviceClient receive message callback: {ex}.");
VerboseTestLogger.WriteLine($"{nameof(SetIncomingMessageCallbackHandlerAndCompleteMessageAsync)}: Error during device {_testDeviceId} receive message callback: {ex}.");
_receiveMessageExceptionDispatch = ExceptionDispatchInfo.Capture(ex);

return Task.FromResult(MessageAcknowledgement.Abandon);
Expand Down
8 changes: 5 additions & 3 deletions e2e/Tests/helpers/templates/FaultInjection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ internal static async Task ActivateFaultInjectionAsync(

await deviceClient.SendTelemetryAsync(faultInjectionMessage, linkedCts.Token).ConfigureAwait(false);
}
catch (Exception ex) when (ex is IotHubClientException hubEx && hubEx.IsTransient)
catch (IotHubClientException ex) when (ex.IsTransient || ex.ErrorCode == IotHubClientErrorCode.ConnectionForcefullyClosedOnFaultInjection)
{
VerboseTestLogger.WriteLine($"{nameof(ActivateFaultInjectionAsync)}: {ex}");

Expand Down Expand Up @@ -205,9 +205,11 @@ void OnConnectionStatusChanged(ConnectionStatusInfo connectionStatusInfo)
}
finally
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

if (cleanupOperation != null)
{
await cleanupOperation(ct).ConfigureAwait(false);
await cleanupOperation(cts.Token).ConfigureAwait(false);
}

if (!FaultShouldDisconnect(faultType))
Expand All @@ -218,7 +220,7 @@ void OnConnectionStatusChanged(ConnectionStatusInfo connectionStatusInfo)
if (timeToFinishFaultInjection > TimeSpan.Zero)
{
VerboseTestLogger.WriteLine($"{nameof(FaultInjection)}: Waiting {timeToFinishFaultInjection} to ensure that FaultInjection duration passed.");
await Task.Delay(timeToFinishFaultInjection, ct).ConfigureAwait(false);
await Task.Delay(timeToFinishFaultInjection, cts.Token).ConfigureAwait(false);
}
}
}
Expand Down
17 changes: 9 additions & 8 deletions e2e/Tests/helpers/templates/FaultInjectionPoolingOverAmqp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ public static async Task TestFaultInjectionPoolAmqpAsync(
int countBeforeFaultInjection = amqpConnectionStatuses.First().ConnectionStatusChangeCount;

// Inject the fault into device 0
VerboseTestLogger.WriteLine($"{nameof(FaultInjectionPoolingOverAmqp)}: {testDevices.First().Id} Requesting fault injection type={faultType} reason={reason}, delay={faultDelay}, duration={faultDuration}");
int deviceIndexToFault = 0;
VerboseTestLogger.WriteLine($"{nameof(FaultInjectionPoolingOverAmqp)}: {testDevices[deviceIndexToFault].Id} Requesting fault injection type={faultType} reason={reason}, delay={faultDelay}, duration={faultDuration}");
faultInjectionDuration.Start();
TelemetryMessage faultInjectionMessage = FaultInjection.ComposeErrorInjectionProperties(faultType, reason, faultDelay, faultDuration);
await testDevices.First().DeviceClient.SendTelemetryAsync(faultInjectionMessage, ct).ConfigureAwait(false);
await FaultInjection.ActivateFaultInjectionAsync(amqpTransportSettings, faultType, reason, faultDelay, faultDuration, testDevices[deviceIndexToFault].DeviceClient, ct).ConfigureAwait(false);

VerboseTestLogger.WriteLine($"{nameof(FaultInjection)}: Waiting for fault injection to be active: {faultDelay} seconds.");
await Task.Delay(faultDelay, ct).ConfigureAwait(false);
Expand All @@ -92,13 +92,13 @@ public static async Task TestFaultInjectionPoolAmqpAsync(
if (FaultInjection.FaultShouldDisconnect(faultType))
{
VerboseTestLogger.WriteLine($"{nameof(FaultInjectionPoolingOverAmqp)}: Confirming fault injection has been actived.");
// Check that service issued the fault to the faulting device [device 0]
// Check that service issued the fault to the faulting device
bool isFaulted = false;

var connectionChangeWaitDuration = Stopwatch.StartNew();
while (connectionChangeWaitDuration.Elapsed < FaultInjection.LatencyTimeBuffer)
{
if (amqpConnectionStatuses.First().ConnectionStatusChangeCount > countBeforeFaultInjection)
if (amqpConnectionStatuses[deviceIndexToFault].ConnectionStatusChangeCount > countBeforeFaultInjection)
{
isFaulted = true;
break;
Expand Down Expand Up @@ -147,7 +147,7 @@ public static async Task TestFaultInjectionPoolAmqpAsync(
// Perform the test operation for all devices
for (int i = 0; i < devicesCount; i++)
{
VerboseTestLogger.WriteLine($"{nameof(FaultInjectionPoolingOverAmqp)}: Performing test operation for device {i}.");
VerboseTestLogger.WriteLine($"{nameof(FaultInjectionPoolingOverAmqp)}: Performing test operation for device {testDevices[i].Id}.");
operations.Add(testOperation(testDevices[i], testDeviceCallbackHandlers[i], ct));
}
await Task.WhenAll(operations).ConfigureAwait(false);
Expand All @@ -169,7 +169,8 @@ public static async Task TestFaultInjectionPoolAmqpAsync(
}
finally
{
await cleanupOperation(testDevices, testDeviceCallbackHandlers, ct).ConfigureAwait(false);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await cleanupOperation(testDevices, testDeviceCallbackHandlers, cts.Token).ConfigureAwait(false);

testDeviceCallbackHandlers.ForEach(x => x.Dispose());
await Task.WhenAll(testDevices.Select(x => x.DisposeAsync().AsTask())).ConfigureAwait(false);
Expand All @@ -182,7 +183,7 @@ public static async Task TestFaultInjectionPoolAmqpAsync(
if (timeToFinishFaultInjection > TimeSpan.Zero)
{
VerboseTestLogger.WriteLine($"{nameof(FaultInjection)}: Waiting {timeToFinishFaultInjection} to ensure that FaultInjection duration passed.");
await Task.Delay(timeToFinishFaultInjection, ct).ConfigureAwait(false);
await Task.Delay(timeToFinishFaultInjection, cts.Token).ConfigureAwait(false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace Microsoft.Azure.Devices.E2ETests
{
public partial class FaultInjectionPoolAmqpTests
{
private readonly string MessageReceive_DevicePrefix = $"{nameof(FaultInjectionPoolAmqpTests)}.MessagaeReceive";
private readonly string IncomingMessage_DevicePrefix = $"{nameof(FaultInjectionPoolAmqpTests)}.IncomingMessaage";

[DataTestMethod]
[TestCategory("LongRunning")]
Expand Down Expand Up @@ -102,7 +102,9 @@ async Task TestOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler t
testDeviceCallbackHandler.ExpectedOutgoingMessage = msg;

await serviceClient.Messages.SendAsync(testDevice.Id, msg, ct).ConfigureAwait(false);
VerboseTestLogger.WriteLine($"C2D message sent for {testDevice.Id}");
await testDeviceCallbackHandler.WaitForIncomingMessageCallbackAsync(ct).ConfigureAwait(false);
VerboseTestLogger.WriteLine($"C2D message received for {testDevice.Id}");
}

async Task CleanupOperationAsync(List<TestDevice> _, List<TestDeviceCallbackHandler> testDeviceCallbackHandlers, CancellationToken ct)
Expand All @@ -112,7 +114,7 @@ async Task CleanupOperationAsync(List<TestDevice> _, List<TestDeviceCallbackHand

await FaultInjectionPoolingOverAmqp
.TestFaultInjectionPoolAmqpAsync(
MessageReceive_DevicePrefix,
IncomingMessage_DevicePrefix,
transportSettings,
null,
poolSize,
Expand Down
3 changes: 2 additions & 1 deletion e2e/Tests/iothub/device/MethodE2ETests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ public static async Task ServiceSendMethodAndVerifyResponseAsync<T>(
.InvokeAsync(deviceId, directMethodRequest, ct)
.ConfigureAwait(false);
}
else {
else
{
response = await serviceClient.DirectMethods
.InvokeAsync(deviceId, moduleId, directMethodRequest, ct)
.ConfigureAwait(false);
Expand Down
3 changes: 1 addition & 2 deletions e2e/Tests/iothub/device/NoRetryE2ETests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ public async Task FaultInjection_NoRetry_NoRecovery_OpenAsync()
void ConnectionStatusChangeHandler(ConnectionStatusInfo connectionStatusInfo)
{
connectionStatusChange.TryGetValue(connectionStatusInfo.Status, out int count);
count++;
connectionStatusChange[connectionStatusInfo.Status] = count;
connectionStatusChange[connectionStatusInfo.Status] = ++count;
}
deviceClient.ConnectionStatusChangeCallback = ConnectionStatusChangeHandler;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,7 @@ Task<MessageAcknowledgement> OnC2DMessageReceived(IncomingMessage message)
{
if (feedback.Records.Any(x => x.OriginalMessageId == message.MessageId))
{
if (!feedbackMessageReceived.Task.IsCompleted)
{
feedbackMessageReceived.TrySetResult(true);
}
feedbackMessageReceived.TrySetResult(true);
return AcknowledgementType.Complete;
}

Expand All @@ -83,13 +80,9 @@ Task<MessageAcknowledgement> OnC2DMessageReceived(IncomingMessage message)
// Wait for the device to receive the message.
await c2dMessageReceived.WaitAsync(cts.Token).ConfigureAwait(false);

c2dMessageReceived.Task.IsCompleted.Should().BeTrue("Timed out waiting for C2D message to be received by device");

// Wait for the service to receive the feedback message.
using var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(200));
await feedbackMessageReceived.WaitAsync(cts2.Token).ConfigureAwait(false);

feedbackMessageReceived.Task.IsCompleted.Should().BeTrue("service client never received c2d feedback message even though the device received the message");
}
catch (Exception ex)
{
Expand Down
58 changes: 38 additions & 20 deletions iothub/device/src/Exceptions/ClientExceptionHandlingHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,49 +35,49 @@ internal static IDictionary<HttpStatusCode, Func<HttpResponseMessage, Task<Excep
HttpStatusCode.BadRequest,
async (response) => await GenerateIotHubClientExceptionAsync(response).ConfigureAwait(false)
},
{
{
HttpStatusCode.Unauthorized,
async (response) =>
new IotHubClientException(
await GetExceptionMessageAsync(response).ConfigureAwait(false),
IotHubClientErrorCode.Unauthorized)
},
{
{
HttpStatusCode.Forbidden,
async (response) =>
new IotHubClientException(
await GetExceptionMessageAsync(response).ConfigureAwait(false),
IotHubClientErrorCode.QuotaExceeded)
},
{
{
HttpStatusCode.PreconditionFailed,
async (response) =>
new IotHubClientException(
await GetExceptionMessageAsync(response).ConfigureAwait(false),
IotHubClientErrorCode.DeviceMessageLockLost)
},
{
{
HttpStatusCode.RequestEntityTooLarge,
async (response) =>
new IotHubClientException(
await GetExceptionMessageAsync(response).ConfigureAwait(false),
IotHubClientErrorCode.MessageTooLarge)
},
{
{
HttpStatusCode.InternalServerError,
async (response) =>
new IotHubClientException(
await GetExceptionMessageAsync(response).ConfigureAwait(false),
IotHubClientErrorCode.ServerError)
},
{
{
HttpStatusCode.ServiceUnavailable,
async (response) =>
new IotHubClientException(
await GetExceptionMessageAsync(response).ConfigureAwait(false),
IotHubClientErrorCode.ServerBusy)
},
{
{
(HttpStatusCode)429,
async (response) =>
new IotHubClientException(
Expand All @@ -96,12 +96,24 @@ internal static Task<string> GetExceptionMessageAsync(HttpResponseMessage respon
internal static async Task<Tuple<string, IotHubClientErrorCode>> GetErrorCodeAndTrackingIdAsync(HttpResponseMessage response)
{
string responseBody = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
ErrorPayload result = GetErrorCodeAndTrackingId(responseBody);
return Tuple.Create(result.TrackingId, result.IotHubClientErrorCode);
}

internal static ErrorPayload GetErrorCodeAndTrackingId(string responseBody)
{
ErrorPayload responseMessage = null;

try
{
IotHubExceptionResult result = JsonConvert.DeserializeObject<IotHubExceptionResult>(responseBody);
responseMessage = JsonConvert.DeserializeObject<ErrorPayload>(result.Message);
// Sometimes we get the full error payload at the root
responseMessage = JsonConvert.DeserializeObject<ErrorPayload>(responseBody);
if (responseMessage.ErrorCode == null
&& responseMessage.TrackingId == null)
{
// And sometimes it is a nested object with a single 'Message' property at the root
responseMessage = JsonConvert.DeserializeObject<ErrorPayload>(responseMessage.Message);
}
}
catch (JsonException ex)
{
Expand All @@ -113,27 +125,33 @@ internal static async Task<Tuple<string, IotHubClientErrorCode>> GetErrorCodeAnd

if (responseMessage != null)
{
string trackingId = string.Empty;
if (responseMessage.TrackingId != null)
if (Enum.TryParse<IotHubClientErrorCode>(responseMessage.ErrorCode?.ToString(), out IotHubClientErrorCode errorCode))
{
trackingId = responseMessage.TrackingId;
responseMessage.IotHubClientErrorCode = errorCode;
}

if (responseMessage.ErrorCode != null)
else if (int.TryParse(responseMessage.ErrorCode?.ToString(), NumberStyles.Any, CultureInfo.InvariantCulture, out int errorCodeInt))
{
if (int.TryParse(responseMessage.ErrorCode, NumberStyles.Any, CultureInfo.InvariantCulture, out int errorCodeInt))
{
return Tuple.Create(trackingId, (IotHubClientErrorCode)errorCodeInt);
}
responseMessage.IotHubClientErrorCode = (IotHubClientErrorCode)errorCodeInt;
}
else
{
responseMessage.IotHubClientErrorCode = IotHubClientErrorCode.Unknown;
}

return responseMessage;
}

if (Logging.IsEnabled)
Logging.Error(
nameof(GetErrorCodeAndTrackingIdAsync),
$"Failed to derive any error code from the response message: {responseBody}");
$"Failed to derive any error code from the response message: '{responseBody}'");

return Tuple.Create(string.Empty, IotHubClientErrorCode.Unknown);
return new ErrorPayload
{
IotHubClientErrorCode = IotHubClientErrorCode.Unknown,
Message = responseBody,
TrackingId = "",
};
}

private static string CreateMessageWhenDeviceNotFound(string deviceId)
Expand Down
5 changes: 4 additions & 1 deletion iothub/device/src/Exceptions/ErrorPayload.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Microsoft.Azure.Devices.Client
internal sealed class ErrorPayload
{
[JsonProperty("errorCode")]
internal string ErrorCode { get; set; }
internal dynamic ErrorCode { get; set; }

[JsonProperty("trackingId")]
internal string TrackingId { get; set; }
Expand All @@ -21,5 +21,8 @@ internal sealed class ErrorPayload

[JsonProperty("timestampUtc")]
internal string OccurredOnUtc { get; set; }

[JsonIgnore]
internal IotHubClientErrorCode IotHubClientErrorCode { get; set; }
}
}
14 changes: 14 additions & 0 deletions iothub/device/src/Exceptions/IotHubClientErrorCode.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.ComponentModel;

namespace Microsoft.Azure.Devices.Client
{
/// <summary>
Expand Down Expand Up @@ -151,5 +153,17 @@ public enum IotHubClientErrorCode
/// is invalid.
/// </summary>
IotHubFormatError = 400006,

/// <summary>
/// Used for fault injection, end-to-end testing in the SDK only.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
ConnectionForcefullyClosedOnFaultInjection = 400029,

/// <summary>
/// Used for fault injection, end-to-end testing in the SDK only.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
ConnectionRejectedOnFaultInjection = 400030,
}
}
Loading