From e98b4db922e5fa95854880252e41806768916ec9 Mon Sep 17 00:00:00 2001 From: "David R. Williamson" Date: Tue, 26 Apr 2022 15:07:13 -0700 Subject: [PATCH] Allow users to configure websocket keep-alive (#2352) --- iothub/device/src/AmqpTransportSettings.cs | 6 +++ .../Transport/Amqp/AmqpConnectionHolder.cs | 47 +++---------------- .../src/Transport/AmqpIot/AmqpIotConnector.cs | 9 ++-- .../src/Transport/AmqpIot/AmqpIotTransport.cs | 34 ++++++++++---- .../Transport/Mqtt/ClientWebSocketChannel.cs | 4 +- .../src/Transport/Mqtt/MqttIotHubAdapter.cs | 4 +- .../Transport/Mqtt/MqttTransportHandler.cs | 19 ++++++-- .../Transport/Mqtt/MqttTransportSettings.cs | 11 ++++- iothub/device/tests/Amqp/MoqableAmqpUnit.cs | 15 +++++- 9 files changed, 87 insertions(+), 62 deletions(-) diff --git a/iothub/device/src/AmqpTransportSettings.cs b/iothub/device/src/AmqpTransportSettings.cs index 84a427e4eb..f5cfaa0cef 100644 --- a/iothub/device/src/AmqpTransportSettings.cs +++ b/iothub/device/src/AmqpTransportSettings.cs @@ -146,6 +146,12 @@ public TimeSpan OpenTimeout set => SetOpenTimeout(value); } + /// + /// A keep-alive for the transport layer in sending ping/pong control frames when using web sockets. + /// + /// + public TimeSpan? WebSocketKeepAlive { get; set; } + /// /// The pre-fetch count /// diff --git a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs index 46d1b245dc..66c72af8c6 100644 --- a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs +++ b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs @@ -29,9 +29,7 @@ public AmqpConnectionHolder(IDeviceIdentity deviceIdentity) _deviceIdentity = deviceIdentity; _amqpIotConnector = new AmqpIotConnector(deviceIdentity.AmqpTransportSettings, deviceIdentity.IotHubConnectionString.HostName); if (Logging.IsEnabled) - { Logging.Associate(this, _deviceIdentity, nameof(_deviceIdentity)); - } } public AmqpUnit CreateAmqpUnit( @@ -43,9 +41,7 @@ public AmqpUnit CreateAmqpUnit( Action onUnitDisconnected) { if (Logging.IsEnabled) - { Logging.Enter(this, deviceIdentity, nameof(CreateAmqpUnit)); - } var amqpUnit = new AmqpUnit( deviceIdentity, @@ -59,10 +55,9 @@ public AmqpUnit CreateAmqpUnit( { _amqpUnits.Add(amqpUnit); } + if (Logging.IsEnabled) - { Logging.Exit(this, deviceIdentity, nameof(CreateAmqpUnit)); - } return amqpUnit; } @@ -70,9 +65,7 @@ public AmqpUnit CreateAmqpUnit( private void OnConnectionClosed(object o, EventArgs args) { if (Logging.IsEnabled) - { Logging.Enter(this, o, nameof(OnConnectionClosed)); - } if (_amqpIotConnection != null && ReferenceEquals(_amqpIotConnection, o)) { @@ -87,25 +80,21 @@ private void OnConnectionClosed(object o, EventArgs args) unit.OnConnectionDisconnected(); } } + if (Logging.IsEnabled) - { Logging.Exit(this, o, nameof(OnConnectionClosed)); - } } public void Shutdown() { if (Logging.IsEnabled) - { Logging.Enter(this, _amqpIotConnection, nameof(Shutdown)); - } _amqpAuthenticationRefresher?.StopLoop(); _amqpIotConnection?.SafeClose(); + if (Logging.IsEnabled) - { Logging.Exit(this, _amqpIotConnection, nameof(Shutdown)); - } } public void Dispose() @@ -122,9 +111,7 @@ private void Dispose(bool disposing) } if (Logging.IsEnabled) - { Logging.Info(this, disposing, nameof(Dispose)); - } if (disposing) { @@ -144,18 +131,15 @@ private void Dispose(bool disposing) public async Task CreateRefresherAsync(IDeviceIdentity deviceIdentity, CancellationToken cancellationToken) { if (Logging.IsEnabled) - { Logging.Enter(this, deviceIdentity, nameof(CreateRefresherAsync)); - } AmqpIotConnection amqpIotConnection = await EnsureConnectionAsync(cancellationToken).ConfigureAwait(false); IAmqpAuthenticationRefresher amqpAuthenticator = await amqpIotConnection .CreateRefresherAsync(deviceIdentity, cancellationToken) .ConfigureAwait(false); + if (Logging.IsEnabled) - { Logging.Exit(this, deviceIdentity, nameof(CreateRefresherAsync)); - } return amqpAuthenticator; } @@ -163,19 +147,13 @@ public async Task CreateRefresherAsync(IDeviceIden public async Task OpenSessionAsync(IDeviceIdentity deviceIdentity, CancellationToken cancellationToken) { if (Logging.IsEnabled) - { Logging.Enter(this, deviceIdentity, nameof(OpenSessionAsync)); - } AmqpIotConnection amqpIotConnection = await EnsureConnectionAsync(cancellationToken).ConfigureAwait(false); AmqpIotSession amqpIotSession = await amqpIotConnection.OpenSessionAsync(cancellationToken).ConfigureAwait(false); if (Logging.IsEnabled) { Logging.Associate(amqpIotConnection, amqpIotSession, nameof(OpenSessionAsync)); - } - - if (Logging.IsEnabled) - { Logging.Exit(this, deviceIdentity, nameof(OpenSessionAsync)); } @@ -185,9 +163,7 @@ public async Task OpenSessionAsync(IDeviceIdentity deviceIdentit public async Task EnsureConnectionAsync(CancellationToken cancellationToken) { if (Logging.IsEnabled) - { Logging.Enter(this, nameof(EnsureConnectionAsync)); - } AmqpIotConnection amqpIotConnection = null; IAmqpAuthenticationRefresher amqpAuthenticationRefresher = null; @@ -205,18 +181,15 @@ public async Task EnsureConnectionAsync(CancellationToken can if (_amqpIotConnection == null || _amqpIotConnection.IsClosing()) { if (Logging.IsEnabled) - { Logging.Info(this, "Creating new AmqpConnection", nameof(EnsureConnectionAsync)); - } + // Create AmqpConnection amqpIotConnection = await _amqpIotConnector.OpenConnectionAsync(cancellationToken).ConfigureAwait(false); if (_deviceIdentity.AuthenticationModel == AuthenticationModel.SasGrouped) { if (Logging.IsEnabled) - { Logging.Info(this, "Creating connection wide AmqpAuthenticationRefresher", nameof(EnsureConnectionAsync)); - } amqpAuthenticationRefresher = new AmqpAuthenticationRefresher(_deviceIdentity, amqpIotConnection.GetCbsLink()); await amqpAuthenticationRefresher.InitLoopAsync(cancellationToken).ConfigureAwait(false); @@ -226,9 +199,7 @@ public async Task EnsureConnectionAsync(CancellationToken can _amqpAuthenticationRefresher = amqpAuthenticationRefresher; _amqpIotConnection.Closed += OnConnectionClosed; if (Logging.IsEnabled) - { Logging.Associate(this, _amqpIotConnection, nameof(_amqpIotConnection)); - } } else { @@ -245,10 +216,9 @@ public async Task EnsureConnectionAsync(CancellationToken can { _lock.Release(); } + if (Logging.IsEnabled) - { Logging.Exit(this, nameof(EnsureConnectionAsync)); - } return amqpIotConnection; } @@ -256,9 +226,7 @@ public async Task EnsureConnectionAsync(CancellationToken can public void RemoveAmqpUnit(AmqpUnit amqpUnit) { if (Logging.IsEnabled) - { Logging.Enter(this, amqpUnit, nameof(RemoveAmqpUnit)); - } lock (_unitsLock) { @@ -269,10 +237,9 @@ public void RemoveAmqpUnit(AmqpUnit amqpUnit) Shutdown(); } } + if (Logging.IsEnabled) - { Logging.Exit(this, amqpUnit, nameof(RemoveAmqpUnit)); - } } internal IDeviceIdentity GetDeviceIdentityOfAuthenticationProvider() diff --git a/iothub/device/src/Transport/AmqpIot/AmqpIotConnector.cs b/iothub/device/src/Transport/AmqpIot/AmqpIotConnector.cs index 3c80df7c55..2b83cacae5 100644 --- a/iothub/device/src/Transport/AmqpIot/AmqpIotConnector.cs +++ b/iothub/device/src/Transport/AmqpIot/AmqpIotConnector.cs @@ -37,7 +37,8 @@ internal AmqpIotConnector(AmqpTransportSettings amqpTransportSettings, string ho public async Task OpenConnectionAsync(CancellationToken cancellationToken) { - Logging.Enter(this, nameof(OpenConnectionAsync)); + if (Logging.IsEnabled) + Logging.Enter(this, nameof(OpenConnectionAsync)); var amqpTransportProvider = new AmqpTransportProvider(); amqpTransportProvider.Versions.Add(s_amqpVersion_1_0_0); @@ -63,7 +64,8 @@ public async Task OpenConnectionAsync(CancellationToken cance amqpConnection.Closed += amqpIotConnection.AmqpConnectionClosed; await amqpConnection.OpenAsync(cancellationToken).ConfigureAwait(false); - Logging.Exit(this, $"{nameof(OpenConnectionAsync)}"); + if (Logging.IsEnabled) + Logging.Exit(this, $"{nameof(OpenConnectionAsync)}"); return amqpIotConnection; } @@ -75,7 +77,8 @@ public async Task OpenConnectionAsync(CancellationToken cance } finally { - Logging.Exit(this, nameof(OpenConnectionAsync)); + if (Logging.IsEnabled) + Logging.Exit(this, nameof(OpenConnectionAsync)); } } diff --git a/iothub/device/src/Transport/AmqpIot/AmqpIotTransport.cs b/iothub/device/src/Transport/AmqpIot/AmqpIotTransport.cs index 3406e2a13d..7e3c3ef2ca 100644 --- a/iothub/device/src/Transport/AmqpIot/AmqpIotTransport.cs +++ b/iothub/device/src/Transport/AmqpIot/AmqpIotTransport.cs @@ -77,7 +77,8 @@ public void Dispose() internal async Task InitializeAsync(CancellationToken cancellationToken) { - Logging.Enter(this, nameof(InitializeAsync)); + if (Logging.IsEnabled) + Logging.Enter(this, nameof(InitializeAsync)); TransportBase transport; @@ -96,7 +97,8 @@ internal async Task InitializeAsync(CancellationToken cancellatio default: throw new InvalidOperationException("AmqpTransportSettings must specify WebSocketOnly or TcpOnly"); } - Logging.Exit(this, nameof(InitializeAsync)); + if (Logging.IsEnabled) + Logging.Exit(this, nameof(InitializeAsync)); return transport; } @@ -107,7 +109,8 @@ private async Task CreateClientWebSocketTransportAsync(Cancellati { cancellationToken.ThrowIfCancellationRequested(); - Logging.Enter(this, nameof(CreateClientWebSocketTransportAsync)); + if (Logging.IsEnabled) + Logging.Enter(this, nameof(CreateClientWebSocketTransportAsync)); string additionalQueryParams = ""; var websocketUri = new Uri($"{WebSocketConstants.Scheme}{_hostName}:{WebSocketConstants.SecurePort}{WebSocketConstants.UriSuffix}{additionalQueryParams}"); @@ -139,7 +142,8 @@ private async Task CreateClientWebSocketTransportAsync(Cancellati } finally { - Logging.Exit(this, $"{nameof(CreateClientWebSocketTransportAsync)}"); + if (Logging.IsEnabled) + Logging.Exit(this, $"{nameof(CreateClientWebSocketTransportAsync)}"); } } @@ -161,7 +165,8 @@ private async Task CreateClientWebSocketAsync(Uri websocketUri, { try { - Logging.Enter(this, nameof(CreateClientWebSocketAsync)); + if (Logging.IsEnabled) + Logging.Enter(this, nameof(CreateClientWebSocketAsync)); var websocket = new ClientWebSocket(); @@ -177,13 +182,22 @@ private async Task CreateClientWebSocketAsync(Uri websocketUri, { // Configure proxy server websocket.Options.Proxy = webProxy; - Logging.Info(this, $"{nameof(CreateClientWebSocketAsync)} Setting ClientWebSocket.Options.Proxy"); + if (Logging.IsEnabled) + Logging.Info(this, $"{nameof(CreateClientWebSocketAsync)} Set ClientWebSocket.Options.Proxy to {webProxy}"); } } catch (PlatformNotSupportedException) { // .NET Core 2.0 doesn't support proxy. Ignore this setting. - Logging.Error(this, $"{nameof(CreateClientWebSocketAsync)} PlatformNotSupportedException thrown as .NET Core 2.0 doesn't support proxy"); + if (Logging.IsEnabled) + Logging.Error(this, $"{nameof(CreateClientWebSocketAsync)} PlatformNotSupportedException thrown as .NET Core 2.0 doesn't support proxy"); + } + + if (_amqpTransportSettings.WebSocketKeepAlive.HasValue) + { + websocket.Options.KeepAliveInterval = _amqpTransportSettings.WebSocketKeepAlive.Value; + if (Logging.IsEnabled) + Logging.Info(this, $"{nameof(CreateClientWebSocketAsync)} Set websocket keep-alive to {_amqpTransportSettings.WebSocketKeepAlive}"); } if (_amqpTransportSettings.ClientCertificate != null) @@ -196,7 +210,8 @@ private async Task CreateClientWebSocketAsync(Uri websocketUri, if (_amqpTransportSettings.RemoteCertificateValidationCallback != null) { websocket.Options.RemoteCertificateValidationCallback = _amqpTransportSettings.RemoteCertificateValidationCallback; - Logging.Info(this, $"{nameof(CreateClientWebSocketAsync)} Setting RemoteCertificateValidationCallback"); + if (Logging.IsEnabled) + Logging.Info(this, $"{nameof(CreateClientWebSocketAsync)} Setting RemoteCertificateValidationCallback"); } #endif await websocket.ConnectAsync(websocketUri, cancellationToken).ConfigureAwait(false); @@ -205,7 +220,8 @@ private async Task CreateClientWebSocketAsync(Uri websocketUri, } finally { - Logging.Exit(this, nameof(CreateClientWebSocketAsync)); + if (Logging.IsEnabled) + Logging.Exit(this, nameof(CreateClientWebSocketAsync)); } } diff --git a/iothub/device/src/Transport/Mqtt/ClientWebSocketChannel.cs b/iothub/device/src/Transport/Mqtt/ClientWebSocketChannel.cs index 677f1eaaac..89b6025aa7 100644 --- a/iothub/device/src/Transport/Mqtt/ClientWebSocketChannel.cs +++ b/iothub/device/src/Transport/Mqtt/ClientWebSocketChannel.cs @@ -354,8 +354,8 @@ private async Task DoReadBytesAsync(IByteBuffer byteBuffer) try { WebSocketReceiveResult receiveResult = await _webSocket - .ReceiveAsync(new ArraySegment(byteBuffer.Array, byteBuffer.ArrayOffset + byteBuffer.WriterIndex, byteBuffer.WritableBytes), CancellationToken.None) - .ConfigureAwait(false); + .ReceiveAsync(new ArraySegment(byteBuffer.Array, byteBuffer.ArrayOffset + byteBuffer.WriterIndex, byteBuffer.WritableBytes), CancellationToken.None) + .ConfigureAwait(false); if (receiveResult.MessageType == WebSocketMessageType.Text) { throw new ProtocolViolationException("Mqtt over WS message cannot be in text"); diff --git a/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs b/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs index f26b7a7ce1..d5c28ca79f 100644 --- a/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs +++ b/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs @@ -429,7 +429,9 @@ private async void ScheduleCheckConnectTimeoutAsync(IChannelHandlerContext conte try { - await context.Channel.EventLoop.ScheduleAsync(s_checkConnAckTimeoutCallback, context, _mqttTransportSettings.ConnectArrivalTimeout).ConfigureAwait(true); + await context.Channel.EventLoop + .ScheduleAsync(s_checkConnAckTimeoutCallback, context, _mqttTransportSettings.ConnectArrivalTimeout) + .ConfigureAwait(true); } catch (Exception ex) when (!ex.IsFatal()) { diff --git a/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs b/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs index 405cc05a52..5c35743c3a 100644 --- a/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs +++ b/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs @@ -1265,13 +1265,17 @@ private Func> CreateChannelFactory(IotHubConnec }; } - private Func> CreateWebSocketChannelFactory(IotHubConnectionString iotHubConnectionString, MqttTransportSettings settings, ProductInfo productInfo, ClientOptions options) + private Func> CreateWebSocketChannelFactory( + IotHubConnectionString iotHubConnectionString, + MqttTransportSettings settings, + ProductInfo productInfo, + ClientOptions options) { return async (address, port) => { string additionalQueryParams = ""; - var websocketUri = new Uri(WebSocketConstants.Scheme + iotHubConnectionString.HostName + ":" + WebSocketConstants.SecurePort + WebSocketConstants.UriSuffix + additionalQueryParams); + var websocketUri = new Uri($"{WebSocketConstants.Scheme}{iotHubConnectionString.HostName}:{WebSocketConstants.SecurePort}{WebSocketConstants.UriSuffix}{additionalQueryParams}"); var websocket = new ClientWebSocket(); websocket.Options.AddSubProtocol(WebSocketConstants.SubProtocols.Mqtt); @@ -1282,7 +1286,7 @@ private Func> CreateWebSocketChannelFactory(Iot // Configure proxy server websocket.Options.Proxy = _webProxy; if (Logging.IsEnabled) - Logging.Info(this, $"{nameof(CreateWebSocketChannelFactory)} Setting ClientWebSocket.Options.Proxy"); + Logging.Info(this, $"{nameof(CreateWebSocketChannelFactory)} Set ClientWebSocket.Options.Proxy to {_webProxy}"); } } catch (PlatformNotSupportedException) @@ -1292,13 +1296,20 @@ private Func> CreateWebSocketChannelFactory(Iot Logging.Error(this, $"{nameof(CreateWebSocketChannelFactory)} PlatformNotSupportedException thrown as .NET Core 2.0 doesn't support proxy"); } + if (settings.WebSocketKeepAlive.HasValue) + { + websocket.Options.KeepAliveInterval = settings.WebSocketKeepAlive.Value; + if (Logging.IsEnabled) + Logging.Info(this, $"{nameof(CreateWebSocketChannelFactory)} Set websocket keep-alive to {settings.WebSocketKeepAlive}"); + } + if (settings.ClientCertificate != null) { websocket.Options.ClientCertificates.Add(settings.ClientCertificate); } // Support for RemoteCertificateValidationCallback for ClientWebSocket is introduced in .NET Standard 2.1 -#if NETSTANDARD2_1 +#if NETSTANDARD2_1_OR_GREATER if (settings.RemoteCertificateValidationCallback != null) { websocket.Options.RemoteCertificateValidationCallback = settings.RemoteCertificateValidationCallback; diff --git a/iothub/device/src/Transport/Mqtt/MqttTransportSettings.cs b/iothub/device/src/Transport/Mqtt/MqttTransportSettings.cs index ed9c960a79..d72da354c2 100644 --- a/iothub/device/src/Transport/Mqtt/MqttTransportSettings.cs +++ b/iothub/device/src/Transport/Mqtt/MqttTransportSettings.cs @@ -182,7 +182,7 @@ public bool CertificateRevocationCheck public bool CleanSession { get; set; } /// - /// The interval, in seconds, that the client establishes with the service, for sending keep-alive pings. + /// The interval, in seconds, that the client establishes with the service, for sending protocol-level keep-alive pings. /// The default is 300 seconds. /// /// @@ -193,6 +193,15 @@ public bool CertificateRevocationCheck /// public int KeepAliveInSeconds { get; set; } + /// + /// A keep-alive for the transport layer in sending ping/pong control frames when using web sockets. + /// + /// + /// This value is different from the protocol-level keep-alive packets that are sent over the overlaying MQTT transport protocol. + /// + /// + public TimeSpan? WebSocketKeepAlive { get; set; } + /// /// Indicates whether the transport has a will message. /// diff --git a/iothub/device/tests/Amqp/MoqableAmqpUnit.cs b/iothub/device/tests/Amqp/MoqableAmqpUnit.cs index 60a2f1f088..9afc0ef678 100644 --- a/iothub/device/tests/Amqp/MoqableAmqpUnit.cs +++ b/iothub/device/tests/Amqp/MoqableAmqpUnit.cs @@ -16,8 +16,19 @@ namespace Microsoft.Azure.Devices.Client.Test.Transport { internal class MoqableAmqpUnit : AmqpUnit { - public MoqableAmqpUnit() : this(new DeviceIdentity(IotHubConnectionStringExtensions.Parse(AmqpTransportHandlerTests.TestConnectionString), new AmqpTransportSettings(TransportType.Amqp_Tcp_Only), new ProductInfo(), new ClientOptions()), - new AmqpConnectionHolder(new DeviceIdentity(IotHubConnectionStringExtensions.Parse(AmqpTransportHandlerTests.TestConnectionString), new AmqpTransportSettings(TransportType.Amqp_Tcp_Only), new ProductInfo(), new ClientOptions()))) + public MoqableAmqpUnit() + : this( + new DeviceIdentity( + IotHubConnectionStringExtensions.Parse(AmqpTransportHandlerTests.TestConnectionString), + new AmqpTransportSettings(TransportType.Amqp_Tcp_Only), + new ProductInfo(), + new ClientOptions()), + new AmqpConnectionHolder( + new DeviceIdentity( + IotHubConnectionStringExtensions.Parse(AmqpTransportHandlerTests.TestConnectionString), + new AmqpTransportSettings(TransportType.Amqp_Tcp_Only), + new ProductInfo(), + new ClientOptions()))) { }