1
0
mirror of https://github.com/eclipse/paho.mqtt.cpp.git synced 2025-05-09 19:31:22 +08:00

Reworked 'create_options' for all params (serverURI, clientId, etc); new persistence_type variant; reqorked async_client constructors

This commit is contained in:
fpagliughi 2024-06-22 20:00:06 -04:00
parent 54e4e5caf3
commit d386d30be9
11 changed files with 471 additions and 353 deletions

View File

@ -86,22 +86,23 @@ uint64_t timestamp()
int main(int argc, char* argv[]) int main(int argc, char* argv[])
{ {
// The server URI (address) // The server URI (address)
string address = (argc > 1) ? string(argv[1]) : DFLT_SERVER_ADDRESS; string serverURI = (argc > 1) ? string(argv[1]) : DFLT_SERVER_ADDRESS;
// The amount of time to run (in ms). Zero means "run forever". // The amount of time to run (in ms). Zero means "run forever".
uint64_t trun = (argc > 2) ? stoll(argv[2]) : 0LL; uint64_t trun = (argc > 2) ? stoll(argv[2]) : 0LL;
cout << "Initializing for server '" << address << "'..." << endl; cout << "Initializing for server '" << serverURI << "'..." << endl;
// We configure to allow publishing to the client while off-line, // We configure to allow publishing to the client while off-line,
// and that it's OK to do so before the 1st successful connection. // and that it's OK to do so before the 1st successful connection.
auto createOpts = mqtt::create_options_builder() auto createOpts = mqtt::create_options_builder()
.server_uri(serverURI)
.send_while_disconnected(true, true) .send_while_disconnected(true, true)
.max_buffered_messages(MAX_BUFFERED_MESSAGES) .max_buffered_messages(MAX_BUFFERED_MESSAGES)
.delete_oldest_messages() .delete_oldest_messages()
.finalize(); .finalize();
mqtt::async_client cli(address, "", createOpts); mqtt::async_client cli(createOpts);
// Set callbacks for when connected and connection lost. // Set callbacks for when connected and connection lost.
@ -129,7 +130,8 @@ int main(int argc, char* argv[])
auto top = mqtt::topic(cli, "data/time", QOS); auto top = mqtt::topic(cli, "data/time", QOS);
cout << "Publishing data..." << endl; cout << "Publishing data..." << endl;
while (timestamp() % DELTA_MS != 0); while (timestamp() % DELTA_MS != 0)
;
uint64_t t = timestamp(), tlast = t, tstart = t; uint64_t t = timestamp(), tlast = t, tstart = t;

View File

@ -205,7 +205,8 @@ int main(int argc, char* argv[])
// Just block till user tells us to quit. // Just block till user tells us to quit.
while (std::tolower(std::cin.get()) != 'q'); while (std::tolower(std::cin.get()) != 'q')
;
// Disconnect // Disconnect

View File

@ -67,7 +67,7 @@ int main(int argc, char* argv[])
std::string chatUser{argv[1]}, chatGroup{argv[2]}, chatTopic{"chat/" + chatGroup}; std::string chatUser{argv[1]}, chatGroup{argv[2]}, chatTopic{"chat/" + chatGroup};
mqtt::async_client cli(SERVER_ADDRESS, "", mqtt::create_options(MQTTVERSION_5)); mqtt::async_client cli(SERVER_ADDRESS);
// LWT message is broadcast to other users if out connection is lost // LWT message is broadcast to other users if out connection is lost

View File

@ -75,7 +75,7 @@ public:
// "Open" the store // "Open" the store
void open(const std::string& clientId, const std::string& serverURI) override void open(const std::string& clientId, const std::string& serverURI) override
{ {
std::cout << "[Opening persistence store for '" << clientId << "' at '" << serverURI std::cout << " [Opening persistence store for '" << clientId << "' at '" << serverURI
<< "']" << std::endl; << "']" << std::endl;
open_ = true; open_ = true;
} }
@ -83,14 +83,14 @@ public:
// Close the persistent store that was previously opened. // Close the persistent store that was previously opened.
void close() override void close() override
{ {
std::cout << "[Closing persistence store.]" << std::endl; std::cout << " [Closing persistence store.]" << std::endl;
open_ = false; open_ = false;
} }
// Clears persistence, so that it no longer contains any persisted data. // Clears persistence, so that it no longer contains any persisted data.
void clear() override void clear() override
{ {
std::cout << "[Clearing persistence store.]" << std::endl; std::cout << " [Clearing persistence store.]" << std::endl;
store_.clear(); store_.clear();
} }
@ -111,7 +111,7 @@ public:
// Puts the specified data into the persistent store. // Puts the specified data into the persistent store.
void put(const std::string& key, const std::vector<mqtt::string_view>& bufs) override void put(const std::string& key, const std::vector<mqtt::string_view>& bufs) override
{ {
std::cout << "[Persisting data with key '" << key << "']" << std::endl; std::cout << " [Persisting data with key '" << key << "']" << std::endl;
std::string str; std::string str;
for (const auto& b : bufs) str.append(b.data(), b.size()); // += b.str(); for (const auto& b : bufs) str.append(b.data(), b.size()); // += b.str();
store_[key] = std::move(str); store_[key] = std::move(str);
@ -120,11 +120,11 @@ public:
// Gets the specified data out of the persistent store. // Gets the specified data out of the persistent store.
std::string get(const std::string& key) const override std::string get(const std::string& key) const override
{ {
std::cout << "[Searching persistence for key '" << key << "']" << std::endl; std::cout << " [Searching persistence for key '" << key << "']" << std::endl;
auto p = store_.find(key); auto p = store_.find(key);
if (p == store_.end()) if (p == store_.end())
throw mqtt::persistence_exception(); throw mqtt::persistence_exception();
std::cout << "[Found persistence data for key '" << key << "']" << std::endl; std::cout << " [Found persistence data for key '" << key << "']" << std::endl;
return p->second; return p->second;
} }
@ -132,12 +132,12 @@ public:
// Remove the data for the specified key. // Remove the data for the specified key.
void remove(const std::string& key) override void remove(const std::string& key) override
{ {
std::cout << "[Persistence removing key '" << key << "']" << std::endl; std::cout << " [Persistence removing key '" << key << "']" << std::endl;
auto p = store_.find(key); auto p = store_.find(key);
if (p == store_.end()) if (p == store_.end())
throw mqtt::persistence_exception(); throw mqtt::persistence_exception();
store_.erase(p); store_.erase(p);
std::cout << "[Persistence key removed '" << key << "']" << std::endl; std::cout << " [Persistence key removed '" << key << "']" << std::endl;
} }
}; };
@ -155,7 +155,7 @@ class user_callback : public virtual mqtt::callback
void delivery_complete(mqtt::delivery_token_ptr tok) override void delivery_complete(mqtt::delivery_token_ptr tok) override
{ {
std::cout << "\n\t[Delivery complete for token: " std::cout << "\n [Delivery complete for token: "
<< (tok ? tok->get_message_id() : -1) << "]" << std::endl; << (tok ? tok->get_message_id() : -1) << "]" << std::endl;
} }

View File

@ -132,16 +132,14 @@ private:
mutable std::mutex lock_; mutable std::mutex lock_;
/** The underlying C-lib client. */ /** The underlying C-lib client. */
MQTTAsync cli_; MQTTAsync cli_;
/** The server URI string. */ /** The options used to create the client */
string serverURI_; const create_options createOpts_;
/** The client ID string that we provided to the server. */ /** The MQTT protocol version of the connection */
string clientId_;
/** The MQTT protocol version we're connected at */
int mqttVersion_; int mqttVersion_;
/** A user persistence wrapper (if any) */ /** A user persistence wrapper (if any) */
std::unique_ptr<MQTTClient_persistence> persist_; std::unique_ptr<MQTTClient_persistence> persist_{};
/** Callback supplied by the user (if any) */ /** Callback supplied by the user (if any) */
callback* userCallback_; callback* userCallback_{};
/** Connection handler */ /** Connection handler */
connection_handler connHandler_; connection_handler connHandler_;
/** Connection lost handler */ /** Connection lost handler */
@ -193,6 +191,15 @@ private:
if (rc != MQTTASYNC_SUCCESS) if (rc != MQTTASYNC_SUCCESS)
throw exception(rc); throw exception(rc);
} }
/**
* Create an async_client that can be used to communicate with an MQTT
* server, which allows for off-line message buffering.
* This allows the caller to specify a user-defined persistence object,
* or use no persistence.
* @param opts The create options
* @throw exception if an argument is invalid
*/
void create();
public: public:
/** /**
@ -203,10 +210,10 @@ public:
* as a URI. * as a URI.
* @param clientId a client identifier that is unique on the server * @param clientId a client identifier that is unique on the server
* being connected to * being connected to
* @param persistDir The directory to use for persistence data
* @throw exception if an argument is invalid * @throw exception if an argument is invalid
*/ */
async_client(const string& serverURI, const string& clientId, const string& persistDir); explicit async_client(const string& serverURI, const string& clientId = string{})
: async_client(serverURI, clientId, NO_PERSISTENCE) {}
/** /**
* Create an async_client that can be used to communicate with an MQTT * Create an async_client that can be used to communicate with an MQTT
* server. * server.
@ -221,9 +228,11 @@ public:
* @throw exception if an argument is invalid * @throw exception if an argument is invalid
*/ */
async_client( async_client(
const string& serverURI, const string& clientId, const string& serverURI, const string& clientId, const persistence_type& persistence
iclient_persistence* persistence = nullptr )
); : createOpts_{serverURI, clientId, persistence} {
create();
}
/** /**
* Create an async_client that can be used to communicate with an MQTT * Create an async_client that can be used to communicate with an MQTT
* server, which allows for off-line message buffering. * server, which allows for off-line message buffering.
@ -239,27 +248,11 @@ public:
*/ */
async_client( async_client(
const string& serverURI, const string& clientId, int maxBufferedMessages, const string& serverURI, const string& clientId, int maxBufferedMessages,
const string& persistDir const persistence_type& persistence
); )
/** : createOpts_{serverURI, clientId, maxBufferedMessages, persistence} {
* Create an async_client that can be used to communicate with an MQTT create();
* server, which allows for off-line message buffering. }
* This allows the caller to specify a user-defined persistence object,
* or use no persistence.
* @param serverURI the address of the server to connect to, specified
* as a URI.
* @param clientId a client identifier that is unique on the server
* being connected to
* @param maxBufferedMessages the maximum number of messages allowed to
* be buffered while not connected
* @param persistence The user persistence structure. If this is null,
* then no persistence is used.
* @throw exception if an argument is invalid
*/
async_client(
const string& serverURI, const string& clientId, int maxBufferedMessages,
iclient_persistence* persistence = nullptr
);
/** /**
* Create an async_client that can be used to communicate with an MQTT * Create an async_client that can be used to communicate with an MQTT
* server, which allows for off-line message buffering. * server, which allows for off-line message buffering.
@ -274,26 +267,20 @@ public:
*/ */
async_client( async_client(
const string& serverURI, const string& clientId, const create_options& opts, const string& serverURI, const string& clientId, const create_options& opts,
const string& persistDir const persistence_type& persistence
); )
: createOpts_{serverURI, clientId, opts, persistence} {
create();
}
/** /**
* Create an async_client that can be used to communicate with an MQTT * Create an async_client that can be used to communicate with an MQTT
* server, which allows for off-line message buffering. * server, which allows for off-line message buffering.
* This allows the caller to specify a user-defined persistence object, * This allows the caller to specify a user-defined persistence object,
* or use no persistence. * or use no persistence.
* @param serverURI the address of the server to connect to, specified
* as a URI.
* @param clientId a client identifier that is unique on the server
* being connected to
* @param opts The create options * @param opts The create options
* @param persistence The user persistence structure. If this is null,
* then no persistence is used.
* @throw exception if an argument is invalid * @throw exception if an argument is invalid
*/ */
async_client( async_client(const create_options& opts) : createOpts_{opts} { create(); }
const string& serverURI, const string& clientId, const create_options& opts,
iclient_persistence* persistence = nullptr
);
/** /**
* Destructor * Destructor
*/ */
@ -501,12 +488,12 @@ public:
* Returns the client ID used by this client. * Returns the client ID used by this client.
* @return The client ID used by this client. * @return The client ID used by this client.
*/ */
string get_client_id() const override { return clientId_; } string get_client_id() const override { return createOpts_.get_client_id(); }
/** /**
* Returns the address of the server used by this client. * Returns the address of the server used by this client.
* @return The server's address, as a URI String. * @return The server's address, as a URI String.
*/ */
string get_server_uri() const override { return serverURI_; } string get_server_uri() const override { return createOpts_.get_server_uri(); }
/** /**
* Gets the MQTT version used by the client. * Gets the MQTT version used by the client.
* @return The MQTT version used by the client * @return The MQTT version used by the client

View File

@ -24,23 +24,51 @@
#ifndef __mqtt_create_options_h #ifndef __mqtt_create_options_h
#define __mqtt_create_options_h #define __mqtt_create_options_h
#include <variant>
#include "MQTTAsync.h" #include "MQTTAsync.h"
#include "mqtt/iclient_persistence.h"
#include "mqtt/types.h" #include "mqtt/types.h"
namespace mqtt { namespace mqtt {
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
/** An empty type that can be used as a `persistent_type` variant opiton. */
struct no_persistence
{
};
/** A constant used to indicate that no persistence is desired */
constexpr no_persistence NO_PERSISTENCE{};
/**
* A variant for the different type of persistence:
* @li no_persistence: Any object of this type indicates no persistence.
* @li string: Indicates file persistence. The string specifies the
* directory for the persistence store.
* @li iclient_persistence*: User-defined persistence
*/
using persistence_type = std::variant<no_persistence, string, iclient_persistence*>;
/////////////////////////////////////////////////////////////////////////////
/** /**
* Options for creating a client object. * Options for creating a client object.
*/ */
class create_options class create_options
{ {
/** The default C struct */
static const MQTTAsync_createOptions DFLT_C_STRUCT;
/** The underlying C options */ /** The underlying C options */
MQTTAsync_createOptions opts_; MQTTAsync_createOptions opts_ MQTTAsync_createOptions_initializer5;
/** The address of the server to connect to, specified as a URI */
string serverURI_{};
/** A client identifier that is unique on the server */
string clientId_{};
/** The persistence for the client */
persistence_type persistence_{};
/** The client and tests have special access */ /** The client and tests have special access */
friend class async_client; friend class async_client;
@ -55,14 +83,14 @@ public:
/** /**
* Default set of client create options. * Default set of client create options.
*/ */
create_options() : opts_(DFLT_C_STRUCT) {} create_options() {}
/** /**
* Default create options for the specified version of MQTT. * Default create options for the specified version of MQTT.
* @param mqttVersion The MQTT version used to create the client. * @param mqttVersion The MQTT version used to create the client.
*/ */
explicit create_options(int mqttVersion) : create_options() { explicit create_options(int mqttVersion) : create_options() {
opts_.MQTTVersion = mqttVersion; opts_.MQTTVersion = mqttVersion;
} }
/** /**
* Default create options, but with off-line buffering enabled. * Default create options, but with off-line buffering enabled.
* @param mqttVersion The MQTT version used to create the client. * @param mqttVersion The MQTT version used to create the client.
@ -70,6 +98,122 @@ public:
* be buffered while not connected * be buffered while not connected
*/ */
create_options(int mqttVersion, int maxBufferedMessages); create_options(int mqttVersion, int maxBufferedMessages);
/**
* Cretae options for the specified server and client ID.
* This uses file-based persistence in the specified directory.
* @param serverURI the address of the server to connect to, specified
* as a URI.
* @param clientId a client identifier that is unique on the server
* being connected to
* @throw exception if an argument is invalid
*/
explicit create_options(const string& serverURI, const string& clientId = string{})
: serverURI_{serverURI}, clientId_{clientId} {}
/**
* Create options for the specified server and client ID, with optional
* persistence.
* This allows the caller to specify a user-defined persistence object,
* or use no persistence.
* @param serverURI the address of the server to connect to, specified
* as a URI.
* @param clientId a client identifier that is unique on the server
* being connected to
* @param persistence The desired persistence structure.
* @throw exception if an argument is invalid
*/
create_options(
const string& serverURI, const string& clientId, const persistence_type& persistence
)
: serverURI_{serverURI}, clientId_{clientId}, persistence_{persistence} {}
/**
* Create an async_client that can be used to communicate with an MQTT
* server, which allows for off-line message buffering.
* This uses file-based persistence in the specified directory.
* @param serverURI the address of the server to connect to, specified
* as a URI.
* @param clientId a client identifier that is unique on the server
* being connected to
* @param maxBufferedMessages the maximum number of messages allowed to
* be buffered while not connected
* @param persistDir The directory to use for persistence data
* @throw exception if an argument is invalid
*/
create_options(
const string& serverURI, const string& clientId, int maxBufferedMessages,
const persistence_type& persistence
)
: serverURI_{serverURI}, clientId_{clientId}, persistence_{persistence} {
opts_.maxBufferedMessages = maxBufferedMessages;
}
/**
* Create an async_client that can be used to communicate with an MQTT
* server, which allows for off-line message buffering.
* This uses file-based persistence in the specified directory.
* @param serverURI the address of the server to connect to, specified
* as a URI.
* @param clientId a client identifier that is unique on the server
* being connected to
* @param opts The create options
* @param persistDir The directory to use for persistence data
* @throw exception if an argument is invalid
*/
create_options(
const string& serverURI, const string& clientId, const create_options& opts,
const persistence_type& persistence
)
: opts_{opts.opts_},
serverURI_{serverURI},
clientId_{clientId},
persistence_{persistence} {}
/**
* Copy constructor.
* @param opts The other options.
*/
create_options(const create_options& opts)
: opts_{opts.opts_},
serverURI_{opts.serverURI_},
clientId_{opts.clientId_},
persistence_{opts.persistence_} {}
/**
* Move constructor.
* @param opts The other options.
*/
create_options(create_options&& opts)
: opts_{opts.opts_},
serverURI_{std::move(opts.serverURI_)},
clientId_{std::move(opts.clientId_)},
persistence_{std::move(opts.persistence_)} {}
/**
* Set the address of the server to connect to, specified as a URI
* @param serverURI The URI of the server.
*/
void set_server_uri(const string& serverURI) { serverURI_ = serverURI; };
/**
* Get the address of the server to connect to, specified as a URI.
* @return The URI of the server.
*/
const string& get_server_uri() const noexcept { return serverURI_; };
/**
* Set the client identifier.
* @param The client identifier.
*/
void set_client_id(const string& clientId) { clientId_ = clientId; }
/**
* Get the client identifier.
* @return The client identifier.
*/
const string& get_client_id() const noexcept { return clientId_; }
/**
* Set the persistence for the client.
* @param persistence The persistence for the client
*/
void set_persistence(const persistence_type& persistence) { persistence_ = persistence; }
/**
* Get the persistence for the client.
* @return The persistence for the client
*/
const persistence_type& get_persistence() const noexcept { return persistence_; }
/** /**
* Gets whether the client will accept message to publish while * Gets whether the client will accept message to publish while
* disconnected. * disconnected.
@ -174,7 +318,31 @@ public:
*/ */
create_options_builder() {} create_options_builder() {}
/** /**
* * Set the server URI.
* @param serverURI The address of the server to connect to, specified
* as a URI
*/
auto server_uri(const string& serverURI) -> self& {
opts_.set_server_uri(serverURI);
return *this;
}
/**
* Sets the client ID.
* @param clientId A client identifier that is unique on the server
*/
auto client_id(const string& clientId) -> self& {
opts_.set_client_id(clientId);
return *this;
}
/**
* Sets the persistence.
* @param persistence The persistence the client should use.
*/
auto persistence(const persistence_type& persistence) -> self& {
opts_.set_persistence(persistence);
return *this;
}
/**
* Sets whether the client will accept message to publish while * Sets whether the client will accept message to publish while
* disconnected. * disconnected.
* *

View File

@ -34,96 +34,39 @@
namespace mqtt { namespace mqtt {
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
// Constructors
async_client::async_client( void async_client::create()
const string& serverURI, const string& clientId, const string& persistDir
)
: async_client(serverURI, clientId, 0, persistDir)
{
}
async_client::async_client(
const string& serverURI, const string& clientId,
iclient_persistence* persistence /*=nullptr*/
)
: async_client(serverURI, clientId, 0, persistence)
{
}
async_client::async_client(
const string& serverURI, const string& clientId, int maxBufferedMessages,
const string& persistDir
)
: serverURI_(serverURI),
clientId_(clientId),
mqttVersion_(MQTTVERSION_DEFAULT),
userCallback_(nullptr)
{
create_options opts(MQTTVERSION_5, maxBufferedMessages);
int rc = MQTTAsync_createWithOptions(
&cli_, serverURI.c_str(), clientId.c_str(), MQTTCLIENT_PERSISTENCE_DEFAULT,
const_cast<char*>(persistDir.c_str()), &opts.opts_
);
if (rc != 0)
throw exception(rc);
}
async_client::async_client(
const string& serverURI, const string& clientId, int maxBufferedMessages,
iclient_persistence* persistence /*=nullptr*/
)
: async_client(
serverURI, clientId, create_options(MQTTVERSION_DEFAULT, maxBufferedMessages),
persistence
)
{
}
async_client::async_client(
const string& serverURI, const string& clientId, const create_options& opts,
const string& persistDir
)
: serverURI_(serverURI),
clientId_(clientId),
mqttVersion_(opts.opts_.MQTTVersion),
userCallback_(nullptr)
{
create_options v5opts{opts};
v5opts.set_mqtt_version(MQTTVERSION_5);
int rc = MQTTAsync_createWithOptions(
&cli_, serverURI.c_str(), clientId.c_str(), MQTTCLIENT_PERSISTENCE_DEFAULT,
const_cast<char*>(persistDir.c_str()), &v5opts.opts_
);
if (rc != 0)
throw exception(rc);
}
async_client::async_client(
const string& serverURI, const string& clientId, const create_options& opts,
iclient_persistence* persistence /*=nullptr*/
)
: serverURI_(serverURI),
clientId_(clientId),
mqttVersion_(opts.opts_.MQTTVersion),
userCallback_(nullptr)
{ {
int rc = MQTTASYNC_SUCCESS; int rc = MQTTASYNC_SUCCESS;
create_options v5opts{opts}; const auto& opts = createOpts_;
v5opts.set_mqtt_version(MQTTVERSION_5); mqttVersion_ = opts.mqtt_version();
if (!persistence) { // The C client, when created for v5, can accommodate any version for
// connections. This leaves the version solely to the connection.
auto copts{opts.opts_};
copts.MQTTVersion = MQTTVERSION_5;
auto serverURI = opts.get_server_uri();
auto clientId = opts.get_client_id();
const auto userp{std::get_if<iclient_persistence*>(&opts.persistence_)};
if (std::get_if<no_persistence>(&opts.persistence_) || (userp && !*userp)) {
rc = MQTTAsync_createWithOptions( rc = MQTTAsync_createWithOptions(
&cli_, serverURI.c_str(), clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr, &cli_, serverURI.c_str(), clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr,
&v5opts.opts_ &copts
);
}
else if (const auto dir{std::get_if<string>(&opts.persistence_)}; dir) {
rc = MQTTAsync_createWithOptions(
&cli_, serverURI.c_str(), clientId.c_str(), MQTTCLIENT_PERSISTENCE_DEFAULT,
const_cast<char*>(dir->c_str()), &copts
); );
} }
else { else {
persist_.reset(new MQTTClient_persistence{ persist_.reset(new MQTTClient_persistence{
persistence, &iclient_persistence::persistence_open, *userp, &iclient_persistence::persistence_open,
&iclient_persistence::persistence_close, &iclient_persistence::persistence_put, &iclient_persistence::persistence_close, &iclient_persistence::persistence_put,
&iclient_persistence::persistence_get, &iclient_persistence::persistence_remove, &iclient_persistence::persistence_get, &iclient_persistence::persistence_remove,
&iclient_persistence::persistence_keys, &iclient_persistence::persistence_clear, &iclient_persistence::persistence_keys, &iclient_persistence::persistence_clear,
@ -132,10 +75,10 @@ async_client::async_client(
rc = MQTTAsync_createWithOptions( rc = MQTTAsync_createWithOptions(
&cli_, serverURI.c_str(), clientId.c_str(), MQTTCLIENT_PERSISTENCE_USER, &cli_, serverURI.c_str(), clientId.c_str(), MQTTCLIENT_PERSISTENCE_USER,
persist_.get(), &v5opts.opts_ persist_.get(), &copts
); );
} }
if (rc != 0) if (rc != MQTTASYNC_SUCCESS)
throw exception(rc); throw exception(rc);
} }

View File

@ -22,10 +22,10 @@ namespace mqtt {
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
const MQTTAsync_createOptions create_options::DFLT_C_STRUCT = // const MQTTAsync_createOptions create_options::DFLT_C_STRUCT =
MQTTAsync_createOptions_initializer5; // MQTTAsync_createOptions_initializer5;
create_options::create_options(int mqttVersion, int maxBufferedMessages) : create_options() create_options::create_options(int mqttVersion, int maxBufferedMessages)
{ {
opts_.MQTTVersion = mqttVersion; opts_.MQTTVersion = mqttVersion;

View File

@ -242,7 +242,6 @@ public:
}; };
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
} // namespace mqtt } // namespace mqtt
#endif // __mqtt_test_mock_async_client_h #endif // __mqtt_test_mock_async_client_h

View File

@ -4,8 +4,8 @@
// //
/******************************************************************************* /*******************************************************************************
* Copyright (c) 2020-2024 Frank Pagliughi <fpagliughi@mindspring.com>
* Copyright (c) 2017 Guilherme M. Ferreira <guilherme.maciel.ferreira@gmail.com> * Copyright (c) 2017 Guilherme M. Ferreira <guilherme.maciel.ferreira@gmail.com>
* Copyright (c) 2020 Frank Pagliughi <fpagliughi@mindspring.com>
* *
* All rights reserved. This program and the accompanying materials * All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0 * are made available under the terms of the Eclipse Public License v2.0
@ -45,18 +45,18 @@ static const std::string GOOD_SSL_SERVER_URI{"ssl://localhost:18885"};
static const std::string BAD_SERVER_URI{"one://invalid.address"}; static const std::string BAD_SERVER_URI{"one://invalid.address"};
static const std::string CLIENT_ID{"test_async_client"}; static const std::string CLIENT_ID{"test_async_client"};
static const std::string PERSISTENCE_DIR{"persist"}; static const std::string PERSISTENCE_DIR{"persist"};
static const std::string TOPIC{"TOPIC"}; static const std::string TOPIC{"topic"};
static const int GOOD_QOS{0}; static const int GOOD_QOS{0};
static const int BAD_QOS{3}; static const int BAD_QOS{3};
static const_string_collection_ptr TOPIC_COLL{ static const_string_collection_ptr TOPIC_COLL{
string_collection::create({"TOPIC0", "TOPIC1", "TOPIC2"}) string_collection::create({"topic0", "topic1", "topic2"})
}; };
static iasync_client::qos_collection GOOD_QOS_COLL{0, 1, 2}; static iasync_client::qos_collection GOOD_QOS_COLL{0, 1, 2};
static iasync_client::qos_collection BAD_QOS_COLL{BAD_QOS, 1, 2}; static iasync_client::qos_collection BAD_QOS_COLL{BAD_QOS, 1, 2};
static const std::string PAYLOAD{"PAYLOAD"}; static const std::string PAYLOAD{"some payload"};
static const int TIMEOUT{1000}; static const int TIMEOUT{1000};
static int CONTEXT{4}; static int CONTEXT{4};
static mock_action_listener listener; static mock_action_listener listener;
@ -128,9 +128,9 @@ TEST_CASE("async_client connect 0 arg", "[client]")
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
try { try {
token_ptr token_conn = cli.connect(); token_ptr conn_tok = cli.connect();
REQUIRE(token_conn); REQUIRE(conn_tok);
token_conn->wait(); conn_tok->wait();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
} }
catch (const std::exception& exc) { catch (const std::exception& exc) {
@ -144,9 +144,9 @@ TEST_CASE("async_client connect 1 arg", "[client]")
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
connect_options co; connect_options co;
token_ptr token_conn{cli.connect(co)}; token_ptr conn_tok{cli.connect(co)};
REQUIRE(token_conn); REQUIRE(conn_tok);
token_conn->wait(); conn_tok->wait();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
} }
@ -155,20 +155,20 @@ TEST_CASE("async_client connect 1 arg failure", "[client]")
async_client cli{GOOD_SERVER_URI, CLIENT_ID}; async_client cli{GOOD_SERVER_URI, CLIENT_ID};
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
token_ptr token_conn; //{ nullptr }; token_ptr conn_tok; //{ nullptr };
connect_options co; connect_options co;
will_options wo; will_options wo;
wo.set_qos(BAD_QOS); // Invalid QoS causes connection failure wo.set_qos(BAD_QOS); // Invalid QoS causes connection failure
co.set_will(wo); co.set_will(wo);
int return_code = MQTTASYNC_SUCCESS; int return_code = MQTTASYNC_SUCCESS;
try { try {
token_conn = cli.connect(co); conn_tok = cli.connect(co);
REQUIRE(token_conn); REQUIRE(conn_tok);
} }
catch (mqtt::exception& ex) { catch (mqtt::exception& ex) {
return_code = ex.get_return_code(); return_code = ex.get_return_code();
} }
REQUIRE(nullptr == token_conn); REQUIRE(nullptr == conn_tok);
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
REQUIRE(MQTTASYNC_BAD_QOS == return_code); REQUIRE(MQTTASYNC_BAD_QOS == return_code);
} }
@ -179,11 +179,11 @@ TEST_CASE("async_client connect 2 args", "[client]")
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
mock_action_listener listener; mock_action_listener listener;
token_ptr token_conn{cli.connect(&CONTEXT, listener)}; token_ptr conn_tok{cli.connect(&CONTEXT, listener)};
REQUIRE(token_conn); REQUIRE(conn_tok);
token_conn->wait(); conn_tok->wait();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
REQUIRE(CONTEXT == *static_cast<int*>(token_conn->get_user_context())); REQUIRE(CONTEXT == *static_cast<int*>(conn_tok->get_user_context()));
REQUIRE(listener.succeeded()); REQUIRE(listener.succeeded());
} }
@ -194,11 +194,11 @@ TEST_CASE("async_client connect 3 args", "[client]")
connect_options co; connect_options co;
mock_action_listener listener; mock_action_listener listener;
token_ptr token_conn{cli.connect(co, &CONTEXT, listener)}; token_ptr conn_tok{cli.connect(co, &CONTEXT, listener)};
REQUIRE(token_conn); REQUIRE(conn_tok);
token_conn->wait(); conn_tok->wait();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
REQUIRE(CONTEXT == *static_cast<int*>(token_conn->get_user_context())); REQUIRE(CONTEXT == *static_cast<int*>(conn_tok->get_user_context()));
REQUIRE(listener.succeeded()); REQUIRE(listener.succeeded());
} }
@ -207,7 +207,7 @@ TEST_CASE("async_client connect 3 args failure", "[client]")
async_client cli{GOOD_SERVER_URI, CLIENT_ID}; async_client cli{GOOD_SERVER_URI, CLIENT_ID};
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
token_ptr token_conn; //{ nullptr }; token_ptr conn_tok; //{ nullptr };
connect_options co; connect_options co;
will_options wo; will_options wo;
wo.set_qos(BAD_QOS); // Invalid QoS causes connection failure wo.set_qos(BAD_QOS); // Invalid QoS causes connection failure
@ -215,14 +215,14 @@ TEST_CASE("async_client connect 3 args failure", "[client]")
mock_action_listener listener; mock_action_listener listener;
int reasonCode = MQTTASYNC_SUCCESS; int reasonCode = MQTTASYNC_SUCCESS;
try { try {
token_conn = cli.connect(co, &CONTEXT, listener); conn_tok = cli.connect(co, &CONTEXT, listener);
REQUIRE(token_conn); REQUIRE(conn_tok);
token_conn->wait(); conn_tok->wait();
} }
catch (mqtt::exception& ex) { catch (mqtt::exception& ex) {
reasonCode = ex.get_return_code(); reasonCode = ex.get_return_code();
} }
REQUIRE(nullptr == token_conn); REQUIRE(nullptr == conn_tok);
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
REQUIRE(MQTTASYNC_BAD_QOS == reasonCode); REQUIRE(MQTTASYNC_BAD_QOS == reasonCode);
// TODO Why listener.on_failure() is not called? // TODO Why listener.on_failure() is not called?
@ -263,14 +263,14 @@ TEST_CASE("async_client disconnect 0 arg", "[client]")
async_client cli{GOOD_SERVER_URI, CLIENT_ID}; async_client cli{GOOD_SERVER_URI, CLIENT_ID};
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
token_ptr token_conn{cli.connect()}; token_ptr conn_tok{cli.connect()};
REQUIRE(token_conn); REQUIRE(conn_tok);
token_conn->wait(); conn_tok->wait();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
token_ptr token_disconn{cli.disconnect()}; token_ptr disconn_tok{cli.disconnect()};
REQUIRE(token_disconn); REQUIRE(disconn_tok);
token_disconn->wait(); disconn_tok->wait();
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
} }
@ -279,14 +279,14 @@ TEST_CASE("async_client disconnect 1 arg", "[client]")
async_client cli{GOOD_SERVER_URI, CLIENT_ID}; async_client cli{GOOD_SERVER_URI, CLIENT_ID};
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
token_ptr token_conn{cli.connect()}; token_ptr conn_tok{cli.connect()};
REQUIRE(token_conn); REQUIRE(conn_tok);
token_conn->wait(); conn_tok->wait();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
token_ptr token_disconn{cli.disconnect(0)}; token_ptr disconn_tok{cli.disconnect(0)};
REQUIRE(token_disconn); REQUIRE(disconn_tok);
token_disconn->wait(); disconn_tok->wait();
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
} }
@ -295,11 +295,11 @@ TEST_CASE("async_client disconnect 1 arg failure", "[client]")
async_client cli{GOOD_SERVER_URI, CLIENT_ID}; async_client cli{GOOD_SERVER_URI, CLIENT_ID};
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
token_ptr token_disconn; //{ nullptr }; token_ptr disconn_tok; //{ nullptr };
int return_code = MQTTASYNC_SUCCESS; int return_code = MQTTASYNC_SUCCESS;
try { try {
token_disconn = cli.disconnect(0); disconn_tok = cli.disconnect(0);
REQUIRE(token_disconn); REQUIRE(disconn_tok);
} }
catch (mqtt::exception& ex) { catch (mqtt::exception& ex) {
return_code = ex.get_return_code(); return_code = ex.get_return_code();
@ -313,17 +313,17 @@ TEST_CASE("async_client disconnect 2 args", "[client]")
async_client cli{GOOD_SERVER_URI, CLIENT_ID}; async_client cli{GOOD_SERVER_URI, CLIENT_ID};
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
token_ptr token_conn{cli.connect()}; token_ptr conn_tok{cli.connect()};
REQUIRE(token_conn); REQUIRE(conn_tok);
token_conn->wait(); conn_tok->wait();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
mock_action_listener listener; mock_action_listener listener;
token_ptr token_disconn{cli.disconnect(&CONTEXT, listener)}; token_ptr disconn_tok{cli.disconnect(&CONTEXT, listener)};
REQUIRE(token_disconn); REQUIRE(disconn_tok);
token_disconn->wait(); disconn_tok->wait();
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
REQUIRE(CONTEXT == *static_cast<int*>(token_disconn->get_user_context())); REQUIRE(CONTEXT == *static_cast<int*>(disconn_tok->get_user_context()));
} }
TEST_CASE("async_client disconnect 3 args", "[client]") TEST_CASE("async_client disconnect 3 args", "[client]")
@ -331,17 +331,17 @@ TEST_CASE("async_client disconnect 3 args", "[client]")
async_client cli{GOOD_SERVER_URI, CLIENT_ID}; async_client cli{GOOD_SERVER_URI, CLIENT_ID};
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
token_ptr token_conn{cli.connect()}; token_ptr conn_tok{cli.connect()};
REQUIRE(token_conn); REQUIRE(conn_tok);
token_conn->wait(); conn_tok->wait();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
mock_action_listener listener; mock_action_listener listener;
token_ptr token_disconn{cli.disconnect(0, &CONTEXT, listener)}; token_ptr disconn_tok{cli.disconnect(0, &CONTEXT, listener)};
REQUIRE(token_disconn); REQUIRE(disconn_tok);
token_disconn->wait(); disconn_tok->wait();
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
REQUIRE(CONTEXT == *static_cast<int*>(token_disconn->get_user_context())); REQUIRE(CONTEXT == *static_cast<int*>(disconn_tok->get_user_context()));
} }
TEST_CASE("async_client disconnect 3 args failure", "[client]") TEST_CASE("async_client disconnect 3 args failure", "[client]")
@ -349,12 +349,12 @@ TEST_CASE("async_client disconnect 3 args failure", "[client]")
async_client cli{GOOD_SERVER_URI, CLIENT_ID}; async_client cli{GOOD_SERVER_URI, CLIENT_ID};
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
token_ptr token_disconn; //{ nullptr }; token_ptr disconn_tok; //{ nullptr };
mock_action_listener listener; mock_action_listener listener;
int return_code = MQTTASYNC_SUCCESS; int return_code = MQTTASYNC_SUCCESS;
try { try {
token_disconn = cli.disconnect(0, &CONTEXT, listener); disconn_tok = cli.disconnect(0, &CONTEXT, listener);
REQUIRE(token_disconn); REQUIRE(disconn_tok);
} }
catch (mqtt::exception& ex) { catch (mqtt::exception& ex) {
return_code = ex.get_return_code(); return_code = ex.get_return_code();
@ -376,9 +376,9 @@ TEST_CASE("async_client get pending delivery token", "[client]")
REQUIRE(1 == GOOD_QOS_COLL[1]); REQUIRE(1 == GOOD_QOS_COLL[1]);
REQUIRE(2 == GOOD_QOS_COLL[2]); REQUIRE(2 == GOOD_QOS_COLL[2]);
token_ptr token_conn{cli.connect()}; token_ptr conn_tok{cli.connect()};
REQUIRE(token_conn); REQUIRE(conn_tok);
token_conn->wait(); conn_tok->wait();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
// NOTE: async_client::publish() is the only method that adds // NOTE: async_client::publish() is the only method that adds
@ -427,9 +427,9 @@ TEST_CASE("async_client get pending delivery token", "[client]")
token_pending = cli.get_pending_delivery_token(message_id++); token_pending = cli.get_pending_delivery_token(message_id++);
REQUIRE(!token_pending); REQUIRE(!token_pending);
token_ptr token_disconn{cli.disconnect()}; token_ptr disconn_tok{cli.disconnect()};
REQUIRE(token_disconn); REQUIRE(disconn_tok);
token_disconn->wait(); disconn_tok->wait();
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
} }
@ -442,9 +442,9 @@ TEST_CASE("async_client get pending delivery tokens", "[client]")
REQUIRE(1 == GOOD_QOS_COLL[1]); REQUIRE(1 == GOOD_QOS_COLL[1]);
REQUIRE(2 == GOOD_QOS_COLL[2]); REQUIRE(2 == GOOD_QOS_COLL[2]);
token_ptr token_conn{cli.connect()}; token_ptr conn_tok{cli.connect()};
REQUIRE(token_conn); REQUIRE(conn_tok);
token_conn->wait(); conn_tok->wait();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
delivery_token_ptr token_pub; // { nullptr }; delivery_token_ptr token_pub; // { nullptr };
@ -473,9 +473,9 @@ TEST_CASE("async_client get pending delivery tokens", "[client]")
std::vector<delivery_token_ptr> tokens_pending{cli.get_pending_delivery_tokens()}; std::vector<delivery_token_ptr> tokens_pending{cli.get_pending_delivery_tokens()};
REQUIRE(2 == static_cast<int>(tokens_pending.size())); REQUIRE(2 == static_cast<int>(tokens_pending.size()));
token_ptr token_disconn{cli.disconnect()}; token_ptr disconn_tok{cli.disconnect()};
REQUIRE(token_disconn); REQUIRE(disconn_tok);
token_disconn->wait(); disconn_tok->wait();
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
} }
@ -488,9 +488,9 @@ TEST_CASE("async_client publish 2 args", "[client]")
async_client cli{GOOD_SERVER_URI, CLIENT_ID}; async_client cli{GOOD_SERVER_URI, CLIENT_ID};
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
token_ptr token_conn{cli.connect()}; token_ptr conn_tok{cli.connect()};
REQUIRE(token_conn); REQUIRE(conn_tok);
token_conn->wait(); conn_tok->wait();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
message_ptr msg{message::create(TOPIC, PAYLOAD)}; message_ptr msg{message::create(TOPIC, PAYLOAD)};
@ -498,9 +498,9 @@ TEST_CASE("async_client publish 2 args", "[client]")
REQUIRE(token_pub); REQUIRE(token_pub);
token_pub->wait_for(TIMEOUT); token_pub->wait_for(TIMEOUT);
token_ptr token_disconn{cli.disconnect()}; token_ptr disconn_tok{cli.disconnect()};
REQUIRE(token_disconn); REQUIRE(disconn_tok);
token_disconn->wait(); disconn_tok->wait();
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
} }
@ -527,9 +527,9 @@ TEST_CASE("async_client publish 4 args", "[client]")
async_client cli{GOOD_SERVER_URI, CLIENT_ID}; async_client cli{GOOD_SERVER_URI, CLIENT_ID};
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
token_ptr token_conn{cli.connect()}; token_ptr conn_tok{cli.connect()};
REQUIRE(token_conn); REQUIRE(conn_tok);
token_conn->wait(); conn_tok->wait();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
message_ptr msg{message::create(TOPIC, PAYLOAD)}; message_ptr msg{message::create(TOPIC, PAYLOAD)};
@ -539,9 +539,9 @@ TEST_CASE("async_client publish 4 args", "[client]")
token_pub->wait_for(TIMEOUT); token_pub->wait_for(TIMEOUT);
REQUIRE(CONTEXT == *static_cast<int*>(token_pub->get_user_context())); REQUIRE(CONTEXT == *static_cast<int*>(token_pub->get_user_context()));
token_ptr token_disconn{cli.disconnect()}; token_ptr disconn_tok{cli.disconnect()};
REQUIRE(token_disconn); REQUIRE(disconn_tok);
token_disconn->wait(); disconn_tok->wait();
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
} }
@ -569,9 +569,9 @@ TEST_CASE("async_client publish 5 args", "[client]")
async_client cli{GOOD_SERVER_URI, CLIENT_ID}; async_client cli{GOOD_SERVER_URI, CLIENT_ID};
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
token_ptr token_conn{cli.connect()}; token_ptr conn_tok{cli.connect()};
REQUIRE(token_conn); REQUIRE(conn_tok);
token_conn->wait(); conn_tok->wait();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
const void* payload{PAYLOAD.data()}; const void* payload{PAYLOAD.data()};
@ -581,9 +581,9 @@ TEST_CASE("async_client publish 5 args", "[client]")
REQUIRE(token_pub); REQUIRE(token_pub);
token_pub->wait_for(TIMEOUT); token_pub->wait_for(TIMEOUT);
token_ptr token_disconn{cli.disconnect()}; token_ptr disconn_tok{cli.disconnect()};
REQUIRE(token_disconn); REQUIRE(disconn_tok);
token_disconn->wait(); disconn_tok->wait();
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
} }
@ -592,9 +592,9 @@ TEST_CASE("async_client publish 7 args", "[client]")
async_client cli{GOOD_SERVER_URI, CLIENT_ID}; async_client cli{GOOD_SERVER_URI, CLIENT_ID};
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
token_ptr token_conn{cli.connect()}; token_ptr conn_tok{cli.connect()};
REQUIRE(token_conn); REQUIRE(conn_tok);
token_conn->wait(); conn_tok->wait();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
const void* payload{PAYLOAD.c_str()}; const void* payload{PAYLOAD.c_str()};
@ -607,9 +607,9 @@ TEST_CASE("async_client publish 7 args", "[client]")
token_pub->wait_for(TIMEOUT); token_pub->wait_for(TIMEOUT);
REQUIRE(CONTEXT == *static_cast<int*>(token_pub->get_user_context())); REQUIRE(CONTEXT == *static_cast<int*>(token_pub->get_user_context()));
token_ptr token_disconn{cli.disconnect()}; token_ptr disconn_tok{cli.disconnect()};
REQUIRE(token_disconn); REQUIRE(disconn_tok);
token_disconn->wait(); disconn_tok->wait();
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
} }
@ -637,18 +637,18 @@ TEST_CASE("async_client subscribe single topic 2 args", "[client]")
async_client cli{GOOD_SERVER_URI, CLIENT_ID}; async_client cli{GOOD_SERVER_URI, CLIENT_ID};
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
token_ptr token_conn{cli.connect()}; token_ptr conn_tok{cli.connect()};
REQUIRE(token_conn); REQUIRE(conn_tok);
token_conn->wait(); conn_tok->wait();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
token_ptr token_sub{cli.subscribe(TOPIC, GOOD_QOS)}; token_ptr sub_tok{cli.subscribe(TOPIC, GOOD_QOS)};
REQUIRE(token_sub); REQUIRE(sub_tok);
token_sub->wait_for(TIMEOUT); sub_tok->wait_for(TIMEOUT);
token_ptr token_disconn{cli.disconnect()}; token_ptr disconn_tok{cli.disconnect()};
REQUIRE(token_disconn); REQUIRE(disconn_tok);
token_disconn->wait(); disconn_tok->wait();
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
} }
@ -659,9 +659,9 @@ TEST_CASE("async_client subscribe single topic 2 args failure", "[client]")
int return_code = MQTTASYNC_SUCCESS; int return_code = MQTTASYNC_SUCCESS;
try { try {
token_ptr token_sub{cli.subscribe(TOPIC, BAD_QOS)}; token_ptr sub_tok{cli.subscribe(TOPIC, BAD_QOS)};
REQUIRE(token_sub); REQUIRE(sub_tok);
token_sub->wait_for(TIMEOUT); sub_tok->wait_for(TIMEOUT);
} }
catch (mqtt::exception& ex) { catch (mqtt::exception& ex) {
return_code = ex.get_return_code(); return_code = ex.get_return_code();
@ -674,20 +674,20 @@ TEST_CASE("async_client subscribe single topic 4 args", "[client]")
async_client cli{GOOD_SERVER_URI, CLIENT_ID}; async_client cli{GOOD_SERVER_URI, CLIENT_ID};
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
token_ptr token_conn{cli.connect()}; token_ptr conn_tok{cli.connect()};
REQUIRE(token_conn); REQUIRE(conn_tok);
token_conn->wait(); conn_tok->wait();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
mock_action_listener listener; mock_action_listener listener;
token_ptr token_sub{cli.subscribe(TOPIC, GOOD_QOS, &CONTEXT, listener)}; token_ptr sub_tok{cli.subscribe(TOPIC, GOOD_QOS, &CONTEXT, listener)};
REQUIRE(token_sub); REQUIRE(sub_tok);
token_sub->wait_for(TIMEOUT); sub_tok->wait_for(TIMEOUT);
REQUIRE(CONTEXT == *static_cast<int*>(token_sub->get_user_context())); REQUIRE(CONTEXT == *static_cast<int*>(sub_tok->get_user_context()));
token_ptr token_disconn{cli.disconnect()}; token_ptr disconn_tok{cli.disconnect()};
REQUIRE(token_disconn); REQUIRE(disconn_tok);
token_disconn->wait(); disconn_tok->wait();
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
} }
@ -699,9 +699,9 @@ TEST_CASE("async_client subscribe single topic 4 args failure", "[client]")
int return_code = MQTTASYNC_SUCCESS; int return_code = MQTTASYNC_SUCCESS;
try { try {
mock_action_listener listener; mock_action_listener listener;
token_ptr token_sub{cli.subscribe(TOPIC, BAD_QOS, &CONTEXT, listener)}; token_ptr sub_tok{cli.subscribe(TOPIC, BAD_QOS, &CONTEXT, listener)};
REQUIRE(token_sub); REQUIRE(sub_tok);
token_sub->wait_for(TIMEOUT); sub_tok->wait_for(TIMEOUT);
} }
catch (mqtt::exception& ex) { catch (mqtt::exception& ex) {
return_code = ex.get_return_code(); return_code = ex.get_return_code();
@ -722,9 +722,9 @@ TEST_CASE("async_client subscribe many topics 2 args", "[client]")
FAIL(exc.what()); FAIL(exc.what());
} }
token_ptr token_disconn{cli.disconnect()}; token_ptr disconn_tok{cli.disconnect()};
REQUIRE(token_disconn); REQUIRE(disconn_tok);
token_disconn->wait(); disconn_tok->wait();
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
} }
@ -745,9 +745,9 @@ TEST_CASE("async_client subscribe many topics 2 args_single", "[client]")
FAIL(exc.what()); FAIL(exc.what());
} }
token_ptr token_disconn{cli.disconnect()}; token_ptr disconn_tok{cli.disconnect()};
REQUIRE(token_disconn); REQUIRE(disconn_tok);
token_disconn->wait(); disconn_tok->wait();
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
} }
@ -757,9 +757,9 @@ TEST_CASE("async_client subscribe many topics 2 args failure", "[client]")
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
try { try {
token_ptr token_sub{cli.subscribe(TOPIC_COLL, BAD_QOS_COLL)}; token_ptr sub_tok{cli.subscribe(TOPIC_COLL, BAD_QOS_COLL)};
REQUIRE(token_sub); REQUIRE(sub_tok);
token_sub->wait_for(TIMEOUT); sub_tok->wait_for(TIMEOUT);
} }
catch (const mqtt::exception& /*ex*/) { catch (const mqtt::exception& /*ex*/) {
// REQUIRE(MQTTASYNC_BAD_QOS == ex.get_return_code()); // REQUIRE(MQTTASYNC_BAD_QOS == ex.get_return_code());
@ -767,9 +767,9 @@ TEST_CASE("async_client subscribe many topics 2 args failure", "[client]")
int return_code = MQTTASYNC_SUCCESS; int return_code = MQTTASYNC_SUCCESS;
try { try {
token_ptr token_sub{cli.subscribe(TOPIC_COLL, GOOD_QOS_COLL)}; token_ptr sub_tok{cli.subscribe(TOPIC_COLL, GOOD_QOS_COLL)};
REQUIRE(token_sub); REQUIRE(sub_tok);
token_sub->wait_for(TIMEOUT); sub_tok->wait_for(TIMEOUT);
} }
catch (mqtt::exception& ex) { catch (mqtt::exception& ex) {
return_code = ex.get_return_code(); return_code = ex.get_return_code();
@ -782,20 +782,20 @@ TEST_CASE("async_client subscribe many topics 4 args", "[client]")
async_client cli{GOOD_SERVER_URI, CLIENT_ID}; async_client cli{GOOD_SERVER_URI, CLIENT_ID};
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
token_ptr token_conn{cli.connect()}; token_ptr conn_tok{cli.connect()};
REQUIRE(token_conn); REQUIRE(conn_tok);
token_conn->wait(); conn_tok->wait();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
mock_action_listener listener; mock_action_listener listener;
token_ptr token_sub{cli.subscribe(TOPIC_COLL, GOOD_QOS_COLL, &CONTEXT, listener)}; token_ptr sub_tok{cli.subscribe(TOPIC_COLL, GOOD_QOS_COLL, &CONTEXT, listener)};
REQUIRE(token_sub); REQUIRE(sub_tok);
token_sub->wait_for(TIMEOUT); sub_tok->wait_for(TIMEOUT);
REQUIRE(CONTEXT == *static_cast<int*>(token_sub->get_user_context())); REQUIRE(CONTEXT == *static_cast<int*>(sub_tok->get_user_context()));
token_ptr token_disconn{cli.disconnect()}; token_ptr disconn_tok{cli.disconnect()};
REQUIRE(token_disconn); REQUIRE(disconn_tok);
token_disconn->wait(); disconn_tok->wait();
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
} }
@ -815,9 +815,9 @@ TEST_CASE("async_client subscribe many topics 4 args failure", "[client]")
int return_code = MQTTASYNC_SUCCESS; int return_code = MQTTASYNC_SUCCESS;
try { try {
token_ptr token_sub{cli.subscribe(TOPIC_COLL, GOOD_QOS_COLL, &CONTEXT, listener)}; token_ptr sub_tok{cli.subscribe(TOPIC_COLL, GOOD_QOS_COLL, &CONTEXT, listener)};
REQUIRE(token_sub); REQUIRE(sub_tok);
token_sub->wait_for(TIMEOUT); sub_tok->wait_for(TIMEOUT);
} }
catch (mqtt::exception& ex) { catch (mqtt::exception& ex) {
return_code = ex.get_return_code(); return_code = ex.get_return_code();
@ -834,18 +834,22 @@ TEST_CASE("async_client unsubscribe single topic 1 arg", "[client]")
async_client cli{GOOD_SERVER_URI, CLIENT_ID}; async_client cli{GOOD_SERVER_URI, CLIENT_ID};
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
token_ptr token_conn{cli.connect()}; token_ptr conn_tok = cli.connect();
REQUIRE(token_conn); REQUIRE(conn_tok);
token_conn->wait(); conn_tok->wait();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
token_ptr token_unsub{cli.unsubscribe(TOPIC)}; token_ptr sub_tok = cli.subscribe(TOPIC, 1);
REQUIRE(token_unsub); REQUIRE(sub_tok);
token_unsub->wait_for(TIMEOUT); sub_tok->wait_for(TIMEOUT);
token_ptr token_disconn{cli.disconnect()}; token_ptr unsub_tok = cli.unsubscribe(TOPIC);
REQUIRE(token_disconn); REQUIRE(unsub_tok);
token_disconn->wait(); sub_tok->wait_for(TIMEOUT);
token_ptr disconn_tok = cli.disconnect();
REQUIRE(disconn_tok);
disconn_tok->wait();
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
} }
@ -856,9 +860,9 @@ TEST_CASE("async_client unsubscribe single topic 1 arg failure", "[client]")
int return_code = MQTTASYNC_SUCCESS; int return_code = MQTTASYNC_SUCCESS;
try { try {
token_ptr token_unsub{cli.unsubscribe(TOPIC)}; token_ptr unsub_tok{cli.unsubscribe(TOPIC)};
REQUIRE(token_unsub); REQUIRE(unsub_tok);
token_unsub->wait_for(TIMEOUT); unsub_tok->wait_for(TIMEOUT);
} }
catch (mqtt::exception& ex) { catch (mqtt::exception& ex) {
return_code = ex.get_return_code(); return_code = ex.get_return_code();
@ -871,20 +875,24 @@ TEST_CASE("async_client unsubscribe single topic 3 args", "[client]")
async_client cli{GOOD_SERVER_URI, CLIENT_ID}; async_client cli{GOOD_SERVER_URI, CLIENT_ID};
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
token_ptr token_conn{cli.connect()}; token_ptr conn_tok = cli.connect();
REQUIRE(token_conn); REQUIRE(conn_tok);
token_conn->wait(); conn_tok->wait();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
mock_action_listener listener; token_ptr sub_tok = cli.subscribe(TOPIC, 1);
token_ptr token_unsub{cli.unsubscribe(TOPIC, &CONTEXT, listener)}; REQUIRE(sub_tok);
REQUIRE(token_unsub); sub_tok->wait_for(TIMEOUT);
token_unsub->wait_for(TIMEOUT);
REQUIRE(CONTEXT == *static_cast<int*>(token_unsub->get_user_context()));
token_ptr token_disconn{cli.disconnect()}; mock_action_listener listener;
REQUIRE(token_disconn); token_ptr unsub_tok = cli.unsubscribe(TOPIC, &CONTEXT, listener);
token_disconn->wait(); REQUIRE(unsub_tok);
unsub_tok->wait_for(TIMEOUT);
REQUIRE(CONTEXT == *static_cast<int*>(unsub_tok->get_user_context()));
token_ptr disconn_tok = cli.disconnect();
REQUIRE(disconn_tok);
disconn_tok->wait();
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
} }
@ -896,9 +904,9 @@ TEST_CASE("async_client unsubscribe single topic 3 args failure", "[client]")
int return_code = MQTTASYNC_SUCCESS; int return_code = MQTTASYNC_SUCCESS;
try { try {
mock_action_listener listener; mock_action_listener listener;
token_ptr token_unsub{cli.unsubscribe(TOPIC, &CONTEXT, listener)}; token_ptr unsub_tok{cli.unsubscribe(TOPIC, &CONTEXT, listener)};
REQUIRE(token_unsub); REQUIRE(unsub_tok);
token_unsub->wait_for(TIMEOUT); unsub_tok->wait_for(TIMEOUT);
} }
catch (mqtt::exception& ex) { catch (mqtt::exception& ex) {
return_code = ex.get_return_code(); return_code = ex.get_return_code();
@ -911,18 +919,22 @@ TEST_CASE("async_client unsubscribe many topics 1 arg", "[client]")
async_client cli{GOOD_SERVER_URI, CLIENT_ID}; async_client cli{GOOD_SERVER_URI, CLIENT_ID};
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
token_ptr token_conn{cli.connect()}; token_ptr conn_tok = cli.connect();
REQUIRE(token_conn); REQUIRE(conn_tok);
token_conn->wait(); conn_tok->wait();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
token_ptr token_unsub{cli.unsubscribe(TOPIC_COLL)}; token_ptr sub_tok = cli.subscribe(TOPIC_COLL, GOOD_QOS_COLL);
REQUIRE(token_unsub); REQUIRE(sub_tok);
token_unsub->wait_for(TIMEOUT); sub_tok->wait_for(TIMEOUT);
token_ptr token_disconn{cli.disconnect()}; token_ptr unsub_tok = cli.unsubscribe(TOPIC_COLL);
REQUIRE(token_disconn); REQUIRE(unsub_tok);
token_disconn->wait(); unsub_tok->wait_for(TIMEOUT);
token_ptr disconn_tok = cli.disconnect();
REQUIRE(disconn_tok);
disconn_tok->wait();
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
} }
@ -933,9 +945,9 @@ TEST_CASE("async_client unsubscribe many topics 1 arg_failure", "[client]")
int return_code = MQTTASYNC_SUCCESS; int return_code = MQTTASYNC_SUCCESS;
try { try {
token_ptr token_unsub{cli.unsubscribe(TOPIC_COLL)}; token_ptr unsub_tok{cli.unsubscribe(TOPIC_COLL)};
REQUIRE(token_unsub); REQUIRE(unsub_tok);
token_unsub->wait_for(TIMEOUT); unsub_tok->wait_for(TIMEOUT);
} }
catch (mqtt::exception& ex) { catch (mqtt::exception& ex) {
return_code = ex.get_return_code(); return_code = ex.get_return_code();
@ -948,20 +960,24 @@ TEST_CASE("async_client unsubscribe many topics 3 args", "[client]")
async_client cli{GOOD_SERVER_URI, CLIENT_ID}; async_client cli{GOOD_SERVER_URI, CLIENT_ID};
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
token_ptr token_conn{cli.connect()}; token_ptr conn_tok = cli.connect();
REQUIRE(token_conn); REQUIRE(conn_tok);
token_conn->wait(); conn_tok->wait();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
mock_action_listener listener; token_ptr sub_tok = cli.subscribe(TOPIC_COLL, GOOD_QOS_COLL);
token_ptr token_unsub{cli.unsubscribe(TOPIC_COLL, &CONTEXT, listener)}; REQUIRE(sub_tok);
REQUIRE(token_unsub); sub_tok->wait_for(TIMEOUT);
token_unsub->wait_for(TIMEOUT);
REQUIRE(CONTEXT == *static_cast<int*>(token_unsub->get_user_context()));
token_ptr token_disconn{cli.disconnect()}; mock_action_listener listener;
REQUIRE(token_disconn); token_ptr unsub_tok = cli.unsubscribe(TOPIC_COLL, &CONTEXT, listener);
token_disconn->wait(); REQUIRE(unsub_tok);
unsub_tok->wait_for(TIMEOUT);
REQUIRE(CONTEXT == *static_cast<int*>(unsub_tok->get_user_context()));
token_ptr disconn_tok = cli.disconnect();
REQUIRE(disconn_tok);
disconn_tok->wait();
REQUIRE(!cli.is_connected()); REQUIRE(!cli.is_connected());
} }
@ -973,9 +989,9 @@ TEST_CASE("async_client unsubscribe many topics 3 args failure", "[client]")
mock_action_listener listener; mock_action_listener listener;
int return_code = MQTTASYNC_SUCCESS; int return_code = MQTTASYNC_SUCCESS;
try { try {
token_ptr token_unsub{cli.unsubscribe(TOPIC_COLL, &CONTEXT, listener)}; token_ptr unsub_tok{cli.unsubscribe(TOPIC_COLL, &CONTEXT, listener)};
REQUIRE(token_unsub); REQUIRE(unsub_tok);
token_unsub->wait_for(TIMEOUT); unsub_tok->wait_for(TIMEOUT);
} }
catch (mqtt::exception& ex) { catch (mqtt::exception& ex) {
return_code = ex.get_return_code(); return_code = ex.get_return_code();

View File

@ -4,8 +4,8 @@
// //
/******************************************************************************* /*******************************************************************************
* Copyright (c) 2020-2024 Frank Pagliughi <fpagliughi@mindspring.com>
* Copyright (c) 2017 Guilherme M. Ferreira <guilherme.maciel.ferreira@gmail.com> * Copyright (c) 2017 Guilherme M. Ferreira <guilherme.maciel.ferreira@gmail.com>
* Copyright (c) 2020 Frank Pagliughi <fpagliughi@mindspring.com>
* *
* All rights reserved. This program and the accompanying materials * All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0 * are made available under the terms of the Eclipse Public License v2.0
@ -461,6 +461,7 @@ TEST_CASE("client unsubscribe single topic 1 arg", "[client]")
cli.connect(); cli.connect();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
cli.subscribe(TOPIC, 1);
cli.unsubscribe(TOPIC); cli.unsubscribe(TOPIC);
cli.disconnect(); cli.disconnect();
@ -490,6 +491,7 @@ TEST_CASE("client unsubscribe many topics 1 arg", "[client]")
cli.connect(); cli.connect();
REQUIRE(cli.is_connected()); REQUIRE(cli.is_connected());
cli.subscribe(TOPIC_COLL, GOOD_QOS_COLL);
cli.unsubscribe(TOPIC_COLL); cli.unsubscribe(TOPIC_COLL);
cli.disconnect(); cli.disconnect();