Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In some circumstances DeviceClient.SendEventAsync can go wild, consume all available memory #3342

Closed
ccruden-aspire opened this issue Jun 17, 2023 · 4 comments
Labels
bug Something isn't working. fix-checked-in Fix checked into main or preview, but not yet released. IoTSDK Tracks all IoT SDK issues across the board

Comments

@ccruden-aspire
Copy link

Context

  • OS: Windows Server 2022 Datacenter 21H2
  • .NET framework version: .NET 6.0
  • Device: Azure VM
  • SDK: Microsoft.Azure.Devices.Client 1.40 originally and patched to 1.41 and still exhibiting the problem.

Description of the issue

We have a service whose only job is to read messages from files in a directory and write the contents of those files to IoT hub messages. It has been in service at dozens of sites for several years transferring gigabytes a day without seeing this problem. However, starting a day ago, it seems two IoT hubs have entered a state which seems to cause a message sent with SendEventAsync to block / loop, and consume all available memory. In seconds the process goes from using 70M of memory to 25G and either has to be killed off or the machine rebooted. This only seems to occur under particular circumstances:

  • There must be multiple threads calling SendEventAsync at once - when only one thread calls it, the problem doesn't occur. (Our default installation has 8 threads calling SendEventAsync.)
  • DotNetty trace logging cannot be enabled. If I run a trace with all except DotNetty from https://github.com/Azure/azure-iot-sdk-csharp/blob/main/tools/CaptureLogs/iot_providers.txt, the problem recurs. If I run a trace with only DotNetty, or with DotNetty and any others, it does not.
    At these two sites it takes five minutes or less for the service to enter the "consume all memory" state. The sites appear to be having other probably related problems. The service starts up, sends for about a minute, then logs that it's been disconnected from IoT hub and is reconnecting. A minute later it starts sending again, and a minute after that gets disconnected again. We are still working on getting access to IoT hub logs.

Code sample exhibiting the issue

It's impossible to isolate this to code since the issue is only happening at a couple of production sites and we don't have access to experiment with those IoT hubs. Relevant code included below. The service involves pulling from a BlockingCollection of IQueueMessages and awaiting a call to SendToHub.

    private const int MaxMessageSize = 262144;

    public bool IsSending { get; private set; }
    private DeviceClient _IOTHubClient;
    private ILogger<IotHubClient> _log;
    private SemaphoreSlim _ioHubLock;
    private DateTime firstDisabled;
    public event EventHandler UnrecoverableError;
    private int messagesSent;
    private int bytesSent;
    public IotHubClient(ILogger<IotHubClient> log)
    {
        _log = log;
        _ioHubLock = new SemaphoreSlim(1, 1);
        IsSending = true; //so that we at least try once to process some files on startup            
    }

    public string ConnectionString { get; set; }
    public string ContentType { get; set; }
    public string ContentEncoding { get; set; }
    public IDictionary<string, string> Properties { get; set; }

    private async Task ConnectToHub()
    {
        if (_IOTHubClient == null)
        {
            try
            {
                await _ioHubLock.WaitAsync();
                if (_IOTHubClient == null)
                {
                    (string host, string device) = GetHostAndDevice(ConnectionString);
                    _log.LogDebug("IOTHubConsumer Store And Forward created for host {host}, device {device}", host, device);
                    DeviceClient client = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt);
                    await client.OpenAsync();
                    IRetryPolicy retryPolicy = new ExponentialBackoff(int.MaxValue, TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(10), TimeSpan.FromMilliseconds(100));
                    client.SetRetryPolicy(retryPolicy);
                    client.SetConnectionStatusChangesHandler(ConnectionStatusChangedHandler);
                    _IOTHubClient = client;
                    _log.LogDebug("Successfully opened IOT hub");
                    firstDisabled = DateTime.MaxValue;
                    IsSending = true;
                }
            }
            catch (Exception ex)
            {
                _log.LogError(ex, "Exception opening IOT hub");
                _IOTHubClient = null;
                throw;
            }
            finally
            {
                _ioHubLock.Release();
            }
        }
    }

    private async Task DisconnectClient()
    {
        IsSending = false;
        DeviceClient local = _IOTHubClient;
        _IOTHubClient = null;
        if (local != null)
        {
            await local.CloseAsync();
            local.Dispose();
        }
    }

    private Task Bail()
    {
        IsSending = false;
        UnrecoverableError?.Invoke(this, new EventArgs());
        return Task.CompletedTask;
    }

    private async void ConnectionStatusChangedHandler(ConnectionStatus status, ConnectionStatusChangeReason reason)
    {
        switch (status)
        {
            case ConnectionStatus.Connected:
                _log.LogDebug("Successfully connected to IOT hub");
                IsSending = true;
                break;

            case ConnectionStatus.Disconnected_Retrying:
                _log.LogDebug("IOT hub disconnected - retrying.");
                IsSending = false;
                break;

            case ConnectionStatus.Disabled:
                _log.LogInformation("IOT hub disabled - reconnecting manually.");
                await DisconnectClient();
                break;

            case ConnectionStatus.Disconnected:
                switch (reason)
                {
                    case ConnectionStatusChangeReason.Bad_Credential:
                        _log.LogInformation("IOT hub credentials rejected. Retrying.");
                        try
                        {
                            DeviceClient client = DeviceClient.CreateFromConnectionString(ConnectionString, TransportType.Mqtt);
                            await client.OpenAsync();
                            client.SetConnectionStatusChangesHandler(ConnectionStatusChangedHandler);
                            IRetryPolicy retryPolicy = new ExponentialBackoff(int.MaxValue, TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(10), TimeSpan.FromMilliseconds(100));
                            client.SetRetryPolicy(retryPolicy);
                            await _ioHubLock.WaitAsync();
                            _IOTHubClient = client;
                            _log.LogInformation("Retry successful.");
                            _ioHubLock.Release();
                        }
                        catch (Exception ex)
                        {
                            _log.LogCritical(ex, "Exception re-establishing connection.");
                            await Bail();
                        }
                        break;

                    case ConnectionStatusChangeReason.Client_Close:
                        _log.LogDebug("IOT hub closed gracefully.");
                        break;

                    case ConnectionStatusChangeReason.Communication_Error:
                        _log.LogInformation("IOT hub disconnected because of non-retryable exception. Restarting.");
                        await DisconnectClient();
                        break;

                    case ConnectionStatusChangeReason.Device_Disabled:
                        // This is gonna lose data, but at this point that's unavoidable.
                        if (firstDisabled - DateTime.Now > TimeSpan.FromMinutes(5))
                        {
                            _log.LogCritical($"The IOT hub device for the store and forward has been disabled or deleted for more than 5 minutes. Aborting.");
                            await Bail();
                        }
                        else
                        {
                            // Hopefully the device has just been temporarily disabled and it will be enabled before the data piling up kills us off.
                            if (firstDisabled == DateTime.MaxValue)
                            {
                                firstDisabled = DateTime.Now;
                            }
                            _log.LogCritical($"The IOT hub device for the store and forward has been disabled or deleted. Retrying, but it's not looking good.");
                            await DisconnectClient();
                        }
                        break;

                    case ConnectionStatusChangeReason.Retry_Expired:
                        _log.LogInformation("IOT hub disconnected because retry expired. Retrying more forcibly.");
                        await DisconnectClient();
                        break;

                    // No_Network is not used.
                    case ConnectionStatusChangeReason.No_Network:
                    // Expired_SAS_Token is not used
                    case ConnectionStatusChangeReason.Expired_SAS_Token:
                    default:
                        _log.LogError($"ConnectionStatus {status}, ConnectionStatusChangeReason {reason}: this should never happen. Contact support if you see this message.");
                        break;
                }
                break;

            default:
                _log.LogError($"ConnectionStatus {status}, ConnectionStatusChangeReason {reason}: this should never happen. Contact support if you see this message.");
                break;
        }
    }

    public async Task SendToHub(IQueuedMessage message)
    {
        try
        {
            await ConnectToHub();
            byte[] messageBytes = await message.GetBytes();
            if (messageBytes.Length == 0)
            {
                await message.MarkProcessed();
            }
            else if (messageBytes.Length > 0 && messageBytes.Length < MaxMessageSize)
            {
                using Message msg = new(messageBytes);
                if (!string.IsNullOrEmpty(ContentEncoding))
                {
                    msg.ContentEncoding = ContentEncoding;
                }
                if (!string.IsNullOrEmpty(ContentType))
                {
                    msg.ContentType = ContentType;
                }
                if (Properties != null)
                {
                    foreach(KeyValuePair<string,string> pair in Properties)
                    {
                        msg.Properties[pair.Key] = pair.Value;
                    }
                }

                await _IOTHubClient.SendEventAsync(msg);
                _log.LogDebug($"Sent {messageBytes.Length} byte message to IoTHub");
                Interlocked.Increment(ref messagesSent);
                Interlocked.Add(ref bytesSent, messageBytes.Length);
                await message.MarkProcessed();
            }
            else
            {
                await message.MarkPoisoned();
            }
        }
        catch (Exception ex)
        {
            _log.LogWarning(ex, "Error encountered while sending data to IoTHub");
            await message.Requeue();
        }
    }

Console log of the issue

The service's log at Debug level ends on a line with "Sent # byte message to IoTHub" several seconds before the process had to be killed off. Before that, that message was occurring multiple times a second.
adc_000001.zip

@ccruden-aspire ccruden-aspire added the bug Something isn't working. label Jun 17, 2023
@github-actions github-actions bot added the IoTSDK Tracks all IoT SDK issues across the board label Jun 17, 2023
@ccruden-aspire
Copy link
Author

On a hunch, I tried replacing the DotNetty 0.7.0 assemblies included with MADC 1.40 with DotNetty 0.7.5 assemblies, and that appears to have resolved the main problem in this issue - the giant memory consumption. The issue where the service stops and starts sending regularly is still present, but that's probably IoT hub related...

@romandres
Copy link

I'm encountering a similar issue when trying to connect a device using OpenAsync() that is disabled in the IoT Hub. The process then allocates all available memory in almost no time.

I'm on

  • .NET 6.0
  • Microsoft.Azure.Devices.Client 1.42

Updating to DotNetty 0.7.5 seems to have fixed that issue, thanks @ccruden-aspire!

@timtay-microsoft
Copy link
Member

We've upgraded the dotnetty dependency to the latest in #3401, so I'll mark this as fix checked in

@timtay-microsoft timtay-microsoft added the fix-checked-in Fix checked into main or preview, but not yet released. label Dec 1, 2023
@timtay-microsoft
Copy link
Member

This issue has been fixed as of this release, so I'll close this thread

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working. fix-checked-in Fix checked into main or preview, but not yet released. IoTSDK Tracks all IoT SDK issues across the board
Projects
None yet
Development

No branches or pull requests

3 participants