mirror of
https://github.com/eclipse/mosquitto.git
synced 2025-05-10 01:28:55 +08:00
Merge pull request #2144 from abiliojr/fix_duplicates_on_connect
fix duplication of messages during connect
This commit is contained in:
commit
1c6571a83a
@ -1787,6 +1787,8 @@ Client library:
|
|||||||
- Add support for MQTT v3.1.1.
|
- Add support for MQTT v3.1.1.
|
||||||
- Don't quit mosquitto_loop_forever() if broker not available on first
|
- Don't quit mosquitto_loop_forever() if broker not available on first
|
||||||
connect. Closes bug #453293, but requires more work.
|
connect. Closes bug #453293, but requires more work.
|
||||||
|
- Don't reset queued messages state on CONNACK. Fixes bug with duplicate
|
||||||
|
messages on connection.
|
||||||
|
|
||||||
|
|
||||||
1.3.5 - 20141008
|
1.3.5 - 20141008
|
||||||
|
@ -189,7 +189,7 @@ static int mosquitto__reconnect(struct mosquitto *mosq, bool blocking)
|
|||||||
|
|
||||||
packet__cleanup_all(mosq);
|
packet__cleanup_all(mosq);
|
||||||
|
|
||||||
message__reconnect_reset(mosq);
|
message__reconnect_reset(mosq, false);
|
||||||
|
|
||||||
if(mosq->sock != INVALID_SOCKET){
|
if(mosq->sock != INVALID_SOCKET){
|
||||||
net__socket_close(mosq); //close socket
|
net__socket_close(mosq); //close socket
|
||||||
|
@ -106,7 +106,7 @@ int handle__connack(struct mosquitto *mosq)
|
|||||||
mosquitto_property_read_int32(properties, MQTT_PROP_MAXIMUM_PACKET_SIZE, &mosq->maximum_packet_size, false);
|
mosquitto_property_read_int32(properties, MQTT_PROP_MAXIMUM_PACKET_SIZE, &mosq->maximum_packet_size, false);
|
||||||
|
|
||||||
mosq->msgs_out.inflight_quota = mosq->msgs_out.inflight_maximum;
|
mosq->msgs_out.inflight_quota = mosq->msgs_out.inflight_maximum;
|
||||||
message__reconnect_reset(mosq);
|
message__reconnect_reset(mosq, true);
|
||||||
|
|
||||||
connack_callback(mosq, reason_code, connect_flags, properties);
|
connack_callback(mosq, reason_code, connect_flags, properties);
|
||||||
mosquitto_property_free_all(&properties);
|
mosquitto_property_free_all(&properties);
|
||||||
|
@ -137,7 +137,7 @@ int message__queue(struct mosquitto *mosq, struct mosquitto_message_all *message
|
|||||||
return message__release_to_inflight(mosq, dir);
|
return message__release_to_inflight(mosq, dir);
|
||||||
}
|
}
|
||||||
|
|
||||||
void message__reconnect_reset(struct mosquitto *mosq)
|
void message__reconnect_reset(struct mosquitto *mosq, bool update_quota_only)
|
||||||
{
|
{
|
||||||
struct mosquitto_message_all *message, *tmp;
|
struct mosquitto_message_all *message, *tmp;
|
||||||
assert(mosq);
|
assert(mosq);
|
||||||
@ -169,6 +169,7 @@ void message__reconnect_reset(struct mosquitto *mosq)
|
|||||||
message->timestamp = 0;
|
message->timestamp = 0;
|
||||||
if(mosq->msgs_out.inflight_quota != 0){
|
if(mosq->msgs_out.inflight_quota != 0){
|
||||||
util__decrement_send_quota(mosq);
|
util__decrement_send_quota(mosq);
|
||||||
|
if (update_quota_only == false){
|
||||||
if(message->msg.qos == 1){
|
if(message->msg.qos == 1){
|
||||||
message->state = mosq_ms_publish_qos1;
|
message->state = mosq_ms_publish_qos1;
|
||||||
}else if(message->msg.qos == 2){
|
}else if(message->msg.qos == 2){
|
||||||
@ -179,6 +180,7 @@ void message__reconnect_reset(struct mosquitto *mosq)
|
|||||||
}
|
}
|
||||||
/* Should be able to preserve state. */
|
/* Should be able to preserve state. */
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}else{
|
}else{
|
||||||
message->state = mosq_ms_invalid;
|
message->state = mosq_ms_invalid;
|
||||||
}
|
}
|
||||||
|
@ -25,7 +25,7 @@ void message__cleanup_all(struct mosquitto *mosq);
|
|||||||
void message__cleanup(struct mosquitto_message_all **message);
|
void message__cleanup(struct mosquitto_message_all **message);
|
||||||
int message__delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, int qos);
|
int message__delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, int qos);
|
||||||
int message__queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir);
|
int message__queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir);
|
||||||
void message__reconnect_reset(struct mosquitto *mosq);
|
void message__reconnect_reset(struct mosquitto *mosq, bool update_quota_only);
|
||||||
int message__release_to_inflight(struct mosquitto *mosq, enum mosquitto_msg_direction dir);
|
int message__release_to_inflight(struct mosquitto *mosq, enum mosquitto_msg_direction dir);
|
||||||
int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message, int qos);
|
int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message, int qos);
|
||||||
void message__retry_check(struct mosquitto *mosq);
|
void message__retry_check(struct mosquitto *mosq);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user