nuttx-apps/examples/mqttc/mqttc_pub.c
Alin Jerpelea a91768e715 examples: migrate to SPDX identifier
Most tools used for compliance and SBOM generation use SPDX identifiers
This change brings us a step closer to an easy SBOM generation.

Signed-off-by: Alin Jerpelea <alin.jerpelea@sony.com>
2024-12-30 18:02:50 +08:00

637 lines
17 KiB
C

/****************************************************************************
* apps/examples/mqttc/mqttc_pub.c
*
* SPDX-License-Identifier: Apache-2.0
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The
* ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
****************************************************************************/
/****************************************************************************
* Included Files
****************************************************************************/
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <netdb.h>
#include <errno.h>
#include <mqtt.h>
#ifdef MQTT_USE_MBEDTLS
# include <inttypes.h>
# include <mbedtls/error.h>
# include <mbedtls/entropy.h>
# include <mbedtls/ctr_drbg.h>
# include <mbedtls/net_sockets.h>
# include <mbedtls/ssl.h>
/****************************************************************************
* Pre-processor Definitions
****************************************************************************/
/* This is taken from mbedtls/tests/data_files/test-ca-sha256.crt. */
/* BEGIN FILE string macro TEST_CA_CRT_RSA_SHA256_PEM
* mbedtls/tests/data_files/test-ca-sha256.crt
*/
# define TEST_CA_CRT_RSA_SHA256_PEM \
"-----BEGIN CERTIFICATE-----\r\n" \
"MIIDQTCCAimgAwIBAgIBAzANBgkqhkiG9w0BAQsFADA7MQswCQYDVQQGEwJOTDER\r\n" \
"MA8GA1UECgwIUG9sYXJTU0wxGTAXBgNVBAMMEFBvbGFyU1NMIFRlc3QgQ0EwHhcN\r\n" \
"MTkwMjEwMTQ0NDAwWhcNMjkwMjEwMTQ0NDAwWjA7MQswCQYDVQQGEwJOTDERMA8G\r\n" \
"A1UECgwIUG9sYXJTU0wxGTAXBgNVBAMMEFBvbGFyU1NMIFRlc3QgQ0EwggEiMA0G\r\n" \
"CSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDA3zf8F7vglp0/ht6WMn1EpRagzSHx\r\n" \
"mdTs6st8GFgIlKXsm8WL3xoemTiZhx57wI053zhdcHgH057Zk+i5clHFzqMwUqny\r\n" \
"50BwFMtEonILwuVA+T7lpg6z+exKY8C4KQB0nFc7qKUEkHHxvYPZP9al4jwqj+8n\r\n" \
"YMPGn8u67GB9t+aEMr5P+1gmIgNb1LTV+/Xjli5wwOQuvfwu7uJBVcA0Ln0kcmnL\r\n" \
"R7EUQIN9Z/SG9jGr8XmksrUuEvmEF/Bibyc+E1ixVA0hmnM3oTDPb5Lc9un8rNsu\r\n" \
"KNF+AksjoBXyOGVkCeoMbo4bF6BxyLObyavpw/LPh5aPgAIynplYb6LVAgMBAAGj\r\n" \
"UDBOMAwGA1UdEwQFMAMBAf8wHQYDVR0OBBYEFLRa5KWz3tJS9rnVppUP6z68x/3/\r\n" \
"MB8GA1UdIwQYMBaAFLRa5KWz3tJS9rnVppUP6z68x/3/MA0GCSqGSIb3DQEBCwUA\r\n" \
"A4IBAQA4qFSCth2q22uJIdE4KGHJsJjVEfw2/xn+MkTvCMfxVrvmRvqCtjE4tKDl\r\n" \
"oK4MxFOek07oDZwvtAT9ijn1hHftTNS7RH9zd/fxNpfcHnMZXVC4w4DNA1fSANtW\r\n" \
"5sY1JB5Je9jScrsLSS+mAjyv0Ow3Hb2Bix8wu7xNNrV5fIf7Ubm+wt6SqEBxu3Kb\r\n" \
"+EfObAT4huf3czznhH3C17ed6NSbXwoXfby7stWUDeRJv08RaFOykf/Aae7bY5PL\r\n" \
"yTVrkAnikMntJ9YI+hNNYt3inqq11A5cN0+rVTst8UKCxzQ4GpvroSwPKTFkbMw4\r\n" \
"/anT1dVxr/BtwJfiESoK3/4CeXR1\r\n" \
"-----END CERTIFICATE-----\r\n"
/* END FILE */
# define EXTRA_OPT "c:"
#else
# define EXTRA_OPT ""
#endif
/****************************************************************************
* Private Types
****************************************************************************/
struct mqttc_cfg_s
{
FAR const char *host;
FAR const char *port;
FAR const char *topic;
FAR const char *msg;
FAR const char *id;
FAR const char *user;
FAR const char *pass;
#ifdef MQTT_USE_MBEDTLS
FAR const char *ca_file;
#endif
uint32_t tmo;
uint8_t flags;
uint8_t qos;
};
struct mqtt_conn_context_s
{
struct mqtt_client client;
#ifdef MQTT_USE_MBEDTLS
mbedtls_net_context net_ctx;
mbedtls_ssl_context ssl_ctx;
mbedtls_ssl_config ssl_conf;
mbedtls_x509_crt ca_crt;
mbedtls_entropy_context entropy;
mbedtls_ctr_drbg_context ctr_drbg;
#endif
uint8_t sendbuf[CONFIG_EXAMPLES_MQTTC_TXSIZE];
uint8_t recvbuf[CONFIG_EXAMPLES_MQTTC_RXSIZE];
};
/****************************************************************************
* Private Function Prototypes
****************************************************************************/
static FAR void *client_refresher(FAR void *data);
static void parsearg(int argc, FAR char *argv[], FAR struct mqttc_cfg_s *cfg,
FAR int *n);
static int init_conn(FAR const struct mqttc_cfg_s *cfg,
FAR struct mqtt_conn_context_s *ctx,
FAR mqtt_pal_socket_handle *handle);
static void close_conn(FAR struct mqtt_conn_context_s *ctx);
/****************************************************************************
* Private Functions
****************************************************************************/
/****************************************************************************
* Name: client_refresher
*
* Description:
* The client's refresher. This function triggers back-end routines to
* handle ingress/egress traffic to the broker.
*
****************************************************************************/
static FAR void *client_refresher(FAR void *data)
{
while (1)
{
mqtt_sync((FAR struct mqtt_client *)data);
usleep(100000U);
}
return NULL;
}
/****************************************************************************
* Name: parsearg
*
* Description:
* Parse command line arguments.
*
****************************************************************************/
static void parsearg(int argc, FAR char *argv[],
FAR struct mqttc_cfg_s *cfg, FAR int *n)
{
int opt;
while ((opt = getopt(argc, argv, "h:p:m:t:n:q:" EXTRA_OPT)) != ERROR)
{
switch (opt)
{
case 'h':
cfg->host = optarg;
break;
case 'p':
cfg->port = optarg;
break;
case 'm':
cfg->msg = optarg;
break;
case 't':
cfg->topic = optarg;
break;
case 'n':
*n = strtol(optarg, NULL, 10);
break;
case 'q':
switch (strtol(optarg, NULL, 10))
{
case '0':
cfg->qos = MQTT_PUBLISH_QOS_0;
break;
case '1':
cfg->qos = MQTT_PUBLISH_QOS_1;
break;
case '2':
cfg->qos = MQTT_PUBLISH_QOS_2;
break;
}
break;
#ifdef MQTT_USE_MBEDTLS
case 'c':
cfg->ca_file = optarg;
break;
#endif
default:
fprintf(stderr, "ERROR: Unrecognized option\n");
break;
}
}
}
/****************************************************************************
* Name: init_conn
*
* Description:
* Resolve server's name and try to establish a connection.
*
****************************************************************************/
static int init_conn(FAR const struct mqttc_cfg_s *cfg,
FAR struct mqtt_conn_context_s *conn,
FAR mqtt_pal_socket_handle *socketfd)
{
#ifdef MQTT_USE_MBEDTLS
FAR mbedtls_net_context *net_ctx = &conn->net_ctx;
FAR mbedtls_ssl_context *ssl_ctx = &conn->ssl_ctx;
FAR mbedtls_ssl_config *ssl_conf = &conn->ssl_conf;
FAR mbedtls_x509_crt *ca_crt = &conn->ca_crt;
FAR mbedtls_entropy_context *entropy = &conn->entropy;
FAR mbedtls_ctr_drbg_context *ctr_drbg = &conn->ctr_drbg;
uint32_t result;
FAR const unsigned char *additional = (FAR const unsigned char *)"MQTT-C";
size_t additional_len = 6;
#else
struct addrinfo hints;
FAR struct addrinfo *servinfo;
FAR struct addrinfo *itr;
int fd;
#endif
int ret;
printf("Connecting to %s:%s...\n", cfg->host, cfg->port);
#ifdef MQTT_USE_MBEDTLS
mbedtls_entropy_init(entropy);
mbedtls_ctr_drbg_init(ctr_drbg);
ret = mbedtls_ctr_drbg_seed(ctr_drbg, mbedtls_entropy_func, entropy,
additional, additional_len);
if (ret != 0)
{
printf("ERROR! mbedtls_ctr_drbg_seed() failed: %d\n", ret);
goto err_with_ctr_drbg;
}
mbedtls_x509_crt_init(ca_crt);
if (cfg->ca_file != NULL)
{
ret = mbedtls_x509_crt_parse_file(ca_crt, cfg->ca_file);
}
else
{
ret = mbedtls_x509_crt_parse(ca_crt,
(FAR const unsigned char *)TEST_CA_CRT_RSA_SHA256_PEM,
sizeof(TEST_CA_CRT_RSA_SHA256_PEM));
}
if (ret != 0)
{
printf("ERROR! mbedtls_x509_crt_parse_file() failed: %d\n", ret);
goto err_with_x509_crt;
}
mbedtls_ssl_config_init(ssl_conf);
ret = mbedtls_ssl_config_defaults(ssl_conf, MBEDTLS_SSL_IS_CLIENT,
MBEDTLS_SSL_TRANSPORT_STREAM,
MBEDTLS_SSL_PRESET_DEFAULT);
if (ret != 0)
{
printf("ERROR! mbedtls_ssl_config_defaults() failed: %d\n", ret);
goto err_with_ssl_conf;
}
mbedtls_ssl_conf_ca_chain(ssl_conf, ca_crt, NULL);
mbedtls_ssl_conf_authmode(ssl_conf, MBEDTLS_SSL_VERIFY_OPTIONAL);
mbedtls_ssl_conf_rng(ssl_conf, mbedtls_ctr_drbg_random, ctr_drbg);
mbedtls_net_init(net_ctx);
ret = mbedtls_net_connect(net_ctx, cfg->host, cfg->port,
MBEDTLS_NET_PROTO_TCP);
if (ret != 0)
{
printf("ERROR! mbedtls_net_connect() failed: %d\n", ret);
goto err_with_net;
}
ret = mbedtls_net_set_nonblock(net_ctx);
if (ret != 0)
{
printf("ERROR! mbedtls_net_set_nonblock() failed: %d\n", ret);
goto err_with_net;
}
mbedtls_ssl_init(ssl_ctx);
ret = mbedtls_ssl_setup(ssl_ctx, ssl_conf);
if (ret != 0)
{
printf("ERROR! mbedtls_ssl_setup() failed: %d\n", ret);
goto err_with_ssl;
}
ret = mbedtls_ssl_set_hostname(ssl_ctx, cfg->host);
if (ret != 0)
{
printf("ERROR! mbedtls_ssl_set_hostname() failed: %d\n", ret);
goto err_with_ssl;
}
mbedtls_ssl_set_bio(ssl_ctx, net_ctx,
mbedtls_net_send, mbedtls_net_recv, NULL);
for (; ; )
{
uint32_t want = 0;
ret = mbedtls_ssl_handshake(ssl_ctx);
if (ret == MBEDTLS_ERR_SSL_WANT_READ)
{
want |= MBEDTLS_NET_POLL_READ;
}
else if (ret == MBEDTLS_ERR_SSL_WANT_WRITE)
{
want |= MBEDTLS_NET_POLL_WRITE;
}
else
{
break;
}
ret = mbedtls_net_poll(net_ctx, want, -1);
if (ret < 0)
{
printf("ERROR! mbedtls_net_poll() failed: %d\n", ret);
goto err_with_ssl;
}
}
if (ret != 0)
{
printf("ERROR! mbedtls_ssl_handshake() failed: %d\n", ret);
goto err_with_ssl;
}
result = mbedtls_ssl_get_verify_result(ssl_ctx);
if (result != 0)
{
if (result == 0xffffffff)
{
printf("ERROR! mbedtls_ssl_get_verify_result() failed\n");
goto err_with_ssl;
}
else
{
#if defined(MBEDTLS_X509_REMOVE_INFO)
const char *buf = "";
#else
char buf[512];
mbedtls_x509_crt_verify_info(buf, sizeof(buf), "\t", result);
#endif
printf("Certificate verification failed (%0" PRIx32 ")\n%s\n",
result, buf);
#ifndef CONFIG_EXAMPLES_MQTTC_ALLOW_UNVERIFIED_TLS
goto err_with_ssl;
#endif
}
}
*socketfd = ssl_ctx;
return 0;
err_with_ssl:
mbedtls_ssl_free(ssl_ctx);
err_with_net:
mbedtls_net_free(net_ctx);
err_with_ssl_conf:
mbedtls_ssl_config_free(ssl_conf);
err_with_x509_crt:
mbedtls_x509_crt_free(ca_crt);
err_with_ctr_drbg:
mbedtls_ctr_drbg_free(ctr_drbg);
mbedtls_entropy_free(entropy);
return -1;
#else
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
ret = getaddrinfo(cfg->host, cfg->port, &hints, &servinfo);
if (ret != 0)
{
printf("ERROR! getaddrinfo() failed: %s\n", gai_strerror(ret));
return -1;
}
itr = servinfo;
do
{
fd = socket(itr->ai_family, itr->ai_socktype, itr->ai_protocol);
if (fd < 0)
{
continue;
}
ret = connect(fd, itr->ai_addr, itr->ai_addrlen);
if (ret == 0)
{
break;
}
close(fd);
fd = -1;
}
while ((itr = itr->ai_next) != NULL);
freeaddrinfo(servinfo);
if (fd < 0)
{
printf("ERROR! Couldn't create socket\n");
return -1;
}
ret = fcntl(fd, F_GETFL, 0);
if (ret < 0)
{
close(fd);
printf("ERROR! fcntl() F_GETFL failed, errno: %d\n", errno);
return -1;
}
ret = fcntl(fd, F_SETFL, ret | O_NONBLOCK);
if (ret < 0)
{
close(fd);
printf("ERROR! fcntl() F_SETFL failed, errno: %d\n", errno);
return -1;
}
*socketfd = fd;
return 0;
#endif
}
/****************************************************************************
* Name: close_conn
*
* Description:
* Shut down connection to server established by init_conn.
*
****************************************************************************/
static void close_conn(FAR struct mqtt_conn_context_s *conn)
{
#ifndef MQTT_USE_MBEDTLS
close(conn->client.socketfd);
#else
mbedtls_net_free(&conn->net_ctx);
mbedtls_ssl_free(&conn->ssl_ctx);
mbedtls_ssl_config_free(&conn->ssl_conf);
mbedtls_x509_crt_free(&conn->ca_crt);
mbedtls_ctr_drbg_free(&conn->ctr_drbg);
mbedtls_entropy_free(&conn->entropy);
#endif
}
/****************************************************************************
* Public Functions
****************************************************************************/
int main(int argc, FAR char *argv[])
{
struct mqtt_conn_context_s mqtt_conn;
mqtt_pal_socket_handle socketfd;
int timeout = 100;
enum MQTTErrors mqtterr;
pthread_attr_t attr;
pthread_t thrdid;
int n = 1;
struct mqttc_cfg_s mqtt_cfg =
{
.host = "broker.hivemq.com",
#ifndef MQTT_USE_MBEDTLS
.port = "1883",
#else
.port = "8883",
#endif
.topic = "test",
.msg = "test",
.flags = MQTT_CONNECT_CLEAN_SESSION,
.tmo = 400,
.id = NULL,
.user = NULL,
.pass = NULL,
.qos = MQTT_PUBLISH_QOS_0
};
parsearg(argc, argv, &mqtt_cfg, &n);
if (init_conn(&mqtt_cfg, &mqtt_conn, &socketfd) < 0)
{
return -1;
}
mqtterr = mqtt_init(&mqtt_conn.client, socketfd,
mqtt_conn.sendbuf, sizeof(mqtt_conn.sendbuf),
mqtt_conn.recvbuf, sizeof(mqtt_conn.recvbuf),
NULL);
if (mqtterr != MQTT_OK)
{
printf("ERRPR! mqtt_init() failed.\n");
goto err_with_conn;
}
mqtterr = mqtt_connect(&mqtt_conn.client, mqtt_cfg.id,
NULL, /* Will topic */
NULL, /* Will message */
0, /* Will message size */
mqtt_cfg.user, /* User name */
mqtt_cfg.pass, /* Password */
mqtt_cfg.flags, mqtt_cfg.tmo);
if (mqtterr != MQTT_OK)
{
printf("ERROR! mqtt_connect() failed\n");
goto err_with_conn;
}
if (mqtt_conn.client.error != MQTT_OK)
{
printf("error: %s\n", mqtt_error_str(mqtt_conn.client.error));
goto err_with_conn;
}
else
{
printf("Success: Connected to broker!\n");
}
/* Start a thread to refresh the client (handle egress and ingress client
* traffic)
*/
if (pthread_attr_init(&attr) != 0)
{
printf("ERROR! pthread_attr_init() failed.\n");
goto err_with_conn;
}
pthread_attr_setstacksize(&attr, CONFIG_EXAMPLES_MQTTC_STACKSIZE);
if (pthread_create(&thrdid, &attr, client_refresher, &mqtt_conn.client))
{
printf("ERROR! pthread_create() failed.\n");
goto err_with_conn;
}
/* Wait for MQTT ACK or time-out */
while (!mqtt_conn.client.event_connect && --timeout > 0)
{
usleep(10000);
}
if (timeout == 0)
{
goto err_with_thrd;
}
while (n--)
{
mqtterr = mqtt_publish(&mqtt_conn.client, mqtt_cfg.topic,
mqtt_cfg.msg, strlen(mqtt_cfg.msg),
mqtt_cfg.qos);
if (mqtterr != MQTT_OK)
{
printf("ERROR! mqtt_publish() failed\n");
goto err_with_thrd;
}
if (mqtt_conn.client.error != MQTT_OK)
{
printf("error: %s\n", mqtt_error_str(mqtt_conn.client.error));
goto err_with_thrd;
}
else
{
printf("Success: Published to broker!\n");
}
sleep(5);
}
printf("\nDisconnecting from %s\n\n", mqtt_cfg.host);
mqtterr = mqtt_disconnect(&mqtt_conn.client);
if (mqtterr != MQTT_OK)
{
printf("ERROR! mqtt_disconnect() failed\n");
}
/* Force sending the DISCONNECT, the thread will be canceled before getting
* the chance to sync this last packet.
* Note however that close() would cleanly close the connection but only
* through TCP (i.e. no MQTT DISCONNECT packet).
*/
mqtt_sync(&mqtt_conn.client);
err_with_thrd:
pthread_cancel(thrdid);
err_with_conn:
close_conn(&mqtt_conn);
return 0;
}