diff --git a/doc/devguide.md b/doc/devguide.md index bf66d30086..9ea8998538 100644 --- a/doc/devguide.md +++ b/doc/devguide.md @@ -9,6 +9,11 @@ Before starting development on the Azure IoT SDK for C# you will need to install ### Design We are following the [Azure SDK design specification for C#](https://azuresdkspecs.z5.web.core.windows.net/DotNetSpec.html). To preserve backward compatibility, existing code will not change to follow these rules. +#### Using [`IDisposable`](https://docs.microsoft.com/en-us/dotnet/api/system.idisposable?view=net-5.0#implementing-idisposable) types: +- If the sdk implements `class A` that owns an `IDisposable` resource `X`, then `A` should also be `IDisposable`. In that case, resource `X` can be safely disposed when `class A` is disposed. +- The sdk should dispose any `IDisposable` resource that it creates. +- The sdk should not dispose an `IDisposable` resource that is supplied by the calling application. This is because the caller might want to reuse the resource elsewhere in the application. The responsibility of disposal of caller-supplied `IDisposable` resource is on the caller. An example of such a resource would be an `X509Certificate` instance that is used for authenticating our clients. + ### Code style Please read and apply our [coding style](coding-style.md) when proposing PRs against the Azure repository. When changing existing files, please apply changes to the entire file. Otherwise, maintain the same style. diff --git a/e2e/test/Helpers/AmqpConnectionStatusChange.cs b/e2e/test/Helpers/AmqpConnectionStatusChange.cs new file mode 100644 index 0000000000..60c5e7a0b9 --- /dev/null +++ b/e2e/test/Helpers/AmqpConnectionStatusChange.cs @@ -0,0 +1,36 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Microsoft.Azure.Devices.Client; + +namespace Microsoft.Azure.Devices.E2ETests.Helpers +{ + public class AmqpConnectionStatusChange + { + private readonly string _deviceId; + private readonly MsTestLogger _logger; + + public AmqpConnectionStatusChange(string deviceId, MsTestLogger logger) + { + LastConnectionStatus = null; + LastConnectionStatusChangeReason = null; + ConnectionStatusChangeCount = 0; + _deviceId = deviceId; + _logger = logger; + } + + public int ConnectionStatusChangeCount { get; set; } + + public ConnectionStatus? LastConnectionStatus { get; set; } + + public ConnectionStatusChangeReason? LastConnectionStatusChangeReason { get; set; } + + public void ConnectionStatusChangesHandler(ConnectionStatus status, ConnectionStatusChangeReason reason) + { + ConnectionStatusChangeCount++; + LastConnectionStatus = status; + LastConnectionStatusChangeReason = reason; + _logger.Trace($"{nameof(AmqpConnectionStatusChange)}.{nameof(ConnectionStatusChangesHandler)}: {_deviceId}: status={status} statusChangeReason={reason} count={ConnectionStatusChangeCount}"); + } + } +} diff --git a/e2e/test/Helpers/templates/FaultInjectionPoolingOverAmqp.cs b/e2e/test/Helpers/templates/FaultInjectionPoolingOverAmqp.cs index d1dbb5fce2..2691508e85 100644 --- a/e2e/test/Helpers/templates/FaultInjectionPoolingOverAmqp.cs +++ b/e2e/test/Helpers/templates/FaultInjectionPoolingOverAmqp.cs @@ -217,34 +217,5 @@ public static async Task TestFaultInjectionPoolAmqpAsync( } } } - - public class AmqpConnectionStatusChange - { - private readonly string _deviceId; - private readonly MsTestLogger _logger; - - public AmqpConnectionStatusChange(string deviceId, MsTestLogger logger) - { - LastConnectionStatus = null; - LastConnectionStatusChangeReason = null; - ConnectionStatusChangeCount = 0; - _deviceId = deviceId; - _logger = logger; - } - - public void ConnectionStatusChangesHandler(ConnectionStatus status, ConnectionStatusChangeReason reason) - { - ConnectionStatusChangeCount++; - LastConnectionStatus = status; - LastConnectionStatusChangeReason = reason; - _logger.Trace($"{nameof(FaultInjectionPoolingOverAmqp)}.{nameof(ConnectionStatusChangesHandler)}: {_deviceId}: status={status} statusChangeReason={reason} count={ConnectionStatusChangeCount}"); - } - - public int ConnectionStatusChangeCount { get; set; } - - public ConnectionStatus? LastConnectionStatus { get; set; } - - public ConnectionStatusChangeReason? LastConnectionStatusChangeReason { get; set; } - } } } diff --git a/e2e/test/iothub/AuthenticationWithTokenRefreshDisposalTests.cs b/e2e/test/iothub/AuthenticationWithTokenRefreshDisposalTests.cs new file mode 100644 index 0000000000..a686f9b321 --- /dev/null +++ b/e2e/test/iothub/AuthenticationWithTokenRefreshDisposalTests.cs @@ -0,0 +1,255 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Globalization; +using System.Net; +using System.Threading.Tasks; +using FluentAssertions; +using Microsoft.Azure.Devices.Client; +using Microsoft.Azure.Devices.E2ETests.Helpers; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace Microsoft.Azure.Devices.E2ETests +{ + [TestClass] + [TestCategory("E2E")] + [TestCategory("IoTHub")] + public class AuthenticationWithTokenRefreshDisposalTests : E2EMsTestBase + { + public static readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(10); + private readonly string _devicePrefix = $"E2E_{nameof(AuthenticationWithTokenRefreshDisposalTests)}_"; + + [LoggedTestMethod] + public async Task DeviceSak_ReusableAuthenticationMethod_SingleDevicePerConnection_Amqp() + { + await ReuseAuthenticationMethod_SingleDevice(Client.TransportType.Amqp_Tcp_Only).ConfigureAwait(false); + } + + [LoggedTestMethod] + public async Task DeviceSak_ReusableAuthenticationMethod_SingleDevicePerConnection_AmqpWs() + { + await ReuseAuthenticationMethod_SingleDevice(Client.TransportType.Amqp_WebSocket_Only).ConfigureAwait(false); + } + + [LoggedTestMethod] + public async Task DeviceSak_ReusableAuthenticationMethod_SingleDevicePerConnection_Mqtt() + { + await ReuseAuthenticationMethod_SingleDevice(Client.TransportType.Mqtt_Tcp_Only).ConfigureAwait(false); + } + + [LoggedTestMethod] + public async Task DeviceSak_ReusableAuthenticationMethod_SingleDevicePerConnection_MqttWs() + { + await ReuseAuthenticationMethod_SingleDevice(Client.TransportType.Mqtt_WebSocket_Only).ConfigureAwait(false); + } + + [LoggedTestMethod] + public async Task DeviceSak_ReusableAuthenticationMethod_SingleDevicePerConnection_Http() + { + await ReuseAuthenticationMethod_SingleDevice(Client.TransportType.Http1).ConfigureAwait(false); + } + + [LoggedTestMethod] + public async Task DeviceSak_ReusableAuthenticationMethod_MuxedDevicesPerConnection_Amqp() + { + await ReuseAuthenticationMethod_MuxedDevices(Client.TransportType.Amqp_Tcp_Only, 2); + } + + [LoggedTestMethod] + public async Task DeviceSak_ReusableAuthenticationMethod_MuxedDevicesPerConnection_AmqpWs() + { + await ReuseAuthenticationMethod_MuxedDevices(Client.TransportType.Amqp_WebSocket_Only, 2); + } + + private async Task ReuseAuthenticationMethod_SingleDevice(Client.TransportType transport) + { + TestDevice testDevice = await TestDevice.GetTestDeviceAsync(Logger, _devicePrefix).ConfigureAwait(false); + var authenticationMethod = new DeviceAuthenticationSasToken(testDevice.ConnectionString); + + // Create an instance of the device client, send a test message and then close and dispose it. + DeviceClient deviceClient = DeviceClient.Create(testDevice.IoTHubHostName, authenticationMethod, transport); + await deviceClient.SendEventAsync(new Client.Message()).ConfigureAwait(false); + await deviceClient.CloseAsync(); + deviceClient.Dispose(); + Logger.Trace("Test with instance 1 completed"); + + // Perform the same steps again, reusing the previously created authentication method instance to ensure + // that the sdk did not dispose the user supplied authentication method instance. + DeviceClient deviceClient2 = DeviceClient.Create(testDevice.IoTHubHostName, authenticationMethod, transport); + await deviceClient2.SendEventAsync(new Client.Message()).ConfigureAwait(false); + await deviceClient2.CloseAsync(); + deviceClient2.Dispose(); + Logger.Trace("Test with instance 2 completed, reused the previously created authentication method instance for the device client."); + + authenticationMethod.Dispose(); + } + + private async Task ReuseAuthenticationMethod_MuxedDevices(Client.TransportType transport, int devicesCount) + { + IList testDevices = new List(); + IList deviceClients = new List(); + IList authenticationMethods = new List(); + IList amqpConnectionStatuses = new List(); + + // Set up amqp transport settings to multiplex all device sessions over the same amqp connection. + var amqpTransportSettings = new AmqpTransportSettings(transport) + { + AmqpConnectionPoolSettings = new AmqpConnectionPoolSettings + { + Pooling = true, + MaxPoolSize = 1, + }, + }; + + for (int i = 0; i < devicesCount; i++) + { + TestDevice testDevice = await TestDevice.GetTestDeviceAsync(Logger, _devicePrefix).ConfigureAwait(false); + var authenticationMethod = new DeviceAuthenticationSasToken(testDevice.ConnectionString); + + testDevices.Add(testDevice); + authenticationMethods.Add(authenticationMethod); + } + + // Initialize the client instances, set the connection status change handler and open the connection. + for (int i = 0; i < devicesCount; i++) + { + DeviceClient deviceClient = DeviceClient.Create(testDevices[i].IoTHubHostName, authenticationMethods[i], new ITransportSettings[] { amqpTransportSettings }); + + var amqpConnectionStatusChange = new AmqpConnectionStatusChange(testDevices[i].Id, Logger); + deviceClient.SetConnectionStatusChangesHandler(amqpConnectionStatusChange.ConnectionStatusChangesHandler); + amqpConnectionStatuses.Add(amqpConnectionStatusChange); + + await deviceClient.OpenAsync().ConfigureAwait(false); + deviceClients.Add(deviceClient); + } + + // Close and dispose client instance 1. + // The closed client should report a status of "disabled" while the rest of them should be connected. + // This is to ensure that disposal on one multiplexed device doesn't cause cascading failures + // in the rest of the devices on the same tcp connection. + + await deviceClients[0].CloseAsync().ConfigureAwait(false); + deviceClients[0].Dispose(); + + amqpConnectionStatuses[0].LastConnectionStatus.Should().Be(ConnectionStatus.Disabled); + + Logger.Trace($"{nameof(ReuseAuthenticationMethod_MuxedDevices)}: Confirming the rest of the multiplexed devices are online and operational."); + + bool notRecovered = true; + var sw = Stopwatch.StartNew(); + while (notRecovered && sw.Elapsed < MaxWaitTime) + { + notRecovered = false; + for (int i = 1; i < devicesCount; i++) + { + if (amqpConnectionStatuses[i].LastConnectionStatus != ConnectionStatus.Connected) + { + await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); + notRecovered = true; + break; + } + } + } + + notRecovered.Should().BeFalse(); + + // Send a message through the rest of the multiplexed client instances. + for (int i = 1; i < devicesCount; i++) + { + await deviceClients[i].SendEventAsync(new Client.Message()).ConfigureAwait(false); + Logger.Trace($"Test with client {i} completed."); + } + + // Close and dispose all of the client instances. + for (int i = 1; i < devicesCount; i++) + { + await deviceClients[i].CloseAsync().ConfigureAwait(false); + deviceClients[i].Dispose(); + } + + deviceClients.Clear(); + amqpConnectionStatuses.Clear(); + + // Initialize the client instances by reusing the created authentication methods and open the connection. + for (int i = 0; i < devicesCount; i++) + { + DeviceClient deviceClient = DeviceClient.Create(testDevices[i].IoTHubHostName, authenticationMethods[i], new ITransportSettings[] { amqpTransportSettings }); + + var amqpConnectionStatusChange = new AmqpConnectionStatusChange(testDevices[i].Id, Logger); + deviceClient.SetConnectionStatusChangesHandler(amqpConnectionStatusChange.ConnectionStatusChangesHandler); + amqpConnectionStatuses.Add(amqpConnectionStatusChange); + + await deviceClient.OpenAsync().ConfigureAwait(false); + deviceClients.Add(deviceClient); + } + + // Ensure that all clients are connected successfully, and the close and dispose the instances. + for (int i = 0; i < devicesCount; i++) + { + amqpConnectionStatuses[i].LastConnectionStatus.Should().Be(ConnectionStatus.Connected); + + await deviceClients[i].CloseAsync(); + deviceClients[i].Dispose(); + + amqpConnectionStatuses[i].LastConnectionStatus.Should().Be(ConnectionStatus.Disabled); + } + } + + private class DeviceAuthenticationSasToken : DeviceAuthenticationWithTokenRefresh + { + private const string SasTokenTargetFormat = "{0}/devices/{1}"; + private readonly IotHubConnectionStringBuilder _connectionStringBuilder; + + public DeviceAuthenticationSasToken( + string connectionString) + : base(GetDeviceIdFromConnectionString(connectionString)) + { + if (connectionString == null) + { + throw new ArgumentNullException(nameof(connectionString)); + } + + _connectionStringBuilder = IotHubConnectionStringBuilder.Create(connectionString); + } + + protected override Task SafeCreateNewToken(string iotHub, int suggestedTimeToLive) + { + var builder = new SharedAccessSignatureBuilder + { + Key = _connectionStringBuilder.SharedAccessKey, + TimeToLive = TimeSpan.FromSeconds(suggestedTimeToLive), + }; + + if (_connectionStringBuilder.SharedAccessKeyName == null) + { + builder.Target = string.Format( + CultureInfo.InvariantCulture, + SasTokenTargetFormat, + iotHub, + WebUtility.UrlEncode(DeviceId)); + } + else + { + builder.KeyName = _connectionStringBuilder.SharedAccessKeyName; + builder.Target = _connectionStringBuilder.HostName; + } + + return Task.FromResult(builder.ToSignature()); + } + + private static string GetDeviceIdFromConnectionString(string connectionString) + { + if (connectionString == null) + { + throw new ArgumentNullException(nameof(connectionString)); + } + + var builder = IotHubConnectionStringBuilder.Create(connectionString); + return builder.DeviceId; + } + } + } +} diff --git a/e2e/test/iothub/SasCredentialAuthenticationTests.cs b/e2e/test/iothub/SasCredentialAuthenticationTests.cs index 56eb80e9a1..09881b57bb 100644 --- a/e2e/test/iothub/SasCredentialAuthenticationTests.cs +++ b/e2e/test/iothub/SasCredentialAuthenticationTests.cs @@ -21,7 +21,7 @@ using ClientOptions = Microsoft.Azure.Devices.Client.ClientOptions; -namespace Microsoft.Azure.Devices.E2ETests.iothub +namespace Microsoft.Azure.Devices.E2ETests.Iothub.Service { /// /// Tests to ensure authentication using SAS credential succeeds in all the clients. diff --git a/e2e/test/iothub/TokenCredentialAuthenticationTests.cs b/e2e/test/iothub/TokenCredentialAuthenticationTests.cs index 20cd4c1fb4..22d4448106 100644 --- a/e2e/test/iothub/TokenCredentialAuthenticationTests.cs +++ b/e2e/test/iothub/TokenCredentialAuthenticationTests.cs @@ -21,7 +21,7 @@ using ClientOptions = Microsoft.Azure.Devices.Client.ClientOptions; -namespace Microsoft.Azure.Devices.E2ETests.iothub +namespace Microsoft.Azure.Devices.E2ETests.Iothub.Service { /// /// Tests to ensure authentication using Azure active directory succeeds in all the clients. diff --git a/iothub/device/src/AuthenticationWithTokenRefresh.cs b/iothub/device/src/AuthenticationWithTokenRefresh.cs index 1ec77bdcc6..4bd6585646 100644 --- a/iothub/device/src/AuthenticationWithTokenRefresh.cs +++ b/iothub/device/src/AuthenticationWithTokenRefresh.cs @@ -65,6 +65,10 @@ public AuthenticationWithTokenRefresh( /// public bool IsExpiring => (ExpiresOn - DateTime.UtcNow).TotalSeconds <= _bufferSeconds; + // This internal property is used by the sdk to determine if the instance was created by the sdk, + // and thus, if it should be disposed by the sdk. + internal bool InstanceCreatedBySdk { get; set; } + /// /// Gets a snapshot of the security token associated with the device. This call is thread-safe. /// diff --git a/iothub/device/src/Edge/EdgeModuleClientFactory.cs b/iothub/device/src/Edge/EdgeModuleClientFactory.cs index d4451e360d..07fa16be6c 100644 --- a/iothub/device/src/Edge/EdgeModuleClientFactory.cs +++ b/iothub/device/src/Edge/EdgeModuleClientFactory.cs @@ -98,7 +98,11 @@ private async Task CreateInternalClientFromEnvironmentAsync() int sasTokenRenewalBuffer = _options?.SasTokenRenewalBuffer ?? default; #pragma warning disable CA2000 // Dispose objects before losing scope - IDisposable ModuleAuthenticationWithHsm is disposed when the client is disposed. - var authMethod = new ModuleAuthenticationWithHsm(signatureProvider, deviceId, moduleId, generationId, sasTokenTimeToLive, sasTokenRenewalBuffer); + var authMethod = new ModuleAuthenticationWithHsm(signatureProvider, deviceId, moduleId, generationId, sasTokenTimeToLive, sasTokenRenewalBuffer) + { + // Since the sdk creates the instance of disposable ModuleAuthenticationWithHsm, the sdk needs to dispose it once the client is diposed. + InstanceCreatedBySdk = true, + }; #pragma warning restore CA2000 // Dispose objects before losing scope - IDisposable ModuleAuthenticationWithHsm is disposed when the client is disposed. Debug.WriteLine("EdgeModuleClientFactory setupTrustBundle from service"); diff --git a/iothub/device/src/InternalClient.cs b/iothub/device/src/InternalClient.cs index 5f4794c9c9..25a420da3c 100644 --- a/iothub/device/src/InternalClient.cs +++ b/iothub/device/src/InternalClient.cs @@ -1896,7 +1896,6 @@ public void Dispose() _fileUploadHttpTransportHandler?.Dispose(); _deviceReceiveMessageSemaphore?.Dispose(); _twinDesiredPropertySemaphore?.Dispose(); - IotHubConnectionString?.TokenRefresher?.Dispose(); } internal bool IsE2EDiagnosticSupportedProtocol() diff --git a/iothub/device/src/IotHubConnectionString.cs b/iothub/device/src/IotHubConnectionString.cs index 9bf87c27f8..e52e01d243 100644 --- a/iothub/device/src/IotHubConnectionString.cs +++ b/iothub/device/src/IotHubConnectionString.cs @@ -25,7 +25,6 @@ public IotHubConnectionString(IotHubConnectionStringBuilder builder) : builder.GatewayHostName; SharedAccessKeyName = builder.SharedAccessKeyName; SharedAccessKey = builder.SharedAccessKey; - SharedAccessSignature = builder.SharedAccessSignature; IotHubName = builder.IotHubName; DeviceId = builder.DeviceId; ModuleId = builder.ModuleId; @@ -53,7 +52,12 @@ public IotHubConnectionString(IotHubConnectionStringBuilder builder) { if (ModuleId.IsNullOrWhiteSpace()) { - TokenRefresher = new DeviceAuthenticationWithSakRefresh(DeviceId, this, builder.SasTokenTimeToLive, builder.SasTokenRenewalBuffer); + TokenRefresher = new DeviceAuthenticationWithSakRefresh(DeviceId, this, builder.SasTokenTimeToLive, builder.SasTokenRenewalBuffer) + { + // Since the sdk creates the instance of disposable DeviceAuthenticationWithSakRefresh, the sdk needs to dispose it once the client is diposed. + InstanceCreatedBySdk = true, + }; + if (Logging.IsEnabled) { Logging.Info(this, $"{nameof(IAuthenticationMethod)} is {nameof(DeviceAuthenticationWithSakRefresh)}: {Logging.IdOf(TokenRefresher)}"); @@ -61,7 +65,12 @@ public IotHubConnectionString(IotHubConnectionStringBuilder builder) } else { - TokenRefresher = new ModuleAuthenticationWithSakRefresh(DeviceId, ModuleId, this, builder.SasTokenTimeToLive, builder.SasTokenRenewalBuffer); + TokenRefresher = new ModuleAuthenticationWithSakRefresh(DeviceId, ModuleId, this, builder.SasTokenTimeToLive, builder.SasTokenRenewalBuffer) + { + // Since the sdk creates the instance of disposable ModuleAuthenticationWithSakRefresh, the sdk needs to dispose it once the client is diposed. + InstanceCreatedBySdk = true, + }; + if (Logging.IsEnabled) { Logging.Info(this, $"{nameof(IAuthenticationMethod)} is {nameof(ModuleAuthenticationWithSakRefresh)}: {Logging.IdOf(TokenRefresher)}"); @@ -75,6 +84,17 @@ public IotHubConnectionString(IotHubConnectionStringBuilder builder) Debug.Assert(TokenRefresher != null); } + // SharedAccessSignature should be set only if it is non-null and the authentication method of the device client is + // not of type AuthenticationWithTokenRefresh. + // Setting the sas value for an AuthenticationWithTokenRefresh authentication type will result in tokens not being renewed. + // This flow can be hit if the same authentication method is always used to initialize the client; + // as in, on disposal and reinitialization. This is because the value of the sas token computed is stored within the authentication method, + // and on reinitialization the client is incorrectly identified as a fixed-sas-token-initialized client, + // instead of being identified as a sas-token-refresh-enabled-client. + else if (!string.IsNullOrWhiteSpace(builder.SharedAccessSignature)) + { + SharedAccessSignature = builder.SharedAccessSignature; + } } public string IotHubName { get; private set; } diff --git a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs index 6c844c036b..46c7e9102b 100644 --- a/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs +++ b/iothub/device/src/Transport/Amqp/AmqpAuthenticationRefresher.cs @@ -173,7 +173,10 @@ private void Dispose(bool disposing) if (disposing) { StopLoop(); - _cancellationTokenSource.Dispose(); + _cancellationTokenSource?.Dispose(); + _cancellationTokenSource = null; + + _amqpIotCbsTokenProvider?.Dispose(); } _disposed = true; diff --git a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs index 78922bb8b0..c82660aed5 100644 --- a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs +++ b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs @@ -206,19 +206,17 @@ public async Task EnsureConnectionAsync(TimeSpan timeout) // Create AmqpConnection amqpIotConnection = await _amqpIotConnector.OpenConnectionAsync(timeout).ConfigureAwait(false); - if (_deviceIdentity.AuthenticationModel != AuthenticationModel.X509) + if (_deviceIdentity.AuthenticationModel == AuthenticationModel.SasGrouped) { - if (_deviceIdentity.AuthenticationModel == AuthenticationModel.SasGrouped) + if (Logging.IsEnabled) { - if (Logging.IsEnabled) - { - Logging.Info(this, "Creating connection width AmqpAuthenticationRefresher", nameof(EnsureConnectionAsync)); - } - - amqpAuthenticationRefresher = new AmqpAuthenticationRefresher(_deviceIdentity, amqpIotConnection.GetCbsLink()); - await amqpAuthenticationRefresher.InitLoopAsync(timeout).ConfigureAwait(false); + Logging.Info(this, "Creating connection wide AmqpAuthenticationRefresher", nameof(EnsureConnectionAsync)); } + + amqpAuthenticationRefresher = new AmqpAuthenticationRefresher(_deviceIdentity, amqpIotConnection.GetCbsLink()); + await amqpAuthenticationRefresher.InitLoopAsync(timeout).ConfigureAwait(false); } + _amqpIotConnection = amqpIotConnection; _amqpAuthenticationRefresher = amqpAuthenticationRefresher; _amqpIotConnection.Closed += OnConnectionClosed; @@ -271,5 +269,10 @@ public void RemoveAmqpUnit(AmqpUnit amqpUnit) Logging.Exit(this, amqpUnit, nameof(RemoveAmqpUnit)); } } + + internal DeviceIdentity GetDeviceIdentityOfAuthenticationProvider() + { + return _deviceIdentity; + } } -} +} \ No newline at end of file diff --git a/iothub/device/src/Transport/Amqp/AmqpConnectionPool.cs b/iothub/device/src/Transport/Amqp/AmqpConnectionPool.cs index 61baf37613..1916e4e4b0 100644 --- a/iothub/device/src/Transport/Amqp/AmqpConnectionPool.cs +++ b/iothub/device/src/Transport/Amqp/AmqpConnectionPool.cs @@ -36,6 +36,21 @@ public AmqpUnit CreateAmqpUnit( { AmqpConnectionHolder[] amqpConnectionHolders = ResolveConnectionGroup(deviceIdentity); amqpConnectionHolder = ResolveConnectionByHashing(amqpConnectionHolders, deviceIdentity); + + // For group sas token authenticated devices over a multiplexed connection, the TokenRefresher + // of the first client connecting will be used for generating the group sas tokens + // and will be associated with the connection itself. + // For this reason, if the device identity of the client is not the one associated with the + // connection, the associated TokenRefresher can be safely disposed. + // Note - This does not cause any identity related issues since the group sas tokens are generated + // against the hub host as the intended audience (without the "device Id"). + if (deviceIdentity.AuthenticationModel == AuthenticationModel.SasGrouped + && !ReferenceEquals(amqpConnectionHolder.GetDeviceIdentityOfAuthenticationProvider(), deviceIdentity) + && deviceIdentity.IotHubConnectionString?.TokenRefresher != null + && deviceIdentity.IotHubConnectionString.TokenRefresher.InstanceCreatedBySdk) + { + deviceIdentity.IotHubConnectionString.TokenRefresher.Dispose(); + } } if (Logging.IsEnabled) diff --git a/iothub/device/src/Transport/Amqp/AmqpUnit.cs b/iothub/device/src/Transport/Amqp/AmqpUnit.cs index 359424eb1a..5708b3eba3 100644 --- a/iothub/device/src/Transport/Amqp/AmqpUnit.cs +++ b/iothub/device/src/Transport/Amqp/AmqpUnit.cs @@ -850,6 +850,11 @@ private void Dispose(bool disposing) _amqpConnectionHolder?.Dispose(); } + // For device sas authenticated clients the authentication refresher is associated with the AMQP unit itself, + // so it needs to be explicitly disposed. + _amqpAuthenticationRefresher?.StopLoop(); + _amqpAuthenticationRefresher?.Dispose(); + _sessionSemaphore?.Dispose(); _messageReceivingLinkSemaphore?.Dispose(); _messageReceivingCallbackSemaphore?.Dispose(); diff --git a/iothub/device/src/Transport/AmqpIot/AmqpIotCbsTokenProvider.cs b/iothub/device/src/Transport/AmqpIot/AmqpIotCbsTokenProvider.cs index 6aaae2affa..047f2558b4 100644 --- a/iothub/device/src/Transport/AmqpIot/AmqpIotCbsTokenProvider.cs +++ b/iothub/device/src/Transport/AmqpIot/AmqpIotCbsTokenProvider.cs @@ -9,9 +9,10 @@ namespace Microsoft.Azure.Devices.Client.Transport.AmqpIot { - internal class AmqpIotCbsTokenProvider : ICbsTokenProvider + internal class AmqpIotCbsTokenProvider : ICbsTokenProvider, IDisposable { private readonly IotHubConnectionString _connectionString; + private bool _isDisposed; public AmqpIotCbsTokenProvider(IotHubConnectionString connectionString) { @@ -57,5 +58,29 @@ public async Task GetTokenAsync(Uri namespaceAddress, string appliesTo } } } + + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (!_isDisposed) + { + if (disposing) + { + if (_connectionString?.TokenRefresher != null + && _connectionString.TokenRefresher.InstanceCreatedBySdk) + { + _connectionString.TokenRefresher.Dispose(); + } + } + + _isDisposed = true; + } + } } } diff --git a/iothub/device/src/Transport/HttpClientHelper.cs b/iothub/device/src/Transport/HttpClientHelper.cs index 24c17cb92d..b93f38a63d 100644 --- a/iothub/device/src/Transport/HttpClientHelper.cs +++ b/iothub/device/src/Transport/HttpClientHelper.cs @@ -41,6 +41,7 @@ internal sealed class HttpClientHelper : IHttpClientHelper private HttpClientHandler _httpClientHandler; private bool _isDisposed; private readonly ProductInfo _productInfo; + private readonly bool _isClientPrimaryTransportHandler; public HttpClientHelper( Uri baseAddress, @@ -51,7 +52,8 @@ public HttpClientHelper( X509Certificate2 clientCert, HttpClientHandler httpClientHandler, ProductInfo productInfo, - IWebProxy proxy + IWebProxy proxy, + bool isClientPrimaryTransportHandler = false ) { _baseAddress = baseAddress; @@ -115,6 +117,7 @@ IWebProxy proxy preRequestActionForAllRequests?.Invoke(_httpClientObj); _productInfo = productInfo; + _isClientPrimaryTransportHandler = isClientPrimaryTransportHandler; } public Task GetAsync( @@ -538,6 +541,20 @@ public void Dispose() _httpClientHandler = null; } + // The associated TokenRefresher should be disposed by the http client helper only when the http client + // is the primary transport handler. + // For eg. we create HttpTransportHandler instances for file upload operations even though the client might be + // initialized via MQTT/ AMQP. In those scenarios, since the shared TokenRefresher resource would be primarily used by the + // corresponding transport layers (MQTT/ AMQP), the diposal should be delegated to them and it should not be disposed here. + // The only scenario where the TokenRefresher should be disposed here is when the client has been initialized using HTTP. + if (_isClientPrimaryTransportHandler + && _authenticationHeaderProvider is IotHubConnectionString iotHubConnectionString + && iotHubConnectionString.TokenRefresher != null + && iotHubConnectionString.TokenRefresher.InstanceCreatedBySdk) + { + iotHubConnectionString.TokenRefresher.Dispose(); + } + _isDisposed = true; } } diff --git a/iothub/device/src/Transport/HttpTransportHandler.cs b/iothub/device/src/Transport/HttpTransportHandler.cs index b57150cdc9..cee9c71665 100644 --- a/iothub/device/src/Transport/HttpTransportHandler.cs +++ b/iothub/device/src/Transport/HttpTransportHandler.cs @@ -60,7 +60,12 @@ internal sealed class HttpTransportHandler : TransportHandler private readonly string _deviceId; private readonly string _moduleId; - internal HttpTransportHandler(IPipelineContext context, IotHubConnectionString iotHubConnectionString, Http1TransportSettings transportSettings, HttpClientHandler httpClientHandler = null) + internal HttpTransportHandler( + IPipelineContext context, + IotHubConnectionString iotHubConnectionString, + Http1TransportSettings transportSettings, + HttpClientHandler httpClientHandler = null, + bool isClientPrimaryTransportHandler = false) : base(context, transportSettings) { ProductInfo productInfo = context.Get(); @@ -75,7 +80,8 @@ internal HttpTransportHandler(IPipelineContext context, IotHubConnectionString i transportSettings.ClientCertificate, httpClientHandler, productInfo, - transportSettings.Proxy); + transportSettings.Proxy, + isClientPrimaryTransportHandler); } public override Task OpenAsync(TimeoutHelper timeoutHelper) diff --git a/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs b/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs index af9a82fa73..93c322dca5 100644 --- a/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs +++ b/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs @@ -1017,6 +1017,13 @@ private async void ShutdownAsync(IChannelHandlerContext context) } finally { + if (_passwordProvider is IotHubConnectionString iotHubConnectionString + && iotHubConnectionString.TokenRefresher != null + && iotHubConnectionString.TokenRefresher.InstanceCreatedBySdk) + { + iotHubConnectionString.TokenRefresher.Dispose(); + } + if (Logging.IsEnabled) Logging.Exit(this, context.Name, nameof(ShutdownAsync)); } diff --git a/iothub/device/src/Transport/TransportHandlerFactory.cs b/iothub/device/src/Transport/TransportHandlerFactory.cs index 93969eabda..eb096a7dcd 100644 --- a/iothub/device/src/Transport/TransportHandlerFactory.cs +++ b/iothub/device/src/Transport/TransportHandlerFactory.cs @@ -37,7 +37,7 @@ public IDelegatingHandler Create(IPipelineContext context) new Func(onDeviceMessageReceivedCallback)); case TransportType.Http1: - return new HttpTransportHandler(context, connectionString, transportSetting as Http1TransportSettings); + return new HttpTransportHandler(context, connectionString, transportSetting as Http1TransportSettings, isClientPrimaryTransportHandler: true); case TransportType.Mqtt_Tcp_Only: case TransportType.Mqtt_WebSocket_Only: