1
0
mirror of https://github.com/eclipse/mosquitto.git synced 2025-05-09 01:01:11 +08:00

Fix $share subscriptions not being recovered for durable clients.

If a plugin had granted ACL subscription access to a
durable/non-clean-session client, then removed that access, the client would
keep its existing subscription. This has been fixed.
This commit is contained in:
Roger Light 2021-08-22 23:17:33 +01:00 committed by Roger A. Light
parent 376226c129
commit 32af599c81
9 changed files with 221 additions and 109 deletions

View File

@ -11,6 +11,9 @@ Security:
remotely accessible listener to be opened that was not confined to the local
machine but did have anonymous access enabled, contrary to the
documentation. This has been fixed. Closes #2283.
- If a plugin had granted ACL subscription access to a
durable/non-clean-session client, then removed that access, the client would
keep its existing subscription. This has been fixed.
Broker:
- Fix possible out of bounds memory reads when reading a corrupt/crafted
@ -29,6 +32,8 @@ Broker:
- Fix listener mount_point not being removed on outgoing messages.
Closes #2244.
- Strict protocol compliance fixes, plus test suite.
- Fix $share subscriptions not being recovered for durable clients that
reconnect.
Client library:
- If a client uses TLS-PSK then force the default cipher list to use "PSK"

View File

@ -287,11 +287,9 @@ struct mosquitto {
struct mosquitto__acl_user *acl_list;
struct mosquitto__listener *listener;
struct mosquitto__packet *out_packet_last;
struct mosquitto__subhier **subs;
struct mosquitto__subshared_ref **shared_subs;
struct mosquitto__client_sub **subs;
char *auth_method;
int sub_count;
int shared_sub_count;
# ifndef WITH_EPOLL
int pollfd_index;
# endif

View File

@ -102,7 +102,6 @@ static void connection_check_acl(struct mosquitto *context, struct mosquitto_cli
}
}
int connect__on_authorised(struct mosquitto *context, void *auth_data_out, uint16_t auth_data_out_len)
{
struct mosquitto *found_context;
@ -163,13 +162,23 @@ int connect__on_authorised(struct mosquitto *context, void *auth_data_out, uint1
for(i=0; i<context->sub_count; i++){
if(context->subs[i]){
leaf = context->subs[i]->subs;
leaf = context->subs[i]->hier->subs;
while(leaf){
if(leaf->context == found_context){
leaf->context = context;
}
leaf = leaf->next;
}
if(context->subs[i]->shared){
leaf = context->subs[i]->shared->subs;
while(leaf){
if(leaf->context == found_context){
leaf->context = context;
}
leaf = leaf->next;
}
}
}
}
}

View File

@ -306,6 +306,7 @@ struct mosquitto__config {
struct mosquitto__security_options security_options;
};
struct mosquitto__subleaf {
struct mosquitto__subleaf *prev;
struct mosquitto__subleaf *next;
@ -317,12 +318,6 @@ struct mosquitto__subleaf {
};
struct mosquitto__subshared_ref {
struct mosquitto__subhier *hier;
struct mosquitto__subshared *shared;
};
struct mosquitto__subshared {
UT_hash_handle hh;
char *name;
@ -339,6 +334,12 @@ struct mosquitto__subhier {
uint16_t topic_len;
};
struct mosquitto__client_sub {
struct mosquitto__subhier *hier;
struct mosquitto__subshared *shared;
char topic_filter[];
};
struct sub__token {
struct sub__token *next;
char *topic;

View File

@ -265,6 +265,33 @@ int mosquitto_set_username(struct mosquitto *client, const char *username)
}
/* Check to see whether durable clients still have rights to their subscriptions. */
static void check_subscription_acls(struct mosquitto *context)
{
int i;
int rc;
uint8_t reason;
for(i=0; i<context->sub_count; i++){
if(context->subs[i] == NULL){
continue;
}
rc = mosquitto_acl_check(context,
context->subs[i]->topic_filter,
0,
NULL,
0, /* FIXME */
false,
MOSQ_ACL_SUBSCRIBE);
if(rc != MOSQ_ERR_SUCCESS){
sub__remove(context, context->subs[i]->topic_filter, db.subs, &reason);
}
}
}
static void disconnect_client(struct mosquitto *context, bool with_will)
{
if(context->protocol == mosq_p_mqtt5){
@ -273,6 +300,9 @@ static void disconnect_client(struct mosquitto *context, bool with_will)
if(with_will == false){
mosquitto__set_state(context, mosq_cs_disconnecting);
}
if(context->session_expiry_interval > 0){
check_subscription_acls(context);
}
do_disconnect(context, MOSQ_ERR_ADMINISTRATIVE_ACTION);
}

View File

@ -199,12 +199,12 @@ static void sub__remove_shared_leaf(struct mosquitto__subhier *subhier, struct m
}
static int sub__add_shared(struct mosquitto *context, uint8_t qos, uint32_t identifier, int options, struct mosquitto__subhier *subhier, const char *sharename)
static int sub__add_shared(struct mosquitto *context, const char *sub, uint8_t qos, uint32_t identifier, int options, struct mosquitto__subhier *subhier, const char *sharename)
{
struct mosquitto__subleaf *newleaf;
struct mosquitto__subshared *shared = NULL;
struct mosquitto__subshared_ref **shared_subs;
struct mosquitto__subshared_ref *shared_ref;
struct mosquitto__client_sub **subs;
struct mosquitto__client_sub *csub;
int i;
size_t slen;
int rc;
@ -237,32 +237,30 @@ static int sub__add_shared(struct mosquitto *context, uint8_t qos, uint32_t iden
}
if(rc != MOSQ_ERR_SUB_EXISTS){
shared_ref = mosquitto__calloc(1, sizeof(struct mosquitto__subshared_ref));
if(!shared_ref){
sub__remove_shared_leaf(subhier, shared, newleaf);
return MOSQ_ERR_NOMEM;
}
shared_ref->hier = subhier;
shared_ref->shared = shared;
slen = strlen(sub);
csub = mosquitto__calloc(1, sizeof(struct mosquitto__client_sub) + slen + 1);
if(csub == NULL) return MOSQ_ERR_NOMEM;
memcpy(csub->topic_filter, sub, slen);
csub->hier = subhier;
csub->shared = shared;
for(i=0; i<context->shared_sub_count; i++){
if(!context->shared_subs[i]){
context->shared_subs[i] = shared_ref;
shared_ref = NULL;
for(i=0; i<context->sub_count; i++){
if(!context->subs[i]){
context->subs[i] = csub;
break;
}
}
if(shared_ref){
shared_subs = mosquitto__realloc(context->shared_subs, sizeof(struct mosquitto__subshared_ref *)*(size_t)(context->shared_sub_count + 1));
if(!shared_subs){
mosquitto__free(shared_ref);
context->shared_subs[context->shared_sub_count-1] = NULL;
if(i == context->sub_count){
subs = mosquitto__realloc(context->subs, sizeof(struct mosquitto__client_sub *)*(size_t)(context->sub_count + 1));
if(!subs){
sub__remove_shared_leaf(subhier, shared, newleaf);
mosquitto__free(newleaf);
mosquitto__free(csub);
return MOSQ_ERR_NOMEM;
}
context->shared_subs = shared_subs;
context->shared_sub_count++;
context->shared_subs[context->shared_sub_count-1] = shared_ref;
context->subs = subs;
context->sub_count++;
context->subs[context->sub_count-1] = csub;
}
#ifdef WITH_SYS_TREE
db.shared_subscription_count++;
@ -279,35 +277,45 @@ static int sub__add_shared(struct mosquitto *context, uint8_t qos, uint32_t iden
}
static int sub__add_normal(struct mosquitto *context, uint8_t qos, uint32_t identifier, int options, struct mosquitto__subhier *subhier)
static int sub__add_normal(struct mosquitto *context, const char *sub, uint8_t qos, uint32_t identifier, int options, struct mosquitto__subhier *subhier)
{
struct mosquitto__subleaf *newleaf = NULL;
struct mosquitto__subhier **subs;
struct mosquitto__client_sub **subs;
struct mosquitto__client_sub *csub;
int i;
int rc;
size_t slen;
rc = sub__add_leaf(context, qos, identifier, options, &subhier->subs, &newleaf);
if(rc > 0){
return rc;
}
slen = strlen(sub);
csub = mosquitto__calloc(1, sizeof(struct mosquitto__client_sub) + slen + 1);
if(csub == NULL) return MOSQ_ERR_NOMEM;
memcpy(csub->topic_filter, sub, slen);
csub->hier = subhier;
csub->shared = NULL;
if(rc != MOSQ_ERR_SUB_EXISTS){
for(i=0; i<context->sub_count; i++){
if(!context->subs[i]){
context->subs[i] = subhier;
context->subs[i] = csub;
break;
}
}
if(i == context->sub_count){
subs = mosquitto__realloc(context->subs, sizeof(struct mosquitto__subhier *)*(size_t)(context->sub_count + 1));
subs = mosquitto__realloc(context->subs, sizeof(struct mosquitto__client_sub *)*(size_t)(context->sub_count + 1));
if(!subs){
DL_DELETE(subhier->subs, newleaf);
mosquitto__free(newleaf);
mosquitto__free(csub);
return MOSQ_ERR_NOMEM;
}
context->subs = subs;
context->sub_count++;
context->subs[context->sub_count-1] = subhier;
context->subs[context->sub_count-1] = csub;
}
#ifdef WITH_SYS_TREE
db.subscription_count++;
@ -324,7 +332,7 @@ static int sub__add_normal(struct mosquitto *context, uint8_t qos, uint32_t iden
}
static int sub__add_context(struct mosquitto *context, uint8_t qos, uint32_t identifier, int options, struct mosquitto__subhier *subhier, char *const *const topics, const char *sharename)
static int sub__add_context(struct mosquitto *context, const char *topic_filter, uint8_t qos, uint32_t identifier, int options, struct mosquitto__subhier *subhier, char *const *const topics, const char *sharename)
{
struct mosquitto__subhier *branch;
int topic_index = 0;
@ -349,9 +357,9 @@ static int sub__add_context(struct mosquitto *context, uint8_t qos, uint32_t ide
/* Add add our context */
if(context && context->id){
if(sharename){
return sub__add_shared(context, qos, identifier, options, subhier, sharename);
return sub__add_shared(context, topic_filter, qos, identifier, options, subhier, sharename);
}else{
return sub__add_normal(context, qos, identifier, options, subhier);
return sub__add_normal(context, topic_filter, qos, identifier, options, subhier);
}
}else{
return MOSQ_ERR_SUCCESS;
@ -378,7 +386,8 @@ static int sub__remove_normal(struct mosquitto *context, struct mosquitto__subhi
* but that would involve keeping a copy of the topic string in
* each subleaf. Might be worth considering though. */
for(i=0; i<context->sub_count; i++){
if(context->subs[i] == subhier){
if(context->subs[i] && context->subs[i]->hier == subhier){
mosquitto__free(context->subs[i]);
context->subs[i] = NULL;
break;
}
@ -413,13 +422,13 @@ static int sub__remove_shared(struct mosquitto *context, struct mosquitto__subhi
* It would be nice to be able to use the reference directly,
* but that would involve keeping a copy of the topic string in
* each subleaf. Might be worth considering though. */
for(i=0; i<context->shared_sub_count; i++){
if(context->shared_subs[i]
&& context->shared_subs[i]->hier == subhier
&& context->shared_subs[i]->shared == shared){
for(i=0; i<context->sub_count; i++){
if(context->subs[i]
&& context->subs[i]->hier == subhier
&& context->subs[i]->shared == shared){
mosquitto__free(context->shared_subs[i]);
context->shared_subs[i] = NULL;
mosquitto__free(context->subs[i]);
context->subs[i] = NULL;
break;
}
}
@ -599,7 +608,7 @@ int sub__add(struct mosquitto *context, const char *sub, uint8_t qos, uint32_t i
}
}
rc = sub__add_context(context, qos, identifier, options, subhier, topics, sharename);
rc = sub__add_context(context, sub, qos, identifier, options, subhier, topics, sharename);
mosquitto__free(local_sub);
mosquitto__free(topics);
@ -699,47 +708,6 @@ static struct mosquitto__subhier *tmp_remove_subs(struct mosquitto__subhier *sub
}
static int sub__clean_session_shared(struct mosquitto *context)
{
int i;
struct mosquitto__subleaf *leaf;
struct mosquitto__subhier *hier;
for(i=0; i<context->shared_sub_count; i++){
if(context->shared_subs[i] == NULL){
continue;
}
leaf = context->shared_subs[i]->shared->subs;
while(leaf){
if(leaf->context==context){
#ifdef WITH_SYS_TREE
db.shared_subscription_count--;
#endif
sub__remove_shared_leaf(context->shared_subs[i]->hier, context->shared_subs[i]->shared, leaf);
break;
}
leaf = leaf->next;
}
if(context->shared_subs[i]->hier->subs == NULL
&& context->shared_subs[i]->hier->children == NULL
&& context->shared_subs[i]->hier->shared == NULL
&& context->shared_subs[i]->hier->parent){
hier = context->shared_subs[i]->hier;
context->shared_subs[i]->hier = NULL;
do{
hier = tmp_remove_subs(hier);
}while(hier);
}
mosquitto__free(context->shared_subs[i]);
}
mosquitto__free(context->shared_subs);
context->shared_subs = NULL;
context->shared_sub_count = 0;
return MOSQ_ERR_SUCCESS;
}
/* Remove all subscriptions for a client.
*/
int sub__clean_session(struct mosquitto *context)
@ -752,25 +720,43 @@ int sub__clean_session(struct mosquitto *context)
if(context->subs[i] == NULL){
continue;
}
leaf = context->subs[i]->subs;
while(leaf){
if(leaf->context==context){
#ifdef WITH_SYS_TREE
db.subscription_count--;
#endif
DL_DELETE(context->subs[i]->subs, leaf);
mosquitto__free(leaf);
break;
}
leaf = leaf->next;
}
if(context->subs[i]->subs == NULL
&& context->subs[i]->children == NULL
&& context->subs[i]->shared == NULL
&& context->subs[i]->parent){
hier = context->subs[i];
context->subs[i] = NULL;
hier = context->subs[i]->hier;
if(context->subs[i]->shared){
leaf = context->subs[i]->shared->subs;
while(leaf){
if(leaf->context==context){
#ifdef WITH_SYS_TREE
db.shared_subscription_count--;
#endif
sub__remove_shared_leaf(context->subs[i]->hier, context->subs[i]->shared, leaf);
break;
}
leaf = leaf->next;
}
}else{
leaf = hier->subs;
while(leaf){
if(leaf->context==context){
#ifdef WITH_SYS_TREE
db.subscription_count--;
#endif
DL_DELETE(hier->subs, leaf);
mosquitto__free(leaf);
break;
}
leaf = leaf->next;
}
}
mosquitto__free(context->subs[i]);
context->subs[i] = NULL;
if(hier->subs == NULL
&& hier->children == NULL
&& hier->shared == NULL
&& hier->parent){
do{
hier = tmp_remove_subs(hier);
}while(hier);
@ -780,7 +766,7 @@ int sub__clean_session(struct mosquitto *context)
context->subs = NULL;
context->sub_count = 0;
return sub__clean_session_shared(context);
return MOSQ_ERR_SUCCESS;
}
void sub__tree_print(struct mosquitto__subhier *root, int level)

View File

@ -0,0 +1,81 @@
#!/usr/bin/env python3
# Check whether a durable client keeps its subscriptions on reconnecting.
from mosq_test_helper import *
def publish_helper(port):
connect_packet = mosq_test.gen_connect("subpub-sub-helper")
connack_packet = mosq_test.gen_connack(rc=0)
publish1_packet = mosq_test.gen_publish("not-shared/sub", qos=0, payload="message1")
publish2_packet = mosq_test.gen_publish("shared/sub", qos=0, payload="message2")
sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port)
sock.send(publish1_packet)
sock.send(publish2_packet)
sock.close()
def do_test(proto_ver):
rc = 1
if proto_ver == 5:
props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 60)
connect_packet = mosq_test.gen_connect("subpub-sub-test", proto_ver=proto_ver, clean_session=False, properties=props)
else:
connect_packet = mosq_test.gen_connect("subpub-sub-test", proto_ver=proto_ver, clean_session=False)
connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
connack2_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver, flags=1)
mid = 1
subscribe1_packet = mosq_test.gen_subscribe(mid, "not-shared/sub", 0, proto_ver=proto_ver)
suback1_packet = mosq_test.gen_suback(mid, 0, proto_ver=proto_ver)
mid = 2
subscribe2_packet = mosq_test.gen_subscribe(mid, "$share/name/shared/sub", 0, proto_ver=proto_ver)
suback2_packet = mosq_test.gen_suback(mid, 0, proto_ver=proto_ver)
publish1_packet = mosq_test.gen_publish("not-shared/sub", qos=0, payload="message1", proto_ver=proto_ver)
publish2_packet = mosq_test.gen_publish("shared/sub", qos=0, payload="message2", proto_ver=proto_ver)
port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
try:
sock = mosq_test.do_client_connect(connect_packet, connack1_packet, timeout=2, port=port, connack_error="connack 1")
mosq_test.do_send_receive(sock, subscribe1_packet, suback1_packet, "suback1")
mosq_test.do_send_receive(sock, subscribe2_packet, suback2_packet, "suback2")
publish_helper(port)
mosq_test.expect_packet(sock, "publish1", publish1_packet)
if proto_ver == 5:
mosq_test.expect_packet(sock, "publish2", publish2_packet)
sock.close()
# Reconnect, but don't resubscribe
sock = mosq_test.do_client_connect(connect_packet, connack2_packet, timeout=2, port=port, connack_error="connack 2")
publish_helper(port)
mosq_test.expect_packet(sock, "publish1", publish1_packet)
if proto_ver == 5:
mosq_test.expect_packet(sock, "publish2", publish2_packet)
sock.close()
rc = 0
sock.close()
except mosq_test.TestError:
pass
except Exception as err:
print(err)
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde.decode('utf-8'))
exit(rc)
do_test(proto_ver=4)
do_test(proto_ver=5)
exit(0)

View File

@ -56,6 +56,7 @@ msg_sequence_test:
./02-subpub-qos2-receive-maximum-1.py
./02-subpub-qos2-receive-maximum-2.py
./02-subpub-qos2.py
./02-subpub-recover-subscriptions.py
./02-subscribe-dollar-v5.py
./02-subscribe-invalid-utf8.py
./02-subscribe-long-topic.py

View File

@ -36,6 +36,7 @@ tests = [
(1, './02-subpub-qos2-receive-maximum-1.py'),
(1, './02-subpub-qos2-receive-maximum-2.py'),
(1, './02-subpub-qos2.py'),
(1, './02-subpub-recover-subscriptions.py'),
(1, './02-subscribe-dollar-v5.py'),
(1, './02-subscribe-invalid-utf8.py'),
(1, './02-subscribe-long-topic.py'),