1
0
mirror of https://github.com/eclipse/mosquitto.git synced 2025-05-09 01:01:11 +08:00

Add support for sub/rr v5 prop output in JSON mode

This commit is contained in:
Roger Light 2019-10-15 14:26:51 +01:00
parent 2f8573b456
commit e5237ae7e5
6 changed files with 144 additions and 16 deletions

View File

@ -22,6 +22,8 @@ Clients:
- Add support for connecting to brokers through Unix domain sockets with the
`--unix` argument.
- Use cJSON library for producing JSON output, where available. Closes #1222.
- Add support for outputting MQTT v5 property information to mosquitto_sub/rr
JSON output.
1.6.7 - 20190925

View File

@ -47,7 +47,7 @@ pub_shared.o : pub_shared.c ${SHARED_DEP}
sub_client.o : sub_client.c ${SHARED_DEP}
${CROSS_COMPILE}${CC} $(CLIENT_CPPFLAGS) $(CLIENT_CFLAGS) -c $< -o $@
sub_client_output.o : sub_client_output.c ${SHARED_DEP}
sub_client_output.o : sub_client_output.c sub_client_output.h ${SHARED_DEP}
${CROSS_COMPILE}${CC} $(CLIENT_CPPFLAGS) $(CLIENT_CFLAGS) -c $< -o $@
rr_client.o : rr_client.c ${SHARED_DEP}

View File

@ -35,6 +35,7 @@ Contributors:
#include <mqtt_protocol.h>
#include "client_shared.h"
#include "pub_shared.h"
#include "sub_client_output.h"
enum rr__state {
rr_s_new,
@ -64,8 +65,6 @@ void my_signal_handler(int signum)
}
#endif
void print_message(struct mosq_config *cfg, const struct mosquitto_message *message);
int my_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, void *payload, int qos, bool retain)
{
@ -75,7 +74,7 @@ int my_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadl
void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message, const mosquitto_property *properties)
{
print_message(&cfg, message);
print_message(&cfg, message, properties);
switch(cfg.pub_mode){
case MSGMODE_CMD:
case MSGMODE_FILE:

View File

@ -34,6 +34,7 @@ Contributors:
#include <mosquitto.h>
#include <mqtt_protocol.h>
#include "client_shared.h"
#include "sub_client_output.h"
struct mosq_config cfg;
bool process_messages = true;
@ -53,8 +54,6 @@ void my_signal_handler(int signum)
}
#endif
void print_message(struct mosq_config *cfg, const struct mosquitto_message *message);
void my_publish_callback(struct mosquitto *mosq, void *obj, int mid, int reason_code, const mosquitto_property *properties)
{
@ -98,7 +97,7 @@ void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquit
}
}
print_message(&cfg, message);
print_message(&cfg, message, properties);
if(cfg.msg_count>0){
msg_count++;

View File

@ -39,7 +39,9 @@ Contributors:
#endif
#include <mosquitto.h>
#include <mqtt_protocol.h>
#include "client_shared.h"
#include "sub_client_output.h"
extern struct mosq_config cfg;
@ -116,7 +118,97 @@ static void write_json_payload(const char *payload, int payloadlen)
#endif
static int json_print(const struct mosquitto_message *message, const struct tm *ti, bool escaped)
#ifdef WITH_CJSON
static int json_print_properties(cJSON *root, const mosquitto_property *properties)
{
int identifier;
uint8_t i8value;
uint16_t i16value;
uint32_t i32value;
char *strname, *strvalue;
char *binvalue;
cJSON *tmp, *prop_json, *user_json = NULL;
const mosquitto_property *prop;
prop_json = cJSON_CreateObject();
if(prop_json == NULL){
cJSON_Delete(prop_json);
return MOSQ_ERR_NOMEM;
}
cJSON_AddItemToObject(root, "properties", prop_json);
for(prop=properties; prop != NULL; prop = mosquitto_property_next(prop)){
tmp = NULL;
identifier = mosquitto_property_identifier(prop);
switch(identifier){
case MQTT_PROP_PAYLOAD_FORMAT_INDICATOR:
mosquitto_property_read_byte(prop, MQTT_PROP_PAYLOAD_FORMAT_INDICATOR, &i8value, false);
tmp = cJSON_CreateNumber(i8value);
break;
case MQTT_PROP_MESSAGE_EXPIRY_INTERVAL:
mosquitto_property_read_int32(prop, MQTT_PROP_MESSAGE_EXPIRY_INTERVAL, &i32value, false);
tmp = cJSON_CreateNumber(i32value);
break;
case MQTT_PROP_CONTENT_TYPE:
case MQTT_PROP_RESPONSE_TOPIC:
mosquitto_property_read_string(prop, identifier, &strvalue, false);
if(strvalue == NULL) return MOSQ_ERR_NOMEM;
tmp = cJSON_CreateString(strvalue);
free(strvalue);
break;
case MQTT_PROP_CORRELATION_DATA:
mosquitto_property_read_binary(prop, MQTT_PROP_CORRELATION_DATA, (void **)&binvalue, &i16value, false);
if(binvalue == NULL) return MOSQ_ERR_NOMEM;
tmp = cJSON_CreateString(binvalue);
free(binvalue);
break;
case MQTT_PROP_SUBSCRIPTION_IDENTIFIER:
mosquitto_property_read_varint(prop, MQTT_PROP_SUBSCRIPTION_IDENTIFIER, &i32value, false);
tmp = cJSON_CreateNumber(i32value);
break;
case MQTT_PROP_TOPIC_ALIAS:
mosquitto_property_read_int16(prop, MQTT_PROP_MESSAGE_EXPIRY_INTERVAL, &i16value, false);
tmp = cJSON_CreateNumber(i16value);
break;
case MQTT_PROP_USER_PROPERTY:
if(user_json == NULL){
user_json = cJSON_CreateObject();
if(user_json == NULL){
return MOSQ_ERR_NOMEM;
}
cJSON_AddItemToObject(prop_json, "user-properties", user_json);
}
mosquitto_property_read_string_pair(prop, MQTT_PROP_USER_PROPERTY, &strname, &strvalue, false);
if(strname == NULL || strvalue == NULL) return MOSQ_ERR_NOMEM;
tmp = cJSON_CreateString(strvalue);
free(strvalue);
if(tmp == NULL){
free(strname);
return MOSQ_ERR_NOMEM;
}
cJSON_AddItemToObject(user_json, strname, tmp);
free(strname);
tmp = NULL; /* Don't add this to prop_json below */
break;
}
if(tmp != NULL){
cJSON_AddItemToObject(prop_json, mosquitto_property_identifier_to_string(identifier), tmp);
}
}
return MOSQ_ERR_SUCCESS;
}
#endif
static int json_print(const struct mosquitto_message *message, const mosquitto_property *properties, const struct tm *ti, bool escaped)
{
#ifdef WITH_CJSON
cJSON *root;
@ -136,7 +228,7 @@ static int json_print(const struct mosquitto_message *message, const struct tm *
}
cJSON_AddItemToObject(root, "tst", tmp);
tmp = cJSON_CreateStringReference(message->topic);
tmp = cJSON_CreateString(message->topic);
if(tmp == NULL){
cJSON_Delete(root);
return MOSQ_ERR_NOMEM;
@ -173,8 +265,18 @@ static int json_print(const struct mosquitto_message *message, const struct tm *
}
cJSON_AddItemToObject(root, "mid", tmp);
}
/* Properties */
if(properties){
if(json_print_properties(root, properties)){
cJSON_Delete(root);
return MOSQ_ERR_NOMEM;
}
}
/* Payload */
if(escaped){
tmp = cJSON_CreateStringReference(message->payload);
tmp = cJSON_CreateString(message->payload);
if(tmp == NULL){
cJSON_Delete(root);
return MOSQ_ERR_NOMEM;
@ -190,7 +292,8 @@ static int json_print(const struct mosquitto_message *message, const struct tm *
cJSON_AddItemToObject(root, "payload", tmp);
}
json_str = cJSON_PrintUnformatted(root);
//json_str = cJSON_PrintUnformatted(root);
json_str = cJSON_Print(root);
cJSON_Delete(root);
fputs(json_str, stdout);
@ -220,7 +323,7 @@ static int json_print(const struct mosquitto_message *message, const struct tm *
}
static void formatted_print(const struct mosq_config *lcfg, const struct mosquitto_message *message)
static void formatted_print(const struct mosq_config *lcfg, const struct mosquitto_message *message, const mosquitto_property *properties)
{
int len;
int i;
@ -260,7 +363,7 @@ static void formatted_print(const struct mosq_config *lcfg, const struct mosquit
return;
}
}
if(json_print(message, ti, true) != MOSQ_ERR_SUCCESS){
if(json_print(message, properties, ti, true) != MOSQ_ERR_SUCCESS){
err_printf(lcfg, "Error: Out of memory.\n");
return;
}
@ -273,7 +376,7 @@ static void formatted_print(const struct mosq_config *lcfg, const struct mosquit
return;
}
}
rc = json_print(message, ti, false);
rc = json_print(message, properties, ti, false);
if(rc == MOSQ_ERR_NOMEM){
err_printf(lcfg, "Error: Out of memory.\n");
return;
@ -406,10 +509,10 @@ static void formatted_print(const struct mosq_config *lcfg, const struct mosquit
}
void print_message(struct mosq_config *cfg, const struct mosquitto_message *message)
void print_message(struct mosq_config *cfg, const struct mosquitto_message *message, const mosquitto_property *properties)
{
if(cfg->format){
formatted_print(cfg, message);
formatted_print(cfg, message, properties);
}else if(cfg->verbose){
if(message->payloadlen){
printf("%s ", message->topic);

View File

@ -0,0 +1,25 @@
/*
Copyright (c) 2019 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Roger Light - initial implementation and documentation.
*/
#ifndef SUB_CLIENT_OUTPUT_H
#define SUB_CLIENT_OUTPUT_H
#include "mosquitto.h"
#include "client_shared.h"
void print_message(struct mosq_config *cfg, const struct mosquitto_message *message, const mosquitto_property *properties);
#endif