Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unsubscribe from connection loss events when closing service client AMQP connection #3237

Merged
merged 5 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions e2e/LongHaul/service/IotHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public async Task ReceiveMessageFeedbacksAsync(CancellationToken ct)
// It is important to note that receiver only gets feedback messages when the device is actively running and acting on messages.
_logger.Trace("Starting to listen to cloud-to-device feedback messages...", TraceSeverity.Verbose);

AcknowledgementType OnC2dMessageAck(FeedbackBatch feedbackMessages)
Task<AcknowledgementType> OnC2dMessageAck(FeedbackBatch feedbackMessages)
{
foreach (FeedbackRecord feedbackRecord in feedbackMessages.Records)
{
Expand All @@ -146,7 +146,7 @@ AcknowledgementType OnC2dMessageAck(FeedbackBatch feedbackMessages)
_totalFeedbackMessagesReceivedCount += feedbackMessages.Records.Count();
_logger.Metric(TotalFeedbackMessagesReceivedCount, _totalFeedbackMessagesReceivedCount);

return AcknowledgementType.Complete;
return Task.FromResult(AcknowledgementType.Complete);
}

s_serviceClient.MessageFeedback.MessageFeedbackProcessor = OnC2dMessageAck;
Expand Down
61 changes: 24 additions & 37 deletions e2e/Tests/iothub/service/FileUploadNotificationE2ETest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,48 +119,35 @@ async Task<AcknowledgementType> OnFileUploadNotificationReceived(FileUploadNotif
}

[TestMethod]
[DataRow(IotHubTransportProtocol.Tcp)]
[DataRow(IotHubTransportProtocol.WebSocket)]
public async Task FileUploadNotification_ErrorProcessor_ReceivesNotifications(IotHubTransportProtocol protocol)
public async Task FileUploadNotification_CloseGracefully_DoesNotExecuteConnectionLoss()
{
var options = new IotHubServiceClientOptions
// arrange
using var sender = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionString);
bool connectionLossEventExecuted = false;
Func<ErrorContext, Task> OnConnectionLost = delegate
{
Protocol = protocol
// There is a small chance that this test's connection is interrupted by an actual
// network failure (when this callback should be executed), but the operations
// tested here are so quick that it should be safe to ignore that possibility.
connectionLossEventExecuted = true;
return Task.CompletedTask;
};
sender.FileUploadNotifications.ErrorProcessor = OnConnectionLost;

using var serviceClient = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionString, options);

try
{
var errorProcessorNotified = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
serviceClient.FileUploadNotifications.FileUploadNotificationProcessor = (_) => Task.FromResult(_defaultAcknowledgementType);
serviceClient.FileUploadNotifications.ErrorProcessor = (errorContext) =>
{
VerboseTestLogger.WriteLine("Error processor fired.");
errorProcessorNotified.TrySetResult(true);
return Task.CompletedTask;
};

VerboseTestLogger.WriteLine("Opening client...");
await serviceClient.FileUploadNotifications.OpenAsync().ConfigureAwait(false);
VerboseTestLogger.WriteLine("Client opened.");

VerboseTestLogger.WriteLine("Client closing...");
await serviceClient.FileUploadNotifications.CloseAsync().ConfigureAwait(false);
VerboseTestLogger.WriteLine("Client closed.");

// The open file upload notification processor should be able to receive more than one
// file upload notification without closing and re-opening as long as there is more
// than one file upload notification to consume.
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(TestTimeoutMilliseconds));
await errorProcessorNotified.WaitAsync(cts.Token).ConfigureAwait(false);
errorProcessorNotified.Task.IsCompleted.Should().BeTrue();
}
finally
{
serviceClient.FileUploadNotifications.ErrorProcessor = null;
await serviceClient.FileUploadNotifications.CloseAsync().ConfigureAwait(false);
Task<AcknowledgementType> OnFileUploadNotificationReceivedAsync(FileUploadNotification fileUploadNotification)
{
// No file upload notifications belong to this test, so abandon any that it may receive
return Task.FromResult(AcknowledgementType.Abandon);
}
sender.FileUploadNotifications.FileUploadNotificationProcessor = OnFileUploadNotificationReceivedAsync;
await sender.FileUploadNotifications.OpenAsync().ConfigureAwait(false);

// act
await sender.FileUploadNotifications.CloseAsync().ConfigureAwait(false);

// assert
connectionLossEventExecuted.Should().BeFalse(
"One or more connection lost events were reported by the error processor unexpectedly");
}

private async Task UploadFile(string fileName, CancellationToken ct)
Expand Down
38 changes: 36 additions & 2 deletions e2e/Tests/iothub/service/MessageFeedbackReceiverE2ETest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ Task<MessageAcknowledgement> OnC2DMessageReceived(IncomingMessage message)
if (feedback.Records.Any(x => x.OriginalMessageId == message.MessageId))
{
feedbackMessageReceived.TrySetResult(true);
return AcknowledgementType.Complete;
return Task.FromResult(AcknowledgementType.Complete);
}

// Same hub as other tests, so we don't want to complete messages that aren't meant for us.
return AcknowledgementType.Abandon;
return Task.FromResult(AcknowledgementType.Abandon);
};
await serviceClient.MessageFeedback.OpenAsync().ConfigureAwait(false);

Expand Down Expand Up @@ -101,5 +101,39 @@ Task<MessageAcknowledgement> OnC2DMessageReceived(IncomingMessage message)
}
}
}

[TestMethod]
public async Task MessageFeedbackReceiver_CloseGracefully_DoesNotExecuteConnectionLoss()
{
// arrange
using var sender = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionString);
bool connectionLossEventExecuted = false;
Func<ErrorContext, Task> OnConnectionLost = delegate
{
// There is a small chance that this test's connection is interrupted by an actual
// network failure (when this callback should be executed), but the operations
// tested here are so quick that it should be safe to ignore that possibility.
connectionLossEventExecuted = true;
return Task.CompletedTask;
};
sender.MessageFeedback.ErrorProcessor = OnConnectionLost;

Task<AcknowledgementType> OnFeedbackMessageReceivedAsync(FeedbackBatch feedbackBatch)
{
// No feedback messages belong to this test, so abandon any that it may receive
return Task.FromResult(AcknowledgementType.Abandon);
}
sender.MessageFeedback.MessageFeedbackProcessor = OnFeedbackMessageReceivedAsync;

await sender.MessageFeedback.OpenAsync().ConfigureAwait(false);

// act
await sender.MessageFeedback.CloseAsync().ConfigureAwait(false);

// assert
Assert.IsFalse(
connectionLossEventExecuted,
"One or more connection lost events were reported by the error processor unexpectedly");
}
}
}
26 changes: 26 additions & 0 deletions e2e/Tests/iothub/service/MessagingClientE2ETests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -417,5 +417,31 @@ await deviceClient
// assert
actualPayloadString.Should().Be(payload);
}

[TestMethod]
public async Task MessagingClient_CloseGracefully_DoesNotExecuteConnectionLoss()
{
// arrange
using var sender = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionString);
bool connectionLossEventExecuted = false;
Func<ErrorContext, Task> OnConnectionLost = delegate
{
// There is a small chance that this test's connection is interrupted by an actual
// network failure (when this callback should be executed), but the operations
// tested here are so quick that it should be safe to ignore that possibility.
connectionLossEventExecuted = true;
return Task.CompletedTask;
};
sender.Messages.ErrorProcessor = OnConnectionLost;

await sender.Messages.OpenAsync().ConfigureAwait(false);

// act
await sender.Messages.CloseAsync().ConfigureAwait(false);

// assert
connectionLossEventExecuted.Should().BeFalse(
"One or more connection lost events were reported by the error processor unexpectedly");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private async Task ReceiveMessageFeedbacksAsync(CancellationToken token)
// It is important to note that receiver only gets feedback messages when the device is actively running and acting on messages.
_logger.LogInformation("Starting to listen to feedback messages");

AcknowledgementType OnC2dMessageAck(FeedbackBatch feedbackMessages)
Task<AcknowledgementType> OnC2dMessageAck(FeedbackBatch feedbackMessages)
{
AcknowledgementType ackType = AcknowledgementType.Abandon;

Expand All @@ -72,7 +72,7 @@ AcknowledgementType OnC2dMessageAck(FeedbackBatch feedbackMessages)
_logger.LogInformation($"\tDevice {feedbackRecord.DeviceId} acted on message: {feedbackRecord.OriginalMessageId} with status: {feedbackRecord.StatusCode}");
}

return ackType;
return Task.FromResult(ackType);
}

s_serviceClient.MessageFeedback.MessageFeedbackProcessor = OnC2dMessageAck;
Expand Down
2 changes: 2 additions & 0 deletions iothub/service/src/Amqp/AmqpConnectionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ internal virtual async Task CloseAsync(CancellationToken cancellationToken)

try
{
_connection.Closed -= _connectionLossHandler;

_cbsSession?.Close(); // not async because the cbs link type only has a sync close API

if (_workerSession != null)
Expand Down
1 change: 1 addition & 0 deletions iothub/service/src/Amqp/AmqpReceivingLinkHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ internal async Task CloseAsync(CancellationToken cancellationToken)
{
if (_receivingLink != null)
{
_receivingLink.Closed -= _connectionLossHandler;
await _receivingLink.CloseAsync(cancellationToken);
}
}
Expand Down
1 change: 1 addition & 0 deletions iothub/service/src/Amqp/AmqpSendingLinkHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public async Task CloseAsync(CancellationToken cancellationToken)
{
if (_sendingLink != null)
{
_sendingLink.Closed -= _connectionLossHandler;
await _sendingLink.CloseAsync(cancellationToken).ConfigureAwait(false);
}
}
Expand Down
2 changes: 2 additions & 0 deletions iothub/service/src/Amqp/AmqpSessionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ internal async Task CloseAsync(CancellationToken cancellationToken)

try
{
_session.Closed -= _connectionLossHandler;

if (_sendingLinkHandler != null)
{
await _sendingLinkHandler.CloseAsync(cancellationToken).ConfigureAwait(false);
Expand Down
15 changes: 11 additions & 4 deletions iothub/service/src/Feedback/MessageFeedbackProcessorClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ internal MessageFeedbackProcessorClient(
/// }
/// </code>
/// </example>
public Func<FeedbackBatch, AcknowledgementType> MessageFeedbackProcessor { get; set; }
public Func<FeedbackBatch, Task<AcknowledgementType>> MessageFeedbackProcessor { get; set; }

/// <summary>
/// The callback to be executed when the connection is lost.
Expand All @@ -101,12 +101,19 @@ internal MessageFeedbackProcessorClient(
///
/// public void OnConnectionLost(ErrorContext errorContext)
/// {
/// // Add reconnection logic as needed
/// Console.WriteLine("Feedback message processor connection lost")
///
/// // Add reconnection logic as needed, for example:
/// await serviceClient.MessageFeedbackProcessor.OpenAsync();
/// }
/// </code>
/// </example>
public Action<ErrorContext> ErrorProcessor { get; set; }
/// <remarks>
/// This callback will not receive events once <see cref="CloseAsync(CancellationToken)"/> is called.
/// This callback will start receiving events again once <see cref="OpenAsync(CancellationToken)"/> is called.
/// This callback will persist across any number of open/close/open calls, so it does not need to be set before each open call.
/// </remarks>
public Func<ErrorContext, Task> ErrorProcessor { get; set; }

/// <summary>
/// Open the connection and start receiving acknowledgements for messages sent.
Expand Down Expand Up @@ -223,7 +230,7 @@ private async void OnFeedbackMessageReceivedAsync(AmqpMessage amqpMessage)
amqpMessage.Properties.UserId.Count)
};

AcknowledgementType ack = MessageFeedbackProcessor.Invoke(feedbackBatch);
AcknowledgementType ack = await MessageFeedbackProcessor.Invoke(feedbackBatch);
if (ack == AcknowledgementType.Complete)
{
await _amqpConnection.CompleteMessageAsync(amqpMessage.DeliveryTag).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ internal FileUploadNotificationProcessorClient(
/// }
/// </code>
/// </example>
/// <remarks>
/// This callback will not receive events once <see cref="CloseAsync(CancellationToken)"/> is called.
/// This callback will start receiving events again once <see cref="OpenAsync(CancellationToken)"/> is called.
/// This callback will persist across any number of open/close/open calls, so it does not need to be set before each open call.
/// </remarks>
public Func<ErrorContext, Task> ErrorProcessor { get; set; }

/// <summary>
Expand Down
13 changes: 10 additions & 3 deletions iothub/service/src/Messaging/MessagesClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,19 @@ internal MessagesClient(
///
/// public void OnConnectionLost(ErrorContext errorContext)
/// {
/// // Add reconnection logic as needed
/// Console.WriteLine("Messaging client connection lost")
/// Console.WriteLine("Messaging client connection lost");
timtay-microsoft marked this conversation as resolved.
Show resolved Hide resolved
///
/// // Add reconnection logic as needed, for example:
/// await serviceClient.Messaging.OpenAsync();
/// }
/// </code>
/// </example>
public Action<ErrorContext> ErrorProcessor { get; set; }
/// <remarks>
/// This callback will not receive events once <see cref="CloseAsync(CancellationToken)"/> is called.
/// This callback will start receiving events again once <see cref="OpenAsync(CancellationToken)"/> is called.
/// This callback will persist across any number of open/close/open calls, so it does not need to be set before each open call.
/// </remarks>
public Func<ErrorContext, Task> ErrorProcessor { get; set; }

/// <summary>
/// Open the connection. Must be done before any cloud-to-device messages can be sent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public async Task MessageFeedbackProcessorClient_OpenAsync_Ok()
s_retryHandler,
mockAmqpConnectionHandler.Object);

AcknowledgementType messageFeedbackProcessor(FeedbackBatch FeedbackBatch) => AcknowledgementType.Complete;
Task<AcknowledgementType> messageFeedbackProcessor(FeedbackBatch FeedbackBatch) => Task.FromResult(AcknowledgementType.Complete);

messageFeedbackProcessorClient.MessageFeedbackProcessor = messageFeedbackProcessor;

Expand Down