diff --git a/iothub/device/src/AmqpTransportSettings.cs b/iothub/device/src/AmqpTransportSettings.cs
index 84a427e4eb..f5cfaa0cef 100644
--- a/iothub/device/src/AmqpTransportSettings.cs
+++ b/iothub/device/src/AmqpTransportSettings.cs
@@ -146,6 +146,12 @@ public TimeSpan OpenTimeout
set => SetOpenTimeout(value);
}
+ ///
+ /// A keep-alive for the transport layer in sending ping/pong control frames when using web sockets.
+ ///
+ ///
+ public TimeSpan? WebSocketKeepAlive { get; set; }
+
///
/// The pre-fetch count
///
diff --git a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs
index 46d1b245dc..66c72af8c6 100644
--- a/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs
+++ b/iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs
@@ -29,9 +29,7 @@ public AmqpConnectionHolder(IDeviceIdentity deviceIdentity)
_deviceIdentity = deviceIdentity;
_amqpIotConnector = new AmqpIotConnector(deviceIdentity.AmqpTransportSettings, deviceIdentity.IotHubConnectionString.HostName);
if (Logging.IsEnabled)
- {
Logging.Associate(this, _deviceIdentity, nameof(_deviceIdentity));
- }
}
public AmqpUnit CreateAmqpUnit(
@@ -43,9 +41,7 @@ public AmqpUnit CreateAmqpUnit(
Action onUnitDisconnected)
{
if (Logging.IsEnabled)
- {
Logging.Enter(this, deviceIdentity, nameof(CreateAmqpUnit));
- }
var amqpUnit = new AmqpUnit(
deviceIdentity,
@@ -59,10 +55,9 @@ public AmqpUnit CreateAmqpUnit(
{
_amqpUnits.Add(amqpUnit);
}
+
if (Logging.IsEnabled)
- {
Logging.Exit(this, deviceIdentity, nameof(CreateAmqpUnit));
- }
return amqpUnit;
}
@@ -70,9 +65,7 @@ public AmqpUnit CreateAmqpUnit(
private void OnConnectionClosed(object o, EventArgs args)
{
if (Logging.IsEnabled)
- {
Logging.Enter(this, o, nameof(OnConnectionClosed));
- }
if (_amqpIotConnection != null && ReferenceEquals(_amqpIotConnection, o))
{
@@ -87,25 +80,21 @@ private void OnConnectionClosed(object o, EventArgs args)
unit.OnConnectionDisconnected();
}
}
+
if (Logging.IsEnabled)
- {
Logging.Exit(this, o, nameof(OnConnectionClosed));
- }
}
public void Shutdown()
{
if (Logging.IsEnabled)
- {
Logging.Enter(this, _amqpIotConnection, nameof(Shutdown));
- }
_amqpAuthenticationRefresher?.StopLoop();
_amqpIotConnection?.SafeClose();
+
if (Logging.IsEnabled)
- {
Logging.Exit(this, _amqpIotConnection, nameof(Shutdown));
- }
}
public void Dispose()
@@ -122,9 +111,7 @@ private void Dispose(bool disposing)
}
if (Logging.IsEnabled)
- {
Logging.Info(this, disposing, nameof(Dispose));
- }
if (disposing)
{
@@ -144,18 +131,15 @@ private void Dispose(bool disposing)
public async Task CreateRefresherAsync(IDeviceIdentity deviceIdentity, CancellationToken cancellationToken)
{
if (Logging.IsEnabled)
- {
Logging.Enter(this, deviceIdentity, nameof(CreateRefresherAsync));
- }
AmqpIotConnection amqpIotConnection = await EnsureConnectionAsync(cancellationToken).ConfigureAwait(false);
IAmqpAuthenticationRefresher amqpAuthenticator = await amqpIotConnection
.CreateRefresherAsync(deviceIdentity, cancellationToken)
.ConfigureAwait(false);
+
if (Logging.IsEnabled)
- {
Logging.Exit(this, deviceIdentity, nameof(CreateRefresherAsync));
- }
return amqpAuthenticator;
}
@@ -163,19 +147,13 @@ public async Task CreateRefresherAsync(IDeviceIden
public async Task OpenSessionAsync(IDeviceIdentity deviceIdentity, CancellationToken cancellationToken)
{
if (Logging.IsEnabled)
- {
Logging.Enter(this, deviceIdentity, nameof(OpenSessionAsync));
- }
AmqpIotConnection amqpIotConnection = await EnsureConnectionAsync(cancellationToken).ConfigureAwait(false);
AmqpIotSession amqpIotSession = await amqpIotConnection.OpenSessionAsync(cancellationToken).ConfigureAwait(false);
if (Logging.IsEnabled)
{
Logging.Associate(amqpIotConnection, amqpIotSession, nameof(OpenSessionAsync));
- }
-
- if (Logging.IsEnabled)
- {
Logging.Exit(this, deviceIdentity, nameof(OpenSessionAsync));
}
@@ -185,9 +163,7 @@ public async Task OpenSessionAsync(IDeviceIdentity deviceIdentit
public async Task EnsureConnectionAsync(CancellationToken cancellationToken)
{
if (Logging.IsEnabled)
- {
Logging.Enter(this, nameof(EnsureConnectionAsync));
- }
AmqpIotConnection amqpIotConnection = null;
IAmqpAuthenticationRefresher amqpAuthenticationRefresher = null;
@@ -205,18 +181,15 @@ public async Task EnsureConnectionAsync(CancellationToken can
if (_amqpIotConnection == null || _amqpIotConnection.IsClosing())
{
if (Logging.IsEnabled)
- {
Logging.Info(this, "Creating new AmqpConnection", nameof(EnsureConnectionAsync));
- }
+
// Create AmqpConnection
amqpIotConnection = await _amqpIotConnector.OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
if (_deviceIdentity.AuthenticationModel == AuthenticationModel.SasGrouped)
{
if (Logging.IsEnabled)
- {
Logging.Info(this, "Creating connection wide AmqpAuthenticationRefresher", nameof(EnsureConnectionAsync));
- }
amqpAuthenticationRefresher = new AmqpAuthenticationRefresher(_deviceIdentity, amqpIotConnection.GetCbsLink());
await amqpAuthenticationRefresher.InitLoopAsync(cancellationToken).ConfigureAwait(false);
@@ -226,9 +199,7 @@ public async Task EnsureConnectionAsync(CancellationToken can
_amqpAuthenticationRefresher = amqpAuthenticationRefresher;
_amqpIotConnection.Closed += OnConnectionClosed;
if (Logging.IsEnabled)
- {
Logging.Associate(this, _amqpIotConnection, nameof(_amqpIotConnection));
- }
}
else
{
@@ -245,10 +216,9 @@ public async Task EnsureConnectionAsync(CancellationToken can
{
_lock.Release();
}
+
if (Logging.IsEnabled)
- {
Logging.Exit(this, nameof(EnsureConnectionAsync));
- }
return amqpIotConnection;
}
@@ -256,9 +226,7 @@ public async Task EnsureConnectionAsync(CancellationToken can
public void RemoveAmqpUnit(AmqpUnit amqpUnit)
{
if (Logging.IsEnabled)
- {
Logging.Enter(this, amqpUnit, nameof(RemoveAmqpUnit));
- }
lock (_unitsLock)
{
@@ -269,10 +237,9 @@ public void RemoveAmqpUnit(AmqpUnit amqpUnit)
Shutdown();
}
}
+
if (Logging.IsEnabled)
- {
Logging.Exit(this, amqpUnit, nameof(RemoveAmqpUnit));
- }
}
internal IDeviceIdentity GetDeviceIdentityOfAuthenticationProvider()
diff --git a/iothub/device/src/Transport/AmqpIot/AmqpIotConnector.cs b/iothub/device/src/Transport/AmqpIot/AmqpIotConnector.cs
index 3c80df7c55..2b83cacae5 100644
--- a/iothub/device/src/Transport/AmqpIot/AmqpIotConnector.cs
+++ b/iothub/device/src/Transport/AmqpIot/AmqpIotConnector.cs
@@ -37,7 +37,8 @@ internal AmqpIotConnector(AmqpTransportSettings amqpTransportSettings, string ho
public async Task OpenConnectionAsync(CancellationToken cancellationToken)
{
- Logging.Enter(this, nameof(OpenConnectionAsync));
+ if (Logging.IsEnabled)
+ Logging.Enter(this, nameof(OpenConnectionAsync));
var amqpTransportProvider = new AmqpTransportProvider();
amqpTransportProvider.Versions.Add(s_amqpVersion_1_0_0);
@@ -63,7 +64,8 @@ public async Task OpenConnectionAsync(CancellationToken cance
amqpConnection.Closed += amqpIotConnection.AmqpConnectionClosed;
await amqpConnection.OpenAsync(cancellationToken).ConfigureAwait(false);
- Logging.Exit(this, $"{nameof(OpenConnectionAsync)}");
+ if (Logging.IsEnabled)
+ Logging.Exit(this, $"{nameof(OpenConnectionAsync)}");
return amqpIotConnection;
}
@@ -75,7 +77,8 @@ public async Task OpenConnectionAsync(CancellationToken cance
}
finally
{
- Logging.Exit(this, nameof(OpenConnectionAsync));
+ if (Logging.IsEnabled)
+ Logging.Exit(this, nameof(OpenConnectionAsync));
}
}
diff --git a/iothub/device/src/Transport/AmqpIot/AmqpIotTransport.cs b/iothub/device/src/Transport/AmqpIot/AmqpIotTransport.cs
index 3406e2a13d..7e3c3ef2ca 100644
--- a/iothub/device/src/Transport/AmqpIot/AmqpIotTransport.cs
+++ b/iothub/device/src/Transport/AmqpIot/AmqpIotTransport.cs
@@ -77,7 +77,8 @@ public void Dispose()
internal async Task InitializeAsync(CancellationToken cancellationToken)
{
- Logging.Enter(this, nameof(InitializeAsync));
+ if (Logging.IsEnabled)
+ Logging.Enter(this, nameof(InitializeAsync));
TransportBase transport;
@@ -96,7 +97,8 @@ internal async Task InitializeAsync(CancellationToken cancellatio
default:
throw new InvalidOperationException("AmqpTransportSettings must specify WebSocketOnly or TcpOnly");
}
- Logging.Exit(this, nameof(InitializeAsync));
+ if (Logging.IsEnabled)
+ Logging.Exit(this, nameof(InitializeAsync));
return transport;
}
@@ -107,7 +109,8 @@ private async Task CreateClientWebSocketTransportAsync(Cancellati
{
cancellationToken.ThrowIfCancellationRequested();
- Logging.Enter(this, nameof(CreateClientWebSocketTransportAsync));
+ if (Logging.IsEnabled)
+ Logging.Enter(this, nameof(CreateClientWebSocketTransportAsync));
string additionalQueryParams = "";
var websocketUri = new Uri($"{WebSocketConstants.Scheme}{_hostName}:{WebSocketConstants.SecurePort}{WebSocketConstants.UriSuffix}{additionalQueryParams}");
@@ -139,7 +142,8 @@ private async Task CreateClientWebSocketTransportAsync(Cancellati
}
finally
{
- Logging.Exit(this, $"{nameof(CreateClientWebSocketTransportAsync)}");
+ if (Logging.IsEnabled)
+ Logging.Exit(this, $"{nameof(CreateClientWebSocketTransportAsync)}");
}
}
@@ -161,7 +165,8 @@ private async Task CreateClientWebSocketAsync(Uri websocketUri,
{
try
{
- Logging.Enter(this, nameof(CreateClientWebSocketAsync));
+ if (Logging.IsEnabled)
+ Logging.Enter(this, nameof(CreateClientWebSocketAsync));
var websocket = new ClientWebSocket();
@@ -177,13 +182,22 @@ private async Task CreateClientWebSocketAsync(Uri websocketUri,
{
// Configure proxy server
websocket.Options.Proxy = webProxy;
- Logging.Info(this, $"{nameof(CreateClientWebSocketAsync)} Setting ClientWebSocket.Options.Proxy");
+ if (Logging.IsEnabled)
+ Logging.Info(this, $"{nameof(CreateClientWebSocketAsync)} Set ClientWebSocket.Options.Proxy to {webProxy}");
}
}
catch (PlatformNotSupportedException)
{
// .NET Core 2.0 doesn't support proxy. Ignore this setting.
- Logging.Error(this, $"{nameof(CreateClientWebSocketAsync)} PlatformNotSupportedException thrown as .NET Core 2.0 doesn't support proxy");
+ if (Logging.IsEnabled)
+ Logging.Error(this, $"{nameof(CreateClientWebSocketAsync)} PlatformNotSupportedException thrown as .NET Core 2.0 doesn't support proxy");
+ }
+
+ if (_amqpTransportSettings.WebSocketKeepAlive.HasValue)
+ {
+ websocket.Options.KeepAliveInterval = _amqpTransportSettings.WebSocketKeepAlive.Value;
+ if (Logging.IsEnabled)
+ Logging.Info(this, $"{nameof(CreateClientWebSocketAsync)} Set websocket keep-alive to {_amqpTransportSettings.WebSocketKeepAlive}");
}
if (_amqpTransportSettings.ClientCertificate != null)
@@ -196,7 +210,8 @@ private async Task CreateClientWebSocketAsync(Uri websocketUri,
if (_amqpTransportSettings.RemoteCertificateValidationCallback != null)
{
websocket.Options.RemoteCertificateValidationCallback = _amqpTransportSettings.RemoteCertificateValidationCallback;
- Logging.Info(this, $"{nameof(CreateClientWebSocketAsync)} Setting RemoteCertificateValidationCallback");
+ if (Logging.IsEnabled)
+ Logging.Info(this, $"{nameof(CreateClientWebSocketAsync)} Setting RemoteCertificateValidationCallback");
}
#endif
await websocket.ConnectAsync(websocketUri, cancellationToken).ConfigureAwait(false);
@@ -205,7 +220,8 @@ private async Task CreateClientWebSocketAsync(Uri websocketUri,
}
finally
{
- Logging.Exit(this, nameof(CreateClientWebSocketAsync));
+ if (Logging.IsEnabled)
+ Logging.Exit(this, nameof(CreateClientWebSocketAsync));
}
}
diff --git a/iothub/device/src/Transport/Mqtt/ClientWebSocketChannel.cs b/iothub/device/src/Transport/Mqtt/ClientWebSocketChannel.cs
index 677f1eaaac..89b6025aa7 100644
--- a/iothub/device/src/Transport/Mqtt/ClientWebSocketChannel.cs
+++ b/iothub/device/src/Transport/Mqtt/ClientWebSocketChannel.cs
@@ -354,8 +354,8 @@ private async Task DoReadBytesAsync(IByteBuffer byteBuffer)
try
{
WebSocketReceiveResult receiveResult = await _webSocket
- .ReceiveAsync(new ArraySegment(byteBuffer.Array, byteBuffer.ArrayOffset + byteBuffer.WriterIndex, byteBuffer.WritableBytes), CancellationToken.None)
- .ConfigureAwait(false);
+ .ReceiveAsync(new ArraySegment(byteBuffer.Array, byteBuffer.ArrayOffset + byteBuffer.WriterIndex, byteBuffer.WritableBytes), CancellationToken.None)
+ .ConfigureAwait(false);
if (receiveResult.MessageType == WebSocketMessageType.Text)
{
throw new ProtocolViolationException("Mqtt over WS message cannot be in text");
diff --git a/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs b/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs
index f26b7a7ce1..d5c28ca79f 100644
--- a/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs
+++ b/iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs
@@ -429,7 +429,9 @@ private async void ScheduleCheckConnectTimeoutAsync(IChannelHandlerContext conte
try
{
- await context.Channel.EventLoop.ScheduleAsync(s_checkConnAckTimeoutCallback, context, _mqttTransportSettings.ConnectArrivalTimeout).ConfigureAwait(true);
+ await context.Channel.EventLoop
+ .ScheduleAsync(s_checkConnAckTimeoutCallback, context, _mqttTransportSettings.ConnectArrivalTimeout)
+ .ConfigureAwait(true);
}
catch (Exception ex) when (!ex.IsFatal())
{
diff --git a/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs b/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs
index 405cc05a52..5c35743c3a 100644
--- a/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs
+++ b/iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs
@@ -1265,13 +1265,17 @@ private Func> CreateChannelFactory(IotHubConnec
};
}
- private Func> CreateWebSocketChannelFactory(IotHubConnectionString iotHubConnectionString, MqttTransportSettings settings, ProductInfo productInfo, ClientOptions options)
+ private Func> CreateWebSocketChannelFactory(
+ IotHubConnectionString iotHubConnectionString,
+ MqttTransportSettings settings,
+ ProductInfo productInfo,
+ ClientOptions options)
{
return async (address, port) =>
{
string additionalQueryParams = "";
- var websocketUri = new Uri(WebSocketConstants.Scheme + iotHubConnectionString.HostName + ":" + WebSocketConstants.SecurePort + WebSocketConstants.UriSuffix + additionalQueryParams);
+ var websocketUri = new Uri($"{WebSocketConstants.Scheme}{iotHubConnectionString.HostName}:{WebSocketConstants.SecurePort}{WebSocketConstants.UriSuffix}{additionalQueryParams}");
var websocket = new ClientWebSocket();
websocket.Options.AddSubProtocol(WebSocketConstants.SubProtocols.Mqtt);
@@ -1282,7 +1286,7 @@ private Func> CreateWebSocketChannelFactory(Iot
// Configure proxy server
websocket.Options.Proxy = _webProxy;
if (Logging.IsEnabled)
- Logging.Info(this, $"{nameof(CreateWebSocketChannelFactory)} Setting ClientWebSocket.Options.Proxy");
+ Logging.Info(this, $"{nameof(CreateWebSocketChannelFactory)} Set ClientWebSocket.Options.Proxy to {_webProxy}");
}
}
catch (PlatformNotSupportedException)
@@ -1292,13 +1296,20 @@ private Func> CreateWebSocketChannelFactory(Iot
Logging.Error(this, $"{nameof(CreateWebSocketChannelFactory)} PlatformNotSupportedException thrown as .NET Core 2.0 doesn't support proxy");
}
+ if (settings.WebSocketKeepAlive.HasValue)
+ {
+ websocket.Options.KeepAliveInterval = settings.WebSocketKeepAlive.Value;
+ if (Logging.IsEnabled)
+ Logging.Info(this, $"{nameof(CreateWebSocketChannelFactory)} Set websocket keep-alive to {settings.WebSocketKeepAlive}");
+ }
+
if (settings.ClientCertificate != null)
{
websocket.Options.ClientCertificates.Add(settings.ClientCertificate);
}
// Support for RemoteCertificateValidationCallback for ClientWebSocket is introduced in .NET Standard 2.1
-#if NETSTANDARD2_1
+#if NETSTANDARD2_1_OR_GREATER
if (settings.RemoteCertificateValidationCallback != null)
{
websocket.Options.RemoteCertificateValidationCallback = settings.RemoteCertificateValidationCallback;
diff --git a/iothub/device/src/Transport/Mqtt/MqttTransportSettings.cs b/iothub/device/src/Transport/Mqtt/MqttTransportSettings.cs
index ed9c960a79..d72da354c2 100644
--- a/iothub/device/src/Transport/Mqtt/MqttTransportSettings.cs
+++ b/iothub/device/src/Transport/Mqtt/MqttTransportSettings.cs
@@ -182,7 +182,7 @@ public bool CertificateRevocationCheck
public bool CleanSession { get; set; }
///
- /// The interval, in seconds, that the client establishes with the service, for sending keep-alive pings.
+ /// The interval, in seconds, that the client establishes with the service, for sending protocol-level keep-alive pings.
/// The default is 300 seconds.
///
///
@@ -193,6 +193,15 @@ public bool CertificateRevocationCheck
///
public int KeepAliveInSeconds { get; set; }
+ ///
+ /// A keep-alive for the transport layer in sending ping/pong control frames when using web sockets.
+ ///
+ ///
+ /// This value is different from the protocol-level keep-alive packets that are sent over the overlaying MQTT transport protocol.
+ ///
+ ///
+ public TimeSpan? WebSocketKeepAlive { get; set; }
+
///
/// Indicates whether the transport has a will message.
///
diff --git a/iothub/device/tests/Amqp/MoqableAmqpUnit.cs b/iothub/device/tests/Amqp/MoqableAmqpUnit.cs
index 60a2f1f088..9afc0ef678 100644
--- a/iothub/device/tests/Amqp/MoqableAmqpUnit.cs
+++ b/iothub/device/tests/Amqp/MoqableAmqpUnit.cs
@@ -16,8 +16,19 @@ namespace Microsoft.Azure.Devices.Client.Test.Transport
{
internal class MoqableAmqpUnit : AmqpUnit
{
- public MoqableAmqpUnit() : this(new DeviceIdentity(IotHubConnectionStringExtensions.Parse(AmqpTransportHandlerTests.TestConnectionString), new AmqpTransportSettings(TransportType.Amqp_Tcp_Only), new ProductInfo(), new ClientOptions()),
- new AmqpConnectionHolder(new DeviceIdentity(IotHubConnectionStringExtensions.Parse(AmqpTransportHandlerTests.TestConnectionString), new AmqpTransportSettings(TransportType.Amqp_Tcp_Only), new ProductInfo(), new ClientOptions())))
+ public MoqableAmqpUnit()
+ : this(
+ new DeviceIdentity(
+ IotHubConnectionStringExtensions.Parse(AmqpTransportHandlerTests.TestConnectionString),
+ new AmqpTransportSettings(TransportType.Amqp_Tcp_Only),
+ new ProductInfo(),
+ new ClientOptions()),
+ new AmqpConnectionHolder(
+ new DeviceIdentity(
+ IotHubConnectionStringExtensions.Parse(AmqpTransportHandlerTests.TestConnectionString),
+ new AmqpTransportSettings(TransportType.Amqp_Tcp_Only),
+ new ProductInfo(),
+ new ClientOptions())))
{
}