Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify/rename error class given to IoT hub service client error processors #3255

Merged
merged 6 commits into from
Apr 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions e2e/LongHaul/service/IotHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,9 @@ Task<AcknowledgementType> OnC2dMessageAck(FeedbackBatch feedbackMessages)
return Task.FromResult(AcknowledgementType.Complete);
}

async Task OnC2dError(ErrorContext errorContext)
async Task OnC2dError(MessageFeedbackProcessorError error)
{
Exception exToLog = errorContext.IotHubServiceException == null
? errorContext.IOException
: errorContext.IotHubServiceException;
_logger.Trace($"Error processing C2D message.\n{exToLog}");
_logger.Trace($"Error processing C2D message.\n{error.Exception}");

await s_serviceClient.MessageFeedback.OpenAsync(ct).ConfigureAwait(false);
}
Expand Down Expand Up @@ -205,12 +202,9 @@ async Task<AcknowledgementType> FileUploadNotificationCallback(FileUploadNotific
return AcknowledgementType.Complete;
}

async Task FileUploadNotificationErrors(ErrorContext errorContext)
async Task FileUploadNotificationErrors(FileUploadNotificationProcessorError error)
{
Exception exToLog = errorContext.IotHubServiceException == null
? errorContext.IOException
: errorContext.IotHubServiceException;
_logger.Trace($"Error processing FileUploadNotification.\n{exToLog}");
_logger.Trace($"Error processing FileUploadNotification.\n{error.Exception}");

_logger.Trace("Attempting reconnect for FileUploadNotifications...");
await s_serviceClient.FileUploadNotifications.OpenAsync(ct).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public async Task FileUploadNotification_CloseGracefully_DoesNotExecuteConnectio
// arrange
using var sender = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionString);
bool connectionLossEventExecuted = false;
Func<ErrorContext, Task> OnConnectionLost = delegate
Func<FileUploadNotificationProcessorError, Task> OnConnectionLost = delegate
{
// There is a small chance that this test's connection is interrupted by an actual
// network failure (when this callback should be executed), but the operations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public async Task MessageFeedbackReceiver_CloseGracefully_DoesNotExecuteConnecti
// arrange
using var sender = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionString);
bool connectionLossEventExecuted = false;
Func<ErrorContext, Task> OnConnectionLost = delegate
Func<MessageFeedbackProcessorError, Task> OnConnectionLost = delegate
{
// There is a small chance that this test's connection is interrupted by an actual
// network failure (when this callback should be executed), but the operations
Expand Down
2 changes: 1 addition & 1 deletion e2e/Tests/iothub/service/MessagingClientE2ETests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ public async Task MessagingClient_CloseGracefully_DoesNotExecuteConnectionLoss()
// arrange
using var sender = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionString);
bool connectionLossEventExecuted = false;
Func<ErrorContext, Task> OnConnectionLost = delegate
Func<MessagesClientError, Task> OnConnectionLost = delegate
{
// There is a small chance that this test's connection is interrupted by an actual
// network failure (when this callback should be executed), but the operations
Expand Down
4 changes: 2 additions & 2 deletions iothub/service/src/Amqp/AmqpClientHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ internal static IotHubServiceException ToIotHubClientContract(Error error, Excep
return retException;
}

internal static ErrorContext GetErrorContextFromException(AmqpException exception)
internal static IotHubServiceException GetIotHubExceptionFromAmqpException(AmqpException exception)
{
return new ErrorContext(new IotHubServiceException(exception.Error.ToString(), exception));
return new IotHubServiceException(exception.Error.ToString(), exception);
}
}
}
26 changes: 9 additions & 17 deletions iothub/service/src/Feedback/MessageFeedbackProcessorClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ internal MessageFeedbackProcessorClient(
///
/// //...
///
/// public void OnConnectionLost(ErrorContext errorContext)
/// public async Task OnConnectionLost(FeedbackMessagingError error)
/// {
/// Console.WriteLine("Feedback message processor connection lost")
/// Console.WriteLine("Feedback message processor connection lost. Error: {error.Exception.Message}")
///
/// // Add reconnection logic as needed, for example:
/// await serviceClient.MessageFeedbackProcessor.OpenAsync();
Expand All @@ -113,7 +113,7 @@ internal MessageFeedbackProcessorClient(
/// This callback will start receiving events again once <see cref="OpenAsync(CancellationToken)"/> is called.
/// This callback will persist across any number of open/close/open calls, so it does not need to be set before each open call.
/// </remarks>
public Func<ErrorContext, Task> ErrorProcessor { get; set; }
public Func<MessageFeedbackProcessorError, Task> ErrorProcessor { get; set; }

/// <summary>
/// Open the connection and start receiving acknowledgements for messages sent.
Expand Down Expand Up @@ -249,14 +249,7 @@ private async void OnFeedbackMessageReceivedAsync(AmqpMessage amqpMessage)

try
{
if (ex is IotHubServiceException hubEx)
{
ErrorProcessor?.Invoke(new ErrorContext(hubEx));
}
else if (ex is IOException ioEx)
{
ErrorProcessor?.Invoke(new ErrorContext(ioEx));
}
ErrorProcessor?.Invoke(new MessageFeedbackProcessorError(ex));

await _amqpConnection.AbandonMessageAsync(amqpMessage.DeliveryTag).ConfigureAwait(false);
}
Expand All @@ -276,18 +269,17 @@ private void OnConnectionClosed(object sender, EventArgs e)
{
if (((AmqpObject)sender).TerminalException is AmqpException exception)
{
ErrorContext errorContext = AmqpClientHelper.GetErrorContextFromException(exception);
ErrorProcessor?.Invoke(errorContext);
Exception exceptionToLog = errorContext.IotHubServiceException;
IotHubServiceException mappedException = AmqpClientHelper.GetIotHubExceptionFromAmqpException(exception);
ErrorProcessor?.Invoke(new MessageFeedbackProcessorError(mappedException));

if (Logging.IsEnabled)
Logging.Error(this, $"{nameof(sender)}.{nameof(OnConnectionClosed)} threw an exception: {exceptionToLog}", nameof(OnConnectionClosed));
Logging.Error(this, $"{nameof(sender)}.{nameof(OnConnectionClosed)} threw an exception: {mappedException}", nameof(OnConnectionClosed));
}
else
{
var defaultException = new IotHubServiceException("AMQP connection was lost.", ((AmqpObject)sender).TerminalException);
var errorContext = new ErrorContext(defaultException);
ErrorProcessor?.Invoke(errorContext);
var error = new MessageFeedbackProcessorError(defaultException);
ErrorProcessor?.Invoke(error);

if (Logging.IsEnabled)
Logging.Error(this, $"{nameof(sender)}.{nameof(OnConnectionClosed)} threw an exception: {defaultException}", nameof(OnConnectionClosed));
Expand Down
29 changes: 29 additions & 0 deletions iothub/service/src/Feedback/MessageFeedbackProcessorError.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.IO;

namespace Microsoft.Azure.Devices
{
/// <summary>
/// The context provided to the error processor for a connection loss event or other failure
/// when using the <see cref="MessageFeedbackProcessorClient"/>.
/// </summary>
public class MessageFeedbackProcessorError
{
internal MessageFeedbackProcessorError(Exception exception)
{
Exception = exception;
}

/// <summary>
/// The exception that caused the error processor execute.
/// </summary>
/// <remarks>
/// This exception is usually either of type <see cref="IOException"/> or
/// <see cref="IotHubServiceException"/>.
/// </remarks>
public Exception Exception { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ internal FileUploadNotificationProcessorClient(
///
/// //...
///
/// public async Task OnConnectionLostAsync(ErrorContext errorContext)
/// public async Task OnConnectionLostAsync(FileUploadNotificationError error)
/// {
/// Console.WriteLine("File upload notification processor connection lost");
/// Console.WriteLine("File upload notification processor connection lost. Error: {error.Exception.Message}");
///
/// // Add reconnection logic as needed, for example:
/// await serviceClient.FileUploadNotificationProcessor.OpenAsync();
Expand All @@ -113,7 +113,7 @@ internal FileUploadNotificationProcessorClient(
/// This callback will start receiving events again once <see cref="OpenAsync(CancellationToken)"/> is called.
/// This callback will persist across any number of open/close/open calls, so it does not need to be set before each open call.
/// </remarks>
public Func<ErrorContext, Task> ErrorProcessor { get; set; }
public Func<FileUploadNotificationProcessorError, Task> ErrorProcessor { get; set; }

/// <summary>
/// Open the connection and start receiving file upload notifications.
Expand Down Expand Up @@ -239,14 +239,7 @@ private async void OnNotificationMessageReceivedAsync(AmqpMessage amqpMessage)
{
try
{
if (ex is IotHubServiceException hubEx)
{
await ErrorProcessor.Invoke(new ErrorContext(hubEx)).ConfigureAwait(false);
}
else if (ex is IOException ioEx)
{
await ErrorProcessor.Invoke(new ErrorContext(ioEx)).ConfigureAwait(false);
}
await ErrorProcessor.Invoke(new FileUploadNotificationProcessorError(ex)).ConfigureAwait(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So why await here and not for C2D?

}
catch (Exception ex3)
{
Expand Down Expand Up @@ -280,20 +273,19 @@ private void OnConnectionClosed(object sender, EventArgs e)
{
if (((AmqpObject)sender).TerminalException is AmqpException exception)
{
ErrorContext errorContext = AmqpClientHelper.GetErrorContextFromException(exception);
ErrorProcessor?.Invoke(errorContext);
IotHubServiceException mappedException = AmqpClientHelper.GetIotHubExceptionFromAmqpException(exception);
ErrorProcessor?.Invoke(new FileUploadNotificationProcessorError(mappedException));

if (Logging.IsEnabled)
{
Exception exceptionToLog = errorContext.IotHubServiceException;
Logging.Error(this, $"{nameof(sender)}.{nameof(OnConnectionClosed)} threw an exception: {exceptionToLog}", nameof(OnConnectionClosed));
Logging.Error(this, $"{nameof(sender)}.{nameof(OnConnectionClosed)} threw an exception: {mappedException}", nameof(OnConnectionClosed));
}
}
else
{
var defaultException = new IotHubServiceException("AMQP connection was lost", ((AmqpObject)sender).TerminalException);
var errorContext = new ErrorContext(defaultException);
ErrorProcessor?.Invoke(errorContext);
var defaultException = new IOException("AMQP connection was lost", ((AmqpObject)sender).TerminalException);
var error = new FileUploadNotificationProcessorError(defaultException);
ErrorProcessor?.Invoke(error);

if (Logging.IsEnabled)
Logging.Error(this, $"{nameof(sender)}.{nameof(OnConnectionClosed)} threw an exception: {defaultException}", nameof(OnConnectionClosed));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.IO;

namespace Microsoft.Azure.Devices
{
/// <summary>
/// The context provided to the error processor for a connection loss event or other failure
/// when using the <see cref="FileUploadNotificationProcessorClient"/>.
/// </summary>
public class FileUploadNotificationProcessorError
{
internal FileUploadNotificationProcessorError(Exception exception)
{
Exception = exception;
}

/// <summary>
/// The exception that caused the error processor execute.
/// </summary>
/// <remarks>
/// This exception is usually either of type <see cref="IOException"/> or
/// <see cref="IotHubServiceException"/>.
/// </remarks>
public Exception Exception { get; }
}
}
20 changes: 10 additions & 10 deletions iothub/service/src/Messaging/MessagesClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Globalization;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Threading;
Expand Down Expand Up @@ -85,9 +86,9 @@ internal MessagesClient(
///
/// //...
///
/// public void OnConnectionLost(ErrorContext errorContext)
/// public async Task OnConnectionLost(MessagingError error)
/// {
/// Console.WriteLine("Messaging client connection lost");
/// Console.WriteLine($"Messaging client connection lost. Error: {error.Exception.Message}")
///
/// // Add reconnection logic as needed, for example:
/// await serviceClient.Messaging.OpenAsync();
Expand All @@ -99,7 +100,7 @@ internal MessagesClient(
/// This callback will start receiving events again once <see cref="OpenAsync(CancellationToken)"/> is called.
/// This callback will persist across any number of open/close/open calls, so it does not need to be set before each open call.
/// </remarks>
public Func<ErrorContext, Task> ErrorProcessor { get; set; }
public Func<MessagesClientError, Task> ErrorProcessor { get; set; }

/// <summary>
/// Open the connection. Must be done before any cloud-to-device messages can be sent.
Expand Down Expand Up @@ -391,17 +392,16 @@ private void OnConnectionClosed(object sender, EventArgs e)
{
if (((AmqpObject)sender).TerminalException is AmqpException exception)
{
ErrorContext errorContext = AmqpClientHelper.GetErrorContextFromException(exception);
ErrorProcessor?.Invoke(errorContext);
Exception exceptionToLog = errorContext.IotHubServiceException;
IotHubServiceException mappedException = AmqpClientHelper.GetIotHubExceptionFromAmqpException(exception);
ErrorProcessor?.Invoke(new MessagesClientError(mappedException));
drwill-ms marked this conversation as resolved.
Show resolved Hide resolved
if (Logging.IsEnabled)
Logging.Error(this, $"{nameof(sender)}.{nameof(OnConnectionClosed)} threw an exception: {exceptionToLog}", nameof(OnConnectionClosed));
Logging.Error(this, $"{nameof(sender)}.{nameof(OnConnectionClosed)} threw an exception: {mappedException}", nameof(OnConnectionClosed));
}
else
{
var defaultException = new IotHubServiceException("AMQP connection was lost", ((AmqpObject)sender).TerminalException);
var errorContext = new ErrorContext(defaultException);
ErrorProcessor?.Invoke(errorContext);
var defaultException = new IOException("AMQP connection was lost", ((AmqpObject)sender).TerminalException);
var error = new MessagesClientError(defaultException);
ErrorProcessor?.Invoke(error);
drwill-ms marked this conversation as resolved.
Show resolved Hide resolved
if (Logging.IsEnabled)
Logging.Error(this, $"{nameof(sender)}.{nameof(OnConnectionClosed)} threw an exception: {defaultException}", nameof(OnConnectionClosed));
}
Expand Down
29 changes: 29 additions & 0 deletions iothub/service/src/Messaging/MessagesClientError.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.IO;

namespace Microsoft.Azure.Devices
{
/// <summary>
/// The context provided to the error processor for a connection loss event or other failure
/// when using the <see cref="MessagesClient"/>.
/// </summary>
public class MessagesClientError
{
internal MessagesClientError(Exception exception)
{
Exception = exception;
}

/// <summary>
/// The exception that caused the error processor execute.
/// </summary>
/// <remarks>
/// This exception is usually either of type <see cref="IOException"/> or
/// <see cref="IotHubServiceException"/>.
/// </remarks>
public Exception Exception { get; }
}
}
60 changes: 0 additions & 60 deletions iothub/service/src/Messaging/Models/ErrorContext.cs

This file was deleted.

Loading