1
0
mirror of https://github.com/eclipse/mosquitto.git synced 2025-05-08 08:40:13 +08:00

Backport keepalive check from develop branch.

Closes #3138
This commit is contained in:
Roger A. Light 2025-02-27 13:50:29 +00:00
parent 87488a27f0
commit 4304ac0af0
12 changed files with 188 additions and 15 deletions

View File

@ -12,6 +12,8 @@ Broker:
being removed from memory if they are not subscribed to. Closes #3221. being removed from memory if they are not subscribed to. Closes #3221.
- Produce an error if invalid combinations of cafile/capath/certfile/keyfile - Produce an error if invalid combinations of cafile/capath/certfile/keyfile
are used. Closes #1836. Closes #3130. are used. Closes #1836. Closes #3130.
- Backport keepalive checking from develop to fix problems in current
implementation. Closes #3138.
Client library: Client library:
- Fix potential deadlock in mosquitto_sub if `-W` is used. Closes #3175. - Fix potential deadlock in mosquitto_sub if `-W` is used. Closes #3175.

View File

@ -120,6 +120,10 @@ WITH_JEMALLOC:=no
# probably of no particular interest to end users. # probably of no particular interest to end users.
WITH_XTREPORT=no WITH_XTREPORT=no
# Use the old O(n) keepalive check routine, instead of the new O(1) keepalive
# check routine. See src/keepalive.c for notes on this.
WITH_OLD_KEEPALIVE=no
# Build using clang and with address sanitiser enabled # Build using clang and with address sanitiser enabled
WITH_ASAN=no WITH_ASAN=no
@ -388,6 +392,10 @@ ifeq ($(WITH_XTREPORT),yes)
BROKER_CFLAGS:=$(BROKER_CFLAGS) -DWITH_XTREPORT BROKER_CFLAGS:=$(BROKER_CFLAGS) -DWITH_XTREPORT
endif endif
ifeq ($(WITH_OLD_KEEPALIVE),yes)
BROKER_CPPFLAGS:=$(BROKER_CPPFLAGS) -DWITH_OLD_KEEPALIVE
endif
BROKER_LDADD:=${BROKER_LDADD} ${LDADD} BROKER_LDADD:=${BROKER_LDADD} ${LDADD}
CLIENT_LDADD:=${CLIENT_LDADD} ${LDADD} CLIENT_LDADD:=${CLIENT_LDADD} ${LDADD}
PASSWD_LDADD:=${PASSWD_LDADD} ${LDADD} PASSWD_LDADD:=${PASSWD_LDADD} ${LDADD}

View File

@ -353,6 +353,10 @@ struct mosquitto {
struct mosquitto *for_free_next; struct mosquitto *for_free_next;
struct session_expiry_list *expiry_list_item; struct session_expiry_list *expiry_list_item;
uint16_t remote_port; uint16_t remote_port;
# ifndef WITH_OLD_KEEPALIVE
struct mosquitto *keepalive_next;
struct mosquitto *keepalive_prev;
# endif
#endif #endif
uint32_t events; uint32_t events;
}; };

View File

@ -198,6 +198,15 @@ void net__init_tls(void)
} }
#endif #endif
bool net__is_connected(struct mosquitto *mosq)
{
#if defined(WITH_BROKER) && defined(WITH_WEBSOCKETS)
return mosq->sock != INVALID_SOCKET || mosq->wsi != NULL;
#else
return mosq->sock != INVALID_SOCKET;
#endif
}
/* Close a socket associated with a context and set it to -1. /* Close a socket associated with a context and set it to -1.
* Returns 1 on failure (context is NULL) * Returns 1 on failure (context is NULL)
* Returns 0 on success. * Returns 0 on success.

View File

@ -75,6 +75,7 @@ int net__try_connect_step2(struct mosquitto *mosq, uint16_t port, mosq_sock_t *s
int net__socket_connect_step3(struct mosquitto *mosq, const char *host); int net__socket_connect_step3(struct mosquitto *mosq, const char *host);
int net__socket_nonblock(mosq_sock_t *sock); int net__socket_nonblock(mosq_sock_t *sock);
int net__socketpair(mosq_sock_t *sp1, mosq_sock_t *sp2); int net__socketpair(mosq_sock_t *sp1, mosq_sock_t *sp2);
bool net__is_connected(struct mosquitto *mosq);
ssize_t net__read(struct mosquitto *mosq, void *buf, size_t count); ssize_t net__read(struct mosquitto *mosq, void *buf, size_t count);
ssize_t net__write(struct mosquitto *mosq, const void *buf, size_t count); ssize_t net__write(struct mosquitto *mosq, const void *buf, size_t count);

View File

@ -139,6 +139,7 @@ void context__cleanup(struct mosquitto *context, bool force_free)
#endif #endif
alias__free_all(context); alias__free_all(context);
keepalive__remove(context);
context__cleanup_out_packets(context); context__cleanup_out_packets(context);
mosquitto__free(context->auth_method); mosquitto__free(context->auth_method);

View File

@ -269,7 +269,9 @@ int connect__on_authorised(struct mosquitto *context, void *auth_data_out, uint1
if(db.config->max_keepalive && if(db.config->max_keepalive &&
(context->keepalive > db.config->max_keepalive || context->keepalive == 0)){ (context->keepalive > db.config->max_keepalive || context->keepalive == 0)){
keepalive__remove(context);
context->keepalive = db.config->max_keepalive; context->keepalive = db.config->max_keepalive;
keepalive__add(context);
if(context->protocol == mosq_p_mqtt5){ if(context->protocol == mosq_p_mqtt5){
if(mosquitto_property_add_int16(&connack_props, MQTT_PROP_SERVER_KEEP_ALIVE, context->keepalive)){ if(mosquitto_property_add_int16(&connack_props, MQTT_PROP_SERVER_KEEP_ALIVE, context->keepalive)){
rc = MOSQ_ERR_NOMEM; rc = MOSQ_ERR_NOMEM;
@ -312,8 +314,6 @@ int connect__on_authorised(struct mosquitto *context, void *auth_data_out, uint1
free(auth_data_out); free(auth_data_out);
auth_data_out = NULL; auth_data_out = NULL;
keepalive__add(context);
mosquitto__set_state(context, mosq_cs_active); mosquitto__set_state(context, mosq_cs_active);
rc = send__connack(context, connect_ack, CONNACK_ACCEPTED, connack_props); rc = send__connack(context, connect_ack, CONNACK_ACCEPTED, connack_props);
mosquitto_property_free_all(&connack_props); mosquitto_property_free_all(&connack_props);
@ -577,10 +577,15 @@ int handle__connect(struct mosquitto *context)
goto handle_connect_error; goto handle_connect_error;
} }
/* _remove here because net__socket_accept() uses _add and we must have the
* correct keepalive value */
keepalive__remove(context);
if(packet__read_uint16(&context->in_packet, &(context->keepalive))){ if(packet__read_uint16(&context->in_packet, &(context->keepalive))){
rc = MOSQ_ERR_PROTOCOL; rc = MOSQ_ERR_PROTOCOL;
goto handle_connect_error; goto handle_connect_error;
} }
keepalive__add(context);
if(protocol_version == PROTOCOL_VERSION_v5){ if(protocol_version == PROTOCOL_VERSION_v5){
rc = property__read_all(CMD_CONNECT, &context->in_packet, &properties); rc = property__read_all(CMD_CONNECT, &context->in_packet, &properties);

View File

@ -1,5 +1,5 @@
/* /*
Copyright (c) 2009-2020 Roger Light <roger@atchoo.org> Copyright (c) 2009-2021 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License 2.0 are made available under the terms of the Eclipse Public License 2.0
@ -19,30 +19,147 @@ Contributors:
#include "config.h" #include "config.h"
#include <time.h> #include <time.h>
#include "mosquitto_broker_internal.h" #include "mosquitto_broker_internal.h"
#include <utlist.h>
/* This contains code for checking whether clients have exceeded their keepalive timeouts.
* There are two versions.
*
* The old version can be used by compiling with `make WITH_OLD_KEEPALIVE=yes`.
* It will scan the entire list of connected clients every 5 seconds to see if
* they have expired. Hence it scales with O(n) and with e.g. 60000 clients can
* have a measurable effect on CPU usage in the low single digit percent range.
*
* The new version scales with O(1). It uses a ring buffer that contains
* max_keepalive*1.5+1 entries. The current time in integer seconds, modulus
* the number of entries, points to the head of the ring buffer. Any clients
* will appear after this point at the position indexed by the time at which
* they will expire if they do not send another message, assuming they do not
* have keepalive==0 - in which case they are not part of this check. So a
* client that connects with keepalive=60 will be added at `now + 60*1.5`.
*
* A client is added to an entry with a doubly linked list. When the client
* sends a new message, it is removed from the old position and added to the
* new.
*
* As time moves on, if the linked list at the current entry is not empty, all
* of the clients are expired.
*
* The ring buffer size is determined by max_keepalive. At the default, it is
* 65535*1.5+1=98303 entries long. On a 64-bit machine that is 786424 bytes.
* If this is too big a burden and you do not need many clients connected, then
* the old check is sufficient. You can reduce the number of entries by setting
* a lower max_keepalive value. A value as low as 600 still gives a 10 minute
* keepalive and reduces the memory for the ring buffer to 7208 bytes.
*
* *NOTE* It is likely that the old check routine will be removed in the
* future, and max_keepalive set to a sensible default value. If this is a
* problem for you please get in touch.
*/
static time_t last_keepalive_check = 0; static time_t last_keepalive_check = 0;
#ifndef WITH_OLD_KEEPALIVE
static int keepalive_list_max = 0;
static struct mosquitto **keepalive_list = NULL;
#endif
/* FIXME - this is the prototype for the future tree/trie based keepalive check implementation. */ #ifndef WITH_OLD_KEEPALIVE
static int calc_index(struct mosquitto *context)
{
return (int)(context->last_msg_in + context->keepalive*3/2) % keepalive_list_max;
}
#endif
int keepalive__init(void)
{
#ifndef WITH_OLD_KEEPALIVE
struct mosquitto *context, *ctxt_tmp;
last_keepalive_check = db.now_s;
if(db.config->max_keepalive <= 0){
keepalive_list_max = (UINT16_MAX * 3)/2 + 1;
}else{
keepalive_list_max = (db.config->max_keepalive * 3)/2 + 1;
}
keepalive_list = mosquitto_calloc((size_t)keepalive_list_max, sizeof(struct mosquitto *));
if(keepalive_list == NULL){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
keepalive_list_max = 0;
return MOSQ_ERR_NOMEM;
}
/* Add existing clients - should only be applicable on MOSQ_EVT_RELOAD */
HASH_ITER(hh_sock, db.contexts_by_sock, context, ctxt_tmp){
if(net__is_connected(context) && !context->bridge && context->keepalive > 0){
keepalive__add(context);
}
}
#endif
return MOSQ_ERR_SUCCESS;
}
void keepalive__cleanup(void)
{
#ifndef WITH_OLD_KEEPALIVE
mosquitto_free(keepalive_list);
keepalive_list = NULL;
keepalive_list_max = 0;
#endif
}
int keepalive__add(struct mosquitto *context) int keepalive__add(struct mosquitto *context)
{ {
UNUSED(context); #ifndef WITH_OLD_KEEPALIVE
if(context->keepalive <= 0 || !net__is_connected(context)) return MOSQ_ERR_SUCCESS;
#ifdef WITH_BRIDGE
if(context->bridge) return MOSQ_ERR_SUCCESS;
#endif
DL_APPEND2(keepalive_list[calc_index(context)], context, keepalive_prev, keepalive_next);
#else
UNUSED(context);
#endif
return MOSQ_ERR_SUCCESS; return MOSQ_ERR_SUCCESS;
} }
#ifndef WITH_OLD_KEEPALIVE
void keepalive__check(void) void keepalive__check(void)
{ {
struct mosquitto *context, *ctxt_tmp; struct mosquitto *context, *ctxt_tmp;
if(last_keepalive_check + 5 < db.now_s){ for(time_t i=last_keepalive_check; i<db.now_s; i++){
int idx = (int)(i % keepalive_list_max);
if(keepalive_list[idx]){
DL_FOREACH_SAFE2(keepalive_list[idx], context, ctxt_tmp, keepalive_next){
if(net__is_connected(context)){
/* Client has exceeded keepalive*1.5 */
do_disconnect(context, MOSQ_ERR_KEEPALIVE);
}
}
}
}
last_keepalive_check = db.now_s;
}
#else
void keepalive__check(void)
{
struct mosquitto *context, *ctxt_tmp;
time_t timeout;
if(db.contexts_by_sock){
timeout = (last_keepalive_check + 5 - db.now_s);
if(timeout <= 0){
timeout = 5;
}
loop__update_next_event(timeout*1000);
}
if(last_keepalive_check + 5 <= db.now_s){
last_keepalive_check = db.now_s; last_keepalive_check = db.now_s;
/* FIXME - this needs replacing with something more efficient */
HASH_ITER(hh_sock, db.contexts_by_sock, context, ctxt_tmp){ HASH_ITER(hh_sock, db.contexts_by_sock, context, ctxt_tmp){
if(context->sock != INVALID_SOCKET){ if(net__is_connected(context)){
/* Local bridges never time out in this fashion. */ /* Local bridges never time out in this fashion. */
if(!(context->keepalive) if(!(context->keepalive)
|| context->bridge || context->bridge
@ -56,23 +173,38 @@ void keepalive__check(void)
} }
} }
} }
#endif
int keepalive__remove(struct mosquitto *context) int keepalive__remove(struct mosquitto *context)
{ {
#ifndef WITH_OLD_KEEPALIVE
int idx;
if(context->keepalive <= 0 || context->keepalive_prev == NULL) return MOSQ_ERR_SUCCESS;
idx = calc_index(context);
if(keepalive_list[idx]){
DL_DELETE2(keepalive_list[idx], context, keepalive_prev, keepalive_next);
context->keepalive_next = NULL;
context->keepalive_prev = NULL;
}
#else
UNUSED(context); UNUSED(context);
#endif
return MOSQ_ERR_SUCCESS; return MOSQ_ERR_SUCCESS;
} }
void keepalive__remove_all(void)
{
}
int keepalive__update(struct mosquitto *context) int keepalive__update(struct mosquitto *context)
{ {
#ifndef WITH_OLD_KEEPALIVE
keepalive__remove(context);
/* coverity[missing_lock] - broker is single threaded, so no lock required */
context->last_msg_in = db.now_s; context->last_msg_in = db.now_s;
keepalive__add(context);
#else
UNUSED(context);
#endif
return MOSQ_ERR_SUCCESS; return MOSQ_ERR_SUCCESS;
} }

View File

@ -238,6 +238,8 @@ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listens
mosquitto_security_apply(); mosquitto_security_apply();
log__close(db.config); log__close(db.config);
log__init(db.config); log__init(db.config);
keepalive__cleanup();
keepalive__init();
flag_reload = false; flag_reload = false;
} }
if(flag_tree_print){ if(flag_tree_print){

View File

@ -509,6 +509,11 @@ int main(int argc, char *argv[])
if(rc != MOSQ_ERR_SUCCESS) return rc; if(rc != MOSQ_ERR_SUCCESS) return rc;
db.config = &config; db.config = &config;
rc = keepalive__init();
if(rc){
return rc;
}
/* Drop privileges permanently immediately after the config is loaded. /* Drop privileges permanently immediately after the config is loaded.
* This requires the user to ensure that all certificates, log locations, * This requires the user to ensure that all certificates, log locations,
* etc. are accessible my the `mosquitto` or other unprivileged user. * etc. are accessible my the `mosquitto` or other unprivileged user.
@ -618,6 +623,7 @@ int main(int argc, char *argv[])
mosquitto__free(db.bridges); mosquitto__free(db.bridges);
#endif #endif
context__free_disused(); context__free_disused();
keepalive__cleanup();
db__close(); db__close();

View File

@ -772,10 +772,11 @@ void plugin__handle_tick(void);
/* ============================================================ /* ============================================================
* Property related functions * Property related functions
* ============================================================ */ * ============================================================ */
int keepalive__init(void);
void keepalive__cleanup(void);
int keepalive__add(struct mosquitto *context); int keepalive__add(struct mosquitto *context);
void keepalive__check(void); void keepalive__check(void);
int keepalive__remove(struct mosquitto *context); int keepalive__remove(struct mosquitto *context);
void keepalive__remove_all(void);
int keepalive__update(struct mosquitto *context); int keepalive__update(struct mosquitto *context);
/* ============================================================ /* ============================================================

View File

@ -246,6 +246,8 @@ struct mosquitto *net__socket_accept(struct mosquitto__listener_sock *listensock
new_context->address, new_context->remote_port, new_context->listener->port); new_context->address, new_context->remote_port, new_context->listener->port);
} }
keepalive__add(new_context);
return new_context; return new_context;
} }