-
Notifications
You must be signed in to change notification settings - Fork 463
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
edgeHub Sometimes Loses Subscriptions When Restarted #4050
Comments
From the linked azure-iot-sdk-python issue:
|
EDIT: hold this if you were about to try, now I have a theory so at this moment I am not curious if it works with older versions: @jackt-moran I see it is 1.0.10 version of edgehub. Is it hard to try with an older version, e.g. 1.0.9? I see that it happens in production, so I assume a downgrade for a test would be complicated - only asking because that would help a lot if it turned out that this is a regression. |
@jackt-moran can you give me an edgeHub log from your staging environment where restarts work correctly? I need the first segment of the log (with the logo and then the next 10 seconds). |
@jackt-moran in the staging environment do the clients use MQTT (vs AMPQ) to connect? MTC0304/IoTEdgeASA They were able to restore their session state from a store, and that session state contained the subscriptions (and for IoTEdgeASA, one of the subscription was for module messages). Later that is the reason IoTEdgeASA was able to receive M2M messages. For the rest of the clients I don't see in the logs that it recovers the session state. It may be caused by different things, e.g if it is an MQTT client and it has its clean session flag on, then the session state will not be stored. Or for some reason the session state was not stored. If it works in staging, I wonder if it uses the same protocol (MQTT), and if we looked at those logs, we would find lines for the rest of the devices like: Set subscriptions from session state for MTC0304/IoTEdgeASA Is it something you could search for in the staging logs? (e.g. for device MessageLogger) |
Hi @vipeller ,
Unfortunately I may have miscommunicated things between the two issues I've opened. What I got to work in my staging environment and not production was a workaround implemented in my custom IoT Edge modules using the Python SDK. The gist of the workaround is a coroutine that monitors the state of the Python module client's connection and forces the container to exit if the client becomes disconnected (thereby forcing edgeHub and the module to "start over", so to speak). The workaround's difference in behavior in staging/production was surprising and there is likely a clue contained in that discrepancy. Since testing that workaround I have reverted our deployments to versions without the workaround, and our staging and production environments now behave the same (which is what I shared above).
Both in staging and production the clients that don't reconnect properly use the Python SDK. As a result, they must be using MQTT https://github.com/Azure/azure-iot-sdk-python#features.
Seeing as both our staging and production environments exhibit the issue and our clients use MQTT in both environments, I assume it will be helpful to know if there was a regression and this is easy for me to test in our staging environment. I have started a deployment (reverting edgeHub and edgeAgent back to version 1.0.9.5-linux-amd64) , but it is taking some time for the deployment to go through so I will get back to you ASAP on if it appears to be a regression or not. |
@BertKleewein I need your help with the Python SDK: What I see right now is that two modules (IoTEdgeASA/blobstorageiotedge) are able to recover their states after edgeHub restarted. The way it works is the following: MQTT has the concept of persistent sessions (that is not an official name, but many people refers it like that). These persistent sessions are created when a client connects and uses 0 value for the CleanSession flag. When a client connects with persistent session, edgeHub is going to store all subscriptions in a persistent store for that given client. It stores it immediately when a client subscribes/unsubscribes. If a client such that gets disconnected and then reconnects (and its CleanSession flag is still 0), then EdgeHub reads the persistent store, it gets the stored subscription list and resubscribes to those topics. This can be seen in the logs (for IotEdgeASA and blobstorageiotedge): 2020-12-04 16:12:08.964 +00:00 [INF] [Microsoft.Azure.Devices.Edge.Hub.Mqtt.SessionStateStoragePersistenceProvider] - Set subscriptions from session state for MTC0304/IoTEdgeASA No lines like this can be found for the other modules, so it seems that edgehub does not try to reapply any stored state. For me it seems that it is because these module don't connect with CleanSession flag 0. Unfortunately the log does not tell if a client comes with CleanSession 0, also it does not log when it stores a subscription. The only thing we see is that it restores from persistent store for those two modules and not for the rest. So my question for you is that what are the options to control the CleanSession flag when we use Python SDK and what is its default value. I know that in case of the C# SDK the flag can be set and its default value is 0 (meaning that when the user does not do anything, it will create persistent sessions, so subscriptions will be reapplied if EdgeHub restarts) Now what I am hoping is that you would say that Python also lets this flag control and its default value is 1, so all @jackt-moran needs to do is to set the flag to 0 and that solves all our problems :) |
@vipeller - the azure-iot-sdk-python library hard codes the clean_session flag to False. I'm going to read into the transport to see if there are any code paths that can change this. |
@vipeller - Paho logs this for us. It's false in @jackt-moran 's logs:
u,p,wr,wq,wf, and c are the bitfields in the MQTT connect_flags byte, with c being |
@jackt-moran - just to be 100% clear:
|
@vipeller - I'm having some trouble rolling edgeHub back to version 1.0.9.5 as something from the downgrade is causing it to fail. I'll keep working on getting it up and running just keeping you up to date. |
@vipeller - I can confirm that the issue is not a regression, at least not from version 1.0.9 to 1.0.10. I tested with the following versions: When following the steps to reproduce the behavior of edgeHub 1.0.9 was the same as 1.0.10, as far as I could tell. While I have version 1.0.9 installed please let me know if any additional tests or logs would be useful. |
@jackt-moran thank you. There is a special setting (an environment variable) which turns on a low level debug mode of that mqtt library we use internally, so then we can see the mqtt packets logged. Unfortunately, I could not find it yet (it is not documented), but I want to apply that first, so we can check how the connection/communication goes. |
@vipeller did you end up finding the environment variable to turn on the debug mode of the mqtt library? Is there anything else I can do to help? |
@vipeller @BertKleewein hi there folks. I got a chance to try and reproduce the issue with a different language/SDK, namely C#. I used the cookie cutter sample module that the VS Code extension produces, which is pasted below. I was not able to reproduce, which may suggest we were going down the wrong path. It's also worth noting a couple of things:
Here's the C# module, which simply logs data coming through to the console: namespace SampleModule
{
using System;
using System.IO;
using System.Runtime.InteropServices;
using System.Runtime.Loader;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Client.Transport.Mqtt;
class Program
{
static int counter;
static void Main(string[] args)
{
Init().Wait();
// Wait until the app unloads or is cancelled
var cts = new CancellationTokenSource();
AssemblyLoadContext.Default.Unloading += (ctx) => cts.Cancel();
Console.CancelKeyPress += (sender, cpe) => cts.Cancel();
WhenCancelled(cts.Token).Wait();
}
/// <summary>
/// Handles cleanup operations when app is cancelled or unloads
/// </summary>
public static Task WhenCancelled(CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<bool>();
cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).SetResult(true), tcs);
return tcs.Task;
}
/// <summary>
/// Initializes the ModuleClient and sets up the callback to receive
/// messages containing temperature information
/// </summary>
static async Task Init()
{
MqttTransportSettings mqttSetting = new MqttTransportSettings(TransportType.Mqtt_Tcp_Only);
ITransportSettings[] settings = { mqttSetting };
// Open a connection to the Edge runtime
ModuleClient ioTHubModuleClient = await ModuleClient.CreateFromEnvironmentAsync(settings);
await ioTHubModuleClient.OpenAsync();
Console.WriteLine("IoT Hub module client initialized.");
// Register callback to be called when a message is received by the module
await ioTHubModuleClient.SetInputMessageHandlerAsync("input1", PipeMessage, ioTHubModuleClient);
}
/// <summary>
/// This method is called whenever the module is sent a message from the EdgeHub.
/// It just pipe the messages without any change.
/// It prints all the incoming messages.
/// </summary>
static async Task<MessageResponse> PipeMessage(Message message, object userContext)
{
int counterValue = Interlocked.Increment(ref counter);
var moduleClient = userContext as ModuleClient;
if (moduleClient == null)
{
throw new InvalidOperationException("UserContext doesn't contain " + "expected values");
}
byte[] messageBytes = message.GetBytes();
string messageString = Encoding.UTF8.GetString(messageBytes);
Console.WriteLine($"Received message: {counterValue}, Body: [{messageString}]");
return MessageResponse.Completed;
}
}
} Along with the following .csproj file:
For comparison, I am able to reproduce the issue with the following Python (azure-iot-device==2.4.0): import asyncio
from asyncio.tasks import wait, wait_for
import concurrent
from azure.iot.device.aio import IoTHubModuleClient
from azure.iot.device import Message
from azure.iot.device.exceptions import (
CredentialError,
ConnectionFailedError,
ConnectionDroppedError,
ClientError,
)
logging.basicConfig(level=logging.DEBUG)
async def shutdown(signal, loop):
"""Cleanup tasks tied to the service's shutdown."""
print("Received exit signal %s..." % (signal))
print("Clearing out messages in message buffer")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
[task.cancel() for task in tasks]
print("Cancelling outstanding tasks")
await asyncio.gather(*tasks, return_exceptions=True)
async def setup_shutdown():
"""
Adds the shutdown coroutine as a signal handler in the event loop
for SIGHUP, SIGTERM, and SIGINT signals.
"""
if sys.platform.startswith("linux"):
loop = asyncio.get_running_loop()
# May want to catch other signals too
signals = (signal.SIGTERM, signal.SIGINT)
for s in signals:
loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(shutdown(s, loop))
)
async def monitor_client_connection(client: IoTHubModuleClient):
""""""
while True:
if not client.connected:
raise asyncio.CancelledError
else:
print("INFO: IoT Edge module client is connected.")
await asyncio.sleep(60)
def init_module_client() -> IoTHubModuleClient:
"""Wrapper for IoTHubModuleClient.create_from_edge_environment()
Used for testing purposes.
Returns:
The result of the azure-iot-device API call."""
return IoTHubModuleClient.create_from_edge_environment()
async def connect_module_client(module_client: IoTHubModuleClient):
"""
Connects a Module Client to the IoT Edge runtime's Edge Hub.
This coroutine is interruptible, but will otherwise attempt
to connect forever until successful.
Returns:
None.
"""
connected = False
while not connected:
try:
await module_client.connect()
connected = True
except CredentialError:
traceback.print_exc()
raise
except (ConnectionFailedError, ConnectionDroppedError):
traceback.print_exc()
print("Attempting to connect again")
except ClientError:
traceback.print_exc()
raise
await asyncio.sleep(1)
async def wait_forever():
while True:
await asyncio.sleep(60)
async def main():
try:
await setup_shutdown()
module_client = init_module_client()
await connect_module_client(module_client)
connected = True
async def message_handler(message: Message):
if message.input_name == "log":
print(message.data)
else:
print("message received on unknown input")
module_client.on_message_received = message_handler
listeners = asyncio.gather(
wait_forever(),
)
await listeners
except (asyncio.CancelledError, concurrent.futures.CancelledError):
if connected:
await module_client.disconnect()
print("Disconnected module client.")
print("Caught cancellation in main coroutine gracefully exiting.")
if __name__ == "__main__":
asyncio.run(main()) The main "symptom" I can observe in edgeHub's logs, which is either highlighted above or in the linked issue, is the following reccurring log after edgeHub is restarted:
On the other hand, there is no such message for the sample C# module I shared above. It seems that there is some sort of incompatibility between the Python SDK and edgeHub that does not exist between the C# SDK and edgeHub. Hopefully this additional info is helpful in tracking down the root cause, and please let me know if I can provide further info such as logs. |
@jackt-moran Thank you for trying this out. The C# SDK is implemented the way that it automatically resubscribes to topics when it loses the connection and connects back. That means that even if edgehub does not able to reload the old subscriptions, it receives new ones, so it will not miss the subscription as the log message says you found. Regarding the switch I was talking about, it turns on only the upstream mqtt communication, not between the device-edgehub, so that cannot be used to check the flags that I wanted to see. What I can do is to use your test code and debug edgehub locally on my desktop to see what happens with the subscriptions (if it tries to store it or not, and if it tries, why does not it have any effect later) |
@vipeller thanks for getting back, it would definitely be good to know if someone else can reproduce or if it is just me doing something silly. In case you'd use iotedgedev for testing, I can confirm the issue occurs for me there as well. Just to make sure we're completely on the same page, here is my requirements file for the sample Python module provided above:
and the Dockerfile (amd64): FROM amd64/python:3.7-slim-buster
WORKDIR /app
COPY . .
RUN pip install --upgrade pip
RUN pip install -r requirements.txt
RUN useradd -ms /bin/bash moduleuser
RUN chown -R moduleuser:moduleuser .
RUN chmod -R 700 .
USER moduleuser
CMD [ "python3", "-u", "./main.py" ] |
@jackt-moran a quick update: I managed to repro this in my environment. At this moment I don't know what causes it, what I know is that the subscription (for module-to-module messages) gets retrieved from the persistent store of edgeHub when the python client reconnects - this means that edgeHub knows about this subscription when it recovers, however apparently this information is not get propagated later (it gets lost somewhere) I am keep debugging. |
@vipeller thanks for the update, it is nice to hear the issue is reproducible. Let me know if there's anything I can do to help. |
This issue is being marked as stale because it has been open for 30 days with no activity. |
@jackt-moran Hi, sorry to not coming back, but the update is that we found the problem in the code and it is fixed, it will be included in one of the upcoming releases |
@vipeller hi, no problem at all. Thanks for the update and I appreciate the fix! Will this issue be linked to the release when the time comes? |
I just observed this bug too, still running 1.0.10.4. Has the fix been deployed and which release is this linked to? |
Currently, this has been fixed in 1.2, but has not yet been cherry-picked into 1.1: PR for 1.1 branch - #5048 |
Expected Behavior
When the edgeHub container is restarted it should not lose subscriptions and/or it should gracefully reestablish connection with module clients.
Current Behavior
Sometimes edgeHub loses subscriptions upon restart, which causes module clients to stop receiving messages from edgeHub indefinitely.
Steps to Reproduce
Provide a detailed set of steps to reproduce the bug.
iotedge restart edgeHub
).Context (Environment)
Output of
iotedge check
Click here
Device Information
Runtime Versions
iotedged [run
iotedge version
]: 1.0.10.2Edge Agent [image tag (e.g. 1.0.0)]: azureiotedge-agent:1.0.10-linux-amd64
Edge Hub [image tag (e.g. 1.0.0)]: azureiotedge-hub:1.0.10-linux-amd64
Docker/Moby [run
docker version
]:Client:
Version: 19.03.13+azure
API version: 1.40
Go version: go1.13.15
Git commit: cd8016b6bcc51a51f577bcffb37e6a870f7813f9
Built: Tue Sep 15 23:02:04 2020
OS/Arch: linux/amd64
Experimental: false
Server:
Engine:
Version: 19.03.13+azure
API version: 1.40 (minimum version 1.12)
Go version: go1.13.15
Git commit: bd33bbf0497b2327516dc799a5e541b720822a4c
Built: Mon Mar 12 00:00:00 2018
OS/Arch: linux/amd64
Experimental: false
containerd:
Version: 1.3.9+azure
GitCommit: ea765aba0d05254012b0b9e595e995c09186427f
runc:
Version: 1.0.0-rc92
GitCommit: ff819c7e9184c13b7c2607fe6c30ae19403a7aff
docker-init:
Version: 0.18.0
GitCommit:
Note: when using Windows containers on Windows, run
docker -H npipe:////./pipe/iotedge_moby_engine version
insteadLogs
iotedged logs
edge-agent logs
edge-hub logs
Gist here (too large to include here).
Additional Information
Please provide any additional information that may be helpful in understanding the issue.
I have not checked if I can reproduce this issue with another language and SDK, but I did initially open this issue in the Python SDK repository here: Azure/azure-iot-sdk-python#713. @BertKleewein from the Python SDK helped me out in tracking down the issue and we believe it to be an issue on the edgeHub side. In the linked, original issue you can find our discussion as well as some additional logs from a different reproduction of the issue.
The text was updated successfully, but these errors were encountered: