1
0
mirror of https://github.com/eclipse/mosquitto.git synced 2025-05-09 01:01:11 +08:00

Fix potential deadlock in mosquitto_sub if -W is used.

Closes #3175. Thanks to Audric Schiltknecht
This commit is contained in:
Roger A. Light 2025-02-27 00:02:26 +00:00
parent 167d0c3550
commit cd0987a661
14 changed files with 191 additions and 128 deletions

View File

@ -9,6 +9,9 @@ Broker:
- Fix mismatched wrapped/unwrapped memory alloc/free in properties. Closes #3192. - Fix mismatched wrapped/unwrapped memory alloc/free in properties. Closes #3192.
- Fix `allow_anonymous false` not being applied in local only mode. Closes #3198. - Fix `allow_anonymous false` not being applied in local only mode. Closes #3198.
Client library:
- Fix potential deadlock in mosquitto_sub if `-W` is used. Closes #3175.
Apps: Apps:
- mosquitto_ctrl dynsec now also allows `-i` to specify a clientid as well as - mosquitto_ctrl dynsec now also allows `-i` to specify a clientid as well as
`-c`. This matches the documentation which states `-i`. Closes #3219. `-c`. This matches the documentation which states `-i`. Closes #3219.

View File

@ -267,6 +267,9 @@ int mosquitto_disconnect_v5(struct mosquitto *mosq, int reason_code, const mosqu
void do_client_disconnect(struct mosquitto *mosq, int reason_code, const mosquitto_property *properties) void do_client_disconnect(struct mosquitto *mosq, int reason_code, const mosquitto_property *properties)
{ {
void (*on_disconnect)(struct mosquitto *, void *userdata, int rc);
void (*on_disconnect_v5)(struct mosquitto *, void *userdata, int rc, const mosquitto_property *props);
mosquitto__set_state(mosq, mosq_cs_disconnected); mosquitto__set_state(mosq, mosq_cs_disconnected);
net__socket_close(mosq); net__socket_close(mosq);
@ -287,17 +290,20 @@ void do_client_disconnect(struct mosquitto *mosq, int reason_code, const mosquit
COMPAT_pthread_mutex_unlock(&mosq->msgtime_mutex); COMPAT_pthread_mutex_unlock(&mosq->msgtime_mutex);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex); COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_disconnect){ on_disconnect = mosq->on_disconnect;
mosq->in_callback = true; on_disconnect_v5 = mosq->on_disconnect_v5;
mosq->on_disconnect(mosq, mosq->userdata, reason_code);
mosq->in_callback = false;
}
if(mosq->on_disconnect_v5){
mosq->in_callback = true;
mosq->on_disconnect_v5(mosq, mosq->userdata, reason_code, properties);
mosq->in_callback = false;
}
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex); COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_disconnect){
mosq->in_callback = true;
on_disconnect(mosq, mosq->userdata, reason_code);
mosq->in_callback = false;
}
if(on_disconnect_v5){
mosq->in_callback = true;
on_disconnect_v5(mosq, mosq->userdata, reason_code, properties);
mosq->in_callback = false;
}
COMPAT_pthread_mutex_unlock(&mosq->current_out_packet_mutex); COMPAT_pthread_mutex_unlock(&mosq->current_out_packet_mutex);
} }

View File

@ -32,27 +32,35 @@ Contributors:
static void connack_callback(struct mosquitto *mosq, uint8_t reason_code, uint8_t connect_flags, const mosquitto_property *properties) static void connack_callback(struct mosquitto *mosq, uint8_t reason_code, uint8_t connect_flags, const mosquitto_property *properties)
{ {
void (*on_connect)(struct mosquitto *, void *userdata, int rc);
void (*on_connect_with_flags)(struct mosquitto *, void *userdata, int rc, int flags);
void (*on_connect_v5)(struct mosquitto *, void *userdata, int rc, int flags, const mosquitto_property *props);
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received CONNACK (%d)", SAFE_PRINT(mosq->id), reason_code); log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received CONNACK (%d)", SAFE_PRINT(mosq->id), reason_code);
if(reason_code == MQTT_RC_SUCCESS){ if(reason_code == MQTT_RC_SUCCESS){
mosq->reconnects = 0; mosq->reconnects = 0;
} }
COMPAT_pthread_mutex_lock(&mosq->callback_mutex); COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_connect){ on_connect = mosq->on_connect;
mosq->in_callback = true; on_connect_with_flags = mosq->on_connect_with_flags;
mosq->on_connect(mosq, mosq->userdata, reason_code); on_connect_v5 = mosq->on_connect_v5;
mosq->in_callback = false;
}
if(mosq->on_connect_with_flags){
mosq->in_callback = true;
mosq->on_connect_with_flags(mosq, mosq->userdata, reason_code, connect_flags);
mosq->in_callback = false;
}
if(mosq->on_connect_v5){
mosq->in_callback = true;
mosq->on_connect_v5(mosq, mosq->userdata, reason_code, connect_flags, properties);
mosq->in_callback = false;
}
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex); COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_connect){
mosq->in_callback = true;
on_connect(mosq, mosq->userdata, reason_code);
mosq->in_callback = false;
}
if(on_connect_with_flags){
mosq->in_callback = true;
on_connect_with_flags(mosq, mosq->userdata, reason_code, connect_flags);
mosq->in_callback = false;
}
if(on_connect_v5){
mosq->in_callback = true;
on_connect_v5(mosq, mosq->userdata, reason_code, connect_flags, properties);
mosq->in_callback = false;
}
} }

View File

@ -136,19 +136,25 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type)
rc = message__delete(mosq, mid, mosq_md_out, qos); rc = message__delete(mosq, mid, mosq_md_out, qos);
if(rc == MOSQ_ERR_SUCCESS){ if(rc == MOSQ_ERR_SUCCESS){
void (*on_publish)(struct mosquitto *, void *userdata, int mid);
void (*on_publish_v5)(struct mosquitto *, void *userdata, int mid, int reason_code, const mosquitto_property *props);
/* Only inform the client the message has been sent once. */ /* Only inform the client the message has been sent once. */
COMPAT_pthread_mutex_lock(&mosq->callback_mutex); COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_publish){ on_publish = mosq->on_publish;
mosq->in_callback = true; on_publish_v5 = mosq->on_publish_v5;
mosq->on_publish(mosq, mosq->userdata, mid);
mosq->in_callback = false;
}
if(mosq->on_publish_v5){
mosq->in_callback = true;
mosq->on_publish_v5(mosq, mosq->userdata, mid, reason_code, properties);
mosq->in_callback = false;
}
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex); COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_publish){
mosq->in_callback = true;
on_publish(mosq, mosq->userdata, mid);
mosq->in_callback = false;
}
if(on_publish_v5){
mosq->in_callback = true;
on_publish_v5(mosq, mosq->userdata, mid, reason_code, properties);
mosq->in_callback = false;
}
mosquitto_property_free_all(&properties); mosquitto_property_free_all(&properties);
}else if(rc != MOSQ_ERR_NOT_FOUND){ }else if(rc != MOSQ_ERR_NOT_FOUND){
return rc; return rc;

View File

@ -120,20 +120,24 @@ int handle__publish(struct mosquitto *mosq)
(long)message->msg.payloadlen); (long)message->msg.payloadlen);
message->timestamp = mosquitto_time(); message->timestamp = mosquitto_time();
void (*on_message)(struct mosquitto *, void *userdata, const struct mosquitto_message *message);
void (*on_message_v5)(struct mosquitto *, void *userdata, const struct mosquitto_message *message, const mosquitto_property *props);
switch(message->msg.qos){ switch(message->msg.qos){
case 0: case 0:
COMPAT_pthread_mutex_lock(&mosq->callback_mutex); COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_message){ on_message = mosq->on_message;
on_message_v5 = mosq->on_message_v5;
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_message){
mosq->in_callback = true; mosq->in_callback = true;
mosq->on_message(mosq, mosq->userdata, &message->msg); on_message(mosq, mosq->userdata, &message->msg);
mosq->in_callback = false; mosq->in_callback = false;
} }
if(mosq->on_message_v5){ if(mosq->on_message_v5){
mosq->in_callback = true; mosq->in_callback = true;
mosq->on_message_v5(mosq, mosq->userdata, &message->msg, properties); on_message_v5(mosq, mosq->userdata, &message->msg, properties);
mosq->in_callback = false; mosq->in_callback = false;
} }
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
message__cleanup(&message); message__cleanup(&message);
mosquitto_property_free_all(&properties); mosquitto_property_free_all(&properties);
return MOSQ_ERR_SUCCESS; return MOSQ_ERR_SUCCESS;
@ -141,17 +145,19 @@ int handle__publish(struct mosquitto *mosq)
util__decrement_receive_quota(mosq); util__decrement_receive_quota(mosq);
rc = send__puback(mosq, mid, 0, NULL); rc = send__puback(mosq, mid, 0, NULL);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex); COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_message){ on_message = mosq->on_message;
mosq->in_callback = true; on_message_v5 = mosq->on_message_v5;
mosq->on_message(mosq, mosq->userdata, &message->msg);
mosq->in_callback = false;
}
if(mosq->on_message_v5){
mosq->in_callback = true;
mosq->on_message_v5(mosq, mosq->userdata, &message->msg, properties);
mosq->in_callback = false;
}
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex); COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_message){
mosq->in_callback = true;
on_message(mosq, mosq->userdata, &message->msg);
mosq->in_callback = false;
}
if(on_message_v5){
mosq->in_callback = true;
on_message_v5(mosq, mosq->userdata, &message->msg, properties);
mosq->in_callback = false;
}
message__cleanup(&message); message__cleanup(&message);
mosquitto_property_free_all(&properties); mosquitto_property_free_all(&properties);
return rc; return rc;

View File

@ -107,13 +107,16 @@ int handle__pubrec(struct mosquitto *mosq)
}else{ }else{
if(!message__delete(mosq, mid, mosq_md_out, 2)){ if(!message__delete(mosq, mid, mosq_md_out, 2)){
/* Only inform the client the message has been sent once. */ /* Only inform the client the message has been sent once. */
void (*on_publish_v5)(struct mosquitto *, void *userdata, int mid, int reason_code, const mosquitto_property *props);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex); COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_publish_v5){ on_publish_v5 = mosq->on_publish_v5;
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_publish_v5){
mosq->in_callback = true; mosq->in_callback = true;
mosq->on_publish_v5(mosq, mosq->userdata, mid, reason_code, properties); on_publish_v5(mosq, mosq->userdata, mid, reason_code, properties);
mosq->in_callback = false; mosq->in_callback = false;
} }
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
} }
util__increment_send_quota(mosq); util__increment_send_quota(mosq);
COMPAT_pthread_mutex_lock(&mosq->msgs_out.mutex); COMPAT_pthread_mutex_lock(&mosq->msgs_out.mutex);

View File

@ -116,18 +116,22 @@ int handle__pubrel(struct mosquitto *mosq)
if(rc == MOSQ_ERR_SUCCESS){ if(rc == MOSQ_ERR_SUCCESS){
/* Only pass the message on if we have removed it from the queue - this /* Only pass the message on if we have removed it from the queue - this
* prevents multiple callbacks for the same message. */ * prevents multiple callbacks for the same message. */
void (*on_message)(struct mosquitto *, void *userdata, const struct mosquitto_message *message);
void (*on_message_v5)(struct mosquitto *, void *userdata, const struct mosquitto_message *message, const mosquitto_property *props);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex); COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_message){ on_message = mosq->on_message;
mosq->in_callback = true; on_message_v5 = mosq->on_message_v5;
mosq->on_message(mosq, mosq->userdata, &message->msg);
mosq->in_callback = false;
}
if(mosq->on_message_v5){
mosq->in_callback = true;
mosq->on_message_v5(mosq, mosq->userdata, &message->msg, message->properties);
mosq->in_callback = false;
}
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex); COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_message){
mosq->in_callback = true;
on_message(mosq, mosq->userdata, &message->msg);
mosq->in_callback = false;
}
if(on_message_v5){
mosq->in_callback = true;
on_message_v5(mosq, mosq->userdata, &message->msg, message->properties);
mosq->in_callback = false;
}
mosquitto_property_free_all(&properties); mosquitto_property_free_all(&properties);
message__cleanup(&message); message__cleanup(&message);
}else if(rc == MOSQ_ERR_NOT_FOUND){ }else if(rc == MOSQ_ERR_NOT_FOUND){

View File

@ -97,18 +97,22 @@ int handle__suback(struct mosquitto *mosq)
/* Immediately free, we don't do anything with Reason String or User Property at the moment */ /* Immediately free, we don't do anything with Reason String or User Property at the moment */
mosquitto_property_free_all(&properties); mosquitto_property_free_all(&properties);
#else #else
void (*on_subscribe)(struct mosquitto *, void *userdata, int mid, int qos_count, const int *granted_qos);
void (*on_subscribe_v5)(struct mosquitto *, void *userdata, int mid, int qos_count, const int *granted_qos, const mosquitto_property *props);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex); COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_subscribe){ on_subscribe = mosq->on_subscribe;
mosq->in_callback = true; on_subscribe_v5 = mosq->on_subscribe_v5;
mosq->on_subscribe(mosq, mosq->userdata, mid, qos_count, granted_qos);
mosq->in_callback = false;
}
if(mosq->on_subscribe_v5){
mosq->in_callback = true;
mosq->on_subscribe_v5(mosq, mosq->userdata, mid, qos_count, granted_qos, properties);
mosq->in_callback = false;
}
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex); COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_subscribe){
mosq->in_callback = true;
on_subscribe(mosq, mosq->userdata, mid, qos_count, granted_qos);
mosq->in_callback = false;
}
if(on_subscribe_v5){
mosq->in_callback = true;
on_subscribe_v5(mosq, mosq->userdata, mid, qos_count, granted_qos, properties);
mosq->in_callback = false;
}
mosquitto_property_free_all(&properties); mosquitto_property_free_all(&properties);
#endif #endif
mosquitto__free(granted_qos); mosquitto__free(granted_qos);

View File

@ -76,18 +76,22 @@ int handle__unsuback(struct mosquitto *mosq)
/* Immediately free, we don't do anything with Reason String or User Property at the moment */ /* Immediately free, we don't do anything with Reason String or User Property at the moment */
mosquitto_property_free_all(&properties); mosquitto_property_free_all(&properties);
#else #else
void (*on_unsubscribe)(struct mosquitto *, void *userdata, int mid);
void (*on_unsubscribe_v5)(struct mosquitto *, void *userdata, int mid, const mosquitto_property *props);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex); COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_unsubscribe){ on_unsubscribe = mosq->on_unsubscribe;
mosq->in_callback = true; on_unsubscribe_v5 = mosq->on_unsubscribe_v5;
mosq->on_unsubscribe(mosq, mosq->userdata, mid);
mosq->in_callback = false;
}
if(mosq->on_unsubscribe_v5){
mosq->in_callback = true;
mosq->on_unsubscribe_v5(mosq, mosq->userdata, mid, properties);
mosq->in_callback = false;
}
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex); COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_unsubscribe){
mosq->in_callback = true;
on_unsubscribe(mosq, mosq->userdata, mid);
mosq->in_callback = false;
}
if(on_unsubscribe_v5){
mosq->in_callback = true;
on_unsubscribe_v5(mosq, mosq->userdata, mid, properties);
mosq->in_callback = false;
}
mosquitto_property_free_all(&properties); mosquitto_property_free_all(&properties);
#endif #endif

View File

@ -33,16 +33,19 @@ int log__printf(struct mosquitto *mosq, unsigned int priority, const char *fmt,
va_list va; va_list va;
char *s; char *s;
size_t len; size_t len;
void (*on_log)(struct mosquitto *, void *userdata, int level, const char *str);
assert(mosq); assert(mosq);
assert(fmt); assert(fmt);
COMPAT_pthread_mutex_lock(&mosq->log_callback_mutex); COMPAT_pthread_mutex_lock(&mosq->log_callback_mutex);
if(mosq->on_log){ on_log = mosq->on_log;
COMPAT_pthread_mutex_unlock(&mosq->log_callback_mutex);
if(on_log){
len = strlen(fmt) + 500; len = strlen(fmt) + 500;
s = mosquitto__malloc(len*sizeof(char)); s = mosquitto__malloc(len*sizeof(char));
if(!s){ if(!s){
COMPAT_pthread_mutex_unlock(&mosq->log_callback_mutex);
return MOSQ_ERR_NOMEM; return MOSQ_ERR_NOMEM;
} }
@ -51,11 +54,10 @@ int log__printf(struct mosquitto *mosq, unsigned int priority, const char *fmt,
va_end(va); va_end(va);
s[len-1] = '\0'; /* Ensure string is null terminated. */ s[len-1] = '\0'; /* Ensure string is null terminated. */
mosq->on_log(mosq, mosq->userdata, (int)priority, s); on_log(mosq, mosq->userdata, (int)priority, s);
mosquitto__free(s); mosquitto__free(s);
} }
COMPAT_pthread_mutex_unlock(&mosq->log_callback_mutex);
return MOSQ_ERR_SUCCESS; return MOSQ_ERR_SUCCESS;
} }

View File

@ -335,18 +335,23 @@ static int mosquitto__loop_rc_handle(struct mosquitto *mosq, int rc)
if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){ if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){
rc = MOSQ_ERR_SUCCESS; rc = MOSQ_ERR_SUCCESS;
} }
void (*on_disconnect)(struct mosquitto *, void *userdata, int rc);
void (*on_disconnect_v5)(struct mosquitto *, void *userdata, int rc, const mosquitto_property *props);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex); COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_disconnect){ on_disconnect = mosq->on_disconnect;
mosq->in_callback = true; on_disconnect_v5 = mosq->on_disconnect_v5;
mosq->on_disconnect(mosq, mosq->userdata, rc);
mosq->in_callback = false;
}
if(mosq->on_disconnect_v5){
mosq->in_callback = true;
mosq->on_disconnect_v5(mosq, mosq->userdata, rc, NULL);
mosq->in_callback = false;
}
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex); COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_disconnect){
mosq->in_callback = true;
on_disconnect(mosq, mosq->userdata, rc);
mosq->in_callback = false;
}
if(on_disconnect_v5){
mosq->in_callback = true;
on_disconnect_v5(mosq, mosq->userdata, rc, NULL);
mosq->in_callback = false;
}
} }
return rc; return rc;
} }

View File

@ -296,20 +296,24 @@ int packet__write(struct mosquitto *mosq)
if(((packet->command)&0xF6) == CMD_PUBLISH){ if(((packet->command)&0xF6) == CMD_PUBLISH){
G_PUB_MSGS_SENT_INC(1); G_PUB_MSGS_SENT_INC(1);
#ifndef WITH_BROKER #ifndef WITH_BROKER
void (*on_publish)(struct mosquitto *, void *userdata, int mid);
void (*on_publish_v5)(struct mosquitto *, void *userdata, int mid, int reason_code, const mosquitto_property *props);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex); COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_publish){ on_publish = mosq->on_publish;
/* This is a QoS=0 message */ on_publish_v5 = mosq->on_publish_v5;
mosq->in_callback = true;
mosq->on_publish(mosq, mosq->userdata, packet->mid);
mosq->in_callback = false;
}
if(mosq->on_publish_v5){
/* This is a QoS=0 message */
mosq->in_callback = true;
mosq->on_publish_v5(mosq, mosq->userdata, packet->mid, 0, NULL);
mosq->in_callback = false;
}
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex); COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_publish){
/* This is a QoS=0 message */
mosq->in_callback = true;
on_publish(mosq, mosq->userdata, packet->mid);
mosq->in_callback = false;
}
if(on_publish_v5){
/* This is a QoS=0 message */
mosq->in_callback = true;
on_publish_v5(mosq, mosq->userdata, packet->mid, 0, NULL);
mosq->in_callback = false;
}
}else if(((packet->command)&0xF0) == CMD_DISCONNECT){ }else if(((packet->command)&0xF0) == CMD_DISCONNECT){
do_client_disconnect(mosq, MOSQ_ERR_SUCCESS, NULL); do_client_disconnect(mosq, MOSQ_ERR_SUCCESS, NULL);
packet__cleanup(packet); packet__cleanup(packet);

View File

@ -49,18 +49,22 @@ static void srv_callback(void *arg, int status, int timeouts, unsigned char *abu
}else{ }else{
log__printf(mosq, MOSQ_LOG_ERR, "Error: SRV lookup failed (%d).", status); log__printf(mosq, MOSQ_LOG_ERR, "Error: SRV lookup failed (%d).", status);
/* FIXME - calling on_disconnect here isn't correct. */ /* FIXME - calling on_disconnect here isn't correct. */
void (*on_disconnect)(struct mosquitto *, void *userdata, int rc);
void (*on_disconnect_v5)(struct mosquitto *, void *userdata, int rc, const mosquitto_property *props);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex); COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_disconnect){ on_disconnect = mosq->on_disconnect;
mosq->in_callback = true; on_disconnect_v5 = mosq->on_disconnect_v5;
mosq->on_disconnect(mosq, mosq->userdata, MOSQ_ERR_LOOKUP);
mosq->in_callback = false;
}
if(mosq->on_disconnect_v5){
mosq->in_callback = true;
mosq->on_disconnect_v5(mosq, mosq->userdata, MOSQ_ERR_LOOKUP, NULL);
mosq->in_callback = false;
}
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex); COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_disconnect){
mosq->in_callback = true;
on_disconnect(mosq, mosq->userdata, MOSQ_ERR_LOOKUP);
mosq->in_callback = false;
}
if(on_disconnect_v5){
mosq->in_callback = true;
on_disconnect_v5(mosq, mosq->userdata, MOSQ_ERR_LOOKUP, NULL);
mosq->in_callback = false;
}
} }
} }
#endif #endif

View File

@ -118,18 +118,22 @@ int mosquitto__check_keepalive(struct mosquitto *mosq)
}else{ }else{
rc = MOSQ_ERR_KEEPALIVE; rc = MOSQ_ERR_KEEPALIVE;
} }
void (*on_disconnect)(struct mosquitto *, void *userdata, int rc);
void (*on_disconnect_v5)(struct mosquitto *, void *userdata, int rc, const mosquitto_property *props);
COMPAT_pthread_mutex_lock(&mosq->callback_mutex); COMPAT_pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_disconnect){ on_disconnect = mosq->on_disconnect;
mosq->in_callback = true; on_disconnect_v5 = mosq->on_disconnect_v5;
mosq->on_disconnect(mosq, mosq->userdata, rc);
mosq->in_callback = false;
}
if(mosq->on_disconnect_v5){
mosq->in_callback = true;
mosq->on_disconnect_v5(mosq, mosq->userdata, rc, NULL);
mosq->in_callback = false;
}
COMPAT_pthread_mutex_unlock(&mosq->callback_mutex); COMPAT_pthread_mutex_unlock(&mosq->callback_mutex);
if(on_disconnect){
mosq->in_callback = true;
on_disconnect(mosq, mosq->userdata, rc);
mosq->in_callback = false;
}
if(on_disconnect_v5){
mosq->in_callback = true;
on_disconnect_v5(mosq, mosq->userdata, rc, NULL);
mosq->in_callback = false;
}
return rc; return rc;
#endif #endif