1
0
mirror of https://github.com/eclipse/mosquitto.git synced 2025-05-09 01:01:11 +08:00

Merge branch 'fix2908' of github.com:ckrey/mosquitto into ckrey-fix2908

This commit is contained in:
Roger A. Light 2025-02-25 12:34:41 +00:00
commit a8d887f30b
5 changed files with 137 additions and 125 deletions

View File

@ -70,7 +70,7 @@ int mosquitto_pub_topic_check(const char *str)
len++; len++;
str = &str[1]; str = &str[1];
} }
if(len > 65535) return MOSQ_ERR_INVAL; if(len == 0 || len > 65535) return MOSQ_ERR_INVAL;
#ifdef WITH_BROKER #ifdef WITH_BROKER
if(hier_count > TOPIC_HIERARCHY_LIMIT) return MOSQ_ERR_INVAL; if(hier_count > TOPIC_HIERARCHY_LIMIT) return MOSQ_ERR_INVAL;
#endif #endif
@ -85,7 +85,7 @@ int mosquitto_pub_topic_check2(const char *str, size_t len)
int hier_count = 0; int hier_count = 0;
#endif #endif
if(str == NULL || len > 65535){ if(str == NULL || len == 0 || len > 65535){
return MOSQ_ERR_INVAL; return MOSQ_ERR_INVAL;
} }
@ -144,7 +144,7 @@ int mosquitto_sub_topic_check(const char *str)
c = str[0]; c = str[0];
str = &str[1]; str = &str[1];
} }
if(len > 65535) return MOSQ_ERR_INVAL; if(len == 0 || len > 65535) return MOSQ_ERR_INVAL;
#ifdef WITH_BROKER #ifdef WITH_BROKER
if(hier_count > TOPIC_HIERARCHY_LIMIT) return MOSQ_ERR_INVAL; if(hier_count > TOPIC_HIERARCHY_LIMIT) return MOSQ_ERR_INVAL;
#endif #endif
@ -160,7 +160,7 @@ int mosquitto_sub_topic_check2(const char *str, size_t len)
int hier_count = 0; int hier_count = 0;
#endif #endif
if(str == NULL || len > 65535){ if(str == NULL || len == 0 || len > 65535){
return MOSQ_ERR_INVAL; return MOSQ_ERR_INVAL;
} }

View File

@ -59,9 +59,11 @@ static int bridge__create_prefix(char **full_prefix, const char *topic, const ch
{ {
size_t len; size_t len;
if(mosquitto_pub_topic_check(prefix) != MOSQ_ERR_SUCCESS){ if(!prefix || strlen(prefix) != 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic local prefix '%s'.", prefix); if(mosquitto_pub_topic_check(prefix) != MOSQ_ERR_SUCCESS){
return MOSQ_ERR_INVAL; log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic local prefix '%s'.", prefix);
return MOSQ_ERR_INVAL;
}
} }
if(topic){ if(topic){

View File

@ -100,136 +100,137 @@ int handle__subscribe(struct mosquitto *context)
return MOSQ_ERR_MALFORMED_PACKET; return MOSQ_ERR_MALFORMED_PACKET;
} }
if(sub){ if(!slen){
if(!slen){ log__printf(NULL, MOSQ_LOG_INFO,
log__printf(NULL, MOSQ_LOG_INFO, "Empty subscription string from %s, disconnecting.",
"Empty subscription string from %s, disconnecting.", context->address);
context->address);
mosquitto__free(sub);
mosquitto__free(payload);
return MOSQ_ERR_MALFORMED_PACKET;
}
if(mosquitto_sub_topic_check(sub)){
log__printf(NULL, MOSQ_LOG_INFO,
"Invalid subscription string from %s, disconnecting.",
context->address);
mosquitto__free(sub);
mosquitto__free(payload);
return MOSQ_ERR_MALFORMED_PACKET;
}
if(packet__read_byte(&context->in_packet, &subscription_options)){
mosquitto__free(sub);
mosquitto__free(payload);
return MOSQ_ERR_MALFORMED_PACKET;
}
if(context->protocol == mosq_p_mqtt31 || context->protocol == mosq_p_mqtt311){
qos = subscription_options;
if(context->is_bridge){
subscription_options = MQTT_SUB_OPT_RETAIN_AS_PUBLISHED | MQTT_SUB_OPT_NO_LOCAL;
}
}else{
qos = subscription_options & 0x03;
subscription_options &= 0xFC;
if((subscription_options & MQTT_SUB_OPT_NO_LOCAL) && !strncmp(sub, "$share/", 7)){
mosquitto__free(sub);
mosquitto__free(payload);
return MOSQ_ERR_PROTOCOL;
}
retain_handling = (subscription_options & 0x30);
if(retain_handling == 0x30 || (subscription_options & 0xC0) != 0){
mosquitto__free(sub);
mosquitto__free(payload);
return MOSQ_ERR_MALFORMED_PACKET;
}
}
if(qos > 2){
log__printf(NULL, MOSQ_LOG_INFO,
"Invalid QoS in subscription command from %s, disconnecting.",
context->address);
mosquitto__free(sub);
mosquitto__free(payload);
return MOSQ_ERR_MALFORMED_PACKET;
}
if(qos > context->max_qos){
qos = context->max_qos;
}
if(context->listener && context->listener->mount_point){
len = strlen(context->listener->mount_point) + slen + 1;
sub_mount = mosquitto__malloc(len+1);
if(!sub_mount){
mosquitto__free(sub);
mosquitto__free(payload);
return MOSQ_ERR_NOMEM;
}
snprintf(sub_mount, len, "%s%s", context->listener->mount_point, sub);
sub_mount[len] = '\0';
mosquitto__free(sub);
sub = sub_mount;
}
log__printf(NULL, MOSQ_LOG_DEBUG, "\t%s (QoS %d)", sub, qos);
allowed = true;
rc2 = mosquitto_acl_check(context, sub, 0, NULL, qos, false, MOSQ_ACL_SUBSCRIBE);
switch(rc2){
case MOSQ_ERR_SUCCESS:
break;
case MOSQ_ERR_ACL_DENIED:
allowed = false;
if(context->protocol == mosq_p_mqtt5){
qos = MQTT_RC_NOT_AUTHORIZED;
}else if(context->protocol == mosq_p_mqtt311){
qos = 0x80;
}
break;
default:
mosquitto__free(sub);
return rc2;
}
if(allowed){
rc2 = sub__add(context, sub, qos, subscription_identifier, subscription_options);
if(rc2 > 0){
mosquitto__free(sub);
return rc2;
}
if(context->protocol == mosq_p_mqtt311 || context->protocol == mosq_p_mqtt31){
if(rc2 == MOSQ_ERR_SUCCESS || rc2 == MOSQ_ERR_SUB_EXISTS){
if(retain__queue(context, sub, qos, 0)) rc = 1;
}
}else{
if((retain_handling == MQTT_SUB_OPT_SEND_RETAIN_ALWAYS)
|| (rc2 == MOSQ_ERR_SUCCESS && retain_handling == MQTT_SUB_OPT_SEND_RETAIN_NEW)){
if(retain__queue(context, sub, qos, subscription_identifier)) rc = 1;
}
}
log__printf(NULL, MOSQ_LOG_SUBSCRIBE, "%s %d %s", context->id, qos, sub);
}
mosquitto__free(sub); mosquitto__free(sub);
mosquitto__free(payload);
return MOSQ_ERR_MALFORMED_PACKET;
}
if(mosquitto_sub_topic_check(sub)){
log__printf(NULL, MOSQ_LOG_INFO,
"Invalid subscription string from %s, disconnecting.",
context->address);
mosquitto__free(sub);
mosquitto__free(payload);
return MOSQ_ERR_MALFORMED_PACKET;
}
tmp_payload = mosquitto__realloc(payload, payloadlen + 1); if(packet__read_byte(&context->in_packet, &subscription_options)){
if(tmp_payload){ mosquitto__free(sub);
payload = tmp_payload; mosquitto__free(payload);
payload[payloadlen] = qos; return MOSQ_ERR_MALFORMED_PACKET;
payloadlen++; }
}else{ if(context->protocol == mosq_p_mqtt31 || context->protocol == mosq_p_mqtt311){
qos = subscription_options;
if(context->is_bridge){
subscription_options = MQTT_SUB_OPT_RETAIN_AS_PUBLISHED | MQTT_SUB_OPT_NO_LOCAL;
}
}else{
qos = subscription_options & 0x03;
subscription_options &= 0xFC;
if((subscription_options & MQTT_SUB_OPT_NO_LOCAL) && !strncmp(sub, "$share/", 7)){
mosquitto__free(sub);
mosquitto__free(payload); mosquitto__free(payload);
return MOSQ_ERR_PROTOCOL;
}
retain_handling = (subscription_options & 0x30);
if(retain_handling == 0x30 || (subscription_options & 0xC0) != 0){
mosquitto__free(sub);
mosquitto__free(payload);
return MOSQ_ERR_MALFORMED_PACKET;
}
}
if(qos > 2){
log__printf(NULL, MOSQ_LOG_INFO,
"Invalid QoS in subscription command from %s, disconnecting.",
context->address);
mosquitto__free(sub);
mosquitto__free(payload);
return MOSQ_ERR_MALFORMED_PACKET;
}
if(qos > context->max_qos){
qos = context->max_qos;
}
if(context->listener && context->listener->mount_point){
len = strlen(context->listener->mount_point) + slen + 1;
sub_mount = mosquitto__malloc(len+1);
if(!sub_mount){
mosquitto__free(sub);
mosquitto__free(payload);
return MOSQ_ERR_NOMEM; return MOSQ_ERR_NOMEM;
} }
snprintf(sub_mount, len, "%s%s", context->listener->mount_point, sub);
sub_mount[len] = '\0';
mosquitto__free(sub);
sub = sub_mount;
}
log__printf(NULL, MOSQ_LOG_DEBUG, "\t%s (QoS %d)", sub, qos);
allowed = true;
rc2 = mosquitto_acl_check(context, sub, 0, NULL, qos, false, MOSQ_ACL_SUBSCRIBE);
switch(rc2){
case MOSQ_ERR_SUCCESS:
break;
case MOSQ_ERR_ACL_DENIED:
allowed = false;
if(context->protocol == mosq_p_mqtt5){
qos = MQTT_RC_NOT_AUTHORIZED;
}else if(context->protocol == mosq_p_mqtt311){
qos = 0x80;
}
break;
default:
mosquitto__free(sub);
mosquitto__free(payload);
return rc2;
}
if(allowed){
rc2 = sub__add(context, sub, qos, subscription_identifier, subscription_options);
if(rc2 > 0){
mosquitto__free(sub);
mosquitto__free(payload);
return rc2;
}
if(context->protocol == mosq_p_mqtt311 || context->protocol == mosq_p_mqtt31){
if(rc2 == MOSQ_ERR_SUCCESS || rc2 == MOSQ_ERR_SUB_EXISTS){
if(retain__queue(context, sub, qos, 0)) rc = 1;
}
}else{
if((retain_handling == MQTT_SUB_OPT_SEND_RETAIN_ALWAYS)
|| (rc2 == MOSQ_ERR_SUCCESS && retain_handling == MQTT_SUB_OPT_SEND_RETAIN_NEW)){
if(retain__queue(context, sub, qos, subscription_identifier)) rc = 1;
}
}
log__printf(NULL, MOSQ_LOG_SUBSCRIBE, "%s %d %s", context->id, qos, sub);
}
mosquitto__free(sub);
tmp_payload = mosquitto__realloc(payload, payloadlen + 1);
if(tmp_payload){
payload = tmp_payload;
payload[payloadlen] = qos;
payloadlen++;
}else{
mosquitto__free(payload);
return MOSQ_ERR_NOMEM;
} }
} }
if(context->protocol != mosq_p_mqtt31){ if(context->protocol != mosq_p_mqtt31){
if(payloadlen == 0){ if(payloadlen == 0){
/* No subscriptions specified, protocol error. */ /* No subscriptions specified, protocol error. */
fprintf(stderr, "no payload\n");
return MOSQ_ERR_MALFORMED_PACKET; return MOSQ_ERR_MALFORMED_PACKET;
} }
} }

View File

@ -17,6 +17,9 @@ def write_config(filename, port1, port2, protocol_version):
f.write("topic +/value in 0 local3/topic/ remote3/topic/\n") f.write("topic +/value in 0 local3/topic/ remote3/topic/\n")
f.write("topic ic/+ in 0 local4/top remote4/tip\n") f.write("topic ic/+ in 0 local4/top remote4/tip\n")
f.write("topic clients/total in 0 test/mosquitto/org $SYS/broker/\n") f.write("topic clients/total in 0 test/mosquitto/org $SYS/broker/\n")
f.write('topic rmapped in 0 "" remote/mapped/\n')
f.write('topic lmapped in 0 local/mapped/ ""\n')
f.write('topic "" in 0 local/single remote/single\n')
f.write("notifications false\n") f.write("notifications false\n")
f.write("restart_timeout 5\n") f.write("restart_timeout 5\n")
f.write("bridge_protocol_version %s\n" % (protocol_version)) f.write("bridge_protocol_version %s\n" % (protocol_version))
@ -70,6 +73,9 @@ def inner_test(bridge, sock, proto_ver):
('local3/topic/something/value', 'remote3/topic/something/value'), ('local3/topic/something/value', 'remote3/topic/something/value'),
('local4/topic/something', 'remote4/tipic/something'), ('local4/topic/something', 'remote4/tipic/something'),
('test/mosquitto/orgclients/total', '$SYS/broker/clients/total'), ('test/mosquitto/orgclients/total', '$SYS/broker/clients/total'),
('local/mapped/lmapped', 'lmapped'),
('rmapped', 'remote/mapped/rmapped'),
('local/single', 'remote/single'),
] ]
for (local_topic, remote_topic) in cases: for (local_topic, remote_topic) in cases:

View File

@ -196,6 +196,7 @@ static void TEST_invalid(void)
no_match_helper(MOSQ_ERR_INVAL, "foo/#abc", "foo"); no_match_helper(MOSQ_ERR_INVAL, "foo/#abc", "foo");
no_match_helper(MOSQ_ERR_INVAL, "#abc", "foo"); no_match_helper(MOSQ_ERR_INVAL, "#abc", "foo");
no_match_helper(MOSQ_ERR_INVAL, "/#a", "foo/bar"); no_match_helper(MOSQ_ERR_INVAL, "/#a", "foo/bar");
no_match_helper(MOSQ_ERR_INVAL, "", "foo/bar/#");
} }
/* ======================================================================== /* ========================================================================
@ -233,6 +234,7 @@ static void TEST_pub_topic_invalid(void)
pub_topic_helper("pub/topic#", MOSQ_ERR_INVAL); pub_topic_helper("pub/topic#", MOSQ_ERR_INVAL);
pub_topic_helper("pub/topic/#", MOSQ_ERR_INVAL); pub_topic_helper("pub/topic/#", MOSQ_ERR_INVAL);
pub_topic_helper("+/pub/topic", MOSQ_ERR_INVAL); pub_topic_helper("+/pub/topic", MOSQ_ERR_INVAL);
pub_topic_helper("", MOSQ_ERR_INVAL);
} }
@ -278,6 +280,7 @@ static void TEST_sub_topic_invalid(void)
sub_topic_helper("sub/#topic", MOSQ_ERR_INVAL); sub_topic_helper("sub/#topic", MOSQ_ERR_INVAL);
sub_topic_helper("sub/topic#", MOSQ_ERR_INVAL); sub_topic_helper("sub/topic#", MOSQ_ERR_INVAL);
sub_topic_helper("#/sub/topic", MOSQ_ERR_INVAL); sub_topic_helper("#/sub/topic", MOSQ_ERR_INVAL);
sub_topic_helper("", MOSQ_ERR_INVAL);
} }
/* ======================================================================== /* ========================================================================