diff --git a/ChangeLog.txt b/ChangeLog.txt index 1701c5e4..880ef88a 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1787,6 +1787,8 @@ Client library: - Add support for MQTT v3.1.1. - Don't quit mosquitto_loop_forever() if broker not available on first connect. Closes bug #453293, but requires more work. +- Don't reset queued messages state on CONNACK. Fixes bug with duplicate + messages on connection. 1.3.5 - 20141008 diff --git a/lib/connect.c b/lib/connect.c index 196fde47..c668bf3e 100644 --- a/lib/connect.c +++ b/lib/connect.c @@ -189,7 +189,7 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking) packet__cleanup_all(mosq); - message__reconnect_reset(mosq); + message__reconnect_reset(mosq, false); if(mosq->sock != INVALID_SOCKET){ net__socket_close(mosq); //close socket diff --git a/lib/handle_connack.c b/lib/handle_connack.c index 6dca1395..5380243d 100644 --- a/lib/handle_connack.c +++ b/lib/handle_connack.c @@ -106,7 +106,7 @@ int handle__connack(struct mosquitto *mosq) mosquitto_property_read_int32(properties, MQTT_PROP_MAXIMUM_PACKET_SIZE, &mosq->maximum_packet_size, false); mosq->msgs_out.inflight_quota = mosq->msgs_out.inflight_maximum; - message__reconnect_reset(mosq); + message__reconnect_reset(mosq, true); connack_callback(mosq, reason_code, connect_flags, properties); mosquitto_property_free_all(&properties); diff --git a/lib/messages_mosq.c b/lib/messages_mosq.c index ff28a915..8773c968 100644 --- a/lib/messages_mosq.c +++ b/lib/messages_mosq.c @@ -137,7 +137,7 @@ int message__queue(struct mosquitto *mosq, struct mosquitto_message_all *message return message__release_to_inflight(mosq, dir); } -void message__reconnect_reset(struct mosquitto *mosq) +void message__reconnect_reset(struct mosquitto *mosq, bool update_quota_only) { struct mosquitto_message_all *message, *tmp; assert(mosq); @@ -169,15 +169,17 @@ void message__reconnect_reset(struct mosquitto *mosq) message->timestamp = 0; if(mosq->msgs_out.inflight_quota != 0){ util__decrement_send_quota(mosq); - if(message->msg.qos == 1){ - message->state = mosq_ms_publish_qos1; - }else if(message->msg.qos == 2){ - if(message->state == mosq_ms_wait_for_pubrec){ - message->state = mosq_ms_publish_qos2; - }else if(message->state == mosq_ms_wait_for_pubcomp){ - message->state = mosq_ms_resend_pubrel; + if (update_quota_only == false){ + if(message->msg.qos == 1){ + message->state = mosq_ms_publish_qos1; + }else if(message->msg.qos == 2){ + if(message->state == mosq_ms_wait_for_pubrec){ + message->state = mosq_ms_publish_qos2; + }else if(message->state == mosq_ms_wait_for_pubcomp){ + message->state = mosq_ms_resend_pubrel; + } + /* Should be able to preserve state. */ } - /* Should be able to preserve state. */ } }else{ message->state = mosq_ms_invalid; diff --git a/lib/messages_mosq.h b/lib/messages_mosq.h index 5689b49e..ee5ba5cd 100644 --- a/lib/messages_mosq.h +++ b/lib/messages_mosq.h @@ -25,7 +25,7 @@ void message__cleanup_all(struct mosquitto *mosq); void message__cleanup(struct mosquitto_message_all **message); int message__delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, int qos); int message__queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir); -void message__reconnect_reset(struct mosquitto *mosq); +void message__reconnect_reset(struct mosquitto *mosq, bool update_quota_only); int message__release_to_inflight(struct mosquitto *mosq, enum mosquitto_msg_direction dir); int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message, int qos); void message__retry_check(struct mosquitto *mosq);