diff --git a/ChangeLog.txt b/ChangeLog.txt index d969fff9..9c2e437e 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -12,6 +12,8 @@ Broker: being removed from memory if they are not subscribed to. Closes #3221. - Produce an error if invalid combinations of cafile/capath/certfile/keyfile are used. Closes #1836. Closes #3130. +- Backport keepalive checking from develop to fix problems in current + implementation. Closes #3138. Client library: - Fix potential deadlock in mosquitto_sub if `-W` is used. Closes #3175. diff --git a/config.mk b/config.mk index c931ee84..effc568b 100644 --- a/config.mk +++ b/config.mk @@ -120,6 +120,10 @@ WITH_JEMALLOC:=no # probably of no particular interest to end users. WITH_XTREPORT=no +# Use the old O(n) keepalive check routine, instead of the new O(1) keepalive +# check routine. See src/keepalive.c for notes on this. +WITH_OLD_KEEPALIVE=no + # Build using clang and with address sanitiser enabled WITH_ASAN=no @@ -388,6 +392,10 @@ ifeq ($(WITH_XTREPORT),yes) BROKER_CFLAGS:=$(BROKER_CFLAGS) -DWITH_XTREPORT endif +ifeq ($(WITH_OLD_KEEPALIVE),yes) + BROKER_CPPFLAGS:=$(BROKER_CPPFLAGS) -DWITH_OLD_KEEPALIVE +endif + BROKER_LDADD:=${BROKER_LDADD} ${LDADD} CLIENT_LDADD:=${CLIENT_LDADD} ${LDADD} PASSWD_LDADD:=${PASSWD_LDADD} ${LDADD} diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index 31120258..48c8c108 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -353,6 +353,10 @@ struct mosquitto { struct mosquitto *for_free_next; struct session_expiry_list *expiry_list_item; uint16_t remote_port; +# ifndef WITH_OLD_KEEPALIVE + struct mosquitto *keepalive_next; + struct mosquitto *keepalive_prev; +# endif #endif uint32_t events; }; diff --git a/lib/net_mosq.c b/lib/net_mosq.c index fb0c7ac8..18a9bcef 100644 --- a/lib/net_mosq.c +++ b/lib/net_mosq.c @@ -198,6 +198,15 @@ void net__init_tls(void) } #endif +bool net__is_connected(struct mosquitto *mosq) +{ +#if defined(WITH_BROKER) && defined(WITH_WEBSOCKETS) + return mosq->sock != INVALID_SOCKET || mosq->wsi != NULL; +#else + return mosq->sock != INVALID_SOCKET; +#endif +} + /* Close a socket associated with a context and set it to -1. * Returns 1 on failure (context is NULL) * Returns 0 on success. diff --git a/lib/net_mosq.h b/lib/net_mosq.h index ded98760..2e6155b3 100644 --- a/lib/net_mosq.h +++ b/lib/net_mosq.h @@ -75,6 +75,7 @@ int net__try_connect_step2(struct mosquitto *mosq, uint16_t port, mosq_sock_t *s int net__socket_connect_step3(struct mosquitto *mosq, const char *host); int net__socket_nonblock(mosq_sock_t *sock); int net__socketpair(mosq_sock_t *sp1, mosq_sock_t *sp2); +bool net__is_connected(struct mosquitto *mosq); ssize_t net__read(struct mosquitto *mosq, void *buf, size_t count); ssize_t net__write(struct mosquitto *mosq, const void *buf, size_t count); diff --git a/src/context.c b/src/context.c index d838fc76..b2ea7408 100644 --- a/src/context.c +++ b/src/context.c @@ -139,6 +139,7 @@ void context__cleanup(struct mosquitto *context, bool force_free) #endif alias__free_all(context); + keepalive__remove(context); context__cleanup_out_packets(context); mosquitto__free(context->auth_method); diff --git a/src/handle_connect.c b/src/handle_connect.c index cb22e358..a441ba88 100644 --- a/src/handle_connect.c +++ b/src/handle_connect.c @@ -269,7 +269,9 @@ int connect__on_authorised(struct mosquitto *context, void *auth_data_out, uint1 if(db.config->max_keepalive && (context->keepalive > db.config->max_keepalive || context->keepalive == 0)){ + keepalive__remove(context); context->keepalive = db.config->max_keepalive; + keepalive__add(context); if(context->protocol == mosq_p_mqtt5){ if(mosquitto_property_add_int16(&connack_props, MQTT_PROP_SERVER_KEEP_ALIVE, context->keepalive)){ rc = MOSQ_ERR_NOMEM; @@ -312,8 +314,6 @@ int connect__on_authorised(struct mosquitto *context, void *auth_data_out, uint1 free(auth_data_out); auth_data_out = NULL; - keepalive__add(context); - mosquitto__set_state(context, mosq_cs_active); rc = send__connack(context, connect_ack, CONNACK_ACCEPTED, connack_props); mosquitto_property_free_all(&connack_props); @@ -577,10 +577,15 @@ int handle__connect(struct mosquitto *context) goto handle_connect_error; } + /* _remove here because net__socket_accept() uses _add and we must have the + * correct keepalive value */ + keepalive__remove(context); + if(packet__read_uint16(&context->in_packet, &(context->keepalive))){ rc = MOSQ_ERR_PROTOCOL; goto handle_connect_error; } + keepalive__add(context); if(protocol_version == PROTOCOL_VERSION_v5){ rc = property__read_all(CMD_CONNECT, &context->in_packet, &properties); diff --git a/src/keepalive.c b/src/keepalive.c index f9321854..03eace93 100644 --- a/src/keepalive.c +++ b/src/keepalive.c @@ -1,5 +1,5 @@ /* -Copyright (c) 2009-2020 Roger Light +Copyright (c) 2009-2021 Roger Light All rights reserved. This program and the accompanying materials are made available under the terms of the Eclipse Public License 2.0 @@ -19,30 +19,147 @@ Contributors: #include "config.h" #include #include "mosquitto_broker_internal.h" +#include +/* This contains code for checking whether clients have exceeded their keepalive timeouts. + * There are two versions. + * + * The old version can be used by compiling with `make WITH_OLD_KEEPALIVE=yes`. + * It will scan the entire list of connected clients every 5 seconds to see if + * they have expired. Hence it scales with O(n) and with e.g. 60000 clients can + * have a measurable effect on CPU usage in the low single digit percent range. + * + * The new version scales with O(1). It uses a ring buffer that contains + * max_keepalive*1.5+1 entries. The current time in integer seconds, modulus + * the number of entries, points to the head of the ring buffer. Any clients + * will appear after this point at the position indexed by the time at which + * they will expire if they do not send another message, assuming they do not + * have keepalive==0 - in which case they are not part of this check. So a + * client that connects with keepalive=60 will be added at `now + 60*1.5`. + * + * A client is added to an entry with a doubly linked list. When the client + * sends a new message, it is removed from the old position and added to the + * new. + * + * As time moves on, if the linked list at the current entry is not empty, all + * of the clients are expired. + * + * The ring buffer size is determined by max_keepalive. At the default, it is + * 65535*1.5+1=98303 entries long. On a 64-bit machine that is 786424 bytes. + * If this is too big a burden and you do not need many clients connected, then + * the old check is sufficient. You can reduce the number of entries by setting + * a lower max_keepalive value. A value as low as 600 still gives a 10 minute + * keepalive and reduces the memory for the ring buffer to 7208 bytes. + * + * *NOTE* It is likely that the old check routine will be removed in the + * future, and max_keepalive set to a sensible default value. If this is a + * problem for you please get in touch. + */ + static time_t last_keepalive_check = 0; +#ifndef WITH_OLD_KEEPALIVE +static int keepalive_list_max = 0; +static struct mosquitto **keepalive_list = NULL; +#endif -/* FIXME - this is the prototype for the future tree/trie based keepalive check implementation. */ +#ifndef WITH_OLD_KEEPALIVE +static int calc_index(struct mosquitto *context) +{ + return (int)(context->last_msg_in + context->keepalive*3/2) % keepalive_list_max; +} +#endif + +int keepalive__init(void) +{ +#ifndef WITH_OLD_KEEPALIVE + struct mosquitto *context, *ctxt_tmp; + + last_keepalive_check = db.now_s; + if(db.config->max_keepalive <= 0){ + keepalive_list_max = (UINT16_MAX * 3)/2 + 1; + }else{ + keepalive_list_max = (db.config->max_keepalive * 3)/2 + 1; + } + keepalive_list = mosquitto_calloc((size_t)keepalive_list_max, sizeof(struct mosquitto *)); + if(keepalive_list == NULL){ + log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); + keepalive_list_max = 0; + return MOSQ_ERR_NOMEM; + } + + /* Add existing clients - should only be applicable on MOSQ_EVT_RELOAD */ + HASH_ITER(hh_sock, db.contexts_by_sock, context, ctxt_tmp){ + if(net__is_connected(context) && !context->bridge && context->keepalive > 0){ + keepalive__add(context); + } + } +#endif + return MOSQ_ERR_SUCCESS; +} + +void keepalive__cleanup(void) +{ +#ifndef WITH_OLD_KEEPALIVE + mosquitto_free(keepalive_list); + keepalive_list = NULL; + keepalive_list_max = 0; +#endif +} int keepalive__add(struct mosquitto *context) { - UNUSED(context); +#ifndef WITH_OLD_KEEPALIVE + if(context->keepalive <= 0 || !net__is_connected(context)) return MOSQ_ERR_SUCCESS; +#ifdef WITH_BRIDGE + if(context->bridge) return MOSQ_ERR_SUCCESS; +#endif + DL_APPEND2(keepalive_list[calc_index(context)], context, keepalive_prev, keepalive_next); +#else + UNUSED(context); +#endif return MOSQ_ERR_SUCCESS; } +#ifndef WITH_OLD_KEEPALIVE void keepalive__check(void) { struct mosquitto *context, *ctxt_tmp; - if(last_keepalive_check + 5 < db.now_s){ + for(time_t i=last_keepalive_check; isock != INVALID_SOCKET){ + if(net__is_connected(context)){ /* Local bridges never time out in this fashion. */ if(!(context->keepalive) || context->bridge @@ -56,23 +173,38 @@ void keepalive__check(void) } } } +#endif int keepalive__remove(struct mosquitto *context) { +#ifndef WITH_OLD_KEEPALIVE + int idx; + + if(context->keepalive <= 0 || context->keepalive_prev == NULL) return MOSQ_ERR_SUCCESS; + + idx = calc_index(context); + if(keepalive_list[idx]){ + DL_DELETE2(keepalive_list[idx], context, keepalive_prev, keepalive_next); + context->keepalive_next = NULL; + context->keepalive_prev = NULL; + } +#else UNUSED(context); - +#endif return MOSQ_ERR_SUCCESS; } -void keepalive__remove_all(void) -{ -} - - int keepalive__update(struct mosquitto *context) { +#ifndef WITH_OLD_KEEPALIVE + keepalive__remove(context); + /* coverity[missing_lock] - broker is single threaded, so no lock required */ context->last_msg_in = db.now_s; + keepalive__add(context); +#else + UNUSED(context); +#endif return MOSQ_ERR_SUCCESS; } diff --git a/src/loop.c b/src/loop.c index 0940bb77..084b2f3d 100644 --- a/src/loop.c +++ b/src/loop.c @@ -238,6 +238,8 @@ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listens mosquitto_security_apply(); log__close(db.config); log__init(db.config); + keepalive__cleanup(); + keepalive__init(); flag_reload = false; } if(flag_tree_print){ diff --git a/src/mosquitto.c b/src/mosquitto.c index f015d142..de5d4c10 100644 --- a/src/mosquitto.c +++ b/src/mosquitto.c @@ -509,6 +509,11 @@ int main(int argc, char *argv[]) if(rc != MOSQ_ERR_SUCCESS) return rc; db.config = &config; + rc = keepalive__init(); + if(rc){ + return rc; + } + /* Drop privileges permanently immediately after the config is loaded. * This requires the user to ensure that all certificates, log locations, * etc. are accessible my the `mosquitto` or other unprivileged user. @@ -618,6 +623,7 @@ int main(int argc, char *argv[]) mosquitto__free(db.bridges); #endif context__free_disused(); + keepalive__cleanup(); db__close(); diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 4458434f..7516f312 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -772,10 +772,11 @@ void plugin__handle_tick(void); /* ============================================================ * Property related functions * ============================================================ */ +int keepalive__init(void); +void keepalive__cleanup(void); int keepalive__add(struct mosquitto *context); void keepalive__check(void); int keepalive__remove(struct mosquitto *context); -void keepalive__remove_all(void); int keepalive__update(struct mosquitto *context); /* ============================================================ diff --git a/src/net.c b/src/net.c index 2e4ee6e1..a4efdb00 100644 --- a/src/net.c +++ b/src/net.c @@ -246,6 +246,8 @@ struct mosquitto *net__socket_accept(struct mosquitto__listener_sock *listensock new_context->address, new_context->remote_port, new_context->listener->port); } + keepalive__add(new_context); + return new_context; }