From 911923977c7ce279e2c73740d7160b3104231a3c Mon Sep 17 00:00:00 2001 From: Azad Abbasi Date: Tue, 22 Dec 2020 00:06:31 +0000 Subject: [PATCH] Fix(warnings) : CA2213 - CA2215 (#1710) --- .../Transport/HttpBufferedStream.cs | 1 + .../Transport/Amqp/AmqpTransportHandler.cs | 3 +- .../Mqtt/ReadOnlyByteBufferStream.cs | 46 ++++------ .../src/Transport/RetryDelegatingHandler.cs | 15 +++- iothub/service/src/AmqpServiceClient.cs | 1 + iothub/service/src/HttpRegistryManager.cs | 1 + iothub/service/src/JobClient/HttpJobClient.cs | 1 + .../src/ProvisioningTransportHandlerAmqp.cs | 85 +++++++++---------- .../src/ProvisioningTransportHandlerHttp.cs | 10 +-- 9 files changed, 84 insertions(+), 79 deletions(-) diff --git a/iothub/device/src/ModernDotNet/HsmAuthentication/Transport/HttpBufferedStream.cs b/iothub/device/src/ModernDotNet/HsmAuthentication/Transport/HttpBufferedStream.cs index eac7f44500..e00e57696d 100644 --- a/iothub/device/src/ModernDotNet/HsmAuthentication/Transport/HttpBufferedStream.cs +++ b/iothub/device/src/ModernDotNet/HsmAuthentication/Transport/HttpBufferedStream.cs @@ -112,6 +112,7 @@ public override long Position protected override void Dispose(bool disposing) { + base.Dispose(disposing); _innerStream.Dispose(); } } diff --git a/iothub/device/src/Transport/Amqp/AmqpTransportHandler.cs b/iothub/device/src/Transport/Amqp/AmqpTransportHandler.cs index 8212d1e199..aa322c6cc6 100644 --- a/iothub/device/src/Transport/Amqp/AmqpTransportHandler.cs +++ b/iothub/device/src/Transport/Amqp/AmqpTransportHandler.cs @@ -547,6 +547,8 @@ protected override void Dispose(bool disposing) { lock (_lock) { + base.Dispose(disposing); + if (_disposed) { return; @@ -557,7 +559,6 @@ protected override void Dispose(bool disposing) if (disposing) { _closed = true; - OnTransportClosedGracefully(); AmqpUnitManager.GetInstance().RemoveAmqpUnit(_amqpUnit); _disposed = true; } diff --git a/iothub/device/src/Transport/Mqtt/ReadOnlyByteBufferStream.cs b/iothub/device/src/Transport/Mqtt/ReadOnlyByteBufferStream.cs index 4145bdc6bc..d1c3aab8f7 100644 --- a/iothub/device/src/Transport/Mqtt/ReadOnlyByteBufferStream.cs +++ b/iothub/device/src/Transport/Mqtt/ReadOnlyByteBufferStream.cs @@ -10,45 +10,33 @@ namespace Microsoft.Azure.Devices.Client.Transport.Mqtt { internal sealed class ReadOnlyByteBufferStream : Stream { - private readonly IByteBuffer buffer; - private bool releaseReferenceOnClosure; + private readonly IByteBuffer _buffer; + private bool _releaseReferenceOnClosure; public ReadOnlyByteBufferStream(IByteBuffer buffer, bool releaseReferenceOnClosure) { - this.buffer = buffer; - this.releaseReferenceOnClosure = releaseReferenceOnClosure; + _buffer = buffer; + _releaseReferenceOnClosure = releaseReferenceOnClosure; } - public override bool CanRead - { - get { return true; } - } + public override bool CanRead => true; - public override bool CanSeek - { - get { return false; } - } + public override bool CanSeek => false; - public override bool CanWrite - { - get { return false; } - } + public override bool CanWrite => false; - public override long Length - { - get { throw new NotSupportedException(); } - } + public override long Length => throw new NotSupportedException(); public override long Position { - get { throw new NotSupportedException(); } - set { throw new NotSupportedException(); } + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); } public override int Read(byte[] output, int offset, int count) { - int read = Math.Min(count - offset, this.buffer.ReadableBytes); - this.buffer.ReadBytes(output, offset, read); + int read = Math.Min(count - offset, _buffer.ReadableBytes); + _ = _buffer.ReadBytes(output, offset, read); return read; } @@ -58,16 +46,18 @@ public override void Flush() protected override void Dispose(bool disposing) { - if (this.releaseReferenceOnClosure) + base.Dispose(disposing); + + if (_releaseReferenceOnClosure) { - this.releaseReferenceOnClosure = false; + _releaseReferenceOnClosure = false; if (disposing) { - this.buffer.Release(); + _ = _buffer.Release(); } else { - ReferenceCountUtil.SafeRelease(this.buffer); + ReferenceCountUtil.SafeRelease(_buffer); } } } diff --git a/iothub/device/src/Transport/RetryDelegatingHandler.cs b/iothub/device/src/Transport/RetryDelegatingHandler.cs index ed277b98a7..716770fe7e 100644 --- a/iothub/device/src/Transport/RetryDelegatingHandler.cs +++ b/iothub/device/src/Transport/RetryDelegatingHandler.cs @@ -20,7 +20,7 @@ internal class RetryDelegatingHandler : DefaultDelegatingHandler private RetryPolicy _internalRetryPolicy; - private readonly SemaphoreSlim _handlerSemaphore = new SemaphoreSlim(1, 1); + private SemaphoreSlim _handlerSemaphore = new SemaphoreSlim(1, 1); private bool _openCalled; private bool _opened; private bool _methodsEnabled; @@ -625,13 +625,13 @@ public override async Task CloseAsync(CancellationToken cancellationToken) _handleDisconnectCts.Cancel(); await base.CloseAsync(cancellationToken).ConfigureAwait(false); - Dispose(true); } finally { Logging.Exit(this, cancellationToken, nameof(CloseAsync)); _handlerSemaphore.Release(); + Dispose(true); } } @@ -640,6 +640,14 @@ public override async Task CloseAsync(CancellationToken cancellationToken) /// private async Task EnsureOpenedAsync(CancellationToken cancellationToken) { + // If this object has already been disposed, we will throw an exception indicating that. + // This is the entry point for interacting with the client and this safety check should be done here. + // The current behavior does not support open->close->open + if (_disposed) + { + throw new ObjectDisposedException(nameof(RetryDelegatingHandler)); + } + if (Volatile.Read(ref _opened)) { return; @@ -958,6 +966,9 @@ protected override void Dispose(bool disposing) { _handleDisconnectCts?.Cancel(); _handleDisconnectCts?.Dispose(); + + _handlerSemaphore?.Dispose(); + _handlerSemaphore = null; } } } diff --git a/iothub/service/src/AmqpServiceClient.cs b/iothub/service/src/AmqpServiceClient.cs index 814c9db248..b22017be5d 100644 --- a/iothub/service/src/AmqpServiceClient.cs +++ b/iothub/service/src/AmqpServiceClient.cs @@ -389,6 +389,7 @@ private Task CreateSendingLinkAsync(TimeSpan timeout) /// protected override void Dispose(bool disposing) { + base.Dispose(disposing); if (disposing) { _faultTolerantSendingLink.Dispose(); diff --git a/iothub/service/src/HttpRegistryManager.cs b/iothub/service/src/HttpRegistryManager.cs index 81050f01f5..bef9961e76 100644 --- a/iothub/service/src/HttpRegistryManager.cs +++ b/iothub/service/src/HttpRegistryManager.cs @@ -1120,6 +1120,7 @@ public override IQuery CreateQuery(string sqlQueryString, int? pageSize) protected override void Dispose(bool disposing) { + base.Dispose(disposing); if (disposing && _httpClientHelper != null) { _httpClientHelper.Dispose(); diff --git a/iothub/service/src/JobClient/HttpJobClient.cs b/iothub/service/src/JobClient/HttpJobClient.cs index 6ee8c118d3..7a74dea26d 100644 --- a/iothub/service/src/JobClient/HttpJobClient.cs +++ b/iothub/service/src/JobClient/HttpJobClient.cs @@ -56,6 +56,7 @@ public override Task CloseAsync() protected override void Dispose(bool disposing) { + base.Dispose(disposing); if (disposing) { if (_httpClientHelper != null) diff --git a/provisioning/transport/amqp/src/ProvisioningTransportHandlerAmqp.cs b/provisioning/transport/amqp/src/ProvisioningTransportHandlerAmqp.cs index ceec9a38b6..740d4265a2 100644 --- a/provisioning/transport/amqp/src/ProvisioningTransportHandlerAmqp.cs +++ b/provisioning/transport/amqp/src/ProvisioningTransportHandlerAmqp.cs @@ -131,10 +131,8 @@ await CreateLinksAsync( cancellationToken.ThrowIfCancellationRequested(); await Task.Delay( - operation.RetryAfter ?? - RetryJitter.GenerateDelayWithJitterForRetry(s_defaultOperationPoolingInterval)).ConfigureAwait(false); - - cancellationToken.ThrowIfCancellationRequested(); + operation.RetryAfter ?? RetryJitter.GenerateDelayWithJitterForRetry(s_defaultOperationPoolingInterval), + cancellationToken).ConfigureAwait(false); try { @@ -143,9 +141,9 @@ await Task.Delay( operationId, correlationId).ConfigureAwait(false); } - catch (ProvisioningTransportException e) when (e.ErrorDetails is ProvisioningErrorDetailsAmqp && e.IsTransient) + catch (ProvisioningTransportException e) when (e.ErrorDetails is ProvisioningErrorDetailsAmqp amqp && e.IsTransient) { - operation.RetryAfter = ((ProvisioningErrorDetailsAmqp)e.ErrorDetails).RetryAfter; + operation.RetryAfter = amqp.RetryAfter; } attempts++; @@ -183,17 +181,17 @@ await Task.Delay( private static async Task CreateLinksAsync(AmqpClientConnection connection, string linkEndpoint, string productInfo) { - var amqpDeviceSession = connection.CreateSession(); + AmqpClientSession amqpDeviceSession = connection.CreateSession(); await amqpDeviceSession.OpenAsync(s_timeoutConstant).ConfigureAwait(false); - var amqpReceivingLink = amqpDeviceSession.CreateReceivingLink(linkEndpoint); + AmqpClientLink amqpReceivingLink = amqpDeviceSession.CreateReceivingLink(linkEndpoint); amqpReceivingLink.AddClientVersion(productInfo); amqpReceivingLink.AddApiVersion(ClientApiVersionHelper.ApiVersion); await amqpReceivingLink.OpenAsync(s_timeoutConstant).ConfigureAwait(false); - var amqpSendingLink = amqpDeviceSession.CreateSendingLink(linkEndpoint); + AmqpClientLink amqpSendingLink = amqpDeviceSession.CreateSendingLink(linkEndpoint); amqpSendingLink.AddClientVersion(productInfo); amqpSendingLink.AddApiVersion(ClientApiVersionHelper.ApiVersion); @@ -235,15 +233,13 @@ private async Task RegisterDeviceAsync( AmqpMessage amqpResponse = await client.AmqpSession.ReceivingLink.ReceiveMessageAsync(s_timeoutConstant).ConfigureAwait(false); client.AmqpSession.ReceivingLink.AcceptMessage(amqpResponse); - using (var streamReader = new StreamReader(amqpResponse.BodyStream)) - { - string jsonResponse = await streamReader - .ReadToEndAsync() - .ConfigureAwait(false); - RegistrationOperationStatus status = JsonConvert.DeserializeObject(jsonResponse); - status.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromApplicationProperties(amqpResponse, s_defaultOperationPoolingInterval); - return status; - } + using var streamReader = new StreamReader(amqpResponse.BodyStream); + string jsonResponse = await streamReader + .ReadToEndAsync() + .ConfigureAwait(false); + RegistrationOperationStatus status = JsonConvert.DeserializeObject(jsonResponse); + status.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromApplicationProperties(amqpResponse, s_defaultOperationPoolingInterval); + return status; } finally { @@ -256,31 +252,34 @@ private async Task OperationStatusLookupAsync( string operationId, string correlationId) { - using (var amqpMessage = AmqpMessage.Create(new AmqpValue { Value = DeviceOperations.GetOperationStatus })) - { - amqpMessage.Properties.CorrelationId = correlationId; - amqpMessage.ApplicationProperties.Map[MessageApplicationPropertyNames.OperationType] = - DeviceOperations.GetOperationStatus; - amqpMessage.ApplicationProperties.Map[MessageApplicationPropertyNames.OperationId] = operationId; - var outcome = await client.AmqpSession.SendingLink - .SendMessageAsync( - amqpMessage, - new ArraySegment(Guid.NewGuid().ToByteArray()), - s_timeoutConstant) - .ConfigureAwait(false); - ValidateOutcome(outcome); - AmqpMessage amqpResponse = await client.AmqpSession.ReceivingLink.ReceiveMessageAsync(s_timeoutConstant) - .ConfigureAwait(false); - client.AmqpSession.ReceivingLink.AcceptMessage(amqpResponse); + using var amqpMessage = AmqpMessage.Create(new AmqpValue { Value = DeviceOperations.GetOperationStatus }); - using (var streamReader = new StreamReader(amqpResponse.BodyStream)) - { - string jsonResponse = await streamReader.ReadToEndAsync().ConfigureAwait(false); - RegistrationOperationStatus status = JsonConvert.DeserializeObject(jsonResponse); - status.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromApplicationProperties(amqpResponse, s_defaultOperationPoolingInterval); - return status; - } - } + amqpMessage.Properties.CorrelationId = correlationId; + amqpMessage.ApplicationProperties.Map[MessageApplicationPropertyNames.OperationType] = + DeviceOperations.GetOperationStatus; + amqpMessage.ApplicationProperties.Map[MessageApplicationPropertyNames.OperationId] = operationId; + + Outcome outcome = await client.AmqpSession.SendingLink + .SendMessageAsync( + amqpMessage, + new ArraySegment(Guid.NewGuid().ToByteArray()), + s_timeoutConstant) + .ConfigureAwait(false); + + ValidateOutcome(outcome); + + AmqpMessage amqpResponse = await client.AmqpSession.ReceivingLink.ReceiveMessageAsync(s_timeoutConstant) + .ConfigureAwait(false); + + client.AmqpSession.ReceivingLink.AcceptMessage(amqpResponse); + + using var streamReader = new StreamReader(amqpResponse.BodyStream); + + string jsonResponse = await streamReader.ReadToEndAsync().ConfigureAwait(false); + RegistrationOperationStatus status = JsonConvert.DeserializeObject(jsonResponse); + status.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromApplicationProperties(amqpResponse, s_defaultOperationPoolingInterval); + + return status; } private static DeviceRegistrationResult ConvertToProvisioningRegistrationResult( @@ -310,7 +309,7 @@ private void ValidateOutcome(Outcome outcome) { try { - var errorDetails = JsonConvert.DeserializeObject(rejected.Error.Description); + ProvisioningErrorDetailsAmqp errorDetails = JsonConvert.DeserializeObject(rejected.Error.Description); int statusCode = errorDetails.ErrorCode / 1000; bool isTransient = statusCode >= (int)HttpStatusCode.InternalServerError || statusCode == 429; if (isTransient) diff --git a/provisioning/transport/http/src/ProvisioningTransportHandlerHttp.cs b/provisioning/transport/http/src/ProvisioningTransportHandlerHttp.cs index 001a39237d..59b3af6723 100644 --- a/provisioning/transport/http/src/ProvisioningTransportHandlerHttp.cs +++ b/provisioning/transport/http/src/ProvisioningTransportHandlerHttp.cs @@ -117,7 +117,8 @@ public async override Task RegisterAsync( .RegisterDeviceAsync( registrationId, message.IdScope, - deviceRegistration) + deviceRegistration, + cancellationToken: cancellationToken) .ConfigureAwait(false); int attempts = 0; @@ -147,18 +148,17 @@ public async override Task RegisterAsync( } await Task - .Delay(serviceRecommendedDelay ?? RetryJitter.GenerateDelayWithJitterForRetry(s_defaultOperationPoolingIntervalMilliseconds)) + .Delay(serviceRecommendedDelay ?? RetryJitter.GenerateDelayWithJitterForRetry(s_defaultOperationPoolingIntervalMilliseconds), cancellationToken) .ConfigureAwait(false); - cancellationToken.ThrowIfCancellationRequested(); - try { operation = await client .RuntimeRegistration.OperationStatusLookupAsync( registrationId, operationId, - message.IdScope) + message.IdScope, + cancellationToken: cancellationToken) .ConfigureAwait(false); } catch (HttpOperationException ex)