Skip to content

Commit

Permalink
Fix(warnings) : CA2213 - CA2215 (#1710)
Browse files Browse the repository at this point in the history
  • Loading branch information
azabbasi authored Dec 22, 2020
1 parent 345244c commit 9119239
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public override long Position

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
_innerStream.Dispose();
}
}
Expand Down
3 changes: 2 additions & 1 deletion iothub/device/src/Transport/Amqp/AmqpTransportHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,8 @@ protected override void Dispose(bool disposing)
{
lock (_lock)
{
base.Dispose(disposing);

if (_disposed)
{
return;
Expand All @@ -557,7 +559,6 @@ protected override void Dispose(bool disposing)
if (disposing)
{
_closed = true;
OnTransportClosedGracefully();
AmqpUnitManager.GetInstance().RemoveAmqpUnit(_amqpUnit);
_disposed = true;
}
Expand Down
46 changes: 18 additions & 28 deletions iothub/device/src/Transport/Mqtt/ReadOnlyByteBufferStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,45 +10,33 @@ namespace Microsoft.Azure.Devices.Client.Transport.Mqtt
{
internal sealed class ReadOnlyByteBufferStream : Stream
{
private readonly IByteBuffer buffer;
private bool releaseReferenceOnClosure;
private readonly IByteBuffer _buffer;
private bool _releaseReferenceOnClosure;

public ReadOnlyByteBufferStream(IByteBuffer buffer, bool releaseReferenceOnClosure)
{
this.buffer = buffer;
this.releaseReferenceOnClosure = releaseReferenceOnClosure;
_buffer = buffer;
_releaseReferenceOnClosure = releaseReferenceOnClosure;
}

public override bool CanRead
{
get { return true; }
}
public override bool CanRead => true;

public override bool CanSeek
{
get { return false; }
}
public override bool CanSeek => false;

public override bool CanWrite
{
get { return false; }
}
public override bool CanWrite => false;

public override long Length
{
get { throw new NotSupportedException(); }
}
public override long Length => throw new NotSupportedException();

public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}

public override int Read(byte[] output, int offset, int count)
{
int read = Math.Min(count - offset, this.buffer.ReadableBytes);
this.buffer.ReadBytes(output, offset, read);
int read = Math.Min(count - offset, _buffer.ReadableBytes);
_ = _buffer.ReadBytes(output, offset, read);
return read;
}

Expand All @@ -58,16 +46,18 @@ public override void Flush()

protected override void Dispose(bool disposing)
{
if (this.releaseReferenceOnClosure)
base.Dispose(disposing);

if (_releaseReferenceOnClosure)
{
this.releaseReferenceOnClosure = false;
_releaseReferenceOnClosure = false;
if (disposing)
{
this.buffer.Release();
_ = _buffer.Release();
}
else
{
ReferenceCountUtil.SafeRelease(this.buffer);
ReferenceCountUtil.SafeRelease(_buffer);
}
}
}
Expand Down
15 changes: 13 additions & 2 deletions iothub/device/src/Transport/RetryDelegatingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ internal class RetryDelegatingHandler : DefaultDelegatingHandler

private RetryPolicy _internalRetryPolicy;

private readonly SemaphoreSlim _handlerSemaphore = new SemaphoreSlim(1, 1);
private SemaphoreSlim _handlerSemaphore = new SemaphoreSlim(1, 1);
private bool _openCalled;
private bool _opened;
private bool _methodsEnabled;
Expand Down Expand Up @@ -625,13 +625,13 @@ public override async Task CloseAsync(CancellationToken cancellationToken)

_handleDisconnectCts.Cancel();
await base.CloseAsync(cancellationToken).ConfigureAwait(false);
Dispose(true);
}
finally
{
Logging.Exit(this, cancellationToken, nameof(CloseAsync));

_handlerSemaphore.Release();
Dispose(true);
}
}

Expand All @@ -640,6 +640,14 @@ public override async Task CloseAsync(CancellationToken cancellationToken)
/// </summary>
private async Task EnsureOpenedAsync(CancellationToken cancellationToken)
{
// If this object has already been disposed, we will throw an exception indicating that.
// This is the entry point for interacting with the client and this safety check should be done here.
// The current behavior does not support open->close->open
if (_disposed)
{
throw new ObjectDisposedException(nameof(RetryDelegatingHandler));
}

if (Volatile.Read(ref _opened))
{
return;
Expand Down Expand Up @@ -958,6 +966,9 @@ protected override void Dispose(bool disposing)
{
_handleDisconnectCts?.Cancel();
_handleDisconnectCts?.Dispose();

_handlerSemaphore?.Dispose();
_handlerSemaphore = null;
}
}
}
Expand Down
1 change: 1 addition & 0 deletions iothub/service/src/AmqpServiceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ private Task<SendingAmqpLink> CreateSendingLinkAsync(TimeSpan timeout)
/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
{
_faultTolerantSendingLink.Dispose();
Expand Down
1 change: 1 addition & 0 deletions iothub/service/src/HttpRegistryManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,7 @@ public override IQuery CreateQuery(string sqlQueryString, int? pageSize)

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing && _httpClientHelper != null)
{
_httpClientHelper.Dispose();
Expand Down
1 change: 1 addition & 0 deletions iothub/service/src/JobClient/HttpJobClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public override Task CloseAsync()

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
{
if (_httpClientHelper != null)
Expand Down
85 changes: 42 additions & 43 deletions provisioning/transport/amqp/src/ProvisioningTransportHandlerAmqp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,8 @@ await CreateLinksAsync(
cancellationToken.ThrowIfCancellationRequested();

await Task.Delay(
operation.RetryAfter ??
RetryJitter.GenerateDelayWithJitterForRetry(s_defaultOperationPoolingInterval)).ConfigureAwait(false);

cancellationToken.ThrowIfCancellationRequested();
operation.RetryAfter ?? RetryJitter.GenerateDelayWithJitterForRetry(s_defaultOperationPoolingInterval),
cancellationToken).ConfigureAwait(false);

try
{
Expand All @@ -143,9 +141,9 @@ await Task.Delay(
operationId,
correlationId).ConfigureAwait(false);
}
catch (ProvisioningTransportException e) when (e.ErrorDetails is ProvisioningErrorDetailsAmqp && e.IsTransient)
catch (ProvisioningTransportException e) when (e.ErrorDetails is ProvisioningErrorDetailsAmqp amqp && e.IsTransient)
{
operation.RetryAfter = ((ProvisioningErrorDetailsAmqp)e.ErrorDetails).RetryAfter;
operation.RetryAfter = amqp.RetryAfter;
}

attempts++;
Expand Down Expand Up @@ -183,17 +181,17 @@ await Task.Delay(

private static async Task CreateLinksAsync(AmqpClientConnection connection, string linkEndpoint, string productInfo)
{
var amqpDeviceSession = connection.CreateSession();
AmqpClientSession amqpDeviceSession = connection.CreateSession();
await amqpDeviceSession.OpenAsync(s_timeoutConstant).ConfigureAwait(false);

var amqpReceivingLink = amqpDeviceSession.CreateReceivingLink(linkEndpoint);
AmqpClientLink amqpReceivingLink = amqpDeviceSession.CreateReceivingLink(linkEndpoint);

amqpReceivingLink.AddClientVersion(productInfo);
amqpReceivingLink.AddApiVersion(ClientApiVersionHelper.ApiVersion);

await amqpReceivingLink.OpenAsync(s_timeoutConstant).ConfigureAwait(false);

var amqpSendingLink = amqpDeviceSession.CreateSendingLink(linkEndpoint);
AmqpClientLink amqpSendingLink = amqpDeviceSession.CreateSendingLink(linkEndpoint);

amqpSendingLink.AddClientVersion(productInfo);
amqpSendingLink.AddApiVersion(ClientApiVersionHelper.ApiVersion);
Expand Down Expand Up @@ -235,15 +233,13 @@ private async Task<RegistrationOperationStatus> RegisterDeviceAsync(
AmqpMessage amqpResponse = await client.AmqpSession.ReceivingLink.ReceiveMessageAsync(s_timeoutConstant).ConfigureAwait(false);
client.AmqpSession.ReceivingLink.AcceptMessage(amqpResponse);

using (var streamReader = new StreamReader(amqpResponse.BodyStream))
{
string jsonResponse = await streamReader
.ReadToEndAsync()
.ConfigureAwait(false);
RegistrationOperationStatus status = JsonConvert.DeserializeObject<RegistrationOperationStatus>(jsonResponse);
status.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromApplicationProperties(amqpResponse, s_defaultOperationPoolingInterval);
return status;
}
using var streamReader = new StreamReader(amqpResponse.BodyStream);
string jsonResponse = await streamReader
.ReadToEndAsync()
.ConfigureAwait(false);
RegistrationOperationStatus status = JsonConvert.DeserializeObject<RegistrationOperationStatus>(jsonResponse);
status.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromApplicationProperties(amqpResponse, s_defaultOperationPoolingInterval);
return status;
}
finally
{
Expand All @@ -256,31 +252,34 @@ private async Task<RegistrationOperationStatus> OperationStatusLookupAsync(
string operationId,
string correlationId)
{
using (var amqpMessage = AmqpMessage.Create(new AmqpValue { Value = DeviceOperations.GetOperationStatus }))
{
amqpMessage.Properties.CorrelationId = correlationId;
amqpMessage.ApplicationProperties.Map[MessageApplicationPropertyNames.OperationType] =
DeviceOperations.GetOperationStatus;
amqpMessage.ApplicationProperties.Map[MessageApplicationPropertyNames.OperationId] = operationId;
var outcome = await client.AmqpSession.SendingLink
.SendMessageAsync(
amqpMessage,
new ArraySegment<byte>(Guid.NewGuid().ToByteArray()),
s_timeoutConstant)
.ConfigureAwait(false);
ValidateOutcome(outcome);
AmqpMessage amqpResponse = await client.AmqpSession.ReceivingLink.ReceiveMessageAsync(s_timeoutConstant)
.ConfigureAwait(false);
client.AmqpSession.ReceivingLink.AcceptMessage(amqpResponse);
using var amqpMessage = AmqpMessage.Create(new AmqpValue { Value = DeviceOperations.GetOperationStatus });

using (var streamReader = new StreamReader(amqpResponse.BodyStream))
{
string jsonResponse = await streamReader.ReadToEndAsync().ConfigureAwait(false);
RegistrationOperationStatus status = JsonConvert.DeserializeObject<RegistrationOperationStatus>(jsonResponse);
status.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromApplicationProperties(amqpResponse, s_defaultOperationPoolingInterval);
return status;
}
}
amqpMessage.Properties.CorrelationId = correlationId;
amqpMessage.ApplicationProperties.Map[MessageApplicationPropertyNames.OperationType] =
DeviceOperations.GetOperationStatus;
amqpMessage.ApplicationProperties.Map[MessageApplicationPropertyNames.OperationId] = operationId;

Outcome outcome = await client.AmqpSession.SendingLink
.SendMessageAsync(
amqpMessage,
new ArraySegment<byte>(Guid.NewGuid().ToByteArray()),
s_timeoutConstant)
.ConfigureAwait(false);

ValidateOutcome(outcome);

AmqpMessage amqpResponse = await client.AmqpSession.ReceivingLink.ReceiveMessageAsync(s_timeoutConstant)
.ConfigureAwait(false);

client.AmqpSession.ReceivingLink.AcceptMessage(amqpResponse);

using var streamReader = new StreamReader(amqpResponse.BodyStream);

string jsonResponse = await streamReader.ReadToEndAsync().ConfigureAwait(false);
RegistrationOperationStatus status = JsonConvert.DeserializeObject<RegistrationOperationStatus>(jsonResponse);
status.RetryAfter = ProvisioningErrorDetailsAmqp.GetRetryAfterFromApplicationProperties(amqpResponse, s_defaultOperationPoolingInterval);

return status;
}

private static DeviceRegistrationResult ConvertToProvisioningRegistrationResult(
Expand Down Expand Up @@ -310,7 +309,7 @@ private void ValidateOutcome(Outcome outcome)
{
try
{
var errorDetails = JsonConvert.DeserializeObject<ProvisioningErrorDetailsAmqp>(rejected.Error.Description);
ProvisioningErrorDetailsAmqp errorDetails = JsonConvert.DeserializeObject<ProvisioningErrorDetailsAmqp>(rejected.Error.Description);
int statusCode = errorDetails.ErrorCode / 1000;
bool isTransient = statusCode >= (int)HttpStatusCode.InternalServerError || statusCode == 429;
if (isTransient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ public async override Task<DeviceRegistrationResult> RegisterAsync(
.RegisterDeviceAsync(
registrationId,
message.IdScope,
deviceRegistration)
deviceRegistration,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

int attempts = 0;
Expand Down Expand Up @@ -147,18 +148,17 @@ public async override Task<DeviceRegistrationResult> RegisterAsync(
}

await Task
.Delay(serviceRecommendedDelay ?? RetryJitter.GenerateDelayWithJitterForRetry(s_defaultOperationPoolingIntervalMilliseconds))
.Delay(serviceRecommendedDelay ?? RetryJitter.GenerateDelayWithJitterForRetry(s_defaultOperationPoolingIntervalMilliseconds), cancellationToken)
.ConfigureAwait(false);

cancellationToken.ThrowIfCancellationRequested();

try
{
operation = await client
.RuntimeRegistration.OperationStatusLookupAsync(
registrationId,
operationId,
message.IdScope)
message.IdScope,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
}
catch (HttpOperationException ex)
Expand Down

0 comments on commit 9119239

Please sign in to comment.