From ed2e1094b26b73d383393d83550f0993e71a61a6 Mon Sep 17 00:00:00 2001 From: "David R. Williamson" Date: Tue, 19 Apr 2022 13:21:58 -0700 Subject: [PATCH 1/5] Allow users to configure websocket keep-alive --- iothub/device/src/ClientOptions.cs | 6 +++ .../Transport/Amqp/AmqpConnectionHolder.cs | 49 +++---------------- .../src/Transport/AmqpIot/AmqpIotConnector.cs | 15 ++++-- .../src/Transport/AmqpIot/AmqpIotTransport.cs | 35 +++++++++---- .../Transport/Mqtt/ClientWebSocketChannel.cs | 4 +- .../src/Transport/Mqtt/MqttIotHubAdapter.cs | 4 +- .../Transport/Mqtt/MqttTransportHandler.cs | 15 ++++-- .../Transport/Mqtt/MqttTransportSettings.cs | 2 +- iothub/device/tests/Amqp/MoqableAmqpUnit.cs | 15 +++++- 9 files changed, 81 insertions(+), 64 deletions(-) diff --git a/iothub/device/src/ClientOptions.cs b/iothub/device/src/ClientOptions.cs index 1953bded96..66da76be31 100644 --- a/iothub/device/src/ClientOptions.cs +++ b/iothub/device/src/ClientOptions.cs @@ -54,5 +54,11 @@ public class ClientOptions /// or the flow. /// public int SasTokenRenewalBuffer { get; set; } + + /// + /// A keep-alive for when using web sockets. + /// + /// + public TimeSpan WebSocketKeepAlive { get; set; } } } diff --git a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs index 46d1b245dc..599ab201af 100644 --- a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs +++ b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs @@ -27,11 +27,9 @@ internal class AmqpConnectionHolder : IAmqpConnectionHolder, IAmqpUnitManager public AmqpConnectionHolder(IDeviceIdentity deviceIdentity) { _deviceIdentity = deviceIdentity; - _amqpIotConnector = new AmqpIotConnector(deviceIdentity.AmqpTransportSettings, deviceIdentity.IotHubConnectionString.HostName); + _amqpIotConnector = new AmqpIotConnector(deviceIdentity.AmqpTransportSettings, deviceIdentity.Options, deviceIdentity.IotHubConnectionString.HostName); if (Logging.IsEnabled) - { Logging.Associate(this, _deviceIdentity, nameof(_deviceIdentity)); - } } public AmqpUnit CreateAmqpUnit( @@ -43,9 +41,7 @@ public AmqpUnit CreateAmqpUnit( Action onUnitDisconnected) { if (Logging.IsEnabled) - { Logging.Enter(this, deviceIdentity, nameof(CreateAmqpUnit)); - } var amqpUnit = new AmqpUnit( deviceIdentity, @@ -59,10 +55,9 @@ public AmqpUnit CreateAmqpUnit( { _amqpUnits.Add(amqpUnit); } + if (Logging.IsEnabled) - { Logging.Exit(this, deviceIdentity, nameof(CreateAmqpUnit)); - } return amqpUnit; } @@ -70,9 +65,7 @@ public AmqpUnit CreateAmqpUnit( private void OnConnectionClosed(object o, EventArgs args) { if (Logging.IsEnabled) - { Logging.Enter(this, o, nameof(OnConnectionClosed)); - } if (_amqpIotConnection != null && ReferenceEquals(_amqpIotConnection, o)) { @@ -87,25 +80,21 @@ private void OnConnectionClosed(object o, EventArgs args) unit.OnConnectionDisconnected(); } } + if (Logging.IsEnabled) - { Logging.Exit(this, o, nameof(OnConnectionClosed)); - } } public void Shutdown() { if (Logging.IsEnabled) - { Logging.Enter(this, _amqpIotConnection, nameof(Shutdown)); - } _amqpAuthenticationRefresher?.StopLoop(); _amqpIotConnection?.SafeClose(); + if (Logging.IsEnabled) - { Logging.Exit(this, _amqpIotConnection, nameof(Shutdown)); - } } public void Dispose() @@ -122,9 +111,7 @@ private void Dispose(bool disposing) } if (Logging.IsEnabled) - { Logging.Info(this, disposing, nameof(Dispose)); - } if (disposing) { @@ -144,18 +131,15 @@ private void Dispose(bool disposing) public async Task CreateRefresherAsync(IDeviceIdentity deviceIdentity, CancellationToken cancellationToken) { if (Logging.IsEnabled) - { Logging.Enter(this, deviceIdentity, nameof(CreateRefresherAsync)); - } AmqpIotConnection amqpIotConnection = await EnsureConnectionAsync(cancellationToken).ConfigureAwait(false); IAmqpAuthenticationRefresher amqpAuthenticator = await amqpIotConnection .CreateRefresherAsync(deviceIdentity, cancellationToken) .ConfigureAwait(false); + if (Logging.IsEnabled) - { Logging.Exit(this, deviceIdentity, nameof(CreateRefresherAsync)); - } return amqpAuthenticator; } @@ -163,19 +147,13 @@ public async Task CreateRefresherAsync(IDeviceIden public async Task OpenSessionAsync(IDeviceIdentity deviceIdentity, CancellationToken cancellationToken) { if (Logging.IsEnabled) - { Logging.Enter(this, deviceIdentity, nameof(OpenSessionAsync)); - } AmqpIotConnection amqpIotConnection = await EnsureConnectionAsync(cancellationToken).ConfigureAwait(false); AmqpIotSession amqpIotSession = await amqpIotConnection.OpenSessionAsync(cancellationToken).ConfigureAwait(false); if (Logging.IsEnabled) { Logging.Associate(amqpIotConnection, amqpIotSession, nameof(OpenSessionAsync)); - } - - if (Logging.IsEnabled) - { Logging.Exit(this, deviceIdentity, nameof(OpenSessionAsync)); } @@ -185,9 +163,7 @@ public async Task OpenSessionAsync(IDeviceIdentity deviceIdentit public async Task EnsureConnectionAsync(CancellationToken cancellationToken) { if (Logging.IsEnabled) - { Logging.Enter(this, nameof(EnsureConnectionAsync)); - } AmqpIotConnection amqpIotConnection = null; IAmqpAuthenticationRefresher amqpAuthenticationRefresher = null; @@ -205,18 +181,15 @@ public async Task EnsureConnectionAsync(CancellationToken can if (_amqpIotConnection == null || _amqpIotConnection.IsClosing()) { if (Logging.IsEnabled) - { Logging.Info(this, "Creating new AmqpConnection", nameof(EnsureConnectionAsync)); - } + // Create AmqpConnection amqpIotConnection = await _amqpIotConnector.OpenConnectionAsync(cancellationToken).ConfigureAwait(false); if (_deviceIdentity.AuthenticationModel == AuthenticationModel.SasGrouped) { if (Logging.IsEnabled) - { Logging.Info(this, "Creating connection wide AmqpAuthenticationRefresher", nameof(EnsureConnectionAsync)); - } amqpAuthenticationRefresher = new AmqpAuthenticationRefresher(_deviceIdentity, amqpIotConnection.GetCbsLink()); await amqpAuthenticationRefresher.InitLoopAsync(cancellationToken).ConfigureAwait(false); @@ -226,9 +199,7 @@ public async Task EnsureConnectionAsync(CancellationToken can _amqpAuthenticationRefresher = amqpAuthenticationRefresher; _amqpIotConnection.Closed += OnConnectionClosed; if (Logging.IsEnabled) - { Logging.Associate(this, _amqpIotConnection, nameof(_amqpIotConnection)); - } } else { @@ -245,10 +216,9 @@ public async Task EnsureConnectionAsync(CancellationToken can { _lock.Release(); } + if (Logging.IsEnabled) - { Logging.Exit(this, nameof(EnsureConnectionAsync)); - } return amqpIotConnection; } @@ -256,9 +226,7 @@ public async Task EnsureConnectionAsync(CancellationToken can public void RemoveAmqpUnit(AmqpUnit amqpUnit) { if (Logging.IsEnabled) - { Logging.Enter(this, amqpUnit, nameof(RemoveAmqpUnit)); - } lock (_unitsLock) { @@ -269,10 +237,9 @@ public void RemoveAmqpUnit(AmqpUnit amqpUnit) Shutdown(); } } + if (Logging.IsEnabled) - { Logging.Exit(this, amqpUnit, nameof(RemoveAmqpUnit)); - } } internal IDeviceIdentity GetDeviceIdentityOfAuthenticationProvider() diff --git a/iothub/device/src/Transport/AmqpIot/AmqpIotConnector.cs b/iothub/device/src/Transport/AmqpIot/AmqpIotConnector.cs index 3c80df7c55..1a0e5dcde9 100644 --- a/iothub/device/src/Transport/AmqpIot/AmqpIotConnector.cs +++ b/iothub/device/src/Transport/AmqpIot/AmqpIotConnector.cs @@ -24,20 +24,23 @@ internal class AmqpIotConnector : IDisposable private static readonly AmqpVersion s_amqpVersion_1_0_0 = new AmqpVersion(1, 0, 0); private static readonly bool s_disableServerCertificateValidation = InitializeDisableServerCertificateValidation(); + private readonly ClientOptions _clientOptions; private readonly AmqpTransportSettings _amqpTransportSettings; private readonly string _hostName; private AmqpIotTransport _amqpIotTransport; - internal AmqpIotConnector(AmqpTransportSettings amqpTransportSettings, string hostName) + internal AmqpIotConnector(AmqpTransportSettings amqpTransportSettings, ClientOptions clientOptions, string hostName) { _amqpTransportSettings = amqpTransportSettings; + _clientOptions = clientOptions; _hostName = hostName; } public async Task OpenConnectionAsync(CancellationToken cancellationToken) { - Logging.Enter(this, nameof(OpenConnectionAsync)); + if (Logging.IsEnabled) + Logging.Enter(this, nameof(OpenConnectionAsync)); var amqpTransportProvider = new AmqpTransportProvider(); amqpTransportProvider.Versions.Add(s_amqpVersion_1_0_0); @@ -53,7 +56,7 @@ public async Task OpenConnectionAsync(CancellationToken cance IdleTimeOut = Convert.ToUInt32(_amqpTransportSettings.IdleTimeout.TotalMilliseconds), }; - _amqpIotTransport = new AmqpIotTransport(amqpSettings, _amqpTransportSettings, _hostName, s_disableServerCertificateValidation); + _amqpIotTransport = new AmqpIotTransport(amqpSettings, _amqpTransportSettings, _clientOptions, _hostName, s_disableServerCertificateValidation); TransportBase transportBase = await _amqpIotTransport.InitializeAsync(cancellationToken).ConfigureAwait(false); try @@ -63,7 +66,8 @@ public async Task OpenConnectionAsync(CancellationToken cance amqpConnection.Closed += amqpIotConnection.AmqpConnectionClosed; await amqpConnection.OpenAsync(cancellationToken).ConfigureAwait(false); - Logging.Exit(this, $"{nameof(OpenConnectionAsync)}"); + if (Logging.IsEnabled) + Logging.Exit(this, $"{nameof(OpenConnectionAsync)}"); return amqpIotConnection; } @@ -75,7 +79,8 @@ public async Task OpenConnectionAsync(CancellationToken cance } finally { - Logging.Exit(this, nameof(OpenConnectionAsync)); + if (Logging.IsEnabled) + Logging.Exit(this, nameof(OpenConnectionAsync)); } } diff --git a/iothub/device/src/Transport/AmqpIot/AmqpIotTransport.cs b/iothub/device/src/Transport/AmqpIot/AmqpIotTransport.cs index 3406e2a13d..872aebe6c6 100644 --- a/iothub/device/src/Transport/AmqpIot/AmqpIotTransport.cs +++ b/iothub/device/src/Transport/AmqpIot/AmqpIotTransport.cs @@ -23,17 +23,20 @@ internal class AmqpIotTransport : IDisposable private readonly AmqpSettings _amqpSettings; private readonly AmqpTransportSettings _amqpTransportSettings; private readonly TlsTransportSettings _tlsTransportSettings; + private readonly ClientOptions _clientOptions; private ClientWebSocketTransport _clientWebSocketTransport; public AmqpIotTransport( AmqpSettings amqpSettings, AmqpTransportSettings amqpTransportSettings, + ClientOptions clientOptions, string hostName, bool disableServerCertificateValidation) { _amqpSettings = amqpSettings; _amqpTransportSettings = amqpTransportSettings; + _clientOptions = clientOptions; _hostName = hostName; _disableServerCertificateValidation = disableServerCertificateValidation; @@ -77,7 +80,8 @@ public void Dispose() internal async Task InitializeAsync(CancellationToken cancellationToken) { - Logging.Enter(this, nameof(InitializeAsync)); + if (Logging.IsEnabled) + Logging.Enter(this, nameof(InitializeAsync)); TransportBase transport; @@ -96,7 +100,8 @@ internal async Task InitializeAsync(CancellationToken cancellatio default: throw new InvalidOperationException("AmqpTransportSettings must specify WebSocketOnly or TcpOnly"); } - Logging.Exit(this, nameof(InitializeAsync)); + if (Logging.IsEnabled) + Logging.Exit(this, nameof(InitializeAsync)); return transport; } @@ -107,7 +112,8 @@ private async Task CreateClientWebSocketTransportAsync(Cancellati { cancellationToken.ThrowIfCancellationRequested(); - Logging.Enter(this, nameof(CreateClientWebSocketTransportAsync)); + if (Logging.IsEnabled) + Logging.Enter(this, nameof(CreateClientWebSocketTransportAsync)); string additionalQueryParams = ""; var websocketUri = new Uri($"{WebSocketConstants.Scheme}{_hostName}:{WebSocketConstants.SecurePort}{WebSocketConstants.UriSuffix}{additionalQueryParams}"); @@ -139,7 +145,8 @@ private async Task CreateClientWebSocketTransportAsync(Cancellati } finally { - Logging.Exit(this, $"{nameof(CreateClientWebSocketTransportAsync)}"); + if (Logging.IsEnabled) + Logging.Exit(this, $"{nameof(CreateClientWebSocketTransportAsync)}"); } } @@ -161,7 +168,8 @@ private async Task CreateClientWebSocketAsync(Uri websocketUri, { try { - Logging.Enter(this, nameof(CreateClientWebSocketAsync)); + if (Logging.IsEnabled) + Logging.Enter(this, nameof(CreateClientWebSocketAsync)); var websocket = new ClientWebSocket(); @@ -177,13 +185,20 @@ private async Task CreateClientWebSocketAsync(Uri websocketUri, { // Configure proxy server websocket.Options.Proxy = webProxy; - Logging.Info(this, $"{nameof(CreateClientWebSocketAsync)} Setting ClientWebSocket.Options.Proxy"); + if (Logging.IsEnabled) + Logging.Info(this, $"{nameof(CreateClientWebSocketAsync)} Setting ClientWebSocket.Options.Proxy"); } } catch (PlatformNotSupportedException) { // .NET Core 2.0 doesn't support proxy. Ignore this setting. - Logging.Error(this, $"{nameof(CreateClientWebSocketAsync)} PlatformNotSupportedException thrown as .NET Core 2.0 doesn't support proxy"); + if (Logging.IsEnabled) + Logging.Error(this, $"{nameof(CreateClientWebSocketAsync)} PlatformNotSupportedException thrown as .NET Core 2.0 doesn't support proxy"); + } + + if (_clientOptions?.WebSocketKeepAlive != null) + { + websocket.Options.KeepAliveInterval = _clientOptions.WebSocketKeepAlive; } if (_amqpTransportSettings.ClientCertificate != null) @@ -196,7 +211,8 @@ private async Task CreateClientWebSocketAsync(Uri websocketUri, if (_amqpTransportSettings.RemoteCertificateValidationCallback != null) { websocket.Options.RemoteCertificateValidationCallback = _amqpTransportSettings.RemoteCertificateValidationCallback; - Logging.Info(this, $"{nameof(CreateClientWebSocketAsync)} Setting RemoteCertificateValidationCallback"); + if (Logging.IsEnabled) + Logging.Info(this, $"{nameof(CreateClientWebSocketAsync)} Setting RemoteCertificateValidationCallback"); } #endif await websocket.ConnectAsync(websocketUri, cancellationToken).ConfigureAwait(false); @@ -205,7 +221,8 @@ private async Task CreateClientWebSocketAsync(Uri websocketUri, } finally { - Logging.Exit(this, nameof(CreateClientWebSocketAsync)); + if (Logging.IsEnabled) + Logging.Exit(this, nameof(CreateClientWebSocketAsync)); } } diff --git a/iothub/device/src/Transport/Mqtt/ClientWebSocketChannel.cs b/iothub/device/src/Transport/Mqtt/ClientWebSocketChannel.cs index 677f1eaaac..89b6025aa7 100644 --- a/iothub/device/src/Transport/Mqtt/ClientWebSocketChannel.cs +++ b/iothub/device/src/Transport/Mqtt/ClientWebSocketChannel.cs @@ -354,8 +354,8 @@ private async Task DoReadBytesAsync(IByteBuffer byteBuffer) try { WebSocketReceiveResult receiveResult = await _webSocket - .ReceiveAsync(new ArraySegment(byteBuffer.Array, byteBuffer.ArrayOffset + byteBuffer.WriterIndex, byteBuffer.WritableBytes), CancellationToken.None) - .ConfigureAwait(false); + .ReceiveAsync(new ArraySegment(byteBuffer.Array, byteBuffer.ArrayOffset + byteBuffer.WriterIndex, byteBuffer.WritableBytes), CancellationToken.None) + .ConfigureAwait(false); if (receiveResult.MessageType == WebSocketMessageType.Text) { throw new ProtocolViolationException("Mqtt over WS message cannot be in text"); diff --git a/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs b/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs index f26b7a7ce1..d5c28ca79f 100644 --- a/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs +++ b/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs @@ -429,7 +429,9 @@ private async void ScheduleCheckConnectTimeoutAsync(IChannelHandlerContext conte try { - await context.Channel.EventLoop.ScheduleAsync(s_checkConnAckTimeoutCallback, context, _mqttTransportSettings.ConnectArrivalTimeout).ConfigureAwait(true); + await context.Channel.EventLoop + .ScheduleAsync(s_checkConnAckTimeoutCallback, context, _mqttTransportSettings.ConnectArrivalTimeout) + .ConfigureAwait(true); } catch (Exception ex) when (!ex.IsFatal()) { diff --git a/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs b/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs index 405cc05a52..1739e1261d 100644 --- a/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs +++ b/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs @@ -1265,13 +1265,17 @@ private Func> CreateChannelFactory(IotHubConnec }; } - private Func> CreateWebSocketChannelFactory(IotHubConnectionString iotHubConnectionString, MqttTransportSettings settings, ProductInfo productInfo, ClientOptions options) + private Func> CreateWebSocketChannelFactory( + IotHubConnectionString iotHubConnectionString, + MqttTransportSettings settings, + + ProductInfo productInfo, ClientOptions options) { return async (address, port) => { string additionalQueryParams = ""; - var websocketUri = new Uri(WebSocketConstants.Scheme + iotHubConnectionString.HostName + ":" + WebSocketConstants.SecurePort + WebSocketConstants.UriSuffix + additionalQueryParams); + var websocketUri = new Uri($"{WebSocketConstants.Scheme}{iotHubConnectionString.HostName}:{WebSocketConstants.SecurePort}{WebSocketConstants.UriSuffix}{additionalQueryParams}"); var websocket = new ClientWebSocket(); websocket.Options.AddSubProtocol(WebSocketConstants.SubProtocols.Mqtt); @@ -1292,13 +1296,18 @@ private Func> CreateWebSocketChannelFactory(Iot Logging.Error(this, $"{nameof(CreateWebSocketChannelFactory)} PlatformNotSupportedException thrown as .NET Core 2.0 doesn't support proxy"); } + if (options?.WebSocketKeepAlive != null) + { + websocket.Options.KeepAliveInterval = options.WebSocketKeepAlive; + } + if (settings.ClientCertificate != null) { websocket.Options.ClientCertificates.Add(settings.ClientCertificate); } // Support for RemoteCertificateValidationCallback for ClientWebSocket is introduced in .NET Standard 2.1 -#if NETSTANDARD2_1 +#if NETSTANDARD2_1_OR_GREATER if (settings.RemoteCertificateValidationCallback != null) { websocket.Options.RemoteCertificateValidationCallback = settings.RemoteCertificateValidationCallback; diff --git a/iothub/device/src/Transport/Mqtt/MqttTransportSettings.cs b/iothub/device/src/Transport/Mqtt/MqttTransportSettings.cs index ed9c960a79..8ef98e257b 100644 --- a/iothub/device/src/Transport/Mqtt/MqttTransportSettings.cs +++ b/iothub/device/src/Transport/Mqtt/MqttTransportSettings.cs @@ -182,7 +182,7 @@ public bool CertificateRevocationCheck public bool CleanSession { get; set; } /// - /// The interval, in seconds, that the client establishes with the service, for sending keep-alive pings. + /// The interval, in seconds, that the client establishes with the service, for sending keep-alive pings over TCP. /// The default is 300 seconds. /// /// diff --git a/iothub/device/tests/Amqp/MoqableAmqpUnit.cs b/iothub/device/tests/Amqp/MoqableAmqpUnit.cs index 60a2f1f088..9afc0ef678 100644 --- a/iothub/device/tests/Amqp/MoqableAmqpUnit.cs +++ b/iothub/device/tests/Amqp/MoqableAmqpUnit.cs @@ -16,8 +16,19 @@ namespace Microsoft.Azure.Devices.Client.Test.Transport { internal class MoqableAmqpUnit : AmqpUnit { - public MoqableAmqpUnit() : this(new DeviceIdentity(IotHubConnectionStringExtensions.Parse(AmqpTransportHandlerTests.TestConnectionString), new AmqpTransportSettings(TransportType.Amqp_Tcp_Only), new ProductInfo(), new ClientOptions()), - new AmqpConnectionHolder(new DeviceIdentity(IotHubConnectionStringExtensions.Parse(AmqpTransportHandlerTests.TestConnectionString), new AmqpTransportSettings(TransportType.Amqp_Tcp_Only), new ProductInfo(), new ClientOptions()))) + public MoqableAmqpUnit() + : this( + new DeviceIdentity( + IotHubConnectionStringExtensions.Parse(AmqpTransportHandlerTests.TestConnectionString), + new AmqpTransportSettings(TransportType.Amqp_Tcp_Only), + new ProductInfo(), + new ClientOptions()), + new AmqpConnectionHolder( + new DeviceIdentity( + IotHubConnectionStringExtensions.Parse(AmqpTransportHandlerTests.TestConnectionString), + new AmqpTransportSettings(TransportType.Amqp_Tcp_Only), + new ProductInfo(), + new ClientOptions()))) { } From 5118b79c74848ed59eebd556e08817a3d210cf8d Mon Sep 17 00:00:00 2001 From: "David R. Williamson" Date: Tue, 19 Apr 2022 13:54:08 -0700 Subject: [PATCH 2/5] TimeSpan not nullable unless specified --- iothub/device/src/ClientOptions.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iothub/device/src/ClientOptions.cs b/iothub/device/src/ClientOptions.cs index 66da76be31..ffd944bc1b 100644 --- a/iothub/device/src/ClientOptions.cs +++ b/iothub/device/src/ClientOptions.cs @@ -59,6 +59,6 @@ public class ClientOptions /// A keep-alive for when using web sockets. /// /// - public TimeSpan WebSocketKeepAlive { get; set; } + public TimeSpan? WebSocketKeepAlive { get; set; } } } From 8845b368aef93fc449b838ee19f750b067c9d352 Mon Sep 17 00:00:00 2001 From: "David R. Williamson" Date: Tue, 19 Apr 2022 14:11:00 -0700 Subject: [PATCH 3/5] Move keep-alive setting to AMQP and MQTT transport settings --- iothub/device/src/AmqpTransportSettings.cs | 6 ++++++ iothub/device/src/ClientOptions.cs | 6 ------ iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs | 2 +- iothub/device/src/Transport/AmqpIot/AmqpIotConnector.cs | 6 ++---- iothub/device/src/Transport/AmqpIot/AmqpIotTransport.cs | 7 ++----- iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs | 8 ++++---- iothub/device/src/Transport/Mqtt/MqttTransportSettings.cs | 6 ++++++ 7 files changed, 21 insertions(+), 20 deletions(-) diff --git a/iothub/device/src/AmqpTransportSettings.cs b/iothub/device/src/AmqpTransportSettings.cs index 84a427e4eb..fcf78d96f7 100644 --- a/iothub/device/src/AmqpTransportSettings.cs +++ b/iothub/device/src/AmqpTransportSettings.cs @@ -146,6 +146,12 @@ public TimeSpan OpenTimeout set => SetOpenTimeout(value); } + /// + /// A keep-alive for when using web sockets. + /// + /// + public TimeSpan? WebSocketKeepAlive { get; set; } + /// /// The pre-fetch count /// diff --git a/iothub/device/src/ClientOptions.cs b/iothub/device/src/ClientOptions.cs index ffd944bc1b..1953bded96 100644 --- a/iothub/device/src/ClientOptions.cs +++ b/iothub/device/src/ClientOptions.cs @@ -54,11 +54,5 @@ public class ClientOptions /// or the flow. /// public int SasTokenRenewalBuffer { get; set; } - - /// - /// A keep-alive for when using web sockets. - /// - /// - public TimeSpan? WebSocketKeepAlive { get; set; } } } diff --git a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs index 599ab201af..66c72af8c6 100644 --- a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs +++ b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs @@ -27,7 +27,7 @@ internal class AmqpConnectionHolder : IAmqpConnectionHolder, IAmqpUnitManager public AmqpConnectionHolder(IDeviceIdentity deviceIdentity) { _deviceIdentity = deviceIdentity; - _amqpIotConnector = new AmqpIotConnector(deviceIdentity.AmqpTransportSettings, deviceIdentity.Options, deviceIdentity.IotHubConnectionString.HostName); + _amqpIotConnector = new AmqpIotConnector(deviceIdentity.AmqpTransportSettings, deviceIdentity.IotHubConnectionString.HostName); if (Logging.IsEnabled) Logging.Associate(this, _deviceIdentity, nameof(_deviceIdentity)); } diff --git a/iothub/device/src/Transport/AmqpIot/AmqpIotConnector.cs b/iothub/device/src/Transport/AmqpIot/AmqpIotConnector.cs index 1a0e5dcde9..2b83cacae5 100644 --- a/iothub/device/src/Transport/AmqpIot/AmqpIotConnector.cs +++ b/iothub/device/src/Transport/AmqpIot/AmqpIotConnector.cs @@ -24,16 +24,14 @@ internal class AmqpIotConnector : IDisposable private static readonly AmqpVersion s_amqpVersion_1_0_0 = new AmqpVersion(1, 0, 0); private static readonly bool s_disableServerCertificateValidation = InitializeDisableServerCertificateValidation(); - private readonly ClientOptions _clientOptions; private readonly AmqpTransportSettings _amqpTransportSettings; private readonly string _hostName; private AmqpIotTransport _amqpIotTransport; - internal AmqpIotConnector(AmqpTransportSettings amqpTransportSettings, ClientOptions clientOptions, string hostName) + internal AmqpIotConnector(AmqpTransportSettings amqpTransportSettings, string hostName) { _amqpTransportSettings = amqpTransportSettings; - _clientOptions = clientOptions; _hostName = hostName; } @@ -56,7 +54,7 @@ public async Task OpenConnectionAsync(CancellationToken cance IdleTimeOut = Convert.ToUInt32(_amqpTransportSettings.IdleTimeout.TotalMilliseconds), }; - _amqpIotTransport = new AmqpIotTransport(amqpSettings, _amqpTransportSettings, _clientOptions, _hostName, s_disableServerCertificateValidation); + _amqpIotTransport = new AmqpIotTransport(amqpSettings, _amqpTransportSettings, _hostName, s_disableServerCertificateValidation); TransportBase transportBase = await _amqpIotTransport.InitializeAsync(cancellationToken).ConfigureAwait(false); try diff --git a/iothub/device/src/Transport/AmqpIot/AmqpIotTransport.cs b/iothub/device/src/Transport/AmqpIot/AmqpIotTransport.cs index 872aebe6c6..27d0de8267 100644 --- a/iothub/device/src/Transport/AmqpIot/AmqpIotTransport.cs +++ b/iothub/device/src/Transport/AmqpIot/AmqpIotTransport.cs @@ -23,20 +23,17 @@ internal class AmqpIotTransport : IDisposable private readonly AmqpSettings _amqpSettings; private readonly AmqpTransportSettings _amqpTransportSettings; private readonly TlsTransportSettings _tlsTransportSettings; - private readonly ClientOptions _clientOptions; private ClientWebSocketTransport _clientWebSocketTransport; public AmqpIotTransport( AmqpSettings amqpSettings, AmqpTransportSettings amqpTransportSettings, - ClientOptions clientOptions, string hostName, bool disableServerCertificateValidation) { _amqpSettings = amqpSettings; _amqpTransportSettings = amqpTransportSettings; - _clientOptions = clientOptions; _hostName = hostName; _disableServerCertificateValidation = disableServerCertificateValidation; @@ -196,9 +193,9 @@ private async Task CreateClientWebSocketAsync(Uri websocketUri, Logging.Error(this, $"{nameof(CreateClientWebSocketAsync)} PlatformNotSupportedException thrown as .NET Core 2.0 doesn't support proxy"); } - if (_clientOptions?.WebSocketKeepAlive != null) + if (_amqpTransportSettings.WebSocketKeepAlive.HasValue) { - websocket.Options.KeepAliveInterval = _clientOptions.WebSocketKeepAlive; + websocket.Options.KeepAliveInterval = _amqpTransportSettings.WebSocketKeepAlive.Value; } if (_amqpTransportSettings.ClientCertificate != null) diff --git a/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs b/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs index 1739e1261d..cbe87aad1b 100644 --- a/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs +++ b/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs @@ -1268,8 +1268,8 @@ private Func> CreateChannelFactory(IotHubConnec private Func> CreateWebSocketChannelFactory( IotHubConnectionString iotHubConnectionString, MqttTransportSettings settings, - - ProductInfo productInfo, ClientOptions options) + ProductInfo productInfo, + ClientOptions options) { return async (address, port) => { @@ -1296,9 +1296,9 @@ private Func> CreateWebSocketChannelFactory( Logging.Error(this, $"{nameof(CreateWebSocketChannelFactory)} PlatformNotSupportedException thrown as .NET Core 2.0 doesn't support proxy"); } - if (options?.WebSocketKeepAlive != null) + if (settings.WebSocketKeepAlive.HasValue) { - websocket.Options.KeepAliveInterval = options.WebSocketKeepAlive; + websocket.Options.KeepAliveInterval = settings.WebSocketKeepAlive.Value; } if (settings.ClientCertificate != null) diff --git a/iothub/device/src/Transport/Mqtt/MqttTransportSettings.cs b/iothub/device/src/Transport/Mqtt/MqttTransportSettings.cs index 8ef98e257b..5ee202c8d4 100644 --- a/iothub/device/src/Transport/Mqtt/MqttTransportSettings.cs +++ b/iothub/device/src/Transport/Mqtt/MqttTransportSettings.cs @@ -193,6 +193,12 @@ public bool CertificateRevocationCheck /// public int KeepAliveInSeconds { get; set; } + /// + /// A keep-alive for when using web sockets. + /// + /// + public TimeSpan? WebSocketKeepAlive { get; set; } + /// /// Indicates whether the transport has a will message. /// From 899a1116af223d1ec8585b10382d7d4e506d0356 Mon Sep 17 00:00:00 2001 From: "David R. Williamson" Date: Tue, 19 Apr 2022 14:23:37 -0700 Subject: [PATCH 4/5] Add logging --- iothub/device/src/Transport/AmqpIot/AmqpIotTransport.cs | 4 +++- iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/iothub/device/src/Transport/AmqpIot/AmqpIotTransport.cs b/iothub/device/src/Transport/AmqpIot/AmqpIotTransport.cs index 27d0de8267..7e3c3ef2ca 100644 --- a/iothub/device/src/Transport/AmqpIot/AmqpIotTransport.cs +++ b/iothub/device/src/Transport/AmqpIot/AmqpIotTransport.cs @@ -183,7 +183,7 @@ private async Task CreateClientWebSocketAsync(Uri websocketUri, // Configure proxy server websocket.Options.Proxy = webProxy; if (Logging.IsEnabled) - Logging.Info(this, $"{nameof(CreateClientWebSocketAsync)} Setting ClientWebSocket.Options.Proxy"); + Logging.Info(this, $"{nameof(CreateClientWebSocketAsync)} Set ClientWebSocket.Options.Proxy to {webProxy}"); } } catch (PlatformNotSupportedException) @@ -196,6 +196,8 @@ private async Task CreateClientWebSocketAsync(Uri websocketUri, if (_amqpTransportSettings.WebSocketKeepAlive.HasValue) { websocket.Options.KeepAliveInterval = _amqpTransportSettings.WebSocketKeepAlive.Value; + if (Logging.IsEnabled) + Logging.Info(this, $"{nameof(CreateClientWebSocketAsync)} Set websocket keep-alive to {_amqpTransportSettings.WebSocketKeepAlive}"); } if (_amqpTransportSettings.ClientCertificate != null) diff --git a/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs b/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs index cbe87aad1b..5c35743c3a 100644 --- a/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs +++ b/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs @@ -1286,7 +1286,7 @@ private Func> CreateWebSocketChannelFactory( // Configure proxy server websocket.Options.Proxy = _webProxy; if (Logging.IsEnabled) - Logging.Info(this, $"{nameof(CreateWebSocketChannelFactory)} Setting ClientWebSocket.Options.Proxy"); + Logging.Info(this, $"{nameof(CreateWebSocketChannelFactory)} Set ClientWebSocket.Options.Proxy to {_webProxy}"); } } catch (PlatformNotSupportedException) @@ -1299,6 +1299,8 @@ private Func> CreateWebSocketChannelFactory( if (settings.WebSocketKeepAlive.HasValue) { websocket.Options.KeepAliveInterval = settings.WebSocketKeepAlive.Value; + if (Logging.IsEnabled) + Logging.Info(this, $"{nameof(CreateWebSocketChannelFactory)} Set websocket keep-alive to {settings.WebSocketKeepAlive}"); } if (settings.ClientCertificate != null) From 2bb786b4a291744ceb4a8b2ee37191acd4b63abe Mon Sep 17 00:00:00 2001 From: "David R. Williamson" Date: Tue, 19 Apr 2022 15:47:45 -0700 Subject: [PATCH 5/5] PR feedback --- iothub/device/src/AmqpTransportSettings.cs | 2 +- iothub/device/src/Transport/Mqtt/MqttTransportSettings.cs | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/iothub/device/src/AmqpTransportSettings.cs b/iothub/device/src/AmqpTransportSettings.cs index fcf78d96f7..f5cfaa0cef 100644 --- a/iothub/device/src/AmqpTransportSettings.cs +++ b/iothub/device/src/AmqpTransportSettings.cs @@ -147,7 +147,7 @@ public TimeSpan OpenTimeout } /// - /// A keep-alive for when using web sockets. + /// A keep-alive for the transport layer in sending ping/pong control frames when using web sockets. /// /// public TimeSpan? WebSocketKeepAlive { get; set; } diff --git a/iothub/device/src/Transport/Mqtt/MqttTransportSettings.cs b/iothub/device/src/Transport/Mqtt/MqttTransportSettings.cs index 5ee202c8d4..d72da354c2 100644 --- a/iothub/device/src/Transport/Mqtt/MqttTransportSettings.cs +++ b/iothub/device/src/Transport/Mqtt/MqttTransportSettings.cs @@ -182,7 +182,7 @@ public bool CertificateRevocationCheck public bool CleanSession { get; set; } /// - /// The interval, in seconds, that the client establishes with the service, for sending keep-alive pings over TCP. + /// The interval, in seconds, that the client establishes with the service, for sending protocol-level keep-alive pings. /// The default is 300 seconds. /// /// @@ -194,8 +194,11 @@ public bool CertificateRevocationCheck public int KeepAliveInSeconds { get; set; } /// - /// A keep-alive for when using web sockets. + /// A keep-alive for the transport layer in sending ping/pong control frames when using web sockets. /// + /// + /// This value is different from the protocol-level keep-alive packets that are sent over the overlaying MQTT transport protocol. + /// /// public TimeSpan? WebSocketKeepAlive { get; set; }