diff --git a/mqttclient/mqttclient.c b/mqttclient/mqttclient.c index 9e04ca0..73074e3 100644 --- a/mqttclient/mqttclient.c +++ b/mqttclient/mqttclient.c @@ -24,7 +24,22 @@ static client_state_t mqtt_get_client_state(mqtt_client_t* c) static void mqtt_set_client_state(mqtt_client_t* c, client_state_t state) { platform_mutex_lock(&c->mqtt_global_lock); - c->mqtt_client_state = state; + if(c->mqtt_client_state == CLIENT_STATE_CLEAN_SESSION) + { + /*CLIENT_STATE_CLEAN_SESSION can only change to CLIENT_STATE_INVALID zhaoshimin 20220620*/ + if(state == CLIENT_STATE_INVALID) + { + c->mqtt_client_state = CLIENT_STATE_INVALID; + } + else + { + /*no change the c->mqtt_client_state*/ + } + } + else + { + c->mqtt_client_state = state; + } platform_mutex_unlock(&c->mqtt_global_lock); } @@ -491,11 +506,18 @@ static void mqtt_clean_session(mqtt_client_t* c) ack_handlers_t *ack_handler; message_handlers_t *msg_handler; + platform_mutex_lock(&c->mqtt_write_lock); /* release all ack_handler_list memory */ if (!(mqtt_list_is_empty(&c->mqtt_ack_handler_list))) { LIST_FOR_EACH_SAFE(curr, next, &c->mqtt_ack_handler_list) { ack_handler = LIST_ENTRY(curr, ack_handlers_t, list); + if(ack_handler->handler) + { + platform_memory_free(ack_handler->handler); + ack_handler->handler = RT_NULL; + } platform_memory_free(ack_handler); + mqtt_subtract_ack_handler_num(c); } mqtt_list_del_init(&c->mqtt_ack_handler_list); } @@ -509,6 +531,7 @@ static void mqtt_clean_session(mqtt_client_t* c) } mqtt_list_del_init(&c->mqtt_msg_handler_list); } + platform_mutex_unlock(&c->mqtt_write_lock); mqtt_set_client_state(c, CLIENT_STATE_INVALID); } @@ -523,6 +546,7 @@ static void mqtt_ack_list_scan(mqtt_client_t* c, uint8_t flag) { mqtt_list_t *curr, *next; ack_handlers_t *ack_handler; + message_handlers_t *msg_handler; if ((mqtt_list_is_empty(&c->mqtt_ack_handler_list)) || (CLIENT_STATE_CONNECTED != mqtt_get_client_state(c))) return; @@ -543,8 +567,14 @@ static void mqtt_ack_list_scan(mqtt_client_t* c, uint8_t flag) if(flag) { - /*when flag == 0, rconnet the broker, resubscribe creat the type SUBACK ack_handler_list, it not been destroyed zhaoshimin 20200629*/ + /*when flag == 0, reconncet the broker, resubscribe creat the type SUBACK ack_handler_list, it not been destroyed zhaoshimin 20200629*/ platform_mutex_lock(&c->mqtt_write_lock); + msg_handler = ack_handler->handler; + if(msg_handler) + { + mqtt_msg_handler_destory(msg_handler); + msg_handler = RT_NULL; + } mqtt_ack_handler_destroy(ack_handler); mqtt_subtract_ack_handler_num(c); platform_mutex_unlock(&c->mqtt_write_lock); @@ -558,7 +588,7 @@ static int mqtt_try_resubscribe(mqtt_client_t* c) mqtt_list_t *curr, *next; message_handlers_t *msg_handler; - KAWAII_MQTT_LOG_W("%s:%d %s()... mqtt try resubscribe ...", __FILE__, __LINE__, __FUNCTION__); + KAWAII_MQTT_LOG_I("%s:%d %s()... mqtt try resubscribe ...", __FILE__, __LINE__, __FUNCTION__); if (mqtt_list_is_empty(&c->mqtt_msg_handler_list)) RETURN_ERROR(KAWAII_MQTT_SUCCESS_ERROR); @@ -587,8 +617,10 @@ static int mqtt_try_do_reconnect(mqtt_client_t* c) /* process these ack messages immediately after reconnecting */ mqtt_ack_list_scan(c, 0); } - - KAWAII_MQTT_LOG_D("%s:%d %s()... mqtt try connect result is -0x%04x", __FILE__, __LINE__, __FUNCTION__, -rc); + else + { + KAWAII_MQTT_LOG_E("%s:%d %s()... mqtt try connect result is -0x%04x", __FILE__, __LINE__, __FUNCTION__, -rc); + } RETURN_ERROR(rc); } @@ -608,8 +640,14 @@ static int mqtt_try_reconnect(mqtt_client_t* c) /*connect fail must delay reconnect try duration time and let cpu time go out, the lowest priority task can run */ mqtt_sleep_ms(c->mqtt_reconnect_try_duration); RETURN_ERROR(KAWAII_MQTT_RECONNECT_TIMEOUT_ERROR); - } - else { + } else { + + /* when connect server success, call the connect success callback function*/ + if(c->mqtt_connect_handler) + { + c->mqtt_connect_handler(c, c->mqtt_connect_data); + } + RETURN_ERROR(rc); } @@ -832,6 +870,8 @@ static int mqtt_packet_handle(mqtt_client_t* c, platform_timer_t* timer) switch (packet_type) { case 0: /* timed out reading packet */ + /*when mqtt closed by server, funtion mqtt_read_pacek return no delay time, so add mqtt_sleep_ms to let cpu time zhaoshimin 20200723*/ + mqtt_sleep_ms(100); break; case CONNACK: /* has been processed */ @@ -939,7 +979,7 @@ static int mqtt_yield(mqtt_client_t* c, int timeout_ms) static void mqtt_yield_thread(void *arg) { - int rc; + int rc = 0; mqtt_client_t *c = (mqtt_client_t *)arg; @@ -947,7 +987,7 @@ static void mqtt_yield_thread(void *arg) while (1) { rc = mqtt_yield(c, c->mqtt_cmd_timeout); if (KAWAII_MQTT_CLEAN_SESSION_ERROR == rc) { - KAWAII_MQTT_LOG_E("%s:%d %s()..., mqtt clean session....", __FILE__, __LINE__, __FUNCTION__); + KAWAII_MQTT_LOG_W("%s:%d %s()..., mqtt clean session....", __FILE__, __LINE__, __FUNCTION__); network_disconnect(c->mqtt_network); mqtt_clean_session(c); break; @@ -957,6 +997,7 @@ static void mqtt_yield_thread(void *arg) } /*let the rtos recycles thread resources zhaoshimin 20200629*/ platform_thread_destroy(c->mqtt_thread); + c->mqtt_thread = RT_NULL; } @@ -966,14 +1007,25 @@ static int mqtt_connect_with_results(mqtt_client_t* c) int rc = KAWAII_MQTT_CONNECT_FAILED_ERROR; platform_timer_t connect_timer; mqtt_connack_data_t connack_data = {0}; + client_state_t state; MQTTPacket_connectData connect_data = MQTTPacket_connectData_initializer; if (NULL == c) + { RETURN_ERROR(KAWAII_MQTT_NULL_VALUE_ERROR); + } - if (CLIENT_STATE_CONNECTED == mqtt_get_client_state(c)) + state = mqtt_get_client_state(c); + if (CLIENT_STATE_CONNECTED == state) + { RETURN_ERROR(KAWAII_MQTT_SUCCESS_ERROR); - + } + else if(CLIENT_STATE_CLEAN_SESSION == state) + { + RETURN_ERROR(KAWAII_MQTT_CLEAN_SESSION_ERROR); + } + /*protect the socket zhaoshimin 20220622 */ + platform_mutex_lock(&c->mqtt_write_lock); #ifdef KAWAII_MQTT_NETWORK_TYPE_TLS rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, c->mqtt_ca); @@ -985,10 +1037,10 @@ static int mqtt_connect_with_results(mqtt_client_t* c) if (KAWAII_MQTT_SUCCESS_ERROR != rc) { /*when connect faile, you should call network_release to release socket file descriptor zhaoshimin 20200629*/ network_release(c->mqtt_network); + platform_mutex_unlock(&c->mqtt_write_lock); RETURN_ERROR(rc); } - KAWAII_MQTT_LOG_I("%s:%d %s()... mqtt connect success...", __FILE__, __LINE__, __FUNCTION__); connect_data.keepAliveInterval = c->mqtt_keep_alive_interval; connect_data.cleansession = c->mqtt_clean_session; @@ -1007,8 +1059,6 @@ static int mqtt_connect_with_results(mqtt_client_t* c) platform_timer_cutdown(&c->mqtt_last_received, (c->mqtt_keep_alive_interval * 1000)); - platform_mutex_lock(&c->mqtt_write_lock); - /* serialize connect packet */ if ((len = MQTTSerialize_connect(c->mqtt_write_buf, c->mqtt_write_buf_size, &connect_data)) <= 0) goto exit; @@ -1025,8 +1075,9 @@ static int mqtt_connect_with_results(mqtt_client_t* c) rc = connack_data.rc; else rc = KAWAII_MQTT_CONNECT_FAILED_ERROR; - } else + } else { rc = KAWAII_MQTT_CONNECT_FAILED_ERROR; + } exit: if (rc == KAWAII_MQTT_SUCCESS_ERROR) { @@ -1053,7 +1104,15 @@ static int mqtt_connect_with_results(mqtt_client_t* c) c->mqtt_ping_outstanding = 0; /* reset ping outstanding */ + /* call the connect success callback function*/ + if((rc == KAWAII_MQTT_SUCCESS_ERROR) && (c->mqtt_connect_handler)) + { + c->mqtt_connect_handler(c, c->mqtt_connect_data); + } + } else { + /*when server ack error, it must close the mqtt socket zhaoshimin 20200724 */ + network_release(c->mqtt_network); mqtt_set_client_state(c, CLIENT_STATE_INITIALIZED); /* connect failed */ } @@ -1072,6 +1131,7 @@ static int mqtt_init(mqtt_client_t* c) RETURN_ERROR(KAWAII_MQTT_MEM_NOT_ENOUGH_ERROR); } memset(c->mqtt_network, 0, sizeof(network_t)); + c->mqtt_network->socket = -1; c->mqtt_packet_id = 1; c->mqtt_clean_session = 0; //no clear session by default @@ -1092,6 +1152,8 @@ static int mqtt_init(mqtt_client_t* c) c->mqtt_reconnect_data = NULL; c->mqtt_reconnect_handler = NULL; c->mqtt_interceptor_handler = NULL; + c->mqtt_connect_data = NULL; + c->mqtt_connect_handler = NULL; /*only malloc write buf and read buf when call mqtt_init function */ if ((KAWAII_MQTT_MIN_PAYLOAD_SIZE >= c->mqtt_read_buf_size) || (KAWAII_MQTT_MAX_PAYLOAD_SIZE <= c->mqtt_read_buf_size)) @@ -1139,6 +1201,8 @@ KAWAII_MQTT_CLIENT_SET_DEFINE(write_buf_size, uint32_t, 0) KAWAII_MQTT_CLIENT_SET_DEFINE(reconnect_try_duration, uint32_t, 0) KAWAII_MQTT_CLIENT_SET_DEFINE(reconnect_handler, reconnect_handler_t, NULL) KAWAII_MQTT_CLIENT_SET_DEFINE(interceptor_handler, interceptor_handler_t, NULL) +KAWAII_MQTT_CLIENT_SET_DEFINE(connect_handler, connect_handler_t, NULL) +KAWAII_MQTT_CLIENT_SET_DEFINE(connect_data, void*, NULL) void mqtt_sleep_ms(int ms) { @@ -1252,6 +1316,20 @@ int mqtt_disconnect(mqtt_client_t* c) platform_timer_cutdown(&timer, c->mqtt_cmd_timeout); + if (CLIENT_STATE_CONNECTED != mqtt_get_client_state(c)) + { + /*��δ���ӣ�����mqtt�̻߳������е�����²�ִ�� */ + if(c->mqtt_thread) + { + mqtt_set_client_state(c, CLIENT_STATE_CLEAN_SESSION); + return KAWAII_MQTT_FAILED_ERROR; + } + else + { + return KAWAII_MQTT_SUCCESS_ERROR; + } + } + platform_mutex_lock(&c->mqtt_write_lock); /* serialize disconnect packet and send it */ @@ -1302,7 +1380,10 @@ int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, m } rc = mqtt_ack_list_record(c, SUBACK, packet_id, len, msg_handler); - + if(KAWAII_MQTT_SUCCESS_ERROR != rc) + { + platform_memory_free(msg_handler); + } exit: platform_mutex_unlock(&c->mqtt_write_lock); @@ -1341,7 +1422,10 @@ int mqtt_unsubscribe(mqtt_client_t* c, const char* topic_filter) } rc = mqtt_ack_list_record(c, UNSUBACK, packet_id, len, msg_handler); - + if(KAWAII_MQTT_SUCCESS_ERROR != rc) + { + platform_memory_free(msg_handler); + } exit: platform_mutex_unlock(&c->mqtt_write_lock); @@ -1359,7 +1443,7 @@ int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg if (CLIENT_STATE_CONNECTED != mqtt_get_client_state(c)) { rc = KAWAII_MQTT_NOT_CONNECT_ERROR; - goto exit; + RETURN_ERROR(rc); } if ((NULL != msg->payload) && (0 == msg->payloadlen)) @@ -1402,7 +1486,8 @@ int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg platform_mutex_unlock(&c->mqtt_write_lock); - if ((KAWAII_MQTT_ACK_HANDLER_NUM_TOO_MUCH_ERROR == rc) || (KAWAII_MQTT_MEM_NOT_ENOUGH_ERROR == rc)) { + if ((KAWAII_MQTT_ACK_HANDLER_NUM_TOO_MUCH_ERROR == rc) || (KAWAII_MQTT_MEM_NOT_ENOUGH_ERROR == rc) || + (KAWAII_MQTT_SEND_PACKET_ERROR == rc)) { KAWAII_MQTT_LOG_W("%s:%d %s()... there is not enough memory space to record...", __FILE__, __LINE__, __FUNCTION__); /*must realse the socket file descriptor zhaoshimin 20200629*/ @@ -1418,7 +1503,6 @@ int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg int mqtt_list_subscribe_topic(mqtt_client_t* c) { - int i = 0; mqtt_list_t *curr, *next; message_handlers_t *msg_handler; diff --git a/mqttclient/mqttclient.h b/mqttclient/mqttclient.h index 8a63225..a674a68 100644 --- a/mqttclient/mqttclient.h +++ b/mqttclient/mqttclient.h @@ -61,6 +61,7 @@ typedef struct message_data { typedef void (*interceptor_handler_t)(void* client, message_data_t* msg); typedef void (*message_handler_t)(void* client, message_data_t* msg); typedef void (*reconnect_handler_t)(void* client, void* reconnect_date); +typedef void (*connect_handler_t)(void* client, void* connect_date); typedef struct message_handlers { mqtt_list_t list; @@ -94,6 +95,7 @@ typedef struct mqtt_client { char *mqtt_port; char *mqtt_ca; void *mqtt_reconnect_data; + void *mqtt_connect_data; uint8_t *mqtt_read_buf; uint8_t *mqtt_write_buf; uint16_t mqtt_keep_alive_interval; @@ -122,6 +124,7 @@ typedef struct mqtt_client { platform_timer_t mqtt_last_received; reconnect_handler_t mqtt_reconnect_handler; interceptor_handler_t mqtt_interceptor_handler; + connect_handler_t mqtt_connect_handler; } mqtt_client_t; @@ -156,6 +159,8 @@ KAWAII_MQTT_CLIENT_SET_STATEMENT(write_buf_size, uint32_t) KAWAII_MQTT_CLIENT_SET_STATEMENT(reconnect_try_duration, uint32_t) KAWAII_MQTT_CLIENT_SET_STATEMENT(reconnect_handler, reconnect_handler_t) KAWAII_MQTT_CLIENT_SET_STATEMENT(interceptor_handler, interceptor_handler_t) +KAWAII_MQTT_CLIENT_SET_STATEMENT(connect_data, void*) +KAWAII_MQTT_CLIENT_SET_STATEMENT(connect_handler, connect_handler_t) void mqtt_sleep_ms(int ms); mqtt_client_t *mqtt_lease(void); diff --git a/network/nettype_tcp.c b/network/nettype_tcp.c index 2fc1bd0..666f544 100644 --- a/network/nettype_tcp.c +++ b/network/nettype_tcp.c @@ -29,7 +29,10 @@ int nettype_tcp_connect(network_t* n) void nettype_tcp_disconnect(network_t* n) { - if (NULL != n) + if((NULL != n) && (n->socket >= 0)) + { platform_net_socket_close(n->socket); - n->socket = -1; + n->socket = -1; + } + } diff --git a/platform/RT-Thread/platform_memory.c b/platform/RT-Thread/platform_memory.c index dd62b95..bdd6740 100644 --- a/platform/RT-Thread/platform_memory.c +++ b/platform/RT-Thread/platform_memory.c @@ -10,9 +10,12 @@ void *platform_memory_alloc(size_t size) { - char *ptr; + char *ptr = RT_NULL; ptr = rt_malloc(size); - memset(ptr, 0, size); + if(ptr) + { + memset(ptr, 0, size); + } return (void *)ptr; } diff --git a/test/test.c b/test/test.c index e4dbd3d..cd14a68 100644 --- a/test/test.c +++ b/test/test.c @@ -34,7 +34,7 @@ static int mqtt_publish_handle1(mqtt_client_t *client) } -int main(void) +int main_mqtt(void) { mqtt_client_t *client = NULL;