diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c2db4e..ce92a15 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - `property` can now report the `typeid` of its contained value. - The `properties` list implements a const iterator - Added a `to_string()` and `operator<<()` for reason codes. +- Cleaned up and fixed a number of example apps. +- Reorganized the source repository - Completely reformat the sources and added a .clang-format file (a project master and a slightly-different one for headers). - Added GitHub CI Action, removing legacy Travis and Appveyor files diff --git a/examples/async_consume.cpp b/examples/async_consume.cpp index ed66a6f..cc802d0 100644 --- a/examples/async_consume.cpp +++ b/examples/async_consume.cpp @@ -7,13 +7,15 @@ // and status updates. // // The sample demonstrates: -// - Connecting to an MQTT server/broker. +// - Connecting to an MQTT v3 server/broker. // - Subscribing to a topic +// - Persistent subscriber session // - Receiving messages through the synchronous queuing API +// - Auto reconnecting // /******************************************************************************* - * Copyright (c) 2013-2023 Frank Pagliughi + * Copyright (c) 2013-2024 Frank Pagliughi * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v2.0 @@ -40,22 +42,34 @@ using namespace std; -const string SERVER_ADDRESS{"mqtt://localhost:1883"}; +const string DFLT_SERVER_URI{"mqtt://localhost:1883"}; const string CLIENT_ID{"paho_cpp_async_consume"}; -const string TOPIC{"hello"}; +const string TOPIC{"hello"}; const int QOS = 1; ///////////////////////////////////////////////////////////////////////////// int main(int argc, char* argv[]) { - mqtt::async_client cli(SERVER_ADDRESS, CLIENT_ID); + auto serverUri = (argc > 1) ? string{argv[1]} : DFLT_SERVER_URI; - auto connOpts = mqtt::connect_options_builder().clean_session(false).finalize(); + mqtt::async_client cli(serverUri, CLIENT_ID); + + auto connOpts = mqtt::connect_options_builder::v3() + .keep_alive_interval(30s) + .clean_session(false) + .automatic_reconnect() + .finalize(); + + // The client will handle automatic reconnects, but we add this + // callbacks to let the user know when we're reconnected. + cli.set_connected_handler([](const std::string&) { + cout << "\n*** Connected ***" << endl; + }); try { - // Start consumer before connecting to make sure to not miss messages + // Start consumer before connecting to make sure to not miss any messages cli.start_consuming(); @@ -71,36 +85,24 @@ int main(int argc, char* argv[]) // If there is no session present, then we need to subscribe, but if // there is a session, then the server remembers us and our // subscriptions. - if (!rsp.is_session_present()) + if (!rsp.is_session_present()) { + cout << " No session present on server. Subscribing..." << flush; cli.subscribe(TOPIC, QOS)->wait(); + } cout << "OK" << endl; // Consume messages - // This just exits if the client is disconnected. - // (See some other examples for auto or manual reconnect) - cout << "Waiting for messages on topic: '" << TOPIC << "'" << endl; + cout << "\nWaiting for messages on topic: '" << TOPIC << "'" << endl; while (true) { auto msg = cli.consume_message(); - if (!msg) - break; - cout << msg->get_topic() << ": " << msg->to_string() << endl; - } - // If we're here, the client was almost certainly disconnected. - // But we check, just to make sure. - - if (cli.is_connected()) { - cout << "\nShutting down and disconnecting from the MQTT server..." << flush; - cli.unsubscribe(TOPIC)->wait(); - cli.stop_consuming(); - cli.disconnect()->wait(); - cout << "OK" << endl; - } - else { - cout << "\nClient was disconnected" << endl; + if (msg) + cout << msg->get_topic() << ": " << msg->to_string() << endl; + else + cout << "*** Connection Lost ***" << endl; } } catch (const mqtt::exception& exc) { diff --git a/examples/async_consume_v5.cpp b/examples/async_consume_v5.cpp index b4259cd..a7d9566 100644 --- a/examples/async_consume_v5.cpp +++ b/examples/async_consume_v5.cpp @@ -40,17 +40,19 @@ using namespace std; -const string SERVER_ADDRESS{"mqtt://localhost:1883"}; +const string DFLT_SERVER_URI{"mqtt://localhost:1883"}; const string CLIENT_ID{"PahoCppAsyncConsumeV5"}; -const string TOPIC{"hello"}; +const string TOPIC{"hello"}; const int QOS = 1; ///////////////////////////////////////////////////////////////////////////// int main(int argc, char* argv[]) { - mqtt::async_client cli(SERVER_ADDRESS, CLIENT_ID); + auto serverURI = (argc > 1) ? string{argv[1]} : DFLT_SERVER_URI; + + mqtt::async_client cli(serverURI, CLIENT_ID); auto connOpts = mqtt::connect_options_builder::v5() .clean_start(false) @@ -63,8 +65,8 @@ int main(int argc, char* argv[]) }); cli.set_disconnected_handler([](const mqtt::properties&, mqtt::ReasonCode reason) { - cout << "*** Disconnected. Reason [0x" - << hex << int{reason} << "]: " << reason << " ***" << endl; + cout << "*** Disconnected. Reason [0x" << hex << int{reason} << "]: " << reason + << " ***" << endl; }); // Start consumer before connecting to make sure to not miss messages diff --git a/examples/async_publish.cpp b/examples/async_publish.cpp index 8f57a8f..b7ac68a 100644 --- a/examples/async_publish.cpp +++ b/examples/async_publish.cpp @@ -42,7 +42,7 @@ using namespace std; -const string DFLT_SERVER_ADDRESS{"mqtt://localhost:1883"}; +const string DFLT_SERVER_URI{"mqtt://localhost:1883"}; const string CLIENT_ID{"paho_cpp_async_publish"}; const string PERSIST_DIR{"./persist"}; @@ -134,11 +134,11 @@ int main(int argc, char* argv[]) // session or Client ID unless it's using persistence, then the local // library requires an ID to identify the persistence files. - string address = (argc > 1) ? string(argv[1]) : DFLT_SERVER_ADDRESS, - clientID = (argc > 2) ? string(argv[2]) : CLIENT_ID; + string serverURI = (argc > 1) ? string{argv[1]} : DFLT_SERVER_URI, + clientID = (argc > 2) ? string{argv[2]} : CLIENT_ID; - cout << "Initializing for server '" << address << "'..." << endl; - mqtt::async_client client(address, clientID, PERSIST_DIR); + cout << "Initializing for server '" << serverURI << "'..." << endl; + mqtt::async_client client(serverURI, clientID, PERSIST_DIR); callback cb; client.set_callback(cb); diff --git a/examples/async_publish_time.cpp b/examples/async_publish_time.cpp index fe14549..9d6f27b 100644 --- a/examples/async_publish_time.cpp +++ b/examples/async_publish_time.cpp @@ -55,7 +55,7 @@ using namespace std; using namespace std::chrono; -const std::string DFLT_SERVER_ADDRESS{"mqtt://localhost:1883"}; +const std::string DFLT_SERVER_URI{"mqtt://localhost:1883"}; // The QoS for sending data const int QOS = 1; @@ -86,7 +86,7 @@ uint64_t timestamp() int main(int argc, char* argv[]) { // The server URI (address) - string serverURI = (argc > 1) ? string(argv[1]) : DFLT_SERVER_ADDRESS; + string serverURI = (argc > 1) ? string{argv[1]} : DFLT_SERVER_URI; // The amount of time to run (in ms). Zero means "run forever". uint64_t trun = (argc > 2) ? stoll(argv[2]) : 0LL; @@ -130,7 +130,8 @@ int main(int argc, char* argv[]) auto top = mqtt::topic(cli, "data/time", QOS); cout << "Publishing data..." << endl; - while (timestamp() % DELTA_MS != 0); + while (timestamp() % DELTA_MS != 0) + ; uint64_t t = timestamp(), tlast = t, tstart = t; diff --git a/examples/async_subscribe.cpp b/examples/async_subscribe.cpp index 917874a..ffdf442 100644 --- a/examples/async_subscribe.cpp +++ b/examples/async_subscribe.cpp @@ -40,7 +40,7 @@ #include "mqtt/async_client.h" -const std::string SERVER_ADDRESS("mqtt://localhost:1883"); +const std::string DFLT_SERVER_URI("mqtt://localhost:1883"); const std::string CLIENT_ID("paho_cpp_async_subscribe"); const std::string TOPIC("hello"); @@ -181,7 +181,9 @@ int main(int argc, char* argv[]) // disconnected. In that case, it needs a unique ClientID and a // non-clean session. - mqtt::async_client cli(SERVER_ADDRESS, CLIENT_ID); + auto serverURI = (argc > 1) ? std::string{argv[1]} : DFLT_SERVER_URI; + + mqtt::async_client cli(serverURI, CLIENT_ID); mqtt::connect_options connOpts; connOpts.set_clean_session(false); @@ -194,18 +196,19 @@ int main(int argc, char* argv[]) // When completed, the callback will subscribe to topic. try { - std::cout << "Connecting to the MQTT server..." << std::flush; + std::cout << "Connecting to the MQTT server '" << serverURI << "'..." << std::flush; cli.connect(connOpts, nullptr, cb); } catch (const mqtt::exception& exc) { - std::cerr << "\nERROR: Unable to connect to MQTT server: '" << SERVER_ADDRESS << "'" - << exc << std::endl; + std::cerr << "\nERROR: Unable to connect to MQTT server: '" << serverURI << "'" << exc + << std::endl; return 1; } // Just block till user tells us to quit. - while (std::tolower(std::cin.get()) != 'q'); + while (std::tolower(std::cin.get()) != 'q') + ; // Disconnect diff --git a/examples/data_publish.cpp b/examples/data_publish.cpp index 038f45c..e1d78bb 100644 --- a/examples/data_publish.cpp +++ b/examples/data_publish.cpp @@ -65,7 +65,7 @@ using namespace std; using namespace std::chrono; -const std::string DFLT_ADDRESS{"mqtt://localhost:1883"}; +const std::string DFLT_SERVER_URI{"mqtt://localhost:1883"}; const std::string CLIENT_ID{"paho-cpp-data-publish"}; const string TOPIC{"data/rand"}; @@ -264,13 +264,13 @@ public: int main(int argc, char* argv[]) { - string address = (argc > 1) ? string(argv[1]) : DFLT_ADDRESS; + string serverURI = (argc > 1) ? string{argv[1]} : DFLT_SERVER_URI; #if defined(_WIN32) - mqtt::async_client cli(address, CLIENT_ID, MAX_BUFFERED_MSGS); + mqtt::async_client cli(serverURI, CLIENT_ID, MAX_BUFFERED_MSGS); #else encoded_file_persistence persist("elephant"); - mqtt::async_client cli(address, CLIENT_ID, MAX_BUFFERED_MSGS, &persist); + mqtt::async_client cli(serverURI, CLIENT_ID, MAX_BUFFERED_MSGS, &persist); #endif auto connOpts = mqtt::connect_options_builder() @@ -290,7 +290,7 @@ int main(int argc, char* argv[]) try { // Connect to the MQTT broker - cout << "Connecting to server '" << address << "'..." << flush; + cout << "Connecting to server '" << serverURI << "'..." << flush; cli.connect(connOpts)->wait(); cout << "OK\n" << endl; diff --git a/examples/ssl_publish.cpp b/examples/ssl_publish.cpp index 19408b9..c3f37a5 100644 --- a/examples/ssl_publish.cpp +++ b/examples/ssl_publish.cpp @@ -50,7 +50,7 @@ #include "mqtt/async_client.h" -const std::string DFLT_SERVER_ADDRESS{"mqtts://localhost:18884"}; +const std::string DFLT_SERVER_URI{"mqtts://localhost:18884"}; const std::string DFLT_CLIENT_ID{"ssl_publish_cpp"}; const std::string KEY_STORE{"client.pem"}; @@ -90,8 +90,8 @@ using namespace std; int main(int argc, char* argv[]) { - string address = (argc > 1) ? string(argv[1]) : DFLT_SERVER_ADDRESS, - clientID = (argc > 2) ? string(argv[2]) : DFLT_CLIENT_ID; + string serverURI = (argc > 1) ? string{argv[1]} : DFLT_SERVER_URI, + clientID = (argc > 2) ? string{argv[2]} : DFLT_CLIENT_ID; // Note that we don't actually need to open the trust or key stores. // We just need a quick, portable way to check that they exist. @@ -112,8 +112,8 @@ int main(int argc, char* argv[]) } } - cout << "Initializing for server '" << address << "'..." << endl; - mqtt::async_client client(address, clientID); + cout << "Initializing for server '" << serverURI << "'..." << endl; + mqtt::async_client client(serverURI, clientID); callback cb; client.set_callback(cb); diff --git a/examples/sync_consume.cpp b/examples/sync_consume.cpp index 85c0503..0ee1c24 100644 --- a/examples/sync_consume.cpp +++ b/examples/sync_consume.cpp @@ -44,16 +44,18 @@ using namespace std; using namespace std::chrono; -const string SERVER_ADDRESS{"mqtt://localhost:1883"}; +const string DFLT_SERVER_URI{"mqtt://localhost:1883"}; const string CLIENT_ID{"paho_cpp_sync_consume"}; ///////////////////////////////////////////////////////////////////////////// int main(int argc, char* argv[]) { - mqtt::client cli(SERVER_ADDRESS, CLIENT_ID); + auto serverURI = (argc > 1) ? std::string{argv[1]} : DFLT_SERVER_URI; - auto connOpts = mqtt::connect_options_builder() + mqtt::client cli(serverURI, CLIENT_ID); + + auto connOpts = mqtt::connect_options_builder::v3() .user_name("user") .password("passwd") .keep_alive_interval(seconds(30)) diff --git a/examples/sync_consume_v5.cpp b/examples/sync_consume_v5.cpp index 7663207..2a5afe8 100644 --- a/examples/sync_consume_v5.cpp +++ b/examples/sync_consume_v5.cpp @@ -46,7 +46,7 @@ using namespace std; using namespace std::chrono; -const string SERVER_ADDRESS{"mqtt://localhost:1883"}; +const string DFLT_SERVER_URI{"mqtt://localhost:1883"}; const string CLIENT_ID{"paho_cpp_sync_consume5"}; constexpr int QOS_0 = 0; @@ -82,7 +82,9 @@ bool command_handler(const mqtt::message& msg) int main(int argc, char* argv[]) { - mqtt::client cli(SERVER_ADDRESS, CLIENT_ID, mqtt::create_options(MQTTVERSION_5)); + auto serverURI = (argc > 1) ? std::string{argv[1]} : DFLT_SERVER_URI; + + mqtt::client cli(serverURI, CLIENT_ID, mqtt::create_options(MQTTVERSION_5)); auto connOpts = mqtt::connect_options_builder::v5() .automatic_reconnect(seconds(2), seconds(30)) diff --git a/examples/sync_publish.cpp b/examples/sync_publish.cpp index fd975f5..28d8f36 100644 --- a/examples/sync_publish.cpp +++ b/examples/sync_publish.cpp @@ -36,7 +36,7 @@ #include "mqtt/client.h" -const std::string SERVER_ADDRESS{"mqtt://localhost:1883"}; +const std::string DFLT_SERVER_URI{"mqtt://localhost:1883"}; const std::string TOPIC{"hello"}; const std::string PAYLOAD1{"Hello World!"}; @@ -165,9 +165,11 @@ public: int main(int argc, char* argv[]) { + auto serverURI = (argc > 1) ? std::string{argv[1]} : DFLT_SERVER_URI; + std::cout << "Initializing..." << std::endl; sample_mem_persistence persist; - mqtt::client client(SERVER_ADDRESS, "", &persist); + mqtt::client client(serverURI, "", &persist); user_callback cb; client.set_callback(cb); diff --git a/examples/sync_reconnect.cpp b/examples/sync_reconnect.cpp index 772af98..4be37de 100644 --- a/examples/sync_reconnect.cpp +++ b/examples/sync_reconnect.cpp @@ -49,7 +49,7 @@ using namespace std; using namespace std::chrono; -const std::string DFLT_SERVER_ADDRESS{"mqtt://localhost:1883"}; +const std::string DFLT_SERVER_URI{"mqtt://localhost:1883"}; // The QoS for sending data const int QOS = 1; @@ -74,14 +74,14 @@ uint64_t timestamp() int main(int argc, char* argv[]) { // The server URI (address) - string address = (argc > 1) ? string(argv[1]) : DFLT_SERVER_ADDRESS; + string serverURI = (argc > 1) ? string{argv[1]} : DFLT_SERVER_URI; // The amount of time to run (in sec). Zero means "run forever". uint64_t trun = (argc > 2) ? stoll(argv[2]) : 0LL; - cout << "Initializing for server '" << address << "'..." << endl; + cout << "Initializing for server '" << serverURI << "'..." << endl; - mqtt::client cli(address, ""); + mqtt::client cli(serverURI, ""); auto connOpts = mqtt::connect_options_builder().clean_session().finalize(); diff --git a/examples/topic_publish.cpp b/examples/topic_publish.cpp index 9663637..a6ae6c4 100644 --- a/examples/topic_publish.cpp +++ b/examples/topic_publish.cpp @@ -40,7 +40,7 @@ using namespace std; -const string DFLT_SERVER_ADDRESS{"mqtt://localhost:1883"}; +const string DFLT_SERVER_URI{"mqtt://localhost:1883"}; const string TOPIC{"test"}; const int QOS = 1; @@ -56,10 +56,10 @@ const auto TIMEOUT = std::chrono::seconds(10); int main(int argc, char* argv[]) { - string address = (argc > 1) ? string(argv[1]) : DFLT_SERVER_ADDRESS; + string serverURI = (argc > 1) ? string(argv[1]) : DFLT_SERVER_URI; - cout << "Initializing for server '" << address << "'..." << endl; - mqtt::async_client cli(address, ""); + cout << "Initializing for server '" << serverURI << "'..." << endl; + mqtt::async_client cli(serverURI, ""); cout << " ...OK" << endl; diff --git a/examples/ws_publish.cpp b/examples/ws_publish.cpp index eec8436..617f3fc 100644 --- a/examples/ws_publish.cpp +++ b/examples/ws_publish.cpp @@ -40,7 +40,7 @@ #include "mqtt/async_client.h" // Assume a local server with websocket support on port 8080 -const std::string DFLT_SERVER_ADDRESS{"ws://localhost:8080"}; +const std::string DFLT_SERVER_URI{"ws://localhost:8080"}; // A local proxy, like squid on port 3128 // Here assuming basic authentication with user "user" and password "pass". @@ -58,14 +58,14 @@ using namespace std; int main(int argc, char* argv[]) { - string address = (argc > 1) ? string(argv[1]) : DFLT_SERVER_ADDRESS, - proxy = (argc > 2) ? string(argv[2]) : DFLT_PROXY_ADDRESS; + string serverURI = (argc > 1) ? string{argv[1]} : DFLT_SERVER_URI, + proxy = (argc > 2) ? string{argv[2]} : DFLT_PROXY_ADDRESS; - cout << "Initializing for server '" << address << "'..." << endl; + cout << "Initializing for server '" << serverURI << "'..." << endl; if (!proxy.empty()) cout << " with proxy '" << proxy << "'" << endl; - mqtt::async_client client(address, ""); + mqtt::async_client client(serverURI, ""); // Build the connect options. diff --git a/src/async_client.cpp b/src/async_client.cpp index 8b5ea8e..5314bd7 100644 --- a/src/async_client.cpp +++ b/src/async_client.cpp @@ -379,20 +379,13 @@ token_ptr async_client::connect(connect_options opts) { // TODO: We should update the MQTT version from the response // (when the server confirms the requested version) - - // If the options specified a new MQTT protocol version, we should - // use it, otherwise default to the version requested when the client - // was created. - if (opts.opts_.MQTTVersion == 0 && mqttVersion_ >= 5) - opts.opts_.MQTTVersion = mqttVersion_; - else - mqttVersion_ = opts.opts_.MQTTVersion; + mqttVersion_ = opts.opts_.MQTTVersion; // The C lib is very picky about version and clean start/session - if (opts.opts_.MQTTVersion >= 5) - opts.opts_.cleansession = 0; - else + if (opts.opts_.MQTTVersion < 5) opts.opts_.cleanstart = 0; + else + opts.opts_.cleansession = 0; // TODO: If connTok_ is non-null, there could be a pending connect // which might complete after creating/assigning a new one. If that diff --git a/src/reason_code.cpp b/src/reason_code.cpp index 78bf927..9d3416d 100644 --- a/src/reason_code.cpp +++ b/src/reason_code.cpp @@ -22,16 +22,15 @@ namespace mqtt { ///////////////////////////////////////////////////////////////////////////// - std::string to_string(ReasonCode reasonCode) { - return std::string{MQTTReasonCode_toString(MQTTReasonCodes(reasonCode))}; + return std::string{MQTTReasonCode_toString(MQTTReasonCodes(reasonCode))}; } std::ostream& operator<<(std::ostream& os, ReasonCode reasonCode) { - os << MQTTReasonCode_toString(MQTTReasonCodes(reasonCode)); - return os; + os << MQTTReasonCode_toString(MQTTReasonCodes(reasonCode)); + return os; } ///////////////////////////////////////////////////////////////////////////// diff --git a/src/token.cpp b/src/token.cpp index e7781e6..d611366 100644 --- a/src/token.cpp +++ b/src/token.cpp @@ -259,17 +259,17 @@ void token::reset() void token::set_action_callback(iaction_listener& listener) { - unique_lock g{lock_}; - listener_ = &listener; + unique_lock g{lock_}; + listener_ = &listener; - if (complete_) { - g.unlock(); + if (complete_) { + g.unlock(); - if (rc_ == MQTTASYNC_SUCCESS) - listener.on_success(*this); - else - listener.on_failure(*this); - } + if (rc_ == MQTTASYNC_SUCCESS) + listener.on_success(*this); + else + listener.on_failure(*this); + } } void token::wait()