1
0
mirror of https://github.com/eclipse/mosquitto.git synced 2025-05-09 01:01:11 +08:00

Library and client support for topic-alias.

This commit is contained in:
Roger A. Light 2018-11-01 15:47:21 +00:00
parent 55b46037da
commit 8aa936936e
5 changed files with 62 additions and 17 deletions

View File

@ -112,6 +112,9 @@ int cfg_parse_property(struct mosq_config *cfg, int argc, char *argv[], int *idx
break;
case CMD_PUBLISH:
if(identifier == MQTT_PROP_TOPIC_ALIAS){
cfg->have_topic_alias = true;
}
if(identifier == MQTT_PROP_SUBSCRIPTION_IDENTIFIER){
fprintf(stderr, "Error: %s property not supported for %s in --property argument.\n\n", propname, cmdname);
return MOSQ_ERR_INVAL;

View File

@ -98,6 +98,7 @@ struct mosq_config {
mosquitto_property *unsubscribe_props;
mosquitto_property *disconnect_props;
mosquitto_property *will_props;
bool have_topic_alias; /* pub */
};
int client_config_load(struct mosq_config *config, int pub_or_sub, int argc, char *argv[]);

View File

@ -55,6 +55,18 @@ static char *password = NULL;
static bool disconnect_sent = false;
static bool quiet = false;
static struct mosq_config cfg;
static bool first_publish = true;
int my_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, void *payload, int qos, bool retain)
{
if(cfg.protocol_version == MQTT_PROTOCOL_V5 && cfg.have_topic_alias && first_publish == false){
return mosquitto_publish_with_properties(mosq, mid, NULL, payloadlen, payload, qos, retain, cfg.publish_props);
}else{
first_publish = false;
return mosquitto_publish_with_properties(mosq, mid, topic, payloadlen, payload, qos, retain, cfg.publish_props);
}
}
void my_connect_callback(struct mosquitto *mosq, void *obj, int result)
{
@ -65,10 +77,10 @@ void my_connect_callback(struct mosquitto *mosq, void *obj, int result)
case MSGMODE_CMD:
case MSGMODE_FILE:
case MSGMODE_STDIN_FILE:
rc = mosquitto_publish_with_properties(mosq, &mid_sent, topic, msglen, message, qos, retain, cfg.publish_props);
rc = my_publish(mosq, &mid_sent, topic, msglen, message, qos, retain);
break;
case MSGMODE_NULL:
rc = mosquitto_publish_with_properties(mosq, &mid_sent, topic, 0, NULL, qos, retain, cfg.publish_props);
rc = my_publish(mosq, &mid_sent, topic, 0, NULL, qos, retain);
break;
case MSGMODE_STDIN_LINE:
status = STATUS_CONNACK_RECVD;
@ -416,7 +428,7 @@ int main(int argc, char *argv[])
buf_len_actual = strlen(buf);
if(buf[buf_len_actual-1] == '\n'){
buf[buf_len_actual-1] = '\0';
rc2 = mosquitto_publish_with_properties(mosq, &mid_sent, topic, buf_len_actual-1, buf, qos, retain, cfg.publish_props);
rc2 = my_publish(mosq, &mid_sent, topic, buf_len_actual-1, buf, qos, retain);
if(rc2){
if(!quiet) fprintf(stderr, "Error: Publish returned %d, disconnecting.\n", rc2);
mosquitto_disconnect_with_properties(mosq, cfg.disconnect_props);

View File

@ -38,14 +38,35 @@ int mosquitto_publish_with_properties(struct mosquitto *mosq, int *mid, const ch
struct mosquitto_message_all *message;
uint16_t local_mid;
int queue_status;
const mosquitto_property *p;
bool have_topic_alias;
if(!mosq || !topic || qos<0 || qos>2) return MOSQ_ERR_INVAL;
if(STREMPTY(topic)) return MOSQ_ERR_INVAL;
if(mosquitto_validate_utf8(topic, strlen(topic))) return MOSQ_ERR_MALFORMED_UTF8;
if(payloadlen < 0 || payloadlen > MQTT_MAX_PAYLOAD) return MOSQ_ERR_PAYLOAD_SIZE;
if(!mosq || qos<0 || qos>2) return MOSQ_ERR_INVAL;
if(!topic || STREMPTY(topic)){
if(topic) topic = NULL;
if(mosquitto_pub_topic_check(topic) != MOSQ_ERR_SUCCESS){
return MOSQ_ERR_INVAL;
if(mosq->protocol == mosq_p_mqtt5){
p = properties;
have_topic_alias = false;
while(p){
if(p->identifier == MQTT_PROP_TOPIC_ALIAS){
have_topic_alias = true;
break;
}
p = p->next;
}
if(have_topic_alias == false){
return MOSQ_ERR_INVAL;
}
}else{
return MOSQ_ERR_INVAL;
}
}else{
if(mosquitto_validate_utf8(topic, strlen(topic))) return MOSQ_ERR_MALFORMED_UTF8;
if(payloadlen < 0 || payloadlen > MQTT_MAX_PAYLOAD) return MOSQ_ERR_PAYLOAD_SIZE;
if(mosquitto_pub_topic_check(topic) != MOSQ_ERR_SUCCESS){
return MOSQ_ERR_INVAL;
}
}
local_mid = mosquitto__mid_generate(mosq);
@ -62,10 +83,12 @@ int mosquitto_publish_with_properties(struct mosquitto *mosq, int *mid, const ch
message->next = NULL;
message->timestamp = mosquitto_time();
message->msg.mid = local_mid;
message->msg.topic = mosquitto__strdup(topic);
if(!message->msg.topic){
message__cleanup(&message);
return MOSQ_ERR_NOMEM;
if(topic){
message->msg.topic = mosquitto__strdup(topic);
if(!message->msg.topic){
message__cleanup(&message);
return MOSQ_ERR_NOMEM;
}
}
if(payloadlen){
message->msg.payloadlen = payloadlen;

View File

@ -51,7 +51,6 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3
#endif
#endif
assert(mosq);
assert(topic);
#if defined(WITH_BROKER) && defined(WITH_WEBSOCKETS)
if(mosq->sock == INVALID_SOCKET && !mosq->wsi) return MOSQ_ERR_NO_CONN;
@ -137,9 +136,12 @@ int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic,
int rc;
assert(mosq);
assert(topic);
packetlen = 2+strlen(topic) + payloadlen;
if(topic){
packetlen = 2+strlen(topic) + payloadlen;
}else{
packetlen = 2 + payloadlen;
}
if(qos > 0) packetlen += 2; /* For message id */
if(mosq->protocol == mosq_p_mqtt5){
proplen = property__get_length_all(properties);
@ -163,7 +165,11 @@ int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic,
return rc;
}
/* Variable header (topic string) */
packet__write_string(packet, topic, strlen(topic));
if(topic){
packet__write_string(packet, topic, strlen(topic));
}else{
packet__write_uint16(packet, 0);
}
if(qos > 0){
packet__write_uint16(packet, mid);
}