diff --git a/library.json b/library.json index 575fa8d7..14514c0e 100644 --- a/library.json +++ b/library.json @@ -6,7 +6,7 @@ "type": "git", "url": "https://github.com/thingsboard/pubsubclient.git" }, - "version": "2.9.3", + "version": "2.9.4", "exclude": "tests", "examples": "examples/*/*.ino", "frameworks": "arduino", diff --git a/library.properties b/library.properties index 9f269137..064c1e7f 100644 --- a/library.properties +++ b/library.properties @@ -1,5 +1,5 @@ name=TBPubSubClient -version=2.9.3 +version=2.9.4 author=ThingsBoard maintainer=ThingsBoard Team sentence=A client library for MQTT messaging. diff --git a/src/PubSubClient.cpp b/src/PubSubClient.cpp index 17d3d225..a9ab0ba9 100755 --- a/src/PubSubClient.cpp +++ b/src/PubSubClient.cpp @@ -196,7 +196,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass if (result == 1) { nextMsgId = 1; // Leave room in the buffer for header and variable length field - size_t length = MQTT_MAX_HEADER_SIZE; + uint16_t length = MQTT_MAX_HEADER_SIZE; unsigned int j; #if MQTT_VERSION == MQTT_VERSION_3_1 @@ -299,8 +299,8 @@ boolean PubSubClient::readByte(uint8_t * result) { } // reads a byte into result[*index] and increments index -boolean PubSubClient::readByte(uint8_t * result, size_t * index){ - size_t current_index = *index; +boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){ + uint16_t current_index = *index; uint8_t * write_address = &(result[current_index]); if(readByte(write_address)){ *index = current_index + 1; @@ -310,7 +310,7 @@ boolean PubSubClient::readByte(uint8_t * result, size_t * index){ } uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { - size_t len = 0; + uint16_t len = 0; if(!readByte(this->buffer, &len)) return 0; bool isPublish = (this->buffer[0]&0xF0) == MQTTPUBLISH; uint32_t multiplier = 1; @@ -344,9 +344,9 @@ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { skip += 2; } } - size_t idx = len; + uint32_t idx = len; - for (size_t i = start;istream) { if (isPublish && idx-*lengthLength-2>skip) { @@ -367,7 +367,82 @@ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { return len; } +boolean PubSubClient::loop_read() { + if (_client == NULL) { + return false; + } + if (!_client->available()) { + return false; + } + uint8_t llen; + uint16_t len = readPacket(&llen); + if (len == 0) { + return false; + } + unsigned long t = millis(); + lastInActivity = t; + uint8_t type = buffer[0]&0xF0; + + switch(type) { + case MQTTPUBLISH: + { + if (callback) { + const boolean msgId_present = (buffer[0]&0x06) == MQTTQOS1; + const uint16_t tl_offset = llen+1; + const uint16_t tl = (buffer[tl_offset]<<8)+buffer[tl_offset+1]; /* topic length in bytes */ + const uint16_t topic_offset = tl_offset+2; + const uint16_t msgId_offset = topic_offset+tl; + const uint16_t payload_offset = msgId_present ? msgId_offset+2 : msgId_offset; + if ((payload_offset) >= this->bufferSize) {return false;} + if (len < payload_offset) {return false;} + memmove(buffer+topic_offset-1,buffer+topic_offset,tl); /* move topic inside buffer 1 byte to front */ + buffer[topic_offset-1+tl] = 0; /* end the topic as a 'C' string with \x00 */ + char *topic = (char*) buffer+topic_offset-1; + uint8_t *payload; + // msgId only present for QOS>0 + if (msgId_present) { + const uint16_t msgId = (buffer[msgId_offset]<<8)+buffer[msgId_offset+1]; + payload = buffer+payload_offset; + callback(topic,payload,len-payload_offset); + if (_client->connected()) { + buffer[0] = MQTTPUBACK; + buffer[1] = 2; + buffer[2] = (msgId >> 8); + buffer[3] = (msgId & 0xFF); + if (_client->write(buffer,4) != 0) { + lastOutActivity = t; + } + } + } else { + // No msgId + payload = buffer+payload_offset; + callback(topic,payload,len-payload_offset); + } + } + break; + } + case MQTTPINGREQ: + { + if (_client->connected()) { + buffer[0] = MQTTPINGRESP; + buffer[1] = 0; + _client->write(buffer,2); + } + break; + } + case MQTTPINGRESP: + { + pingOutstanding = false; + break; + } + default: + return false; + } + return true; +} + boolean PubSubClient::loop() { + loop_read(); if (connected()) { unsigned long t = millis(); if (((t - lastInActivity > this->keepAlive*1000UL) || (t - lastOutActivity > this->keepAlive*1000UL)) && keepAlive != 0) { @@ -376,56 +451,13 @@ boolean PubSubClient::loop() { _client->stop(); return false; } else { - this->buffer[0] = MQTTPINGREQ; - this->buffer[1] = 0; - _client->write(this->buffer,2); - lastOutActivity = t; - lastInActivity = t; - pingOutstanding = true; - } - } - if (_client->available()) { - uint8_t llen; - size_t len = readPacket(&llen); - uint16_t msgId = 0; - uint8_t *payload; - if (len > 0) { - lastInActivity = t; - uint8_t type = this->buffer[0]&0xF0; - if (type == MQTTPUBLISH) { - if (callback) { - uint16_t tl = (this->buffer[llen+1]<<8)+this->buffer[llen+2]; /* topic length in bytes */ - memmove(this->buffer+llen+2,this->buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */ - this->buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */ - char *topic = (char*) this->buffer+llen+2; - // msgId only present for QOS>0 - if ((this->buffer[0]&0x06) == MQTTQOS1) { - msgId = (this->buffer[llen+3+tl]<<8)+this->buffer[llen+3+tl+1]; - payload = this->buffer+llen+3+tl+2; - callback(topic,payload,len-llen-3-tl-2); - - this->buffer[0] = MQTTPUBACK; - this->buffer[1] = 2; - this->buffer[2] = (msgId >> 8); - this->buffer[3] = (msgId & 0xFF); - _client->write(this->buffer,4); - lastOutActivity = t; - - } else { - payload = this->buffer+llen+3+tl; - callback(topic,payload,len-llen-3-tl); - } - } - } else if (type == MQTTPINGREQ) { - this->buffer[0] = MQTTPINGRESP; - this->buffer[1] = 0; - _client->write(this->buffer,2); - } else if (type == MQTTPINGRESP) { - pingOutstanding = false; + buffer[0] = MQTTPINGREQ; + buffer[1] = 0; + if (_client->write(buffer,2) != 0) { + lastOutActivity = t; + lastInActivity = t; } - } else if (!connected()) { - // readPacket has closed the connection - return false; + pingOutstanding = true; } } return true; @@ -452,11 +484,11 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigne return false; } // Leave room in the buffer for header and variable length field - size_t length = MQTT_MAX_HEADER_SIZE; + uint16_t length = MQTT_MAX_HEADER_SIZE; length = writeString(topic,this->buffer,length); // Add payload - size_t i; + uint16_t i; for (i=0;ibuffer[length++] = payload[i]; } @@ -479,7 +511,7 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsig uint8_t llen = 0; uint8_t digit; unsigned int rc = 0; - size_t tlen; + uint16_t tlen; unsigned int pos = 0; unsigned int i; uint8_t header; @@ -526,14 +558,14 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsig boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, boolean retained) { if (connected()) { // Send the header and variable length field - size_t length = MQTT_MAX_HEADER_SIZE; + uint16_t length = MQTT_MAX_HEADER_SIZE; length = writeString(topic,this->buffer,length); uint8_t header = MQTTPUBLISH; if (retained) { header |= 1; } size_t hlen = buildHeader(header, this->buffer, plength+length-MQTT_MAX_HEADER_SIZE); - size_t rc = _client->write(this->buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen)); + uint16_t rc = _client->write(this->buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen)); lastOutActivity = millis(); return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen))); } @@ -541,7 +573,7 @@ boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, bool } int PubSubClient::endPublish() { - return 1; + return 1; } size_t PubSubClient::write(uint8_t data) { @@ -554,12 +586,12 @@ size_t PubSubClient::write(const uint8_t *buffer, size_t size) { return _client->write(buffer,size); } -size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, size_t length) { +size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length) { uint8_t lenBuf[4]; uint8_t llen = 0; uint8_t digit; uint8_t pos = 0; - size_t len = length; + uint16_t len = length; do { digit = len & 127; //digit = len %128 @@ -578,13 +610,13 @@ size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, size_t length) { return llen+1; // Full header size is variable length bit plus the 1-byte fixed header } -boolean PubSubClient::write(uint8_t header, uint8_t* buf, size_t length) { - size_t rc; - size_t hlen = buildHeader(header, buf, length); +boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { + uint16_t rc; + uint8_t hlen = buildHeader(header, buf, length); #ifdef MQTT_MAX_TRANSFER_SIZE uint8_t* writeBuf = buf+(MQTT_MAX_HEADER_SIZE-hlen); - size_t bytesRemaining = length+hlen; //Match the length type + uint16_t bytesRemaining = length+hlen; //Match the length type uint8_t bytesToWrite; boolean result = true; while((bytesRemaining > 0) && result) { @@ -620,7 +652,7 @@ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { } if (connected()) { // Leave room in the buffer for header and variable length field - size_t length = MQTT_MAX_HEADER_SIZE; + uint16_t length = MQTT_MAX_HEADER_SIZE; nextMsgId++; if (nextMsgId == 0) { nextMsgId = 1; @@ -635,7 +667,7 @@ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { } boolean PubSubClient::unsubscribe(const char* topic) { - size_t topicLength = strnlen(topic, this->bufferSize); + size_t topicLength = strnlen(topic, this->bufferSize); if (topic == 0) { return false; } @@ -644,7 +676,7 @@ boolean PubSubClient::unsubscribe(const char* topic) { return false; } if (connected()) { - size_t length = MQTT_MAX_HEADER_SIZE; + uint16_t length = MQTT_MAX_HEADER_SIZE; nextMsgId++; if (nextMsgId == 0) { nextMsgId = 1; @@ -667,9 +699,9 @@ void PubSubClient::disconnect() { lastInActivity = lastOutActivity = millis(); } -size_t PubSubClient::writeString(const char* string, uint8_t* buf, size_t pos) { +uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) { const char* idp = string; - size_t i = 0; + uint16_t i = 0; pos += 2; while (*idp) { buf[pos++] = *idp++; @@ -683,7 +715,7 @@ size_t PubSubClient::writeString(const char* string, uint8_t* buf, size_t pos) { boolean PubSubClient::connected() { boolean rc; - if (_client == NULL ) { + if (_client == NULL) { rc = false; } else { rc = (int)_client->connected(); @@ -737,7 +769,7 @@ int PubSubClient::state() { return this->_state; } -boolean PubSubClient::setBufferSize(size_t size) { +boolean PubSubClient::setBufferSize(uint16_t size) { if (size == 0) { // Cannot set it back to 0 return false; @@ -756,13 +788,15 @@ boolean PubSubClient::setBufferSize(size_t size) { return (this->buffer != NULL); } -size_t PubSubClient::getBufferSize() { +uint16_t PubSubClient::getBufferSize() { return this->bufferSize; } + PubSubClient& PubSubClient::setKeepAlive(uint16_t keepAlive) { this->keepAlive = keepAlive; return *this; } + PubSubClient& PubSubClient::setSocketTimeout(uint16_t timeout) { this->socketTimeout = timeout; return *this; diff --git a/src/PubSubClient.h b/src/PubSubClient.h index faec421c..04053e4a 100755 --- a/src/PubSubClient.h +++ b/src/PubSubClient.h @@ -93,7 +93,7 @@ class PubSubClient : public Print { private: Client* _client; uint8_t* buffer; - size_t bufferSize; + uint16_t bufferSize; uint16_t keepAlive; uint16_t socketTimeout; uint16_t nextMsgId; @@ -103,14 +103,14 @@ class PubSubClient : public Print { MQTT_CALLBACK_SIGNATURE; uint32_t readPacket(uint8_t*); boolean readByte(uint8_t * result); - boolean readByte(uint8_t * result, size_t * index); - boolean write(uint8_t header, uint8_t* buf, size_t length); - size_t writeString(const char* string, uint8_t* buf, size_t pos); + boolean readByte(uint8_t * result, uint16_t * index); + boolean write(uint8_t header, uint8_t* buf, uint16_t length); + uint16_t writeString(const char* string, uint8_t* buf, uint16_t pos); // Build up the header ready to send // Returns the size of the header // Note: the header is built at the end of the first MQTT_MAX_HEADER_SIZE bytes, so will start // (MQTT_MAX_HEADER_SIZE - ) bytes into the buffer - size_t buildHeader(uint8_t header, uint8_t* buf, size_t length); + size_t buildHeader(uint8_t header, uint8_t* buf, uint16_t length); IPAddress ip; const char* domain; uint16_t port; @@ -143,8 +143,8 @@ class PubSubClient : public Print { PubSubClient& setKeepAlive(uint16_t keepAlive); PubSubClient& setSocketTimeout(uint16_t timeout); - boolean setBufferSize(size_t size); - size_t getBufferSize(); + boolean setBufferSize(uint16_t size); + uint16_t getBufferSize(); boolean connect(const char* id); boolean connect(const char* id, const char* user, const char* pass); @@ -178,6 +178,7 @@ class PubSubClient : public Print { boolean subscribe(const char* topic); boolean subscribe(const char* topic, uint8_t qos); boolean unsubscribe(const char* topic); + boolean loop_read(); boolean loop(); boolean connected(); int state();