diff --git a/main/ZgatewayBT.ino b/main/ZgatewayBT.ino index 5e6523180b..ae0a63c931 100644 --- a/main/ZgatewayBT.ino +++ b/main/ZgatewayBT.ino @@ -852,7 +852,7 @@ void setupBTTasksAndBLE() { procBLETask, /* Function to implement the task */ "procBLETask", /* Name of the task */ # if defined(USE_ESP_IDF) || defined(USE_BLUFI) - 12500, + 13500, # else 8500, /* Stack size in bytes */ # endif diff --git a/main/ZgatewaySERIAL.ino b/main/ZgatewaySERIAL.ino index 8e4dd22928..422e21dfd0 100644 --- a/main/ZgatewaySERIAL.ino +++ b/main/ZgatewaySERIAL.ino @@ -34,6 +34,20 @@ SoftwareSerial SERIALSoftSerial(SERIAL_RX_GPIO, SERIAL_TX_GPIO); // RX, TX # endif +# ifdef ESP32 +SemaphoreHandle_t serialSemaphore = NULL; +const TickType_t semaphoreTimeout = pdMS_TO_TICKS(1000); // 1 second timeout +# undef SEMAPHORE_SERIAL +# undef SEMAPHORE_SERIAL_GIVE +# define SEMAPHORE_SERIAL xSemaphoreTake(serialSemaphore, semaphoreTimeout) == pdTRUE +# define SEMAPHORE_SERIAL_GIVE xSemaphoreGive(serialSemaphore) +# else +# undef SEMAPHORE_SERIAL +# undef SEMAPHORE_SERIAL_GIVE +# define SEMAPHORE_SERIAL true +# define SEMAPHORE_SERIAL_GIVE +# endif + // use pointer to stream class for serial communication to make code // compatible with both softwareSerial as hardwareSerial. Stream* SERIALStream = NULL; @@ -43,7 +57,9 @@ bool receiverReady = false; unsigned long lastHeartbeatReceived = 0; unsigned long lastHeartbeatAckReceived = 0; unsigned long lastHeartbeatSent = 0; +unsigned long lastHeartbeatAckReceivedCheck = 0; const unsigned long heartbeatTimeout = 15000; // 15 seconds timeout for ack +const unsigned long heartbeatAckCheckInterval = 5000; // Check for ack every 5 seconds const unsigned long maxHeartbeatInterval = 60000; // Maximum interval of 1 minute unsigned long heartbeatInterval = 5000; // 5 seconds bool isOverflow = false; @@ -102,6 +118,13 @@ void setupSERIAL() { Log.notice(F("SERIAL_TX_GPIO: %d" CR), SERIAL_TX_GPIO); # endif +# ifdef ESP32 + serialSemaphore = xSemaphoreCreateMutex(); + if (serialSemaphore == NULL) { + Log.error(F("Failed to create serialSemaphore" CR)); + } +# endif + // Flush all bytes in the "link" serial port buffer while (SERIALStream->available() > 0) SERIALStream->read(); @@ -137,42 +160,60 @@ void SERIALtoX() { # elif SERIALtoMQTTmode == 1 // Convert received JSON data to one or multiple MQTT topics void sendHeartbeat() { - Log.trace(F("Sending Serial heartbeat" CR)); - SERIALStream->print(SERIALPre); - SERIALStream->print("{\"type\":\"heartbeat\"}"); - SERIALStream->print(SERIALPost); - SERIALStream->flush(); + if (SEMAPHORE_SERIAL) { + SERIALStream->print(SERIALPre); + SERIALStream->print("{\"type\":\"heartbeat\"}"); + SERIALStream->print(SERIALPost); + SERIALStream->flush(); + Log.notice(F("Sent Serial heartbeat" CR)); + SEMAPHORE_SERIAL_GIVE; + } else { + Log.error(F("Failed to take serialSemaphore" CR)); + } } void sendHeartbeatAck() { - SERIALStream->print(SERIALPre); - SERIALStream->print("{\"type\":\"heartbeat_ack\"}"); - SERIALStream->print(SERIALPost); - SERIALStream->flush(); - Log.trace(F("Sent heartbeat ack" CR)); + if (SEMAPHORE_SERIAL) { + SERIALStream->print(SERIALPre); + SERIALStream->print("{\"type\":\"heartbeat_ack\"}"); + SERIALStream->print(SERIALPost); + SERIALStream->flush(); + Log.notice(F("Sent heartbeat ack" CR)); + SEMAPHORE_SERIAL_GIVE; + } else { + Log.error(F("Failed to take serialSemaphore" CR)); + } } void SERIALtoX() { static String buffer = ""; // Static buffer to store incomplete messages - unsigned long currentTime = millis(); +# ifdef SENDER_SERIAL_HEARTBEAT // Check if it's time to send a heartbeat and we're not in overflow if (!isOverflow && currentTime - lastHeartbeatSent > heartbeatInterval) { + sendHeartbeat(); + lastHeartbeatSent = currentTime; + } + if (currentTime - lastHeartbeatAckReceivedCheck > heartbeatAckCheckInterval) { + lastHeartbeatAckReceivedCheck = currentTime; // Check if we received an ack for the last heartbeat if (currentTime - lastHeartbeatAckReceived > heartbeatTimeout) { // No ack received, increase the interval (with a maximum limit) - heartbeatInterval = min(heartbeatInterval * 2, maxHeartbeatInterval); + unsigned long newHeartbeatInterval = heartbeatInterval * 1.25; + heartbeatInterval = min(newHeartbeatInterval, maxHeartbeatInterval); Log.warning(F("No heartbeat ack received. Increasing interval to %lu ms" CR), heartbeatInterval); + receiverReady = false; } else { // Ack received, reset the interval heartbeatInterval = 5000; } - sendHeartbeat(); - lastHeartbeatSent = currentTime; } - +# else + receiverReady = true; +# endif while (SERIALStream->available()) { + unsigned long now = millis(); char c = SERIALStream->read(); buffer += c; @@ -194,7 +235,8 @@ void SERIALtoX() { if (SERIALdata.containsKey("type") && strcmp(SERIALdata["type"], "heartbeat") == 0) { handleHeartbeat(); } else if (SERIALdata.containsKey("type") && strcmp(SERIALdata["type"], "heartbeat_ack") == 0) { - lastHeartbeatAckReceived = currentTime; + lastHeartbeatAckReceived = now; + receiverReady = true; Log.notice(F("Heartbeat ack received" CR)); } else { // Process normal messages @@ -205,7 +247,7 @@ void SERIALtoX() { } else { // send as json if (SERIALdata.containsKey("origin") || SERIALdata.containsKey("topic")) { -# ifdef ZmqttDiscovery +# ifdef SecondaryModule // We need to assign the discovery message to the primary module instead of the secondary module if (SERIALdata.containsKey("device") && SERIALdata["device"].containsKey("via_device")) { SERIALdata["device"]["via_device"] = gateway_name; @@ -231,6 +273,10 @@ void SERIALtoX() { // Clear the buffer for the next message buffer = ""; + } else if (buffer.endsWith(SERIALPost)) { + // If the buffer ends with the postfix but does not start with the prefix, clear it + Log.error(F("Buffer error, clearing buffer. Partial content: %s" CR), buffer.c_str()); + buffer = ""; } else if (buffer.length() > JSON_MSG_BUFFER) { // If the buffer gets too large without finding a complete message, clear it Log.error(F("Buffer overflow, clearing buffer. Partial content: %s" CR), buffer.c_str()); @@ -280,40 +326,37 @@ void sendMQTTfromNestedJson(JsonVariant obj, char* topic, int level, int maxLeve bool XtoSERIAL(const char* topicOri, JsonObject& SERIALdata) { bool res = false; - unsigned long currentTime = millis(); - - // Check if receiver is still ready (heartbeat check) - if (currentTime - lastHeartbeatReceived > heartbeatTimeout) { - receiverReady = false; - Log.error(F("Heartbeat timeout. Receiver is not ready." CR)); - } + if (SEMAPHORE_SERIAL) { + if (receiverReady && (cmpToMainTopic(topicOri, subjectMQTTtoSERIAL) || + (SYSConfig.serial && SERIALdata.containsKey("origin") && SERIALdata["origin"].is()) || + (SYSConfig.serial && SERIALdata.containsKey("topic") && SERIALdata["topic"].is()))) { + Log.trace(F("XtoSERIAL" CR)); + // Prepare the data string + std::string data; + if (SYSConfig.serial || + (SERIALdata.containsKey("origin") && SERIALdata["origin"].is()) || // Module like BT to SERIAL + (SERIALdata.containsKey("target") && SERIALdata["target"].is())) { // Command to send to a specific target example MQTTtoBT through SERIAL + //SERIALdata["msgcount"] = msgCount++; + serializeJson(SERIALdata, data); + } else if (SERIALdata.containsKey("value")) { + data = SERIALdata["value"].as(); + } - if (receiverReady && (cmpToMainTopic(topicOri, subjectMQTTtoSERIAL) || (SYSConfig.serial && SERIALdata.containsKey("origin") && SERIALdata["origin"].is()) || (SYSConfig.serial && SERIALdata.containsKey("topic") && SERIALdata["topic"].is()))) { - Log.trace(F("XtoSERIAL" CR)); - - // Prepare the data string - std::string data; - if (SYSConfig.serial || - (SERIALdata.containsKey("origin") && SERIALdata["origin"].is()) || // Module like BT to SERIAL - (SERIALdata.containsKey("target") && SERIALdata["target"].is())) { // Command to send to a specific target example MQTTtoBT through SERIAL - //SERIALdata["msgcount"] = msgCount++; - serializeJson(SERIALdata, data); - } else if (SERIALdata.containsKey("value")) { - data = SERIALdata["value"].as(); + // Send the message + const char* prefix = SERIALdata["prefix"] | SERIALPre; + const char* postfix = SERIALdata["postfix"] | SERIALPost; + SERIALStream->print(prefix); + SERIALStream->print(data.c_str()); + SERIALStream->print(postfix); + SERIALStream->flush(); + + Log.notice(F("[ OMG->SERIAL ] data sent: %s" CR), data.c_str()); + res = true; + delay(100); } - - // Send the message - const char* prefix = SERIALdata["prefix"] | SERIALPre; - const char* postfix = SERIALdata["postfix"] | SERIALPost; - - SERIALStream->print(prefix); - SERIALStream->print(data.c_str()); - SERIALStream->print(postfix); - SERIALStream->flush(); - - Log.notice(F("[ OMG->SERIAL ] data sent: %s" CR), data.c_str()); - res = true; - delay(100); + SEMAPHORE_SERIAL_GIVE; + } else { + Log.error(F("Failed to take serialSemaphore" CR)); } return res; } @@ -322,11 +365,11 @@ bool isSerialReady() { return receiverReady; } -// This function should be called when a heartbeat is received from the receiver +// This function should be called when a heartbeat is received from the emitter void handleHeartbeat() { - receiverReady = true; lastHeartbeatReceived = millis(); - Log.trace(F("Heartbeat received. Receiver is ready." CR)); - sendHeartbeatAck(); + if (gatewayState == GatewayState::BROKER_CONNECTED) { + sendHeartbeatAck(); + } } #endif \ No newline at end of file diff --git a/main/main.ino b/main/main.ino index 635f40b720..9464e455b6 100644 --- a/main/main.ino +++ b/main/main.ino @@ -1766,6 +1766,13 @@ void setupTLS(int index) { 9 - SELFTEST end */ void ESPRestart(byte reason) { +#ifdef SecondaryModule + // Erase the secondary module config + String restartCmdStr = "{\"cmd\":\"" + String(restartCmd) + "\"}"; + Log.notice(F("Restarting secondary module : %s" CR), restartCmdStr.c_str()); + receivingDATA(subjectMQTTtoSYSsetSecondaryModule, restartCmdStr.c_str()); + delay(2000); +#endif StaticJsonDocument<128> jsonBuffer; JsonObject jsondata = jsonBuffer.to(); jsondata["reason"] = reason; @@ -1873,13 +1880,6 @@ void blockingWaitForReset() { } # endif ledManager.setMode(-1, -1, LEDManager::STATIC, LED_WAITING_ONBOARD_COLOR, -1); -# ifdef SecondaryModule - // Erase the secondary module config - String eraseCmdStr = "{\"cmd\":\"" + String(eraseCmd) + "\"}"; - Log.notice(F("Erasing secondary module config: %s" CR), eraseCmdStr.c_str()); - receivingDATA(subjectMQTTtoSYSsetSecondaryModule, eraseCmdStr.c_str()); - delay(5000); -# endif // Checking if the flash has already been erased to identify if we erase it or go into failsafe mode // going to failsafe mode is done by doing a long button press from a state where the flash has already been erased if (SPIFFS.begin()) { @@ -2737,6 +2737,13 @@ float intTemperatureRead() { Erase flash and restart the ESP */ void erase(bool restart) { +#ifdef SecondaryModule + // Erase the secondary module config + String eraseCmdStr = "{\"cmd\":\"" + String(eraseCmd) + "\"}"; + Log.notice(F("Erasing secondary module config: %s" CR), eraseCmdStr.c_str()); + receivingDATA(subjectMQTTtoSYSsetSecondaryModule, eraseCmdStr.c_str()); + delay(2000); +#endif Log.trace(F("Formatting requested, result: %d" CR), SPIFFS.format()); #if defined(ESP8266)