1
0
mirror of https://github.com/eclipse/paho.mqtt.cpp.git synced 2025-05-10 03:39:07 +08:00

Cleaned up some of the sample apps. Rejiggered create_options contructors, breaking previous commits

This commit is contained in:
fpagliughi 2020-12-07 18:30:23 -05:00
parent bafe6b4100
commit a2e6645533
13 changed files with 104 additions and 50 deletions

View File

@ -54,7 +54,7 @@ async_client::async_client(const string& serverURI, const string& clientId,
: serverURI_(serverURI), clientId_(clientId), : serverURI_(serverURI), clientId_(clientId),
mqttVersion_(MQTTVERSION_DEFAULT), userCallback_(nullptr) mqttVersion_(MQTTVERSION_DEFAULT), userCallback_(nullptr)
{ {
create_options opts(maxBufferedMessages); create_options opts(MQTTVERSION_DEFAULT, maxBufferedMessages);
int rc = MQTTAsync_createWithOptions(&cli_, serverURI.c_str(), clientId.c_str(), int rc = MQTTAsync_createWithOptions(&cli_, serverURI.c_str(), clientId.c_str(),
MQTTCLIENT_PERSISTENCE_DEFAULT, MQTTCLIENT_PERSISTENCE_DEFAULT,
@ -71,7 +71,7 @@ async_client::async_client(const string& serverURI, const string& clientId,
iclient_persistence* persistence /*=nullptr*/, iclient_persistence* persistence /*=nullptr*/,
ipersistence_encoder* encoder /*=nullptr*/) ipersistence_encoder* encoder /*=nullptr*/)
: async_client(serverURI, clientId, : async_client(serverURI, clientId,
create_options(maxBufferedMessages), create_options(MQTTVERSION_DEFAULT, maxBufferedMessages),
persistence, encoder) persistence, encoder)
{ {
} }

View File

@ -31,35 +31,50 @@ const std::chrono::minutes client::DFLT_TIMEOUT = std::chrono::minutes(5);
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
client::client(const string& serverURI, const string& clientId, client::client(const string& serverURI, const string& clientId,
iclient_persistence* persistence /*=nullptr*/) iclient_persistence* persistence /*=nullptr*/,
: cli_(serverURI, clientId, persistence), ipersistence_encoder* encoder /*=nullptr*/)
: cli_(serverURI, clientId, persistence, encoder),
timeout_(DFLT_TIMEOUT), userCallback_(nullptr) timeout_(DFLT_TIMEOUT), userCallback_(nullptr)
{ {
} }
client::client(const string& serverURI, const string& clientId, client::client(const string& serverURI, const string& clientId,
const string& persistDir) const string& persistDir,
: cli_(serverURI, clientId, persistDir), ipersistence_encoder* encoder /*=nullptr*/)
: cli_(serverURI, clientId, persistDir, encoder),
timeout_(DFLT_TIMEOUT), userCallback_(nullptr) timeout_(DFLT_TIMEOUT), userCallback_(nullptr)
{ {
} }
client::client(const string& serverURI, const string& clientId, client::client(const string& serverURI, const string& clientId,
int maxBufferedMessages, iclient_persistence* persistence /*=nullptr*/) int maxBufferedMessages,
: cli_(serverURI, clientId, maxBufferedMessages, persistence), iclient_persistence* persistence /*=nullptr*/,
ipersistence_encoder* encoder /*=nullptr*/)
: cli_(serverURI, clientId, maxBufferedMessages, persistence, encoder),
timeout_(DFLT_TIMEOUT), userCallback_(nullptr) timeout_(DFLT_TIMEOUT), userCallback_(nullptr)
{ {
} }
client::client(const string& serverURI, const string& clientId, client::client(const string& serverURI, const string& clientId,
int maxBufferedMessages, const string& persistDir) int maxBufferedMessages, const string& persistDir,
: cli_(serverURI, clientId, maxBufferedMessages, persistDir), ipersistence_encoder* encoder /*=nullptr*/)
: cli_(serverURI, clientId, maxBufferedMessages, persistDir, encoder),
timeout_(DFLT_TIMEOUT), userCallback_(nullptr) timeout_(DFLT_TIMEOUT), userCallback_(nullptr)
{ {
} }
client::client(const string& serverURI, const string& clientId,
const create_options& opts,
iclient_persistence* persistence /*=nullptr*/,
ipersistence_encoder* encoder /*=nullptr*/)
: cli_(serverURI, clientId, opts, persistence, encoder),
timeout_(DFLT_TIMEOUT), userCallback_(nullptr)
{
}
// --------------------------------------------------------------------------
void client::set_callback(callback& cb) void client::set_callback(callback& cb)
{ {
userCallback_ = &cb; userCallback_ = &cb;

View File

@ -25,7 +25,8 @@ namespace mqtt {
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
const MQTTAsync_connectOptions connect_options::DFLT_C_STRUCT = MQTTAsync_connectOptions_initializer; const MQTTAsync_connectOptions connect_options::DFLT_C_STRUCT =
MQTTAsync_connectOptions_initializer;
connect_options::connect_options() : opts_(DFLT_C_STRUCT) connect_options::connect_options() : opts_(DFLT_C_STRUCT)
{ {

View File

@ -22,11 +22,18 @@ namespace mqtt {
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
const MQTTAsync_createOptions create_options::DFLT_C_STRUCT = const MQTTAsync_createOptions create_options::DFLT_C_STRUCT =
MQTTAsync_createOptions_initializer5; MQTTAsync_createOptions_initializer;
create_options::create_options(int maxBufferedMessages) : create_options() create_options::create_options(int mqttVersion) : create_options()
{ {
opts_.MQTTVersion = mqttVersion;
}
create_options::create_options(int mqttVersion, int maxBufferedMessages) : create_options()
{
opts_.MQTTVersion = mqttVersion;
if (maxBufferedMessages != 0) { if (maxBufferedMessages != 0) {
opts_.sendWhileDisconnected = to_int(true); opts_.sendWhileDisconnected = to_int(true);
opts_.maxBufferedMessages = maxBufferedMessages; opts_.maxBufferedMessages = maxBufferedMessages;

View File

@ -6,7 +6,7 @@
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
/******************************************************************************* /*******************************************************************************
* Copyright (c) 2013-2019 Frank Pagliughi <fpagliughi@mindspring.com> * Copyright (c) 2013-2020 Frank Pagliughi <fpagliughi@mindspring.com>
* *
* All rights reserved. This program and the accompanying materials * All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0 * are made available under the terms of the Eclipse Public License v1.0
@ -253,8 +253,7 @@ public:
* as a URI. * as a URI.
* @param clientId a client identifier that is unique on the server * @param clientId a client identifier that is unique on the server
* being connected to * being connected to
* @param maxBufferedMessages the maximum number of messages allowed to * @param opts The create options
* be buffered while not connected
* @param persistence The user persistence structure. If this is null, * @param persistence The user persistence structure. If this is null,
* then no persistence is used. * then no persistence is used.
* @param encoder An object to encode and decode the persistence data. * @param encoder An object to encode and decode the persistence data.

View File

@ -104,7 +104,8 @@ public:
* then no persistence is used. * then no persistence is used.
*/ */
client(const string& serverURI, const string& clientId, client(const string& serverURI, const string& clientId,
iclient_persistence* persistence=nullptr); iclient_persistence* persistence=nullptr,
ipersistence_encoder* encoder=nullptr);
/** /**
* Create an async_client that can be used to communicate with an MQTT * Create an async_client that can be used to communicate with an MQTT
* server. * server.
@ -116,7 +117,8 @@ public:
* @param persistDir The directory to use for persistence data * @param persistDir The directory to use for persistence data
*/ */
client(const string& serverURI, const string& clientId, client(const string& serverURI, const string& clientId,
const string& persistDir); const string& persistDir,
ipersistence_encoder* encoder=nullptr);
/** /**
* Create a client that can be used to communicate with an MQTT server, * Create a client that can be used to communicate with an MQTT server,
* which allows for off-line message buffering. * which allows for off-line message buffering.
@ -132,7 +134,9 @@ public:
* then no persistence is used. * then no persistence is used.
*/ */
client(const string& serverURI, const string& clientId, client(const string& serverURI, const string& clientId,
int maxBufferedMessages, iclient_persistence* persistence=nullptr); int maxBufferedMessages,
iclient_persistence* persistence=nullptr,
ipersistence_encoder* encoder=nullptr);
/** /**
* Create a client that can be used to communicate with an MQTT server, * Create a client that can be used to communicate with an MQTT server,
* which allows for off-line message buffering. * which allows for off-line message buffering.
@ -144,9 +148,30 @@ public:
* @param maxBufferedMessages the maximum number of messages allowed to * @param maxBufferedMessages the maximum number of messages allowed to
* be buffered while not connected * be buffered while not connected
* @param persistDir The directory to use for persistence data * @param persistDir The directory to use for persistence data
* @param encoder An object to encode and decode the persistence data.
*/ */
client(const string& serverURI, const string& clientId, client(const string& serverURI, const string& clientId,
int maxBufferedMessages, const string& persistDir); int maxBufferedMessages, const string& persistDir,
ipersistence_encoder* encoder=nullptr);
/**
* Create an async_client that can be used to communicate with an MQTT
* server, which allows for off-line message buffering.
* This allows the caller to specify a user-defined persistence object,
* or use no persistence.
* @param serverURI the address of the server to connect to, specified
* as a URI.
* @param clientId a client identifier that is unique on the server
* being connected to
* @param opts The create options
* @param persistence The user persistence structure. If this is null,
* then no persistence is used.
* @param encoder An object to encode and decode the persistence data.
* @param encoder An object to encode and decode the persistence data.
*/
client(const string& serverURI, const string& clientId,
const create_options& opts,
iclient_persistence* persistence=nullptr,
ipersistence_encoder* encoder=nullptr);
/** /**
* Virtual destructor * Virtual destructor
*/ */

View File

@ -56,12 +56,18 @@ public:
* Default set of client create options. * Default set of client create options.
*/ */
create_options() : opts_(DFLT_C_STRUCT) {} create_options() : opts_(DFLT_C_STRUCT) {}
/**
* Default create optionsfor the specified version of MQTT.
* @param mqttVersion The MQTT version used to create the client.
*/
explicit create_options(int mqttVersion);
/** /**
* Default create options, but with off-line buffering enabled. * Default create options, but with off-line buffering enabled.
* @param mqttVersion The MQTT version used to create the client.
* @param maxBufferedMessages the maximum number of messages allowed to * @param maxBufferedMessages the maximum number of messages allowed to
* be buffered while not connected * be buffered while not connected
*/ */
explicit create_options(int maxBufferedMessages); create_options(int mqttVersion, int maxBufferedMessages);
/** /**
* Gets whether the client will accept message to publish while * Gets whether the client will accept message to publish while
* disconnected. * disconnected.

View File

@ -32,7 +32,7 @@
#include <iostream> #include <iostream>
#include <cstdlib> #include <cstdlib>
#include <string> #include <string>
#include <thread> // For sleep #include <thread>
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <cstring> #include <cstring>
@ -132,16 +132,15 @@ int main(int argc, char* argv[])
callback cb; callback cb;
client.set_callback(cb); client.set_callback(cb);
mqtt::connect_options conopts; auto connopts = mqtt::connect_options_builder()
mqtt::message willmsg(TOPIC, LWT_PAYLOAD, 1, true); .will(mqtt::message(TOPIC, LWT_PAYLOAD, QOS))
mqtt::will_options will(willmsg); .finalize();
conopts.set_will(will);
cout << " ...OK" << endl; cout << " ...OK" << endl;
try { try {
cout << "\nConnecting..." << endl; cout << "\nConnecting..." << endl;
mqtt::token_ptr conntok = client.connect(conopts); mqtt::token_ptr conntok = client.connect(connopts);
cout << "Waiting for the connection..." << endl; cout << "Waiting for the connection..." << endl;
conntok->wait(); conntok->wait();
cout << " ...OK" << endl; cout << " ...OK" << endl;

View File

@ -168,12 +168,12 @@ public:
int main(int argc, char* argv[]) int main(int argc, char* argv[])
{ {
mqtt::async_client client(SERVER_ADDRESS, CLIENT_ID);
mqtt::connect_options connOpts; mqtt::connect_options connOpts;
connOpts.set_keep_alive_interval(20); connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true); connOpts.set_clean_session(true);
mqtt::async_client client(SERVER_ADDRESS, CLIENT_ID);
callback cb(client, connOpts); callback cb(client, connOpts);
client.set_callback(cb); client.set_callback(cb);

View File

@ -113,10 +113,11 @@ int main(int argc, char* argv[])
mqtt::async_client cli(address, CLIENT_ID, MAX_BUFFERED_MSGS, mqtt::async_client cli(address, CLIENT_ID, MAX_BUFFERED_MSGS,
PERSIST_DIR, &encoder); PERSIST_DIR, &encoder);
mqtt::connect_options connOpts; auto connOpts = mqtt::connect_options_builder()
connOpts.set_keep_alive_interval(MAX_BUFFERED_MSGS * PERIOD); .keep_alive_interval(MAX_BUFFERED_MSGS * PERIOD)
connOpts.set_clean_session(true); .clean_session(true)
connOpts.set_automatic_reconnect(true); .automatic_reconnect(true)
.finalize();
// Create a topic object. This is a conventience since we will // Create a topic object. This is a conventience since we will
// repeatedly publish messages with the same parameters. // repeatedly publish messages with the same parameters.

View File

@ -70,17 +70,19 @@ int main(int argc, char* argv[])
// LWT message is broadcast to other users if out connection is lost // LWT message is broadcast to other users if out connection is lost
auto lwt = mqtt::make_message(chatTopic, "<<<"+chatUser+" was disconnected>>>", QOS, false); auto lwt = mqtt::message(chatTopic, "<<<"+chatUser+" was disconnected>>>", QOS, false);
// Set up the connect options // Set up the connect options
mqtt::connect_options connOpts; auto connOpts = mqtt::connect_options_builder()
connOpts.set_keep_alive_interval(20); .keep_alive_interval(std::chrono::seconds(20))
connOpts.set_mqtt_version(MQTTVERSION_5); .mqtt_version(MQTTVERSION_5)
connOpts.set_clean_start(true); .clean_start(true)
connOpts.set_will_message(lwt); .will(std::move(lwt))
.finalize();
mqtt::async_client cli(SERVER_ADDRESS, ""); mqtt::async_client cli(SERVER_ADDRESS, "",
mqtt::create_options(MQTTVERSION_5));
// Set a callback for connection lost. // Set a callback for connection lost.
// This just exits the app. // This just exits the app.

View File

@ -34,7 +34,7 @@
#include <sstream> #include <sstream>
#include <cstdlib> #include <cstdlib>
#include <string> #include <string>
#include <thread> // For sleep #include <thread>
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <cstring> #include <cstring>
@ -59,9 +59,10 @@ int main(int argc, char* argv[])
constexpr int QOS = 1; constexpr int QOS = 1;
const string REQ_TOPIC_HDR { "requests/math/" }; const string REQ_TOPIC_HDR { "requests/math/" };
mqtt::async_client cli(SERVER_ADDRESS, ""); mqtt::create_options createOpts(MQTTVERSION_5);
mqtt::async_client cli(SERVER_ADDRESS, "", createOpts);
auto connopts = mqtt::connect_options_builder() auto connOpts = mqtt::connect_options_builder()
.mqtt_version(MQTTVERSION_5) .mqtt_version(MQTTVERSION_5)
.clean_start() .clean_start()
.finalize(); .finalize();
@ -70,7 +71,7 @@ int main(int argc, char* argv[])
try { try {
cout << "Connecting..." << flush; cout << "Connecting..." << flush;
mqtt::token_ptr tok = cli.connect(connopts); mqtt::token_ptr tok = cli.connect(connOpts);
auto connRsp = tok->get_connect_response(); auto connRsp = tok->get_connect_response();
cout << "OK (" << connRsp.get_server_uri() << ")" << endl; cout << "OK (" << connRsp.get_server_uri() << ")" << endl;

View File

@ -91,11 +91,8 @@ double mult(const std::vector<double>& nums)
int main(int argc, char* argv[]) int main(int argc, char* argv[])
{ {
//auto createOpts = mqtt::create_options_builder() mqtt::create_options createOpts(MQTTVERSION_5);
// .mqtt_version(MQTTVERSION_5) mqtt::client cli(SERVER_ADDRESS, CLIENT_ID, createOpts);
// .finalize();
mqtt::client cli(SERVER_ADDRESS, CLIENT_ID);
auto connOpts = mqtt::connect_options_builder() auto connOpts = mqtt::connect_options_builder()
.mqtt_version(MQTTVERSION_5) .mqtt_version(MQTTVERSION_5)
@ -115,6 +112,7 @@ int main(int argc, char* argv[])
// Consume messages // Consume messages
cout << "Waiting for RPC requests..." << endl;
while (true) { while (true) {
auto msg = cli.consume_message(); auto msg = cli.consume_message();