Skip to content

Commit

Permalink
[SYS} Improve queue performance whenb Using Ethernet and low interval…
Browse files Browse the repository at this point in the history
… between scan (1technophile#2052)

Add also a msgrcv (messages received) indicators into SYStoMQTT to measure the performances

Co-authored-by: Florian <[email protected]>
  • Loading branch information
2 people authored and odoral committed Oct 18, 2024
1 parent 0ab0575 commit 9c3c69d
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 55 deletions.
4 changes: 2 additions & 2 deletions main/User_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -785,9 +785,9 @@ void connectMQTT();

unsigned long uptime();
bool cmpToMainTopic(const char*, const char*);
void pub(const char*, const char*, bool);
bool pub(const char*, const char*, bool);
// void pub(const char*, JsonObject&);
void pub(const char*, const char*);
bool pub(const char*, const char*);
// void pub_custom_topic(const char*, JsonObject&, boolean);

#if defined(ESP32)
Expand Down
1 change: 1 addition & 0 deletions main/ZgatewayBT.ino
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ void procBLETask(void* pvParameters) {
updateDevicesStatus();
}
delete (advertisedDevice);
vTaskDelay(10);
}
}

Expand Down
117 changes: 64 additions & 53 deletions main/main.ino
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ unsigned long timer_sys_checks = 0;
int queueLength = 0;
unsigned long queueLengthSum = 0;
unsigned long blockedMessages = 0;
unsigned long receivedMessages = 0;
int maxQueueLength = 0;
#ifndef QueueSize
# define QueueSize 18
Expand Down Expand Up @@ -378,7 +379,8 @@ void Config_update(JsonObject& data, const char* key, T& var) {
* Dispatch json messages towards the communication layer
*
*/
void jsonDispatch(JsonObject& data) {
bool jsonDispatch(JsonObject& data) {
bool res = false;
if (data.containsKey("origin")) {
#if message_UTCtimestamp == true
data["UTCtime"] = TheengsUtils::UTCtimestamp();
Expand All @@ -388,26 +390,30 @@ void jsonDispatch(JsonObject& data) {
#endif
pubWebUI((char*)data["origin"].as<const char*>(), data);
if (SYSConfig.mqtt && !SYSConfig.offline) {
pub((char*)data["origin"].as<const char*>(), data);
res = pub((char*)data["origin"].as<const char*>(), data);
#ifdef ZgatewayBT
if (data.containsKey("distance")) {
String topic = String(mqtt_topic) + BTConfig.presenceTopic + String(gateway_name);
Log.trace(F("Pub HA Presence %s" CR), topic.c_str());
pub_custom_topic((char*)topic.c_str(), data, false);
res = pub_custom_topic((char*)topic.c_str(), data, false);
}
#endif
}
#ifdef ZgatewaySERIAL
if (SYSConfig.serial)
if (SYSConfig.serial) {
XtoSERIAL("", data);
res = true;
}
#endif
} else {
Log.error(F("No origin in JSON filtered" CR));
}
return res;
}

// Add a document to the queue
boolean enqueueJsonObject(const StaticJsonDocument<JSON_MSG_BUFFER>& jsonDoc, int timeout) {
receivedMessages++;
if (jsonDoc.size() == 0) {
Log.error(F("Empty JSON, skipping" CR));
return true;
Expand Down Expand Up @@ -525,9 +531,9 @@ void emptyQueue() {
if (error) {
Log.error(F("deserialize jsonQueue.front() failed: %s, buffer capacity: %u" CR), error.c_str(), jsonBuffer.capacity());
} else {
jsonDispatch(obj);
if (jsonDispatch(obj))
queueLengthSum++;
}
queueLengthSum++;
}

/**
Expand All @@ -537,9 +543,9 @@ void emptyQueue() {
* @param payload the message to sends
* @param retainFlag true if you what a retain
*/
void pub(const char* topicori, const char* payload, bool retainFlag) {
bool pub(const char* topicori, const char* payload, bool retainFlag) {
String topic = String(mqtt_topic) + String(gateway_name) + String(topicori);
pubMQTT(topic.c_str(), payload, retainFlag);
return pubMQTT(topic.c_str(), payload, retainFlag);
}

/**
Expand All @@ -548,7 +554,8 @@ void pub(const char* topicori, const char* payload, bool retainFlag) {
* @param topicori suffix to add on default MQTT Topic
* @param data The Json Object that represents the message
*/
void pub(const char* topicori, JsonObject& data) {
bool pub(const char* topicori, JsonObject& data) {
bool res = false;
bool ret = sensor_Retain;
if (data.containsKey("retain") && data["retain"].is<bool>()) {
ret = data["retain"];
Expand All @@ -559,7 +566,7 @@ void pub(const char* topicori, JsonObject& data) {
}
if (data.size() == 0) {
Log.error(F("Empty JSON, not published" CR));
return;
return res;
}
String topic = String(mqtt_topic) + String(gateway_name) + String(topicori);
#if valueAsATopic
Expand All @@ -580,7 +587,7 @@ void pub(const char* topicori, JsonObject& data) {
#if jsonPublishing
String dataAsString = "";
serializeJson(data, dataAsString);
pubMQTT(topic.c_str(), dataAsString.c_str(), ret);
res = pubMQTT(topic.c_str(), dataAsString.c_str(), ret);
#endif

#if simplePublishing
Expand All @@ -592,19 +599,20 @@ void pub(const char* topicori, JsonObject& data) {
# endif
if (p.value().is<uint64_t>() && strcmp(p.key().c_str(), "rssi") != 0) { //test rssi , bypass solution due to the fact that a int is considered as an uint64_t
if (strcmp(p.key().c_str(), "value") == 0) { // if data is a value we don't integrate the name into the topic
pubMQTT(topic, p.value().as<uint64_t>());
res = pubMQTT(topic, p.value().as<uint64_t>());
} else { // if data is not a value we integrate the name into the topic
pubMQTT(topic + "/" + String(p.key().c_str()), p.value().as<uint64_t>());
res = pubMQTT(topic + "/" + String(p.key().c_str()), p.value().as<uint64_t>());
}
} else if (p.value().is<int>()) {
pubMQTT(topic + "/" + String(p.key().c_str()), p.value().as<int>());
res = pubMQTT(topic + "/" + String(p.key().c_str()), p.value().as<int>());
} else if (p.value().is<float>()) {
pubMQTT(topic + "/" + String(p.key().c_str()), p.value().as<float>());
res = pubMQTT(topic + "/" + String(p.key().c_str()), p.value().as<float>());
} else if (p.value().is<char*>()) {
pubMQTT(topic + "/" + String(p.key().c_str()), p.value().as<const char*>());
res = pubMQTT(topic + "/" + String(p.key().c_str()), p.value().as<const char*>());
}
}
#endif
return res;
}

/**
Expand All @@ -613,9 +621,9 @@ void pub(const char* topicori, JsonObject& data) {
* @param topicori suffix to add on default MQTT Topic
* @param payload the message to sends
*/
void pub(const char* topicori, const char* payload) {
bool pub(const char* topicori, const char* payload) {
String topic = String(mqtt_topic) + String(gateway_name) + String(topicori);
pubMQTT(topic, payload);
return pubMQTT(topic, payload);
}

/**
Expand All @@ -625,10 +633,10 @@ void pub(const char* topicori, const char* payload) {
* @param data The Json Object that represents the message
* @param retain true if you what a retain
*/
void pub_custom_topic(const char* topic, JsonObject& data, boolean retain) {
bool pub_custom_topic(const char* topic, JsonObject& data, boolean retain) {
String buffer = "";
serializeJson(data, buffer);
pubMQTT(topic, buffer.c_str(), retain);
return pubMQTT(topic, buffer.c_str(), retain);
}

/**
Expand All @@ -637,8 +645,8 @@ void pub_custom_topic(const char* topic, JsonObject& data, boolean retain) {
* @param topic the topic
* @param payload the payload
*/
void pubMQTT(const char* topic, const char* payload) {
pubMQTT(topic, payload, sensor_Retain);
bool pubMQTT(const char* topic, const char* payload) {
return pubMQTT(topic, payload, sensor_Retain);
}

/**
Expand All @@ -648,18 +656,19 @@ void pubMQTT(const char* topic, const char* payload) {
* @param payload the payload
* @param retainFlag true if retain the retain Flag
*/
void pubMQTT(const char* topic, const char* payload, bool retainFlag) {
bool pubMQTT(const char* topic, const char* payload, bool retainFlag) {
bool res = false;
if (SYSConfig.mqtt && !SYSConfig.offline) {
#ifdef ESP32
if (xSemaphoreTake(xMqttMutex, pdMS_TO_TICKS(QueueSemaphoreTimeOutTask)) == pdFALSE) {
Log.error(F("xMqttMutex not taken" CR));
return;
return res;
}
#endif
if (mqtt && mqtt->connected()) {
SendReceiveIndicatorON();
Log.notice(F("[ OMG->MQTT ] topic: %s msg: %s " CR), topic, payload);
mqtt->publish(topic, payload, 0, retainFlag);
res = mqtt->publish(topic, payload, 0, retainFlag);
} else {
Log.warning(F("MQTT not connected, aborting the publication" CR));
}
Expand All @@ -669,84 +678,85 @@ void pubMQTT(const char* topic, const char* payload, bool retainFlag) {
} else {
Log.notice(F("[ OMG->MQTT deactivated or offline] topic: %s msg: %s " CR), topic, payload);
}
return res;
}

void pubMQTT(String topic, const char* payload) {
pubMQTT(topic.c_str(), payload);
bool pubMQTT(String topic, const char* payload) {
return pubMQTT(topic.c_str(), payload);
}

void pubMQTT(const char* topic, unsigned long payload) {
bool pubMQTT(const char* topic, unsigned long payload) {
char val[11];
sprintf(val, "%lu", payload);
pubMQTT(topic, val);
return pubMQTT(topic, val);
}

void pubMQTT(const char* topic, unsigned long long payload) {
bool pubMQTT(const char* topic, unsigned long long payload) {
char val[21];
sprintf(val, "%llu", payload);
pubMQTT(topic, val);
return pubMQTT(topic, val);
}

void pubMQTT(const char* topic, String payload) {
pubMQTT(topic, payload.c_str());
bool pubMQTT(const char* topic, String payload) {
return pubMQTT(topic, payload.c_str());
}

void pubMQTT(String topic, String payload) {
pubMQTT(topic.c_str(), payload.c_str());
bool pubMQTT(String topic, String payload) {
return pubMQTT(topic.c_str(), payload.c_str());
}

void pubMQTT(String topic, int payload) {
bool pubMQTT(String topic, int payload) {
char val[12];
sprintf(val, "%d", payload);
pubMQTT(topic.c_str(), val);
return pubMQTT(topic.c_str(), val);
}

void pubMQTT(String topic, unsigned long long payload) {
bool pubMQTT(String topic, unsigned long long payload) {
char val[21];
sprintf(val, "%llu", payload);
pubMQTT(topic.c_str(), val);
return pubMQTT(topic.c_str(), val);
}

void pubMQTT(String topic, float payload) {
bool pubMQTT(String topic, float payload) {
char val[12];
dtostrf(payload, 3, 1, val);
pubMQTT(topic.c_str(), val);
return pubMQTT(topic.c_str(), val);
}

void pubMQTT(const char* topic, float payload) {
bool pubMQTT(const char* topic, float payload) {
char val[12];
dtostrf(payload, 3, 1, val);
pubMQTT(topic, val);
return pubMQTT(topic, val);
}

void pubMQTT(const char* topic, int payload) {
bool pubMQTT(const char* topic, int payload) {
char val[12];
sprintf(val, "%d", payload);
pubMQTT(topic, val);
return pubMQTT(topic, val);
}

void pubMQTT(const char* topic, unsigned int payload) {
bool pubMQTT(const char* topic, unsigned int payload) {
char val[12];
sprintf(val, "%u", payload);
pubMQTT(topic, val);
return pubMQTT(topic, val);
}

void pubMQTT(const char* topic, long payload) {
bool pubMQTT(const char* topic, long payload) {
char val[11];
sprintf(val, "%ld", payload);
pubMQTT(topic, val);
return pubMQTT(topic, val);
}

void pubMQTT(const char* topic, double payload) {
bool pubMQTT(const char* topic, double payload) {
char val[16];
sprintf(val, "%f", payload);
pubMQTT(topic, val);
return pubMQTT(topic, val);
}

void pubMQTT(String topic, unsigned long payload) {
bool pubMQTT(String topic, unsigned long payload) {
char val[11];
sprintf(val, "%lu", payload);
pubMQTT(topic.c_str(), val);
return pubMQTT(topic.c_str(), val);
}

void delayWithOTA(long waitMillis) {
Expand Down Expand Up @@ -2657,6 +2667,7 @@ String stateMeasures() {
#endif
SYSdata["msgprc"] = queueLengthSum;
SYSdata["msgblck"] = blockedMessages;
SYSdata["msgrcv"] = receivedMessages;
SYSdata["maxq"] = maxQueueLength;
SYSdata["cnt_index"] = cnt_index;
#ifdef ESP32
Expand Down

0 comments on commit 9c3c69d

Please sign in to comment.