Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing CA2000 #1699

Merged
merged 5 commits into from
Dec 7, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion iothub/device/src/Edge/CustomCertificateValidator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ private static bool ValidateCertificate(X509Certificate2 trustedCertificate, X50
chain.ChainPolicy.ExtraStore.Add(trustedCertificate);
chain.ChainPolicy.VerificationFlags = X509VerificationFlags.AllowUnknownCertificateAuthority;
#if !NET451
if (!chain.Build(new X509Certificate2(certificate)))
using var cert = new X509Certificate2(certificate);
if (!chain.Build(cert))
{
Debug.WriteLine("Unable to build the chain using the expected root certificate.");
return false;
Expand Down
80 changes: 52 additions & 28 deletions provisioning/transport/amqp/src/AmqpClientConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using Microsoft.Azure.Amqp.Transport;
using Microsoft.Azure.Devices.Shared;
using System;
using System.Diagnostics;
using System.Net;
using System.Net.Security;
using System.Net.WebSockets;
Expand All @@ -17,11 +16,13 @@

namespace Microsoft.Azure.Devices.Provisioning.Client.Transport
{
internal class AmqpClientConnection
internal class AmqpClientConnection : IDisposable
{
private readonly AmqpSettings _amqpSettings;
private readonly Uri _uri;

private bool _isDisposed;

internal AmqpClientConnection(Uri uri, AmqpSettings amqpSettings)
{
_uri = uri;
Expand All @@ -42,12 +43,12 @@ internal AmqpClientConnection(Uri uri, AmqpSettings amqpSettings)

public AmqpClientSession AmqpSession { get; private set; }

public bool IsConnectionClosed => _isConnectionClosed;

private bool _isConnectionClosed;
public bool IsConnectionClosed { get; private set; }

private TaskCompletionSource<TransportBase> _tcs;

private TransportBase _transport;

private ProtocolHeader _sentHeader;

public async Task OpenAsync(TimeSpan timeout, bool useWebSocket, X509Certificate2 clientCert, IWebProxy proxy, RemoteCertificateValidationCallback remoteCerificateValidationCallback)
Expand All @@ -57,7 +58,7 @@ public async Task OpenAsync(TimeSpan timeout, bool useWebSocket, X509Certificate
Logging.Enter(this, $"{nameof(AmqpClientConnection)}.{nameof(OpenAsync)}");
}

var hostName = _uri.Host;
string hostName = _uri.Host;

var tcpSettings = new TcpTransportSettings { Host = hostName, Port = _uri.Port != -1 ? _uri.Port : AmqpConstants.DefaultSecurePort };
TransportSettings = new TlsTransportSettings(tcpSettings)
Expand All @@ -67,11 +68,9 @@ public async Task OpenAsync(TimeSpan timeout, bool useWebSocket, X509Certificate
CertificateValidationCallback = remoteCerificateValidationCallback,
};

TransportBase transport;

if (useWebSocket)
{
transport = await CreateClientWebSocketTransportAsync(timeout, proxy).ConfigureAwait(false);
_transport = await CreateClientWebSocketTransportAsync(timeout, proxy).ConfigureAwait(false);
SaslTransportProvider provider = _amqpSettings.GetTransportProvider<SaslTransportProvider>();
if (provider != null)
{
Expand All @@ -81,46 +80,46 @@ public async Task OpenAsync(TimeSpan timeout, bool useWebSocket, X509Certificate
}

_sentHeader = new ProtocolHeader(provider.ProtocolId, provider.DefaultVersion);
ByteBuffer buffer = new ByteBuffer(new byte[AmqpConstants.ProtocolHeaderSize]);
using var buffer = new ByteBuffer(new byte[AmqpConstants.ProtocolHeaderSize]);
_sentHeader.Encode(buffer);

_tcs = new TaskCompletionSource<TransportBase>();

var args = new TransportAsyncCallbackArgs();
args.SetBuffer(buffer.Buffer, buffer.Offset, buffer.Length);
args.CompletedCallback = OnWriteHeaderComplete;
args.Transport = transport;
bool operationPending = transport.WriteAsync(args);
args.Transport = _transport;
bool operationPending = _transport.WriteAsync(args);

if (Logging.IsEnabled)
{
Logging.Info(this, $"{nameof(AmqpClientConnection)}.{nameof(OpenAsync)}: Sent Protocol Header: {_sentHeader.ToString()} operationPending: {operationPending} completedSynchronously: {args.CompletedSynchronously}");
Logging.Info(this, $"{nameof(AmqpClientConnection)}.{nameof(OpenAsync)}: Sent Protocol Header: {_sentHeader} operationPending: {operationPending} completedSynchronously: {args.CompletedSynchronously}");
}

if (!operationPending)
{
args.CompletedCallback(args);
}

transport = await _tcs.Task.ConfigureAwait(false);
await transport.OpenAsync(timeout).ConfigureAwait(false);
_transport = await _tcs.Task.ConfigureAwait(false);
await _transport.OpenAsync(timeout).ConfigureAwait(false);
}
}
else
{
var tcpInitiator = new AmqpTransportInitiator(_amqpSettings, TransportSettings);
transport = await tcpInitiator.ConnectTaskAsync(timeout).ConfigureAwait(false);
_transport = await tcpInitiator.ConnectTaskAsync(timeout).ConfigureAwait(false);
}

AmqpConnection = new AmqpConnection(transport, _amqpSettings, AmqpConnectionSettings);
AmqpConnection = new AmqpConnection(_transport, _amqpSettings, AmqpConnectionSettings);
AmqpConnection.Closed += OnConnectionClosed;
await AmqpConnection.OpenAsync(timeout).ConfigureAwait(false);
_isConnectionClosed = false;
IsConnectionClosed = false;
}

public async Task CloseAsync(TimeSpan timeout)
{
var connection = AmqpConnection;
AmqpConnection connection = AmqpConnection;
if (connection != null)
{
await connection.CloseAsync(timeout).ConfigureAwait(false);
Expand All @@ -129,7 +128,7 @@ public async Task CloseAsync(TimeSpan timeout)

public void Close()
{
var connection = AmqpConnection;
AmqpConnection connection = AmqpConnection;
if (connection != null)
{
connection.Close();
Expand All @@ -145,18 +144,18 @@ public AmqpClientSession CreateSession()

private void OnConnectionClosed(object o, EventArgs args)
{
_isConnectionClosed = true;
IsConnectionClosed = true;
}

private async Task<TransportBase> CreateClientWebSocketTransportAsync(TimeSpan timeout, IWebProxy proxy)
{
UriBuilder webSocketUriBuilder = new UriBuilder
var webSocketUriBuilder = new UriBuilder
{
Scheme = WebSocketConstants.Scheme,
Host = _uri.Host,
Port = _uri.Port
};
var websocket = await CreateClientWebSocketAsync(webSocketUriBuilder.Uri, timeout, proxy).ConfigureAwait(false);
ClientWebSocket websocket = await CreateClientWebSocketAsync(webSocketUriBuilder.Uri, timeout, proxy).ConfigureAwait(false);
return new ClientWebSocketTransport(
websocket,
null,
Expand Down Expand Up @@ -209,7 +208,6 @@ private async Task<ClientWebSocket> CreateClientWebSocketAsync(Uri websocketUri,
}
}
#endif

using (var cancellationTokenSource = new CancellationTokenSource(timeout))
{
await websocket.ConnectAsync(websocketUri, cancellationTokenSource.Token).ConfigureAwait(false);
Expand Down Expand Up @@ -257,12 +255,14 @@ private void OnReadHeaderComplete(TransportAsyncCallbackArgs args)

try
{
ProtocolHeader receivedHeader = new ProtocolHeader();
receivedHeader.Decode(new ByteBuffer(args.Buffer, args.Offset, args.Count));
var receivedHeader = new ProtocolHeader();

using var byteBuffer = new ByteBuffer(args.Buffer, args.Offset, args.Count);
receivedHeader.Decode(byteBuffer);

if (Logging.IsEnabled)
{
Logging.Info(this, $"{nameof(AmqpClientConnection)}.{nameof(OnReadHeaderComplete)}: Received Protocol Header: {receivedHeader.ToString()}");
Logging.Info(this, $"{nameof(AmqpClientConnection)}.{nameof(OnReadHeaderComplete)}: Received Protocol Header: {receivedHeader}");
}

if (!receivedHeader.Equals(_sentHeader))
Expand All @@ -271,7 +271,7 @@ private void OnReadHeaderComplete(TransportAsyncCallbackArgs args)
}

SaslTransportProvider provider = _amqpSettings.GetTransportProvider<SaslTransportProvider>();
var transport = provider.CreateTransport(args.Transport, true);
TransportBase transport = provider.CreateTransport(args.Transport, true);
if (Logging.IsEnabled)
{
Logging.Info(this, $"{nameof(AmqpClientConnection)}.{nameof(OnReadHeaderComplete)}: Created SaslTransportHandler ");
Expand Down Expand Up @@ -305,5 +305,29 @@ private void CompleteOnException(TransportAsyncCallbackArgs args)
_tcs.TrySetException(args.Exception);
}
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

public virtual void Dispose(bool disposing)
{
if (_isDisposed)
{
return;
}

if (disposing)
{
if (_transport is IDisposable disposable)
{
disposable.Dispose();
}
}

_isDisposed = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ public override async Task<DeviceRegistrationResult> RegisterAsync(

cancellationToken.ThrowIfCancellationRequested();

AmqpClientConnection connection = null;

try
{
AmqpAuthStrategy authStrategy;
Expand Down Expand Up @@ -107,7 +105,7 @@ public override async Task<DeviceRegistrationResult> RegisterAsync(
string registrationId = message.Security.GetRegistrationID();
string linkEndpoint = $"{message.IdScope}/registrations/{registrationId}";

connection = authStrategy.CreateConnection(builder.Uri, message.IdScope);
using AmqpClientConnection connection = authStrategy.CreateConnection(builder.Uri, message.IdScope);
await authStrategy.OpenConnectionAsync(connection, s_timeoutConstant, useWebSocket, Proxy, RemoteCertificateValidationCallback).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,7 @@ public async override Task<DeviceRegistrationResult> RegisterAsync(

Logging.Associate(authStrategy, this);

var httpClientHandler = new HttpClientHandler
{
// Cannot specify a specific protocol here, as desired due to an error:
azabbasi marked this conversation as resolved.
Show resolved Hide resolved
// ProvisioningDeviceClient_ValidRegistrationId_AmqpWithProxy_SymmetricKey_RegisterOk_GroupEnrollment failing for me with System.PlatformNotSupportedException: Operation is not supported on this platform.
// When revisiting TLS12 work for DPS, we should figure out why. Perhaps the service needs to support it.

//SslProtocols = TlsVersions.Preferred,
};
using var httpClientHandler = new HttpClientHandler();

if (Proxy != DefaultWebProxySettings.Instance)
{
Expand All @@ -101,7 +94,7 @@ public async override Task<DeviceRegistrationResult> RegisterAsync(
Port = Port,
};

DeviceProvisioningServiceRuntimeClient client = authStrategy.CreateClient(builder.Uri, httpClientHandler);
using DeviceProvisioningServiceRuntimeClient client = authStrategy.CreateClient(builder.Uri, httpClientHandler);
client.HttpClient.DefaultRequestHeaders.Add("User-Agent", message.ProductInfo);
Logging.Info(this, $"Uri: {builder.Uri}; User-Agent: {message.ProductInfo}");

Expand Down