mirror of
https://github.com/eclipse/paho.mqtt.cpp.git
synced 2025-05-09 11:21:24 +08:00
Merge branch 'develop' into mqtt-c-submodule
This commit is contained in:
commit
7cfcd4bf87
12
README.md
12
README.md
@ -40,6 +40,8 @@ To keep up with the latest announcements for this project, or to ask questions:
|
|||||||
|
|
||||||
### Unreleased features in this branch
|
### Unreleased features in this branch
|
||||||
|
|
||||||
|
- Added `topic_filter` class for simple topic comparisons (w/ unit tests)
|
||||||
|
- Added `topic_matcher` class for matching topics to a collection of filters (w/ unit tests)
|
||||||
- Added Session Expiry Interval to v5 chat sample
|
- Added Session Expiry Interval to v5 chat sample
|
||||||
- Minor tweaks to prepare for C++20
|
- Minor tweaks to prepare for C++20
|
||||||
- Minor cleanup of the tests
|
- Minor cleanup of the tests
|
||||||
@ -95,9 +97,9 @@ Contributions to this project are gladly welcomed and appreciated Before submitt
|
|||||||
- This is an official Eclipse project, so it is required that all contributors sign an [Eclipse Contributor Agreement (ECA)](https://www.eclipse.org/legal/ECA.php)
|
- This is an official Eclipse project, so it is required that all contributors sign an [Eclipse Contributor Agreement (ECA)](https://www.eclipse.org/legal/ECA.php)
|
||||||
- Please submit all Pull Requests against the _develop_ branch (not master).
|
- Please submit all Pull Requests against the _develop_ branch (not master).
|
||||||
- Please sign all commits.
|
- Please sign all commits.
|
||||||
|
|
||||||
For full details, see [CONTRIBUTING.md](https://github.com/eclipse/paho.mqtt.cpp/blob/master/CONTRIBUTING.md).
|
For full details, see [CONTRIBUTING.md](https://github.com/eclipse/paho.mqtt.cpp/blob/master/CONTRIBUTING.md).
|
||||||
|
|
||||||
## Building from source
|
## Building from source
|
||||||
|
|
||||||
_CMake_ is a cross-platform build system suitable for Unix and non-Unix platforms such as Microsoft Windows. It is now the only supported build system.
|
_CMake_ is a cross-platform build system suitable for Unix and non-Unix platforms such as Microsoft Windows. It is now the only supported build system.
|
||||||
@ -137,7 +139,7 @@ $ sudo apt-get install build-essential gcc make cmake cmake-gui cmake-curses-gui
|
|||||||
If you will be using secure sockets (and you probably should):
|
If you will be using secure sockets (and you probably should):
|
||||||
|
|
||||||
```
|
```
|
||||||
$ sudo apt-get install libssl-dev
|
$ sudo apt-get install libssl-dev
|
||||||
```
|
```
|
||||||
|
|
||||||
Building the documentation requires doxygen and optionally graphviz to be installed:
|
Building the documentation requires doxygen and optionally graphviz to be installed:
|
||||||
@ -146,7 +148,7 @@ Building the documentation requires doxygen and optionally graphviz to be instal
|
|||||||
$ sudo apt-get install doxygen graphviz
|
$ sudo apt-get install doxygen graphviz
|
||||||
```
|
```
|
||||||
|
|
||||||
Unit tests are being built using _Catch2_.
|
Unit tests are being built using _Catch2_.
|
||||||
|
|
||||||
_Catch2_ can be found here: [Catch2](https://github.com/catchorg/Catch2). You must download and install _Catch2_ to build and run the unit tests locally.
|
_Catch2_ can be found here: [Catch2](https://github.com/catchorg/Catch2). You must download and install _Catch2_ to build and run the unit tests locally.
|
||||||
|
|
||||||
@ -320,7 +322,7 @@ int main(int argc, char* argv[])
|
|||||||
cli.publish(TOPIC, PAYLOAD2, strlen(PAYLOAD2)+1, 0, false);
|
cli.publish(TOPIC, PAYLOAD2, strlen(PAYLOAD2)+1, 0, false);
|
||||||
|
|
||||||
// Disconnect
|
// Disconnect
|
||||||
|
|
||||||
cli.disconnect();
|
cli.disconnect();
|
||||||
}
|
}
|
||||||
catch (const mqtt::persistence_exception& exc) {
|
catch (const mqtt::persistence_exception& exc) {
|
||||||
|
@ -37,7 +37,7 @@ else()
|
|||||||
endif()
|
endif()
|
||||||
|
|
||||||
if(PAHO_WITH_SSL)
|
if(PAHO_WITH_SSL)
|
||||||
set_target_properties(PahoMqttC::PahoMqttC PROPERTIES
|
set_target_properties(PahoMqttC::PahoMqttC PROPERTIES
|
||||||
INTERFACE_COMPILE_DEFINITIONS "OPENSSL=1"
|
INTERFACE_COMPILE_DEFINITIONS "OPENSSL=1"
|
||||||
INTERFACE_LINK_LIBRARIES "OpenSSL::SSL;OpenSSL::Crypto")
|
INTERFACE_LINK_LIBRARIES "OpenSSL::SSL;OpenSSL::Crypto")
|
||||||
endif()
|
endif()
|
||||||
|
@ -79,6 +79,14 @@ target_include_directories(paho-cpp-objs
|
|||||||
src
|
src
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# --- Warnings ---
|
||||||
|
|
||||||
|
# Maybe set '-Werror' for Release builds?
|
||||||
|
target_compile_options(paho-cpp-objs PRIVATE
|
||||||
|
$<$<CXX_COMPILER_ID:MSVC>:/W3>
|
||||||
|
$<$<CXX_COMPILER_ID:Clang>:-Wall -Wextra -Wdocumentation>
|
||||||
|
$<$<NOT:$<CXX_COMPILER_ID:MSVC,Clang>>:-Wall -Wextra>
|
||||||
|
)
|
||||||
|
|
||||||
## --- Build the shared library, if requested ---
|
## --- Build the shared library, if requested ---
|
||||||
|
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
// async_client.cpp
|
// async_client.cpp
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
* Copyright (c) 2013-2019 Frank Pagliughi <fpagliughi@mindspring.com>
|
* Copyright (c) 2013-2022 Frank Pagliughi <fpagliughi@mindspring.com>
|
||||||
*
|
*
|
||||||
* All rights reserved. This program and the accompanying materials
|
* All rights reserved. This program and the accompanying materials
|
||||||
* are made available under the terms of the Eclipse Public License v1.0
|
* are made available under the terms of the Eclipse Public License v1.0
|
||||||
@ -131,7 +131,7 @@ async_client::~async_client()
|
|||||||
|
|
||||||
|
|
||||||
// Callback for MQTTAsync_setConnected()
|
// Callback for MQTTAsync_setConnected()
|
||||||
// This is installed with the normall callbacks and with a call to
|
// This is installed with the normal callbacks and with a call to
|
||||||
// reconnect() to indicate that it succeeded. It signals the client's
|
// reconnect() to indicate that it succeeded. It signals the client's
|
||||||
// connect token then calls any registered callbacks.
|
// connect token then calls any registered callbacks.
|
||||||
void async_client::on_connected(void* context, char* cause)
|
void async_client::on_connected(void* context, char* cause)
|
||||||
@ -442,9 +442,22 @@ token_ptr async_client::connect()
|
|||||||
|
|
||||||
token_ptr async_client::connect(connect_options opts)
|
token_ptr async_client::connect(connect_options opts)
|
||||||
{
|
{
|
||||||
// TODO: We really should get (or update) this value from the response
|
// TODO: We should update the MQTT version from the response
|
||||||
// (when the server confirms the requested version)
|
// (when the server confirms the requested version)
|
||||||
mqttVersion_ = opts.opts_.MQTTVersion;
|
|
||||||
|
// If the options specified a new MQTT protocol version, we should
|
||||||
|
// use it, otherwise default to the version requested when the client
|
||||||
|
// was created.
|
||||||
|
if (opts.opts_.MQTTVersion == 0 && mqttVersion_ >= 5)
|
||||||
|
opts.opts_.MQTTVersion = mqttVersion_;
|
||||||
|
else
|
||||||
|
mqttVersion_ = opts.opts_.MQTTVersion;
|
||||||
|
|
||||||
|
// The C lib is very picky about version and clean start/session
|
||||||
|
if (opts.opts_.MQTTVersion >= 5)
|
||||||
|
opts.opts_.cleansession = 0;
|
||||||
|
else
|
||||||
|
opts.opts_.cleanstart = 0;
|
||||||
|
|
||||||
// TODO: If connTok_ is non-null, there could be a pending connect
|
// TODO: If connTok_ is non-null, there could be a pending connect
|
||||||
// which might complete after creating/assigning a new one. If that
|
// which might complete after creating/assigning a new one. If that
|
||||||
@ -469,11 +482,21 @@ token_ptr async_client::connect(connect_options opts)
|
|||||||
}
|
}
|
||||||
|
|
||||||
token_ptr async_client::connect(connect_options opts, void* userContext,
|
token_ptr async_client::connect(connect_options opts, void* userContext,
|
||||||
iaction_listener& cb)
|
iaction_listener& cb)
|
||||||
{
|
{
|
||||||
// TODO: We really should get this value from the response (when
|
// If the options specified a new MQTT protocol version, we should
|
||||||
// the server confirms the requested version)
|
// use it, otherwise default to the version requested when the client
|
||||||
mqttVersion_ = opts.opts_.MQTTVersion;
|
// was created.
|
||||||
|
if (opts.opts_.MQTTVersion == 0 && mqttVersion_ >= 5)
|
||||||
|
opts.opts_.MQTTVersion = mqttVersion_;
|
||||||
|
else
|
||||||
|
mqttVersion_ = opts.opts_.MQTTVersion;
|
||||||
|
|
||||||
|
// The C lib is very picky about version and clean start/session
|
||||||
|
if (opts.opts_.MQTTVersion >= 5)
|
||||||
|
opts.opts_.cleansession = 0;
|
||||||
|
else
|
||||||
|
opts.opts_.cleanstart = 0;
|
||||||
|
|
||||||
auto tmpTok = connTok_;
|
auto tmpTok = connTok_;
|
||||||
connTok_ = token::create(token::Type::CONNECT, *this, userContext, cb);
|
connTok_ = token::create(token::Type::CONNECT, *this, userContext, cb);
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
// connect_options.cpp
|
// connect_options.cpp
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
* Copyright (c) 2017-2020 Frank Pagliughi <fpagliughi@mindspring.com>
|
* Copyright (c) 2017-2023 Frank Pagliughi <fpagliughi@mindspring.com>
|
||||||
* Copyright (c) 2016 Guilherme M. Ferreira <guilherme.maciel.ferreira@gmail.com>
|
* Copyright (c) 2016 Guilherme M. Ferreira <guilherme.maciel.ferreira@gmail.com>
|
||||||
*
|
*
|
||||||
* All rights reserved. This program and the accompanying materials
|
* All rights reserved. This program and the accompanying materials
|
||||||
@ -28,12 +28,19 @@ namespace mqtt {
|
|||||||
const MQTTAsync_connectOptions connect_options::DFLT_C_STRUCT =
|
const MQTTAsync_connectOptions connect_options::DFLT_C_STRUCT =
|
||||||
MQTTAsync_connectOptions_initializer;
|
MQTTAsync_connectOptions_initializer;
|
||||||
|
|
||||||
connect_options::connect_options() : opts_(DFLT_C_STRUCT)
|
const MQTTAsync_connectOptions connect_options::DFLT_C_STRUCT5 =
|
||||||
|
MQTTAsync_connectOptions_initializer5;
|
||||||
|
|
||||||
|
connect_options::connect_options(int ver /*=MQTTVERSION_DEFAULT*/)
|
||||||
{
|
{
|
||||||
|
opts_ = (ver < MQTTVERSION_5) ? DFLT_C_STRUCT : DFLT_C_STRUCT5;
|
||||||
}
|
}
|
||||||
|
|
||||||
connect_options::connect_options(string_ref userName, binary_ref password)
|
connect_options::connect_options(
|
||||||
: connect_options()
|
string_ref userName, binary_ref password,
|
||||||
|
int ver /*=MQTTVERSION_DEFAULT*/
|
||||||
|
)
|
||||||
|
: connect_options(ver)
|
||||||
{
|
{
|
||||||
set_user_name(userName);
|
set_user_name(userName);
|
||||||
set_password(password);
|
set_password(password);
|
||||||
@ -55,6 +62,9 @@ connect_options::connect_options(const connect_options& opt) : opts_(opt.opts_),
|
|||||||
if (opts_.ssl)
|
if (opts_.ssl)
|
||||||
set_ssl(opt.ssl_);
|
set_ssl(opt.ssl_);
|
||||||
|
|
||||||
|
if (opts_.connectProperties)
|
||||||
|
set_properties(opt.props_);
|
||||||
|
|
||||||
update_c_struct();
|
update_c_struct();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -78,12 +88,15 @@ connect_options::connect_options(connect_options&& opt) : opts_(opt.opts_),
|
|||||||
|
|
||||||
if (opts_.ssl)
|
if (opts_.ssl)
|
||||||
opts_.ssl = &ssl_.opts_;
|
opts_.ssl = &ssl_.opts_;
|
||||||
|
|
||||||
|
if (opts_.connectProperties)
|
||||||
|
opts_.connectProperties = const_cast<MQTTProperties*>(&props_.c_struct());
|
||||||
|
|
||||||
update_c_struct();
|
update_c_struct();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unfortunately, with the existing implementation, there's no way to know
|
// Unfortunately, with the existing implementation, there's no way to know
|
||||||
// if the will and ssl options were set by looking at the C++ structs.
|
// if the (connect) properties, will and ssl options were set by looking at the C++ structs.
|
||||||
// In a major update, we can consider using a pointer or optional<> to
|
// In a major update, we can consider using a pointer or optional<> to
|
||||||
// indicate that they were set.
|
// indicate that they were set.
|
||||||
// But, for now, the copy and assignment operations must handle it manually
|
// But, for now, the copy and assignment operations must handle it manually
|
||||||
@ -141,6 +154,9 @@ void connect_options::update_c_struct()
|
|||||||
|
|
||||||
connect_options& connect_options::operator=(const connect_options& opt)
|
connect_options& connect_options::operator=(const connect_options& opt)
|
||||||
{
|
{
|
||||||
|
if (&opt == this)
|
||||||
|
return *this;
|
||||||
|
|
||||||
opts_ = opt.opts_;
|
opts_ = opt.opts_;
|
||||||
|
|
||||||
if (opts_.will)
|
if (opts_.will)
|
||||||
@ -154,7 +170,8 @@ connect_options& connect_options::operator=(const connect_options& opt)
|
|||||||
|
|
||||||
tok_ = opt.tok_;
|
tok_ = opt.tok_;
|
||||||
serverURIs_ = opt.serverURIs_;
|
serverURIs_ = opt.serverURIs_;
|
||||||
props_ = opt.props_;
|
if (opts_.connectProperties)
|
||||||
|
set_properties(opt.props_);
|
||||||
|
|
||||||
httpHeaders_ = opt.httpHeaders_;
|
httpHeaders_ = opt.httpHeaders_;
|
||||||
httpProxy_ = opt.httpProxy_;
|
httpProxy_ = opt.httpProxy_;
|
||||||
@ -166,6 +183,9 @@ connect_options& connect_options::operator=(const connect_options& opt)
|
|||||||
|
|
||||||
connect_options& connect_options::operator=(connect_options&& opt)
|
connect_options& connect_options::operator=(connect_options&& opt)
|
||||||
{
|
{
|
||||||
|
if (&opt == this)
|
||||||
|
return *this;
|
||||||
|
|
||||||
opts_ = opt.opts_;
|
opts_ = opt.opts_;
|
||||||
|
|
||||||
if (opts_.will)
|
if (opts_.will)
|
||||||
@ -179,7 +199,8 @@ connect_options& connect_options::operator=(connect_options&& opt)
|
|||||||
|
|
||||||
tok_ = std::move(opt.tok_);
|
tok_ = std::move(opt.tok_);
|
||||||
serverURIs_ = std::move(opt.serverURIs_);
|
serverURIs_ = std::move(opt.serverURIs_);
|
||||||
props_ = std::move(opt.props_);
|
if (opts_.connectProperties)
|
||||||
|
set_properties(std::move(opt.props_));
|
||||||
|
|
||||||
httpHeaders_ = std::move(opt.httpHeaders_);
|
httpHeaders_ = std::move(opt.httpHeaders_);
|
||||||
httpProxy_ = std::move(opt.httpProxy_);
|
httpProxy_ = std::move(opt.httpProxy_);
|
||||||
@ -237,6 +258,20 @@ void connect_options::set_ssl(ssl_options&& ssl)
|
|||||||
opts_.ssl = &ssl_.opts_;
|
opts_.ssl = &ssl_.opts_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clean sessions only apply to MQTT v3, so force it there if set.
|
||||||
|
void connect_options::set_clean_session(bool clean)
|
||||||
|
{
|
||||||
|
if (opts_.MQTTVersion < MQTTVERSION_5)
|
||||||
|
opts_.cleansession = to_int(clean);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean start only apply to MQTT v5, so force it there if set.
|
||||||
|
void connect_options::set_clean_start(bool cleanStart)
|
||||||
|
{
|
||||||
|
if (opts_.MQTTVersion >= MQTTVERSION_5)
|
||||||
|
opts_.cleanstart = to_int(cleanStart);
|
||||||
|
}
|
||||||
|
|
||||||
void connect_options::set_token(const token_ptr& tok)
|
void connect_options::set_token(const token_ptr& tok)
|
||||||
{
|
{
|
||||||
tok_ = tok;
|
tok_ = tok;
|
||||||
@ -291,6 +326,20 @@ void connect_options::set_automatic_reconnect(int minRetryInterval,
|
|||||||
opts_.maxRetryInterval = maxRetryInterval;
|
opts_.maxRetryInterval = maxRetryInterval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void connect_options::set_properties(const properties& props)
|
||||||
|
{
|
||||||
|
props_ = props;
|
||||||
|
opts_.connectProperties = const_cast<MQTTProperties*>(&props_.c_struct());
|
||||||
|
opts_.MQTTVersion = MQTTVERSION_5;
|
||||||
|
}
|
||||||
|
|
||||||
|
void connect_options::set_properties(properties&& props)
|
||||||
|
{
|
||||||
|
props_ = std::move(props);
|
||||||
|
opts_.connectProperties = const_cast<MQTTProperties*>(&props_.c_struct());
|
||||||
|
opts_.MQTTVersion = MQTTVERSION_5;
|
||||||
|
}
|
||||||
|
|
||||||
void connect_options::set_http_proxy(const string& httpProxy)
|
void connect_options::set_http_proxy(const string& httpProxy)
|
||||||
{
|
{
|
||||||
httpProxy_ = httpProxy;
|
httpProxy_ = httpProxy;
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
* Copyright (c) 2020 Frank Pagliughi <fpagliughi@mindspring.com>
|
* Copyright (c) 2020-2023 Frank Pagliughi <fpagliughi@mindspring.com>
|
||||||
*
|
*
|
||||||
* All rights reserved. This program and the accompanying materials
|
* All rights reserved. This program and the accompanying materials
|
||||||
* are made available under the terms of the Eclipse Public License v1.0
|
* are made available under the terms of the Eclipse Public License v1.0
|
||||||
@ -22,8 +22,7 @@ namespace mqtt {
|
|||||||
/////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
const MQTTAsync_createOptions create_options::DFLT_C_STRUCT =
|
const MQTTAsync_createOptions create_options::DFLT_C_STRUCT =
|
||||||
MQTTAsync_createOptions_initializer;
|
MQTTAsync_createOptions_initializer5;
|
||||||
|
|
||||||
|
|
||||||
create_options::create_options(int mqttVersion) : create_options()
|
create_options::create_options(int mqttVersion) : create_options()
|
||||||
{
|
{
|
||||||
|
@ -15,12 +15,12 @@ disconnect_options::disconnect_options() : opts_(DFLT_C_STRUCT)
|
|||||||
}
|
}
|
||||||
|
|
||||||
disconnect_options::disconnect_options(const disconnect_options& opt)
|
disconnect_options::disconnect_options(const disconnect_options& opt)
|
||||||
: opts_(opt.opts_), tok_(opt.tok_)
|
: opts_(opt.opts_), tok_(opt.tok_), props_(opt.props_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
disconnect_options::disconnect_options(disconnect_options&& opt)
|
disconnect_options::disconnect_options(disconnect_options&& opt)
|
||||||
: opts_(opt.opts_), tok_(std::move(opt.tok_))
|
: opts_(opt.opts_), tok_(std::move(opt.tok_)), props_(std::move(opt.props_))
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -28,6 +28,7 @@ disconnect_options& disconnect_options::operator=(const disconnect_options& opt)
|
|||||||
{
|
{
|
||||||
opts_ = opt.opts_;
|
opts_ = opt.opts_;
|
||||||
tok_ = opt.tok_;
|
tok_ = opt.tok_;
|
||||||
|
props_ = opt.props_;
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -35,6 +36,7 @@ disconnect_options& disconnect_options::operator=(disconnect_options&& opt)
|
|||||||
{
|
{
|
||||||
opts_ = opt.opts_;
|
opts_ = opt.opts_;
|
||||||
tok_ = std::move(opt.tok_);
|
tok_ = std::move(opt.tok_);
|
||||||
|
props_ = std::move(opt.props_);
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6,7 +6,7 @@
|
|||||||
/////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
* Copyright (c) 2013-2020 Frank Pagliughi <fpagliughi@mindspring.com>
|
* Copyright (c) 2013-2022 Frank Pagliughi <fpagliughi@mindspring.com>
|
||||||
*
|
*
|
||||||
* All rights reserved. This program and the accompanying materials
|
* All rights reserved. This program and the accompanying materials
|
||||||
* are made available under the terms of the Eclipse Public License v1.0
|
* are made available under the terms of the Eclipse Public License v1.0
|
||||||
@ -464,6 +464,16 @@ public:
|
|||||||
* @return The server's address, as a URI String.
|
* @return The server's address, as a URI String.
|
||||||
*/
|
*/
|
||||||
string get_server_uri() const override { return serverURI_; }
|
string get_server_uri() const override { return serverURI_; }
|
||||||
|
/**
|
||||||
|
* Gets the MQTT version used by the client.
|
||||||
|
* @return The MQTT version used by the client
|
||||||
|
* @li MQTTVERSION_DEFAULT (0) = default: start with 3.1.1, and if
|
||||||
|
* that fails, fall back to 3.1
|
||||||
|
* @li MQTTVERSION_3_1 (3) = only try version 3.1
|
||||||
|
* @li MQTTVERSION_3_1_1 (4) = only try version 3.1.1
|
||||||
|
* @li MQTTVERSION_5 (5) = only try version 5
|
||||||
|
*/
|
||||||
|
int mqtt_version() const noexcept { return mqttVersion_; }
|
||||||
/**
|
/**
|
||||||
* Determines if this client is currently connected to the server.
|
* Determines if this client is currently connected to the server.
|
||||||
* @return true if connected, false otherwise.
|
* @return true if connected, false otherwise.
|
||||||
|
@ -68,17 +68,17 @@ class client : private callback
|
|||||||
// Most are launched in a separate thread, for convenience, except
|
// Most are launched in a separate thread, for convenience, except
|
||||||
// message_arrived, for performance.
|
// message_arrived, for performance.
|
||||||
void connected(const string& cause) override {
|
void connected(const string& cause) override {
|
||||||
std::async(std::launch::async, &callback::connected, userCallback_, cause);
|
std::async(std::launch::async, &callback::connected, userCallback_, cause).wait();
|
||||||
}
|
}
|
||||||
void connection_lost(const string& cause) override {
|
void connection_lost(const string& cause) override {
|
||||||
std::async(std::launch::async,
|
std::async(std::launch::async,
|
||||||
&callback::connection_lost, userCallback_, cause);
|
&callback::connection_lost, userCallback_, cause).wait();
|
||||||
}
|
}
|
||||||
void message_arrived(const_message_ptr msg) override {
|
void message_arrived(const_message_ptr msg) override {
|
||||||
userCallback_->message_arrived(msg);
|
userCallback_->message_arrived(msg);
|
||||||
}
|
}
|
||||||
void delivery_complete(delivery_token_ptr tok) override {
|
void delivery_complete(delivery_token_ptr tok) override {
|
||||||
std::async(std::launch::async, &callback::delivery_complete, userCallback_, tok);
|
std::async(std::launch::async, &callback::delivery_complete, userCallback_, tok).wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Non-copyable */
|
/** Non-copyable */
|
||||||
@ -231,7 +231,7 @@ public:
|
|||||||
*/
|
*/
|
||||||
virtual topic get_topic(const string& top, int qos=message::DFLT_QOS,
|
virtual topic get_topic(const string& top, int qos=message::DFLT_QOS,
|
||||||
bool retained=message::DFLT_RETAINED) {
|
bool retained=message::DFLT_RETAINED) {
|
||||||
return topic(cli_, top);
|
return topic(cli_, top, qos, retained);
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Determines if this client is currently connected to the server.
|
* Determines if this client is currently connected to the server.
|
||||||
|
@ -49,6 +49,9 @@ class connect_options
|
|||||||
/** The default C struct */
|
/** The default C struct */
|
||||||
static const MQTTAsync_connectOptions DFLT_C_STRUCT;
|
static const MQTTAsync_connectOptions DFLT_C_STRUCT;
|
||||||
|
|
||||||
|
/** The default C struct for MQTT v5 */
|
||||||
|
static const MQTTAsync_connectOptions DFLT_C_STRUCT5;
|
||||||
|
|
||||||
/** The underlying C connection options */
|
/** The underlying C connection options */
|
||||||
MQTTAsync_connectOptions opts_;
|
MQTTAsync_connectOptions opts_;
|
||||||
|
|
||||||
@ -114,14 +117,20 @@ public:
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a new object using the default values.
|
* Constructs a new object using the default values.
|
||||||
|
*
|
||||||
|
* @param ver The MQTT protocol version.
|
||||||
*/
|
*/
|
||||||
connect_options();
|
explicit connect_options(int ver=MQTTVERSION_DEFAULT);
|
||||||
/**
|
/**
|
||||||
* Constructs a new object using the specified user name and password.
|
* Constructs a new object using the specified user name and password.
|
||||||
* @param userName The name of the user for connecting to the server
|
* @param userName The name of the user for connecting to the server
|
||||||
* @param password The password for connecting to the server
|
* @param password The password for connecting to the server
|
||||||
|
* @param ver The MQTT protocol version.
|
||||||
*/
|
*/
|
||||||
connect_options(string_ref userName, binary_ref password);
|
connect_options(
|
||||||
|
string_ref userName, binary_ref password,
|
||||||
|
int ver=MQTTVERSION_DEFAULT
|
||||||
|
);
|
||||||
/**
|
/**
|
||||||
* Copy constructor.
|
* Copy constructor.
|
||||||
* @param opt Another object to copy.
|
* @param opt Another object to copy.
|
||||||
@ -145,7 +154,7 @@ public:
|
|||||||
/**
|
/**
|
||||||
* Expose the underlying C struct for the unit tests.
|
* Expose the underlying C struct for the unit tests.
|
||||||
*/
|
*/
|
||||||
#if defined(UNIT_TESTS)
|
#if defined(UNIT_TESTS)
|
||||||
const MQTTAsync_connectOptions& c_struct() const { return opts_; }
|
const MQTTAsync_connectOptions& c_struct() const { return opts_; }
|
||||||
#endif
|
#endif
|
||||||
/**
|
/**
|
||||||
@ -229,10 +238,16 @@ public:
|
|||||||
void set_ssl(ssl_options&& ssl);
|
void set_ssl(ssl_options&& ssl);
|
||||||
/**
|
/**
|
||||||
* Returns whether the server should remember state for the client
|
* Returns whether the server should remember state for the client
|
||||||
* across reconnects.
|
* across reconnects. This only applies to MQTT v3.x connections.
|
||||||
* @return @em true if requesting a clean session, @em false if not.
|
* @return @em true if requesting a clean session, @em false if not.
|
||||||
*/
|
*/
|
||||||
bool is_clean_session() const { return to_bool(opts_.cleansession); }
|
bool is_clean_session() const { return to_bool(opts_.cleansession); }
|
||||||
|
/**
|
||||||
|
* Returns whether the server should remember state for the client
|
||||||
|
* across reconnects. This only applies to MQTT v5 connections.
|
||||||
|
* @return @em true if requesting a clean start, @em false if not.
|
||||||
|
*/
|
||||||
|
bool is_clean_start() const { return to_bool(opts_.cleanstart); }
|
||||||
/**
|
/**
|
||||||
* Gets the token used as the callback context.
|
* Gets the token used as the callback context.
|
||||||
* @return The delivery token used as the callback context.
|
* @return The delivery token used as the callback context.
|
||||||
@ -278,21 +293,35 @@ public:
|
|||||||
std::chrono::seconds get_max_retry_interval() const {
|
std::chrono::seconds get_max_retry_interval() const {
|
||||||
return std::chrono::seconds(opts_.maxRetryInterval);
|
return std::chrono::seconds(opts_.maxRetryInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets whether the server should remember state for the client across
|
* Sets whether the server should remember state for the client across
|
||||||
* reconnects. (MQTT v3.x only)
|
* reconnects. (MQTT v3.x only)
|
||||||
* @param cleanSession @em true if the server should remember state for
|
*
|
||||||
* the client across reconnects, @em false
|
* This will only take effect if the version is _already_ set to v3.x
|
||||||
* othherwise.
|
* (not v5).
|
||||||
|
*
|
||||||
|
* @param clean @em true if the server should remember state for the
|
||||||
|
* client across reconnects, @em false otherwise.
|
||||||
*/
|
*/
|
||||||
void set_clean_session(bool cleanSession) {
|
void set_clean_session(bool cleanSession);
|
||||||
opts_.cleansession = to_int(cleanSession);
|
/**
|
||||||
}
|
* Sets whether the server should remember state for the client across
|
||||||
|
* reconnects. (MQTT v5 only)
|
||||||
|
*
|
||||||
|
* If a persistent session is desired (turning this off), then the app
|
||||||
|
* should also set the `Session Expiry Interval` property, and add that
|
||||||
|
* to the connect options.
|
||||||
|
*
|
||||||
|
* This will only take effect if the MQTT version is set to v5
|
||||||
|
*
|
||||||
|
* @param clean @em true if the server should remember state for the
|
||||||
|
* client across reconnects, @em false otherwise.
|
||||||
|
*/
|
||||||
|
void set_clean_start(bool cleanStart);
|
||||||
/**
|
/**
|
||||||
* Sets the "keep alive" interval.
|
* Sets the "keep alive" interval.
|
||||||
* This is the maximum time that should pass without communications
|
* This is the maximum time that should pass without communications
|
||||||
* between client and server. If no massages pass in this time, the
|
* between client and server. If no messages pass in this time, the
|
||||||
* client will ping the broker.
|
* client will ping the broker.
|
||||||
* @param keepAliveInterval The keep alive interval in seconds.
|
* @param keepAliveInterval The keep alive interval in seconds.
|
||||||
*/
|
*/
|
||||||
@ -302,7 +331,7 @@ public:
|
|||||||
/**
|
/**
|
||||||
* Sets the "keep alive" interval with a chrono duration.
|
* Sets the "keep alive" interval with a chrono duration.
|
||||||
* This is the maximum time that should pass without communications
|
* This is the maximum time that should pass without communications
|
||||||
* between client and server. If no massages pass in this time, the
|
* between client and server. If no messages pass in this time, the
|
||||||
* client will ping the broker.
|
* client will ping the broker.
|
||||||
* @param interval The keep alive interval.
|
* @param interval The keep alive interval.
|
||||||
*/
|
*/
|
||||||
@ -428,21 +457,6 @@ public:
|
|||||||
set_automatic_reconnect((int) to_seconds_count(minRetryInterval),
|
set_automatic_reconnect((int) to_seconds_count(minRetryInterval),
|
||||||
(int) to_seconds_count(maxRetryInterval));
|
(int) to_seconds_count(maxRetryInterval));
|
||||||
}
|
}
|
||||||
/**
|
|
||||||
* Determines if the 'clean start' flag is set for the connect.
|
|
||||||
* @return @em true if the 'clean start' flag is set for the connect, @em
|
|
||||||
* false if not.
|
|
||||||
*/
|
|
||||||
bool is_clean_start() const {
|
|
||||||
return to_bool(opts_.cleanstart);
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Sets the 'clean start' flag for the connection.
|
|
||||||
* @param cleanStart Whether to set the 'clean start' flag for the connect.
|
|
||||||
*/
|
|
||||||
void set_clean_start(bool cleanStart) {
|
|
||||||
opts_.cleanstart = to_int(cleanStart);
|
|
||||||
}
|
|
||||||
/**
|
/**
|
||||||
* Gets the connect properties.
|
* Gets the connect properties.
|
||||||
* @return A const reference to the properties for the connect.
|
* @return A const reference to the properties for the connect.
|
||||||
@ -454,18 +468,12 @@ public:
|
|||||||
* Sets the properties for the connect.
|
* Sets the properties for the connect.
|
||||||
* @param props The properties to place into the message.
|
* @param props The properties to place into the message.
|
||||||
*/
|
*/
|
||||||
void set_properties(const properties& props) {
|
void set_properties(const properties& props);
|
||||||
props_ = props;
|
|
||||||
opts_.connectProperties = const_cast<MQTTProperties*>(&props_.c_struct());
|
|
||||||
}
|
|
||||||
/**
|
/**
|
||||||
* Moves the properties for the connect.
|
* Moves the properties for the connect.
|
||||||
* @param props The properties to move into the connect object.
|
* @param props The properties to move into the connect object.
|
||||||
*/
|
*/
|
||||||
void set_properties(properties&& props) {
|
void set_properties(properties&& props);
|
||||||
props_ = std::move(props);
|
|
||||||
opts_.connectProperties = const_cast<MQTTProperties*>(&props_.c_struct());
|
|
||||||
}
|
|
||||||
/**
|
/**
|
||||||
* Gets the HTTP headers
|
* Gets the HTTP headers
|
||||||
* @return A const reference to the HTTP headers name/value collection.
|
* @return A const reference to the HTTP headers name/value collection.
|
||||||
@ -562,12 +570,12 @@ public:
|
|||||||
connect_data();
|
connect_data();
|
||||||
/**
|
/**
|
||||||
* Creates connection data with a user name, but no password.
|
* Creates connection data with a user name, but no password.
|
||||||
* @param userName The user name fopr reconnecting to the MQTT broker.
|
* @param userName The user name for reconnecting to the MQTT broker.
|
||||||
*/
|
*/
|
||||||
explicit connect_data(string_ref userName);
|
explicit connect_data(string_ref userName);
|
||||||
/**
|
/**
|
||||||
* Creates connection data with a user name and password.
|
* Creates connection data with a user name and password.
|
||||||
* @param userName The user name fopr reconnecting to the MQTT broker.
|
* @param userName The user name for reconnecting to the MQTT broker.
|
||||||
* @param password The password for connecting to the MQTT broker.
|
* @param password The password for connecting to the MQTT broker.
|
||||||
*/
|
*/
|
||||||
connect_data(string_ref userName, binary_ref password);
|
connect_data(string_ref userName, binary_ref password);
|
||||||
@ -619,12 +627,12 @@ public:
|
|||||||
/**
|
/**
|
||||||
* Default constructor.
|
* Default constructor.
|
||||||
*/
|
*/
|
||||||
connect_options_builder() {}
|
explicit connect_options_builder(int ver=MQTTVERSION_DEFAULT) : opts_(ver) {}
|
||||||
/**
|
/**
|
||||||
* Sets whether the server should remember state for the client across
|
* Sets whether the server should remember state for the client across
|
||||||
* reconnects. (MQTT v3.x only)
|
* reconnects. (MQTT v3.x only)
|
||||||
* @param on @em true if the server should remember state for the client
|
* @param on @em true if the server should remember state for the client
|
||||||
* across reconnects, @em false othherwise.
|
* across reconnects, @em false otherwise.
|
||||||
*/
|
*/
|
||||||
auto clean_session(bool on=true) -> self& {
|
auto clean_session(bool on=true) -> self& {
|
||||||
opts_.set_clean_session(on);
|
opts_.set_clean_session(on);
|
||||||
@ -633,7 +641,7 @@ public:
|
|||||||
/**
|
/**
|
||||||
* Sets the "keep alive" interval with a chrono duration.
|
* Sets the "keep alive" interval with a chrono duration.
|
||||||
* This is the maximum time that should pass without communications
|
* This is the maximum time that should pass without communications
|
||||||
* between client and server. If no massages pass in this time, the
|
* between client and server. If no messages pass in this time, the
|
||||||
* client will ping the broker.
|
* client will ping the broker.
|
||||||
* @param interval The keep alive interval.
|
* @param interval The keep alive interval.
|
||||||
*/
|
*/
|
||||||
@ -747,7 +755,7 @@ public:
|
|||||||
* This will also set other connect options to legal values dependent on
|
* This will also set other connect options to legal values dependent on
|
||||||
* the selected version.
|
* the selected version.
|
||||||
*
|
*
|
||||||
* @param ver The MQTT version to use for the connection:
|
* @param ver The MQTT protocol version to use for the connection:
|
||||||
* @li MQTTVERSION_DEFAULT (0) = default: start with 3.1.1, and if
|
* @li MQTTVERSION_DEFAULT (0) = default: start with 3.1.1, and if
|
||||||
* that fails, fall back to 3.1
|
* that fails, fall back to 3.1
|
||||||
* @li MQTTVERSION_3_1 (3) = only try version 3.1
|
* @li MQTTVERSION_3_1 (3) = only try version 3.1
|
||||||
|
@ -6,7 +6,7 @@
|
|||||||
/////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
* Copyright (c) 2020 Frank Pagliughi <fpagliughi@mindspring.com>
|
* Copyright (c) 2020-2023 Frank Pagliughi <fpagliughi@mindspring.com>
|
||||||
*
|
*
|
||||||
* All rights reserved. This program and the accompanying materials
|
* All rights reserved. This program and the accompanying materials
|
||||||
* are made available under the terms of the Eclipse Public License v1.0
|
* are made available under the terms of the Eclipse Public License v1.0
|
||||||
@ -57,7 +57,7 @@ public:
|
|||||||
*/
|
*/
|
||||||
create_options() : opts_(DFLT_C_STRUCT) {}
|
create_options() : opts_(DFLT_C_STRUCT) {}
|
||||||
/**
|
/**
|
||||||
* Default create optionsfor the specified version of MQTT.
|
* Default create options for the specified version of MQTT.
|
||||||
* @param mqttVersion The MQTT version used to create the client.
|
* @param mqttVersion The MQTT version used to create the client.
|
||||||
*/
|
*/
|
||||||
explicit create_options(int mqttVersion);
|
explicit create_options(int mqttVersion);
|
||||||
@ -112,7 +112,7 @@ public:
|
|||||||
* Sets the MQTT version used to create the client.
|
* Sets the MQTT version used to create the client.
|
||||||
* @param ver The MQTT version used to create the client.
|
* @param ver The MQTT version used to create the client.
|
||||||
*/
|
*/
|
||||||
void set_mqtt_verison(int ver) { opts_.MQTTVersion = ver; }
|
void set_mqtt_version(int ver) { opts_.MQTTVersion = ver; }
|
||||||
/**
|
/**
|
||||||
* Whether the oldest messages are deleted when the output buffer is
|
* Whether the oldest messages are deleted when the output buffer is
|
||||||
* full.
|
* full.
|
||||||
@ -160,7 +160,7 @@ public:
|
|||||||
return to_bool(opts_.persistQoS0);
|
return to_bool(opts_.persistQoS0);
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Determeine whether to persist QoS 0 messages.
|
* Determine whether to persist QoS 0 messages.
|
||||||
*
|
*
|
||||||
* @param on @em true if QoS 0 messages are persisted, @em false if not.
|
* @param on @em true if QoS 0 messages are persisted, @em false if not.
|
||||||
*/
|
*/
|
||||||
@ -235,10 +235,10 @@ public:
|
|||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Determines whether to restore persisted messsages or clear the
|
* Determines whether to restore persisted messages or clear the
|
||||||
* persistence store. (Defaults true)
|
* persistence store. (Defaults true)
|
||||||
*
|
*
|
||||||
* @param on @em true to retore persisted messages, @em false to clear
|
* @param on @em true to restore persisted messages, @em false to clear
|
||||||
* the persistence store.
|
* the persistence store.
|
||||||
* @return A reference to this object.
|
* @return A reference to this object.
|
||||||
*/
|
*/
|
||||||
|
@ -339,7 +339,7 @@ public:
|
|||||||
* received at the published QoS. Messages published at a
|
* received at the published QoS. Messages published at a
|
||||||
* higher quality of service will be received using the QoS
|
* higher quality of service will be received using the QoS
|
||||||
* specified on the subscribe.
|
* specified on the subscribe.
|
||||||
* @param opts A collection of subscription optsions (one for each
|
* @param opts A collection of subscription options (one for each
|
||||||
* topic)
|
* topic)
|
||||||
* @param props The MQTT v5 properties.
|
* @param props The MQTT v5 properties.
|
||||||
* @return token used to track and wait for the subscribe to complete.
|
* @return token used to track and wait for the subscribe to complete.
|
||||||
@ -362,7 +362,7 @@ public:
|
|||||||
* callback. Use @em nullptr if not required.
|
* callback. Use @em nullptr if not required.
|
||||||
* @param callback listener that will be notified when subscribe has
|
* @param callback listener that will be notified when subscribe has
|
||||||
* completed
|
* completed
|
||||||
* @param opts A collection of subscription optsions (one for each
|
* @param opts A collection of subscription options (one for each
|
||||||
* topic)
|
* topic)
|
||||||
* @param props The MQTT v5 properties.
|
* @param props The MQTT v5 properties.
|
||||||
* @return token used to track and wait for the subscribe to complete.
|
* @return token used to track and wait for the subscribe to complete.
|
||||||
|
@ -43,10 +43,10 @@ namespace mqtt {
|
|||||||
* The topic and payload buffers are kept as references to const data, so
|
* The topic and payload buffers are kept as references to const data, so
|
||||||
* they can be reassigned as needed, but the buffers can not be updated
|
* they can be reassigned as needed, but the buffers can not be updated
|
||||||
* in-place. Normally they would be created externally then copied or moved
|
* in-place. Normally they would be created externally then copied or moved
|
||||||
* into the message. The library to transport the messages never touchec the
|
* into the message. The library to transport the messages never touches the
|
||||||
* payloads or topics.
|
* payloads or topics.
|
||||||
*
|
*
|
||||||
* This also means that message objects are farily cheap to copy, since they
|
* This also means that message objects are fairly cheap to copy, since they
|
||||||
* don't copy the payloads. They simply copy the reference to the buffers.
|
* don't copy the payloads. They simply copy the reference to the buffers.
|
||||||
* It is safe to pass these buffer references across threads since all
|
* It is safe to pass these buffer references across threads since all
|
||||||
* references promise not to update the contents of the buffer.
|
* references promise not to update the contents of the buffer.
|
||||||
|
@ -167,8 +167,8 @@ public:
|
|||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extracts the value from the property as the specitied type.
|
* Extracts the value from the property as the specified type.
|
||||||
* @return The value from the property as the specitied type.
|
* @return The value from the property as the specified type.
|
||||||
*/
|
*/
|
||||||
template <typename T>
|
template <typename T>
|
||||||
inline T get(const property&) { throw bad_cast(); }
|
inline T get(const property&) { throw bad_cast(); }
|
||||||
@ -402,7 +402,7 @@ public:
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieves a single value from a property list for when there may be
|
* Retrieves a single value from a property list for when there may be
|
||||||
* multiple identical propert ID's.
|
* multiple identical property ID's.
|
||||||
* @tparam T The type of the value to retrieve
|
* @tparam T The type of the value to retrieve
|
||||||
* @param props The property list
|
* @param props The property list
|
||||||
* @param propid The property ID code for the desired value.
|
* @param propid The property ID code for the desired value.
|
||||||
|
@ -28,7 +28,7 @@ class token_test;
|
|||||||
* functionality currently required by the library.
|
* functionality currently required by the library.
|
||||||
*
|
*
|
||||||
* Note, too, in the C lib, this became a place to add MQTT v5 options for
|
* Note, too, in the C lib, this became a place to add MQTT v5 options for
|
||||||
* the outgoing calls without breaking the API, so is also refered to as the
|
* the outgoing calls without breaking the API, so is also referred to as the
|
||||||
* "call options".
|
* "call options".
|
||||||
*/
|
*/
|
||||||
class response_options
|
class response_options
|
||||||
@ -54,13 +54,13 @@ class response_options
|
|||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
* Create an empty response object.
|
* Create an empty response object.
|
||||||
* @param mqttVersion The MQTT versoion for the response.
|
* @param mqttVersion The MQTT version for the response.
|
||||||
*/
|
*/
|
||||||
explicit response_options(int mqttVersion=MQTTVERSION_DEFAULT);
|
explicit response_options(int mqttVersion=MQTTVERSION_DEFAULT);
|
||||||
/**
|
/**
|
||||||
* Creates a response object with the specified callbacks.
|
* Creates a response object with the specified callbacks.
|
||||||
* @param tok A token to be used as the context.
|
* @param tok A token to be used as the context.
|
||||||
* @param mqttVersion The MQTT versoion for the response.
|
* @param mqttVersion The MQTT version for the response.
|
||||||
*/
|
*/
|
||||||
response_options(const token_ptr& tok, int mqttVersion=MQTTVERSION_DEFAULT);
|
response_options(const token_ptr& tok, int mqttVersion=MQTTVERSION_DEFAULT);
|
||||||
/**
|
/**
|
||||||
|
@ -190,7 +190,7 @@ class unsubscribe_response : public server_response
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
unsubscribe_response(MQTTAsync_successData* rsp) {}
|
unsubscribe_response(MQTTAsync_successData* /*rsp*/) {}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
|
@ -273,7 +273,7 @@ public:
|
|||||||
* cipher list format, please see the OpenSSL
|
* cipher list format, please see the OpenSSL
|
||||||
* on-line documentation:
|
* on-line documentation:
|
||||||
* http://www.openssl.org/docs/apps/ciphers.html#CIPHER_LIST_FORMAT
|
* http://www.openssl.org/docs/apps/ciphers.html#CIPHER_LIST_FORMAT
|
||||||
* If this setting is ommitted, its default
|
* If this setting is omitted, its default
|
||||||
* value will be "ALL", that is, all the
|
* value will be "ALL", that is, all the
|
||||||
* cipher suites -excluding those offering no
|
* cipher suites -excluding those offering no
|
||||||
* encryption- will be considered. This
|
* encryption- will be considered. This
|
||||||
@ -284,10 +284,10 @@ public:
|
|||||||
void set_enabled_cipher_suites(const string& enabledCipherSuites);
|
void set_enabled_cipher_suites(const string& enabledCipherSuites);
|
||||||
/**
|
/**
|
||||||
* Enables or disables verification of the server certificate.
|
* Enables or disables verification of the server certificate.
|
||||||
* @param enablServerCertAuth enable/disable verification of the server
|
* @param enableServerCertAuth enable/disable verification of the server
|
||||||
* certificate
|
* certificate
|
||||||
*/
|
*/
|
||||||
void set_enable_server_cert_auth(bool enablServerCertAuth);
|
void set_enable_server_cert_auth(bool enableServerCertAuth);
|
||||||
/**
|
/**
|
||||||
* Gets the requested SSL/TLS version.
|
* Gets the requested SSL/TLS version.
|
||||||
* @return The requested SSL/TLS version.
|
* @return The requested SSL/TLS version.
|
||||||
@ -350,7 +350,7 @@ public:
|
|||||||
*/
|
*/
|
||||||
std::vector<string> get_alpn_protos() const;
|
std::vector<string> get_alpn_protos() const;
|
||||||
/**
|
/**
|
||||||
* Sets the list of supported ALPN protolols.
|
* Sets the list of supported ALPN protocols.
|
||||||
* See:
|
* See:
|
||||||
* https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html
|
* https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html
|
||||||
* @param protos The list of ALPN protocols to be negotiated.
|
* @param protos The list of ALPN protocols to be negotiated.
|
||||||
@ -430,7 +430,7 @@ public:
|
|||||||
* explanation of the cipher list format, please see the
|
* explanation of the cipher list format, please see the
|
||||||
* OpenSSL on-line documentation:
|
* OpenSSL on-line documentation:
|
||||||
* http://www.openssl.org/docs/apps/ciphers.html#CIPHER_LIST_FORMAT
|
* http://www.openssl.org/docs/apps/ciphers.html#CIPHER_LIST_FORMAT
|
||||||
* If this setting is ommitted, its default value will be
|
* If this setting is omitted, its default value will be
|
||||||
* "ALL", that is, all the cipher suites -excluding those
|
* "ALL", that is, all the cipher suites -excluding those
|
||||||
* offering no encryption- will be considered. This setting
|
* offering no encryption- will be considered. This setting
|
||||||
* can be used to set an SSL anonymous connection (empty
|
* can be used to set an SSL anonymous connection (empty
|
||||||
|
@ -51,7 +51,7 @@ class string_collection
|
|||||||
*/
|
*/
|
||||||
collection_type coll_;
|
collection_type coll_;
|
||||||
/**
|
/**
|
||||||
* A colleciton of pointers to NUL-terminated C strings for the topics.
|
* A collection of pointers to NUL-terminated C strings for the topics.
|
||||||
* This is what is required by the Paho C library, and thus the lifetime
|
* This is what is required by the Paho C library, and thus the lifetime
|
||||||
* of the pointers will remain consistent with the lifetime of the
|
* of the pointers will remain consistent with the lifetime of the
|
||||||
* object. The value is kept consistent with the current topics and
|
* object. The value is kept consistent with the current topics and
|
||||||
@ -268,7 +268,7 @@ public:
|
|||||||
/** The type of the string/string pair of values */
|
/** The type of the string/string pair of values */
|
||||||
using value_type = collection_type::value_type;
|
using value_type = collection_type::value_type;
|
||||||
/**
|
/**
|
||||||
* Default construtor for an empty collection.
|
* Default constructor for an empty collection.
|
||||||
*/
|
*/
|
||||||
name_value_collection() =default;
|
name_value_collection() =default;
|
||||||
/**
|
/**
|
||||||
|
@ -135,7 +135,7 @@ public:
|
|||||||
opts_.retainAsPublished = on ? 1 : 0;
|
opts_.retainAsPublished = on ? 1 : 0;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Gets the "retasin handling" option.
|
* Gets the "retain handling" option.
|
||||||
* @return When to send retained messages:
|
* @return When to send retained messages:
|
||||||
* @li (SEND_RETAINED_ON_SUBSCRIBE, 0) At the time of the subscribe
|
* @li (SEND_RETAINED_ON_SUBSCRIBE, 0) At the time of the subscribe
|
||||||
* @li (SEND_RETAINED_ON_NEW, 1) Only at the time of a new
|
* @li (SEND_RETAINED_ON_NEW, 1) Only at the time of a new
|
||||||
|
@ -7,7 +7,7 @@
|
|||||||
/////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
* Copyright (c) 2017-2021 Frank Pagliughi <fpagliughi@mindspring.com>
|
* Copyright (c) 2017-2022 Frank Pagliughi <fpagliughi@mindspring.com>
|
||||||
*
|
*
|
||||||
* All rights reserved. This program and the accompanying materials
|
* All rights reserved. This program and the accompanying materials
|
||||||
* are made available under the terms of the Eclipse Public License v1.0
|
* are made available under the terms of the Eclipse Public License v1.0
|
||||||
@ -39,14 +39,15 @@ namespace mqtt {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* A thread-safe queue for inter-thread communication.
|
* A thread-safe queue for inter-thread communication.
|
||||||
* This is a lockinq queue with blocking operations. The get() operations
|
*
|
||||||
|
* This is a locking queue with blocking operations. The get() operations
|
||||||
* can always block on an empty queue, but have variations for non-blocking
|
* can always block on an empty queue, but have variations for non-blocking
|
||||||
* (try_get) and bounded-time blocking (try_get_for, try_get_until).
|
* (try_get) and bounded-time blocking (try_get_for, try_get_until).
|
||||||
* @par
|
* @par
|
||||||
* The default queue has a capacity that is unbounded in the practical
|
* The default queue has a capacity that is unbounded in the practical
|
||||||
* sense, limited by available memory. In this mode the object will not
|
* sense, limited by available memory. In this mode the object will not
|
||||||
* block when placing values into the queue. A capacity can bet set with the
|
* block when placing values into the queue. A capacity can bet set with the
|
||||||
* construtor or, at any time later by calling the @ref capacity(size_type)
|
* constructor or, at any time later by calling the @ref capacity(size_type)
|
||||||
* method. Using this latter method, the capacity can be set to an amount
|
* method. Using this latter method, the capacity can be set to an amount
|
||||||
* smaller than the current size of the queue. In that case all put's to the
|
* smaller than the current size of the queue. In that case all put's to the
|
||||||
* queue will block until the number of items are removed from the queue to
|
* queue will block until the number of items are removed from the queue to
|
||||||
@ -127,7 +128,7 @@ public:
|
|||||||
* Sets the capacity of the queue.
|
* Sets the capacity of the queue.
|
||||||
* Note that the capacity can be set to a value smaller than the current
|
* Note that the capacity can be set to a value smaller than the current
|
||||||
* size of the queue. In that event, all calls to put() will block until
|
* size of the queue. In that event, all calls to put() will block until
|
||||||
* a suffucuent number
|
* a sufficient number
|
||||||
*/
|
*/
|
||||||
void capacity(size_type cap) {
|
void capacity(size_type cap) {
|
||||||
guard g(lock_);
|
guard g(lock_);
|
||||||
@ -149,14 +150,11 @@ public:
|
|||||||
*/
|
*/
|
||||||
void put(value_type val) {
|
void put(value_type val) {
|
||||||
unique_guard g(lock_);
|
unique_guard g(lock_);
|
||||||
if (que_.size() >= cap_)
|
notFullCond_.wait(g, [this]{return que_.size() < cap_;});
|
||||||
notFullCond_.wait(g, [this]{return que_.size() < cap_;});
|
|
||||||
bool wasEmpty = que_.empty();
|
|
||||||
que_.emplace(std::move(val));
|
que_.emplace(std::move(val));
|
||||||
if (wasEmpty) {
|
g.unlock();
|
||||||
g.unlock();
|
notEmptyCond_.notify_one();
|
||||||
notEmptyCond_.notify_one();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Non-blocking attempt to place an item into the queue.
|
* Non-blocking attempt to place an item into the queue.
|
||||||
@ -166,14 +164,12 @@ public:
|
|||||||
*/
|
*/
|
||||||
bool try_put(value_type val) {
|
bool try_put(value_type val) {
|
||||||
unique_guard g(lock_);
|
unique_guard g(lock_);
|
||||||
size_type n = que_.size();
|
if (que_.size() >= cap_)
|
||||||
if (n >= cap_)
|
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
que_.emplace(std::move(val));
|
que_.emplace(std::move(val));
|
||||||
if (n == 0) {
|
g.unlock();
|
||||||
g.unlock();
|
notEmptyCond_.notify_one();
|
||||||
notEmptyCond_.notify_one();
|
|
||||||
}
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
@ -186,16 +182,14 @@ public:
|
|||||||
* timeout occurred.
|
* timeout occurred.
|
||||||
*/
|
*/
|
||||||
template <typename Rep, class Period>
|
template <typename Rep, class Period>
|
||||||
bool try_put_for(value_type* val, const std::chrono::duration<Rep, Period>& relTime) {
|
bool try_put_for(value_type val, const std::chrono::duration<Rep, Period>& relTime) {
|
||||||
unique_guard g(lock_);
|
unique_guard g(lock_);
|
||||||
if (que_.size() >= cap_ && !notFullCond_.wait_for(g, relTime, [this]{return que_.size() < cap_;}))
|
if (!notFullCond_.wait_for(g, relTime, [this]{return que_.size() < cap_;}))
|
||||||
return false;
|
return false;
|
||||||
bool wasEmpty = que_.empty();
|
|
||||||
que_.emplace(std::move(val));
|
que_.emplace(std::move(val));
|
||||||
if (wasEmpty) {
|
g.unlock();
|
||||||
g.unlock();
|
notEmptyCond_.notify_one();
|
||||||
notEmptyCond_.notify_one();
|
|
||||||
}
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
@ -209,16 +203,14 @@ public:
|
|||||||
* timeout occurred.
|
* timeout occurred.
|
||||||
*/
|
*/
|
||||||
template <class Clock, class Duration>
|
template <class Clock, class Duration>
|
||||||
bool try_put_until(value_type* val, const std::chrono::time_point<Clock,Duration>& absTime) {
|
bool try_put_until(value_type val, const std::chrono::time_point<Clock,Duration>& absTime) {
|
||||||
unique_guard g(lock_);
|
unique_guard g(lock_);
|
||||||
if (que_.size() >= cap_ && !notFullCond_.wait_until(g, absTime, [this]{return que_.size() < cap_;}))
|
if (!notFullCond_.wait_until(g, absTime, [this]{return que_.size() < cap_;}))
|
||||||
return false;
|
return false;
|
||||||
bool wasEmpty = que_.empty();
|
|
||||||
que_.emplace(std::move(val));
|
que_.emplace(std::move(val));
|
||||||
if (wasEmpty) {
|
g.unlock();
|
||||||
g.unlock();
|
notEmptyCond_.notify_one();
|
||||||
notEmptyCond_.notify_one();
|
|
||||||
}
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
@ -228,15 +220,16 @@ public:
|
|||||||
* @param val Pointer to a variable to receive the value.
|
* @param val Pointer to a variable to receive the value.
|
||||||
*/
|
*/
|
||||||
void get(value_type* val) {
|
void get(value_type* val) {
|
||||||
|
if (!val)
|
||||||
|
return;
|
||||||
|
|
||||||
unique_guard g(lock_);
|
unique_guard g(lock_);
|
||||||
if (que_.empty())
|
notEmptyCond_.wait(g, [this]{return !que_.empty();});
|
||||||
notEmptyCond_.wait(g, [this]{return !que_.empty();});
|
|
||||||
*val = std::move(que_.front());
|
*val = std::move(que_.front());
|
||||||
que_.pop();
|
que_.pop();
|
||||||
if (que_.size() == cap_-1) {
|
g.unlock();
|
||||||
g.unlock();
|
notFullCond_.notify_one();
|
||||||
notFullCond_.notify_one();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Retrieve a value from the queue.
|
* Retrieve a value from the queue.
|
||||||
@ -246,14 +239,12 @@ public:
|
|||||||
*/
|
*/
|
||||||
value_type get() {
|
value_type get() {
|
||||||
unique_guard g(lock_);
|
unique_guard g(lock_);
|
||||||
if (que_.empty())
|
notEmptyCond_.wait(g, [this]{return !que_.empty();});
|
||||||
notEmptyCond_.wait(g, [this]{return !que_.empty();});
|
|
||||||
value_type val = std::move(que_.front());
|
value_type val = std::move(que_.front());
|
||||||
que_.pop();
|
que_.pop();
|
||||||
if (que_.size() == cap_-1) {
|
g.unlock();
|
||||||
g.unlock();
|
notFullCond_.notify_one();
|
||||||
notFullCond_.notify_one();
|
|
||||||
}
|
|
||||||
return val;
|
return val;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
@ -265,21 +256,23 @@ public:
|
|||||||
* the queue is empty.
|
* the queue is empty.
|
||||||
*/
|
*/
|
||||||
bool try_get(value_type* val) {
|
bool try_get(value_type* val) {
|
||||||
|
if (!val)
|
||||||
|
return false;
|
||||||
|
|
||||||
unique_guard g(lock_);
|
unique_guard g(lock_);
|
||||||
if (que_.empty())
|
if (que_.empty())
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
*val = std::move(que_.front());
|
*val = std::move(que_.front());
|
||||||
que_.pop();
|
que_.pop();
|
||||||
if (que_.size() == cap_-1) {
|
g.unlock();
|
||||||
g.unlock();
|
notFullCond_.notify_one();
|
||||||
notFullCond_.notify_one();
|
|
||||||
}
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Attempt to remove an item from the queue for a bounded amout of time.
|
* Attempt to remove an item from the queue for a bounded amount of time.
|
||||||
* This will retrieve the next item from the queue. If the queue is
|
* This will retrieve the next item from the queue. If the queue is
|
||||||
* empty, it will wait the specified amout of time for an item to arive
|
* empty, it will wait the specified amount of time for an item to arrive
|
||||||
* before timing out.
|
* before timing out.
|
||||||
* @param val Pointer to a variable to receive the value.
|
* @param val Pointer to a variable to receive the value.
|
||||||
* @param relTime The amount of time to wait until timing out.
|
* @param relTime The amount of time to wait until timing out.
|
||||||
@ -288,21 +281,23 @@ public:
|
|||||||
*/
|
*/
|
||||||
template <typename Rep, class Period>
|
template <typename Rep, class Period>
|
||||||
bool try_get_for(value_type* val, const std::chrono::duration<Rep, Period>& relTime) {
|
bool try_get_for(value_type* val, const std::chrono::duration<Rep, Period>& relTime) {
|
||||||
unique_guard g(lock_);
|
if (!val)
|
||||||
if (que_.empty() && !notEmptyCond_.wait_for(g, relTime, [this]{return !que_.empty();}))
|
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
unique_guard g(lock_);
|
||||||
|
if (!notEmptyCond_.wait_for(g, relTime, [this]{return !que_.empty();}))
|
||||||
|
return false;
|
||||||
|
|
||||||
*val = std::move(que_.front());
|
*val = std::move(que_.front());
|
||||||
que_.pop();
|
que_.pop();
|
||||||
if (que_.size() == cap_-1) {
|
g.unlock();
|
||||||
g.unlock();
|
notFullCond_.notify_one();
|
||||||
notFullCond_.notify_one();
|
|
||||||
}
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Attempt to remove an item from the queue for a bounded amout of time.
|
* Attempt to remove an item from the queue for a bounded amount of time.
|
||||||
* This will retrieve the next item from the queue. If the queue is
|
* This will retrieve the next item from the queue. If the queue is
|
||||||
* empty, it will wait until the specified time for an item to arive
|
* empty, it will wait until the specified time for an item to arrive
|
||||||
* before timing out.
|
* before timing out.
|
||||||
* @param val Pointer to a variable to receive the value.
|
* @param val Pointer to a variable to receive the value.
|
||||||
* @param absTime The absolute time to wait to before timing out.
|
* @param absTime The absolute time to wait to before timing out.
|
||||||
@ -311,15 +306,17 @@ public:
|
|||||||
*/
|
*/
|
||||||
template <class Clock, class Duration>
|
template <class Clock, class Duration>
|
||||||
bool try_get_until(value_type* val, const std::chrono::time_point<Clock,Duration>& absTime) {
|
bool try_get_until(value_type* val, const std::chrono::time_point<Clock,Duration>& absTime) {
|
||||||
unique_guard g(lock_);
|
if (!val)
|
||||||
if (que_.empty() && !notEmptyCond_.wait_until(g, absTime, [this]{return !que_.empty();}))
|
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
unique_guard g(lock_);
|
||||||
|
if (!notEmptyCond_.wait_until(g, absTime, [this]{return !que_.empty();}))
|
||||||
|
return false;
|
||||||
|
|
||||||
*val = std::move(que_.front());
|
*val = std::move(que_.front());
|
||||||
que_.pop();
|
que_.pop();
|
||||||
if (que_.size() == cap_-1) {
|
g.unlock();
|
||||||
g.unlock();
|
notFullCond_.notify_one();
|
||||||
notFullCond_.notify_one();
|
|
||||||
}
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -107,7 +107,7 @@ private:
|
|||||||
/** Whether the action has yet to complete */
|
/** Whether the action has yet to complete */
|
||||||
bool complete_;
|
bool complete_;
|
||||||
|
|
||||||
/** MQTT v5 propeties */
|
/** MQTT v5 properties */
|
||||||
//properties props_;
|
//properties props_;
|
||||||
/** Connection response (null if not available) */
|
/** Connection response (null if not available) */
|
||||||
std::unique_ptr<connect_response> connRsp_;
|
std::unique_ptr<connect_response> connRsp_;
|
||||||
@ -213,7 +213,7 @@ public:
|
|||||||
* Constructs a token object.
|
* Constructs a token object.
|
||||||
* @param typ The type of request that the token is tracking.
|
* @param typ The type of request that the token is tracking.
|
||||||
* @param cli The client that created the token.
|
* @param cli The client that created the token.
|
||||||
* @param topic The topic assiciated with the token
|
* @param topic The topic associated with the token
|
||||||
*/
|
*/
|
||||||
token(Type typ, iasync_client& cli, const string& topic)
|
token(Type typ, iasync_client& cli, const string& topic)
|
||||||
: token(typ, cli, string_collection::create(topic)) {}
|
: token(typ, cli, string_collection::create(topic)) {}
|
||||||
@ -221,7 +221,7 @@ public:
|
|||||||
* Constructs a token object.
|
* Constructs a token object.
|
||||||
* @param typ The type of request that the token is tracking.
|
* @param typ The type of request that the token is tracking.
|
||||||
* @param cli The client that created the token.
|
* @param cli The client that created the token.
|
||||||
* @param topic The topic assiciated with the token
|
* @param topic The topic associated with the token
|
||||||
* @param userContext optional object used to pass context to the
|
* @param userContext optional object used to pass context to the
|
||||||
* callback. Use @em nullptr if not required.
|
* callback. Use @em nullptr if not required.
|
||||||
* @param cb callback listener that will be notified when subscribe has
|
* @param cb callback listener that will be notified when subscribe has
|
||||||
@ -287,7 +287,7 @@ public:
|
|||||||
* Constructs a token object.
|
* Constructs a token object.
|
||||||
* @param typ The type of request that the token is tracking.
|
* @param typ The type of request that the token is tracking.
|
||||||
* @param cli The client that created the token.
|
* @param cli The client that created the token.
|
||||||
* @param topic The topic assiciated with the token
|
* @param topic The topic associated with the token
|
||||||
*/
|
*/
|
||||||
static ptr_t create(Type typ, iasync_client& cli, const string& topic) {
|
static ptr_t create(Type typ, iasync_client& cli, const string& topic) {
|
||||||
return std::make_shared<token>(typ, cli, topic);
|
return std::make_shared<token>(typ, cli, topic);
|
||||||
@ -296,7 +296,7 @@ public:
|
|||||||
* Constructs a token object.
|
* Constructs a token object.
|
||||||
* @param typ The type of request that the token is tracking.
|
* @param typ The type of request that the token is tracking.
|
||||||
* @param cli The client that created the token.
|
* @param cli The client that created the token.
|
||||||
* @param topic The topic assiciated with the token
|
* @param topic The topic associated with the token
|
||||||
* @param userContext optional object used to pass context to the
|
* @param userContext optional object used to pass context to the
|
||||||
* callback. Use @em nullptr if not required.
|
* callback. Use @em nullptr if not required.
|
||||||
* @param cb callback listener that will be notified when subscribe has
|
* @param cb callback listener that will be notified when subscribe has
|
||||||
@ -405,7 +405,7 @@ public:
|
|||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Sets the number of results expected.
|
* Sets the number of results expected.
|
||||||
* This is only required for subecribe_many() with < MQTTv5
|
* This is only required for subscribe many() with < MQTTv5
|
||||||
* @param n The number of results expected.
|
* @param n The number of results expected.
|
||||||
*/
|
*/
|
||||||
void set_num_expected(size_t n) { nExpected_ = n; }
|
void set_num_expected(size_t n) { nExpected_ = n; }
|
||||||
@ -479,7 +479,7 @@ public:
|
|||||||
/**
|
/**
|
||||||
* Gets the response from a connect operation.
|
* Gets the response from a connect operation.
|
||||||
* This returns the result of the completed operation. If the
|
* This returns the result of the completed operation. If the
|
||||||
* operaton is not yet complete this will block until the result
|
* operation is not yet complete this will block until the result
|
||||||
* is available.
|
* is available.
|
||||||
* @return The result of the operation.
|
* @return The result of the operation.
|
||||||
*/
|
*/
|
||||||
@ -487,7 +487,7 @@ public:
|
|||||||
/**
|
/**
|
||||||
* Gets the response from a connect operation.
|
* Gets the response from a connect operation.
|
||||||
* This returns the result of the completed operation. If the
|
* This returns the result of the completed operation. If the
|
||||||
* operaton is not yet complete this will block until the result
|
* operation is not yet complete this will block until the result
|
||||||
* is available.
|
* is available.
|
||||||
* @return The result of the operation.
|
* @return The result of the operation.
|
||||||
*/
|
*/
|
||||||
@ -495,7 +495,7 @@ public:
|
|||||||
/**
|
/**
|
||||||
* Gets the response from a connect operation.
|
* Gets the response from a connect operation.
|
||||||
* This returns the result of the completed operation. If the
|
* This returns the result of the completed operation. If the
|
||||||
* operaton is not yet complete this will block until the result
|
* operation is not yet complete this will block until the result
|
||||||
* is available.
|
* is available.
|
||||||
* @return The result of the operation.
|
* @return The result of the operation.
|
||||||
*/
|
*/
|
||||||
|
@ -48,7 +48,7 @@ class topic
|
|||||||
string name_;
|
string name_;
|
||||||
/** The default QoS */
|
/** The default QoS */
|
||||||
int qos_;
|
int qos_;
|
||||||
/** The default retined flag */
|
/** The default retained flag */
|
||||||
bool retained_;
|
bool retained_;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
@ -90,6 +90,13 @@ public:
|
|||||||
* @return The name of the topic.
|
* @return The name of the topic.
|
||||||
*/
|
*/
|
||||||
const string& get_name() const { return name_; }
|
const string& get_name() const { return name_; }
|
||||||
|
/**
|
||||||
|
* Splits a topic string into individual fields.
|
||||||
|
*
|
||||||
|
* @param topic A slash-delimited MQTT topic string.
|
||||||
|
* @return A vector containing the fields of the topic.
|
||||||
|
*/
|
||||||
|
static std::vector<std::string> split(const std::string& topic);
|
||||||
/**
|
/**
|
||||||
* Gets the default quality of service for this topic.
|
* Gets the default quality of service for this topic.
|
||||||
* @return The default quality of service for this topic.
|
* @return The default quality of service for this topic.
|
||||||
@ -172,6 +179,60 @@ using topic_ptr = topic::ptr_t ;
|
|||||||
/** A smart/shared pointer to a const topic object. */
|
/** A smart/shared pointer to a const topic object. */
|
||||||
using const_topic_ptr = topic::const_ptr_t ;
|
using const_topic_ptr = topic::const_ptr_t ;
|
||||||
|
|
||||||
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Topic Filter
|
||||||
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An MQTT topic filter.
|
||||||
|
*
|
||||||
|
* This is a multi-field string, delimited by forward slashes, '/', in which
|
||||||
|
* fields can contain the wildcards:
|
||||||
|
*
|
||||||
|
* '+' - Matches a single field
|
||||||
|
* '#' - Matches all subsequent fields (must be last field in filter)
|
||||||
|
*
|
||||||
|
* It can be used to match against specific topics.
|
||||||
|
*/
|
||||||
|
class topic_filter
|
||||||
|
{
|
||||||
|
/** We store the filter as a vector of the individual fields. */
|
||||||
|
std::vector<string> fields_;
|
||||||
|
|
||||||
|
public:
|
||||||
|
/**
|
||||||
|
* Creates a new topic filter.
|
||||||
|
*
|
||||||
|
* @param filter A string MQTT topic filter. This is a slash ('/')
|
||||||
|
* delimited topic string that can contain wildcards
|
||||||
|
* '+' and '#'.
|
||||||
|
*/
|
||||||
|
explicit topic_filter(const string& filter);
|
||||||
|
/**
|
||||||
|
* Determines if the specified topic filter contains any wildcards.
|
||||||
|
*
|
||||||
|
* @param filter The topic filter string to check for wildcards.
|
||||||
|
* @return @em true if any of the fields contain a wildcard, @em false
|
||||||
|
* if not.
|
||||||
|
*/
|
||||||
|
static bool has_wildcards(const string& topic);
|
||||||
|
/**
|
||||||
|
* Determines if this topic filter contains any wildcards.
|
||||||
|
*
|
||||||
|
* @return @em true if any of the fields contain a wildcard, @em false
|
||||||
|
* if not.
|
||||||
|
*/
|
||||||
|
bool has_wildcards() const;
|
||||||
|
/**
|
||||||
|
* Determine if the topic matches this filter.
|
||||||
|
*
|
||||||
|
* @param topic An MQTT topic. It should not contain wildcards.
|
||||||
|
* @return @em true of the topic matches this filter, @em false
|
||||||
|
* otherwise.
|
||||||
|
*/
|
||||||
|
bool matches(const string& topic) const;
|
||||||
|
};
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
// end namespace mqtt
|
// end namespace mqtt
|
||||||
}
|
}
|
||||||
|
442
src/mqtt/topic_matcher.h
Normal file
442
src/mqtt/topic_matcher.h
Normal file
@ -0,0 +1,442 @@
|
|||||||
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @file topic_matcher.h
|
||||||
|
/// Declaration of MQTT topic_matcher class
|
||||||
|
/// @date April 23, 2022
|
||||||
|
/// @author Frank Pagliughi
|
||||||
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
* Copyright (c) 2022 Frank Pagliughi <fpagliughi@mindspring.com>
|
||||||
|
*
|
||||||
|
* All rights reserved. This program and the accompanying materials
|
||||||
|
* are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
* and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||||
|
*
|
||||||
|
* The Eclipse Public License is available at
|
||||||
|
* http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
* and the Eclipse Distribution License is available at
|
||||||
|
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||||||
|
*
|
||||||
|
* Contributors:
|
||||||
|
* Frank Pagliughi - initial implementation and documentation
|
||||||
|
*******************************************************************************/
|
||||||
|
|
||||||
|
#ifndef __mqtt_topic_matcher_h
|
||||||
|
#define __mqtt_topic_matcher_h
|
||||||
|
|
||||||
|
#include "mqtt/types.h"
|
||||||
|
#include "mqtt/topic.h"
|
||||||
|
#include <vector>
|
||||||
|
#include <map>
|
||||||
|
#include <forward_list>
|
||||||
|
#include <initializer_list>
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
|
// The make_unique<>() template functions from the original std proposal:
|
||||||
|
// https://isocpp.org/files/papers/N3656.txt
|
||||||
|
|
||||||
|
#if __cplusplus < 201402L
|
||||||
|
namespace std {
|
||||||
|
template<class T> struct _Unique_if {
|
||||||
|
typedef unique_ptr<T> _Single_object;
|
||||||
|
};
|
||||||
|
|
||||||
|
template<class T> struct _Unique_if<T[]> {
|
||||||
|
typedef unique_ptr<T[]> _Unknown_bound;
|
||||||
|
};
|
||||||
|
|
||||||
|
template<class T, size_t N> struct _Unique_if<T[N]> {
|
||||||
|
typedef void _Known_bound;
|
||||||
|
};
|
||||||
|
|
||||||
|
template<class T, class... Args>
|
||||||
|
typename _Unique_if<T>::_Single_object
|
||||||
|
make_unique(Args&&... args) {
|
||||||
|
return unique_ptr<T>(new T(std::forward<Args>(args)...));
|
||||||
|
}
|
||||||
|
|
||||||
|
template<class T>
|
||||||
|
typename _Unique_if<T>::_Unknown_bound
|
||||||
|
make_unique(size_t n) {
|
||||||
|
typedef typename remove_extent<T>::type U;
|
||||||
|
return unique_ptr<T>(new U[n]());
|
||||||
|
}
|
||||||
|
|
||||||
|
template<class T, class... Args>
|
||||||
|
typename _Unique_if<T>::_Known_bound
|
||||||
|
make_unique(Args&&...) = delete;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
namespace mqtt {
|
||||||
|
|
||||||
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This can be used to get an iterator to all items that have a filter that
|
||||||
|
* matches a topic. To test against a single filter, see
|
||||||
|
* [`TopicFilter`](crate::TopicFilter). This collection is more commonly
|
||||||
|
* used when there are a number of filters and each needs to be associated
|
||||||
|
* with a particular action or piece of data. Note, though, that a single
|
||||||
|
* incoming topic could match against several items in the collection. For
|
||||||
|
* example, the topic:
|
||||||
|
* data/temperature/engine
|
||||||
|
*
|
||||||
|
* Could match against the filters:
|
||||||
|
* data/temperature/# data/+/engine
|
||||||
|
*
|
||||||
|
* Thus, the collection gives an iterator for the items matching a topic.
|
||||||
|
*
|
||||||
|
* A common use for this would be to store callbacks to process incoming
|
||||||
|
* messages based on topics.
|
||||||
|
*
|
||||||
|
* This code was adapted from the Eclipse Python `MQTTMatcher` class:
|
||||||
|
*
|
||||||
|
<https://github.com/eclipse/paho.mqtt.python/blob/master/src/paho/mqtt/matcher.py>
|
||||||
|
*
|
||||||
|
* which use a prefix tree (trie) to store the values.
|
||||||
|
*
|
||||||
|
* For example, if you had a `topic_mapper<int>` and you inserted:
|
||||||
|
* insert({"some/random/topic", 42})
|
||||||
|
* insert({"some/#", 99})
|
||||||
|
* insert({"some/+/topic", 33})
|
||||||
|
*
|
||||||
|
* The collection would be built like:
|
||||||
|
* "some" -> <null>
|
||||||
|
* "random" -> <null>
|
||||||
|
* "topic" -> <42>
|
||||||
|
* "#" -> <99>
|
||||||
|
* "+" -> <null>
|
||||||
|
* "topic" -> <33>
|
||||||
|
*/
|
||||||
|
template <typename T>
|
||||||
|
class topic_matcher
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
using key_type = string;
|
||||||
|
using mapped_type = T;
|
||||||
|
using value_type = std::pair<key_type, mapped_type>;
|
||||||
|
using reference = value_type;
|
||||||
|
using const_reference = const value_type&;
|
||||||
|
|
||||||
|
private:
|
||||||
|
/**
|
||||||
|
* The nodes of the collection.
|
||||||
|
*/
|
||||||
|
struct node
|
||||||
|
{
|
||||||
|
using ptr_t = std::unique_ptr<node>;
|
||||||
|
using map_t = std::map<string, ptr_t>;
|
||||||
|
|
||||||
|
/** The value that matches the topic at this node, if any */
|
||||||
|
std::unique_ptr<value_type> content;
|
||||||
|
/** Child nodes mapped by the next field of the topic */
|
||||||
|
map_t children;
|
||||||
|
|
||||||
|
static ptr_t create() {
|
||||||
|
return std::make_unique<node>();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
using node_ptr = typename node::ptr_t;
|
||||||
|
using node_map = typename node::map_t;
|
||||||
|
|
||||||
|
/** The root node of the collection */
|
||||||
|
node_ptr root_;
|
||||||
|
|
||||||
|
public:
|
||||||
|
class iterator {
|
||||||
|
/**
|
||||||
|
* Information about a node to search.
|
||||||
|
*/
|
||||||
|
struct search_node {
|
||||||
|
/** The current node being searched. null means end. */
|
||||||
|
node* node_;
|
||||||
|
/** The fields of the topic still to be searched. */
|
||||||
|
std::forward_list<string> syms_;
|
||||||
|
|
||||||
|
search_node() : node_(nullptr) {}
|
||||||
|
search_node(node* nd, const std::forward_list<string>& sy)
|
||||||
|
: node_(nd), syms_(sy) {}
|
||||||
|
search_node(node* nd, std::forward_list<string>&& sy)
|
||||||
|
: node_(nd), syms_(std::move(sy)) {}
|
||||||
|
};
|
||||||
|
|
||||||
|
/** The last-found value */
|
||||||
|
value_type* pval_;
|
||||||
|
/** The current search node */
|
||||||
|
search_node snode_;
|
||||||
|
/** The nodes still to be checked */
|
||||||
|
std::vector<search_node> nodes_;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Move the next iterator to the next value, or to end(), if none
|
||||||
|
* left.
|
||||||
|
*
|
||||||
|
* This will keep recursing until it finds a matching node that
|
||||||
|
* contains a value or it reaches the end.
|
||||||
|
*/
|
||||||
|
void next() {
|
||||||
|
pval_ = nullptr;
|
||||||
|
|
||||||
|
// Can't move if we're already at the end
|
||||||
|
if (!snode_.node_)
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (snode_.syms_.empty()) {
|
||||||
|
pval_ = snode_.node_->content.get();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
typename node_map::iterator child;
|
||||||
|
auto map_end = snode_.node_->children.end();
|
||||||
|
auto sym = snode_.syms_.front();
|
||||||
|
|
||||||
|
if ((child = snode_.node_->children.find(sym)) != map_end) {
|
||||||
|
auto syms = snode_.syms_;
|
||||||
|
syms.pop_front();
|
||||||
|
nodes_.push_back({child->second.get(), std::move(syms)});
|
||||||
|
}
|
||||||
|
if ((child = snode_.node_->children.find("+")) != map_end) {
|
||||||
|
auto syms = snode_.syms_;
|
||||||
|
syms.pop_front();
|
||||||
|
nodes_.push_back({child->second.get(), std::move(syms)});
|
||||||
|
}
|
||||||
|
if ((child = snode_.node_->children.find("#")) != map_end) {
|
||||||
|
pval_ = child->second->content.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!nodes_.empty()) {
|
||||||
|
// TODO: List pop_front()?
|
||||||
|
snode_ = nodes_.back();
|
||||||
|
nodes_.pop_back();
|
||||||
|
if (!pval_) {
|
||||||
|
// Recurse
|
||||||
|
return this->next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
snode_ = search_node();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
friend class topic_matcher;
|
||||||
|
|
||||||
|
iterator() : pval_(nullptr) {}
|
||||||
|
iterator(value_type* pval) : pval_(pval) {}
|
||||||
|
iterator(node* root, const string& topic) : pval_(nullptr) {
|
||||||
|
auto v = topic::split(topic);
|
||||||
|
std::forward_list<string> syms(v.begin(), v.end());
|
||||||
|
snode_ = search_node(root, std::move(syms));
|
||||||
|
next();
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
/**
|
||||||
|
* Gets a reference to the current value.
|
||||||
|
* @return A reference to the current value.
|
||||||
|
*/
|
||||||
|
reference operator*() noexcept {
|
||||||
|
return *pval_;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Gets a const reference to the current value.
|
||||||
|
* @return A const reference to the current value.
|
||||||
|
*/
|
||||||
|
const_reference operator*() const noexcept {
|
||||||
|
return *pval_;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Get a pointer to the current value.
|
||||||
|
* @return A pointer to the current value.
|
||||||
|
*/
|
||||||
|
value_type* operator->() noexcept {
|
||||||
|
return pval_;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Get a const pointer to the current value.
|
||||||
|
* @return A const pointer to the current value.
|
||||||
|
*/
|
||||||
|
const value_type* operator->() const noexcept {
|
||||||
|
return pval_;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Postfix increment operator.
|
||||||
|
* @return An iterator pointing to the previous matching item.
|
||||||
|
*/
|
||||||
|
iterator operator++(int) noexcept {
|
||||||
|
auto tmp = *this;
|
||||||
|
this->next();
|
||||||
|
return tmp;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Prefix increment operator.
|
||||||
|
* @return An iterator pointing to the next matching item.
|
||||||
|
*/
|
||||||
|
iterator& operator++() noexcept {
|
||||||
|
this->next();
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Compares two iterators to see if they don't refer to the same
|
||||||
|
* node.
|
||||||
|
*
|
||||||
|
* @param other The other iterator to compare against this one.
|
||||||
|
* @return @em true if they don't match, @em false if they do
|
||||||
|
*/
|
||||||
|
bool operator!=(const iterator& other) const noexcept {
|
||||||
|
// TODO: Is this sufficient in the long run?
|
||||||
|
return pval_ != other.pval_ || snode_.node_ != other.snode_.node_;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A const iterator.
|
||||||
|
*/
|
||||||
|
class const_iterator : public iterator {
|
||||||
|
using base = iterator;
|
||||||
|
|
||||||
|
friend class topic_matcher;
|
||||||
|
const_iterator(iterator it) : base(it) {}
|
||||||
|
|
||||||
|
public:
|
||||||
|
/**
|
||||||
|
* Gets a const reference to the current value.
|
||||||
|
* @return A const reference to the current value.
|
||||||
|
*/
|
||||||
|
const_reference operator*() const noexcept {
|
||||||
|
return base::operator*();
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Get a const pointer to the current value.
|
||||||
|
* @return A const pointer to the current value.
|
||||||
|
*/
|
||||||
|
const value_type* operator->() const noexcept {
|
||||||
|
return base::operator->();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates new, empty collection.
|
||||||
|
*/
|
||||||
|
topic_matcher()
|
||||||
|
: root_(node::create()) {}
|
||||||
|
/**
|
||||||
|
* Creates a new collection with a list of key/value pairs.
|
||||||
|
*
|
||||||
|
* This can be used to create a connection from a table of entries, as
|
||||||
|
* key/value pairs, like:
|
||||||
|
*
|
||||||
|
* topic_matcher<int> matcher {
|
||||||
|
* { "#", -1 },
|
||||||
|
* { "some/random/topic", 42 },
|
||||||
|
* { "some/#", 99 }
|
||||||
|
* }
|
||||||
|
*
|
||||||
|
* @param lst The list of key/value pairs to populate the collection.
|
||||||
|
*/
|
||||||
|
topic_matcher(std::initializer_list<value_type> lst)
|
||||||
|
: root_(node::create()) {
|
||||||
|
for (const auto& v : lst) {
|
||||||
|
insert(v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Inserts a new key/value pair into the collection.
|
||||||
|
* @param val The value to place in the collection.
|
||||||
|
*/
|
||||||
|
void insert(value_type&& val) {
|
||||||
|
auto nd = root_.get();
|
||||||
|
auto fields = topic::split(val.first);
|
||||||
|
|
||||||
|
for (auto& field : fields) {
|
||||||
|
auto it = nd->children.find(field);
|
||||||
|
if (it == nd->children.end()) {
|
||||||
|
nd->children[field] = node::create();
|
||||||
|
it = nd->children.find(field);
|
||||||
|
}
|
||||||
|
nd = it->second.get();
|
||||||
|
}
|
||||||
|
nd->content = std::make_unique<value_type>(std::move(val));
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Inserts a new value into the collection.
|
||||||
|
* @param key The topic/filter entry
|
||||||
|
* @param val The value to associate with that entry.
|
||||||
|
*/
|
||||||
|
void insert(const value_type& val) {
|
||||||
|
value_type v { val };
|
||||||
|
this->insert(std::move(v));
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Gets a pointer to the value at the requested key.
|
||||||
|
* @param key The topic/filter entry to find.
|
||||||
|
* @return An iterator to the value if found, @em end() if not found.
|
||||||
|
*/
|
||||||
|
iterator find(const key_type& key) {
|
||||||
|
auto nd = root_.get();
|
||||||
|
auto fields = topic::split(key);
|
||||||
|
|
||||||
|
for (auto& field : fields) {
|
||||||
|
auto it = nd->children.find(field);
|
||||||
|
if (it == nd->children.end())
|
||||||
|
return end();
|
||||||
|
|
||||||
|
nd = it->second.get();
|
||||||
|
}
|
||||||
|
return iterator{ nd->content.get() };
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Gets a const pointer to the value at the requested key.
|
||||||
|
* @param key The topic/filter entry to find.
|
||||||
|
* @return A const pointer to the value if found, @em nullptr if not
|
||||||
|
* found.
|
||||||
|
*/
|
||||||
|
const_iterator find(const key_type& key) const {
|
||||||
|
return const_cast<topic_matcher*>(this)->find(key);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Gets an iterator that can find the matches to the topic.
|
||||||
|
* @param topic The topic to search for matches.
|
||||||
|
* @return An iterator that can find the matches to the topic.
|
||||||
|
*/
|
||||||
|
iterator matches(const string& topic) {
|
||||||
|
return iterator(root_.get(), topic);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Gets a const iterator that can find the matches to the topic.
|
||||||
|
* @param topic The topic to search for matches.
|
||||||
|
* @return A const iterator that can find the matches to the topic.
|
||||||
|
*/
|
||||||
|
const_iterator matches(const string& topic) const {
|
||||||
|
return iterator(root_.get(), topic);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Gets an iterator for the end of the collection.
|
||||||
|
*
|
||||||
|
* This simply returns an empty/null iterator which we can use to signal
|
||||||
|
* the end of the collection.
|
||||||
|
*
|
||||||
|
* @return An empty/null iterator indicating the end of the collection.
|
||||||
|
*/
|
||||||
|
const_iterator end() const noexcept {
|
||||||
|
return iterator {};
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Gets an iterator for the end of the collection.
|
||||||
|
*
|
||||||
|
* This simply returns an empty/null iterator which we can use to signal
|
||||||
|
* the end of the collection.
|
||||||
|
*
|
||||||
|
* @return An empty/null iterator indicating the end of the collection.
|
||||||
|
*/
|
||||||
|
const_iterator cend() const noexcept {
|
||||||
|
return iterator {};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
// end namespace mqtt
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif // __mqtt_topic_matcher_h
|
||||||
|
|
@ -21,7 +21,7 @@
|
|||||||
// mqttpp_chat <user> <group>
|
// mqttpp_chat <user> <group>
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
* Copyright (c) 2019 Frank Pagliughi <fpagliughi@mindspring.com>
|
* Copyright (c) 2019-2022 Frank Pagliughi <fpagliughi@mindspring.com>
|
||||||
*
|
*
|
||||||
* All rights reserved. This program and the accompanying materials
|
* All rights reserved. This program and the accompanying materials
|
||||||
* are made available under the terms of the Eclipse Public License v1.0
|
* are made available under the terms of the Eclipse Public License v1.0
|
||||||
@ -68,27 +68,24 @@ int main(int argc, char* argv[])
|
|||||||
chatGroup { argv[2] },
|
chatGroup { argv[2] },
|
||||||
chatTopic { "chat/"+chatGroup };
|
chatTopic { "chat/"+chatGroup };
|
||||||
|
|
||||||
|
mqtt::async_client cli(SERVER_ADDRESS, "",
|
||||||
|
mqtt::create_options(MQTTVERSION_5));
|
||||||
|
|
||||||
// LWT message is broadcast to other users if out connection is lost
|
// LWT message is broadcast to other users if out connection is lost
|
||||||
|
|
||||||
auto lwt = mqtt::message(chatTopic, "<<<"+chatUser+" was disconnected>>>", QOS, false);
|
auto lwt = mqtt::message(chatTopic, "<<<"+chatUser+" was disconnected>>>", QOS, false);
|
||||||
|
|
||||||
// Set up the connect options
|
// Set up the connect options
|
||||||
|
|
||||||
mqtt::properties connectProperties{
|
|
||||||
{mqtt::property::SESSION_EXPIRY_INTERVAL, 604800}
|
|
||||||
};
|
|
||||||
|
|
||||||
auto connOpts = mqtt::connect_options_builder()
|
auto connOpts = mqtt::connect_options_builder()
|
||||||
.mqtt_version(MQTTVERSION_5)
|
.properties({
|
||||||
.properties(connectProperties)
|
{mqtt::property::SESSION_EXPIRY_INTERVAL, 604800}
|
||||||
.clean_start(true)
|
})
|
||||||
|
.clean_start(false)
|
||||||
.will(std::move(lwt))
|
.will(std::move(lwt))
|
||||||
.keep_alive_interval(std::chrono::seconds(20))
|
.keep_alive_interval(std::chrono::seconds(20))
|
||||||
.finalize();
|
.finalize();
|
||||||
|
|
||||||
mqtt::async_client cli(SERVER_ADDRESS, "",
|
|
||||||
mqtt::create_options(MQTTVERSION_5));
|
|
||||||
|
|
||||||
// Set a callback for connection lost.
|
// Set a callback for connection lost.
|
||||||
// This just exits the app.
|
// This just exits the app.
|
||||||
|
|
||||||
|
@ -15,7 +15,7 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
* Copyright (c) 2019 Frank Pagliughi <fpagliughi@mindspring.com>
|
* Copyright (c) 2019-2022 Frank Pagliughi <fpagliughi@mindspring.com>
|
||||||
*
|
*
|
||||||
* All rights reserved. This program and the accompanying materials
|
* All rights reserved. This program and the accompanying materials
|
||||||
* are made available under the terms of the Eclipse Public License v1.0
|
* are made available under the terms of the Eclipse Public License v1.0
|
||||||
@ -59,19 +59,15 @@ int main(int argc, char* argv[])
|
|||||||
constexpr int QOS = 1;
|
constexpr int QOS = 1;
|
||||||
const string REQ_TOPIC_HDR { "requests/math/" };
|
const string REQ_TOPIC_HDR { "requests/math/" };
|
||||||
|
|
||||||
|
// Create a client using MQTT v5
|
||||||
mqtt::create_options createOpts(MQTTVERSION_5);
|
mqtt::create_options createOpts(MQTTVERSION_5);
|
||||||
mqtt::async_client cli(SERVER_ADDRESS, "", createOpts);
|
mqtt::async_client cli(SERVER_ADDRESS, "", createOpts);
|
||||||
|
|
||||||
auto connOpts = mqtt::connect_options_builder()
|
|
||||||
.mqtt_version(MQTTVERSION_5)
|
|
||||||
.clean_start()
|
|
||||||
.finalize();
|
|
||||||
|
|
||||||
cli.start_consuming();
|
cli.start_consuming();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
cout << "Connecting..." << flush;
|
cout << "Connecting..." << flush;
|
||||||
mqtt::token_ptr tok = cli.connect(connOpts);
|
mqtt::token_ptr tok = cli.connect(); //connOpts);
|
||||||
auto connRsp = tok->get_connect_response();
|
auto connRsp = tok->get_connect_response();
|
||||||
cout << "OK (" << connRsp.get_server_uri() << ")" << endl;
|
cout << "OK (" << connRsp.get_server_uri() << ")" << endl;
|
||||||
|
|
||||||
|
@ -15,7 +15,7 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
* Copyright (c) 2019 Frank Pagliughi <fpagliughi@mindspring.com>
|
* Copyright (c) 2019-2022 Frank Pagliughi <fpagliughi@mindspring.com>
|
||||||
*
|
*
|
||||||
* All rights reserved. This program and the accompanying materials
|
* All rights reserved. This program and the accompanying materials
|
||||||
* are made available under the terms of the Eclipse Public License v1.0
|
* are made available under the terms of the Eclipse Public License v1.0
|
||||||
@ -95,10 +95,9 @@ int main(int argc, char* argv[])
|
|||||||
mqtt::client cli(SERVER_ADDRESS, CLIENT_ID, createOpts);
|
mqtt::client cli(SERVER_ADDRESS, CLIENT_ID, createOpts);
|
||||||
|
|
||||||
auto connOpts = mqtt::connect_options_builder()
|
auto connOpts = mqtt::connect_options_builder()
|
||||||
.mqtt_version(MQTTVERSION_5)
|
.keep_alive_interval(seconds(20))
|
||||||
.keep_alive_interval(seconds(20))
|
.clean_start()
|
||||||
.clean_start(true)
|
.finalize();
|
||||||
.finalize();
|
|
||||||
|
|
||||||
const vector<string> TOPICS { "requests/math", "requests/math/#" };
|
const vector<string> TOPICS { "requests/math", "requests/math/#" };
|
||||||
const vector<int> QOS { 1, 1 };
|
const vector<int> QOS { 1, 1 };
|
||||||
|
@ -190,7 +190,7 @@ void token::on_failure(MQTTAsync_failureData* rsp)
|
|||||||
complete_ = true;
|
complete_ = true;
|
||||||
g.unlock();
|
g.unlock();
|
||||||
|
|
||||||
// Note: callback always completes before the obect is signaled.
|
// Note: callback always completes before the object is signaled.
|
||||||
if (listener)
|
if (listener)
|
||||||
listener->on_failure(*this);
|
listener->on_failure(*this);
|
||||||
cond_.notify_all();
|
cond_.notify_all();
|
||||||
@ -219,7 +219,7 @@ void token::on_failure5(MQTTAsync_failureData5* rsp)
|
|||||||
complete_ = true;
|
complete_ = true;
|
||||||
g.unlock();
|
g.unlock();
|
||||||
|
|
||||||
// Note: callback always completes before the obect is signaled.
|
// Note: callback always completes before the object is signaled.
|
||||||
if (listener)
|
if (listener)
|
||||||
listener->on_failure(*this);
|
listener->on_failure(*this);
|
||||||
cond_.notify_all();
|
cond_.notify_all();
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
// topic.cpp
|
// topic.cpp
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
* Copyright (c) 2013-2016 Frank Pagliughi <fpagliughi@mindspring.com>
|
* Copyright (c) 2013-2022 Frank Pagliughi <fpagliughi@mindspring.com>
|
||||||
*
|
*
|
||||||
* All rights reserved. This program and the accompanying materials
|
* All rights reserved. This program and the accompanying materials
|
||||||
* are made available under the terms of the Eclipse Public License v1.0
|
* are made available under the terms of the Eclipse Public License v1.0
|
||||||
@ -22,6 +22,30 @@
|
|||||||
namespace mqtt {
|
namespace mqtt {
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
// topic
|
||||||
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
// This is just a string split around '/'
|
||||||
|
std::vector<string> topic::split(const string& s)
|
||||||
|
{
|
||||||
|
std::vector<std::string> v;
|
||||||
|
|
||||||
|
if (s.empty())
|
||||||
|
return v;
|
||||||
|
|
||||||
|
const auto delim = '/';
|
||||||
|
string::size_type startPos = 0, pos;
|
||||||
|
|
||||||
|
do {
|
||||||
|
pos = s.find(delim, startPos);
|
||||||
|
auto n = (pos == string::npos) ? pos : (pos - startPos);
|
||||||
|
v.push_back(s.substr(startPos, n));
|
||||||
|
startPos = pos + 1;
|
||||||
|
}
|
||||||
|
while (pos != string::npos);
|
||||||
|
|
||||||
|
return v;
|
||||||
|
}
|
||||||
|
|
||||||
delivery_token_ptr topic::publish(const void* payload, size_t n)
|
delivery_token_ptr topic::publish(const void* payload, size_t n)
|
||||||
{
|
{
|
||||||
@ -49,6 +73,62 @@ token_ptr topic::subscribe(const subscribe_options& opts)
|
|||||||
return cli_.subscribe(name_, qos_, opts);
|
return cli_.subscribe(name_, qos_, opts);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
// topic_filter
|
||||||
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
topic_filter::topic_filter(const string& filter)
|
||||||
|
: fields_(topic::split(filter))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
bool topic_filter::has_wildcards(const string& filter)
|
||||||
|
{
|
||||||
|
auto n = filter.size();
|
||||||
|
|
||||||
|
if (n == 0)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
// A '#' should only be the last char, if present
|
||||||
|
if (filter[n-1] == '#')
|
||||||
|
return true;
|
||||||
|
|
||||||
|
return filter.find('+') != string::npos;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool topic_filter::has_wildcards() const {
|
||||||
|
for (auto& f : fields_) {
|
||||||
|
if (f == "+" || f == "#")
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// See if the topic matches this filter.
|
||||||
|
// OPTIMIZE: If the filter string doesn't contain any wildcards, then a
|
||||||
|
// match is a simple string comparison. We wouldn't need to split the filter
|
||||||
|
// or topic into fields.
|
||||||
|
bool topic_filter::matches(const string& topic) const
|
||||||
|
{
|
||||||
|
auto n = fields_.size();
|
||||||
|
auto topic_fields = topic::split(topic);
|
||||||
|
|
||||||
|
if (n > topic_fields.size()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (size_t i=0; i<n; ++i) {
|
||||||
|
if (fields_[i] == "#") {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (fields_[i] != "+" && fields_[i] != topic_fields[i]) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
// end namespace mqtt
|
// end namespace mqtt
|
||||||
}
|
}
|
||||||
|
@ -41,8 +41,10 @@ add_executable(unit_tests unit_tests.cpp
|
|||||||
test_properties.cpp
|
test_properties.cpp
|
||||||
test_response_options.cpp
|
test_response_options.cpp
|
||||||
test_string_collection.cpp
|
test_string_collection.cpp
|
||||||
|
test_thread_queue.cpp
|
||||||
test_token.cpp
|
test_token.cpp
|
||||||
test_topic.cpp
|
test_topic.cpp
|
||||||
|
test_topic_matcher.cpp
|
||||||
test_will_options.cpp
|
test_will_options.cpp
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -184,6 +184,15 @@ TEST_CASE("connect_options copy ctor", "[options]")
|
|||||||
REQUIRE(HTTPS_PROXY == opts.get_https_proxy());
|
REQUIRE(HTTPS_PROXY == opts.get_https_proxy());
|
||||||
REQUIRE(opts.get_http_proxy().empty());
|
REQUIRE(opts.get_http_proxy().empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SECTION("properties") {
|
||||||
|
orgOpts.set_properties({{ mqtt::property::SESSION_EXPIRY_INTERVAL, 0 }});
|
||||||
|
|
||||||
|
mqtt::connect_options opts { orgOpts };
|
||||||
|
|
||||||
|
REQUIRE(opts.get_properties().contains(mqtt::property::SESSION_EXPIRY_INTERVAL));
|
||||||
|
REQUIRE(opts.c_struct().connectProperties == &opts.get_properties().c_struct());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ----------------------------------------------------------------------
|
// ----------------------------------------------------------------------
|
||||||
@ -193,30 +202,45 @@ TEST_CASE("connect_options copy ctor", "[options]")
|
|||||||
TEST_CASE("connect_options move_constructor", "[options]")
|
TEST_CASE("connect_options move_constructor", "[options]")
|
||||||
{
|
{
|
||||||
mqtt::connect_options orgOpts { USER, PASSWD };
|
mqtt::connect_options orgOpts { USER, PASSWD };
|
||||||
mqtt::connect_options opts { std::move(orgOpts) };
|
|
||||||
|
|
||||||
REQUIRE(USER == opts.get_user_name());
|
SECTION("simple options") {
|
||||||
REQUIRE(PASSWD == opts.get_password_str());
|
mqtt::connect_options opts { std::move(orgOpts) };
|
||||||
|
|
||||||
const auto& c_struct = opts.c_struct();
|
REQUIRE(USER == opts.get_user_name());
|
||||||
|
REQUIRE(PASSWD == opts.get_password_str());
|
||||||
|
|
||||||
REQUIRE(0 == memcmp(&c_struct.struct_id, CSIG, CSIG_LEN));
|
const auto& c_struct = opts.c_struct();
|
||||||
|
|
||||||
REQUIRE(0 == strcmp(USER.c_str(), c_struct.username));
|
REQUIRE(0 == memcmp(&c_struct.struct_id, CSIG, CSIG_LEN));
|
||||||
REQUIRE(c_struct.password == nullptr);
|
|
||||||
REQUIRE(PASSWD.size() == size_t(c_struct.binarypwd.len));
|
|
||||||
REQUIRE(0 == memcmp(PASSWD.data(), c_struct.binarypwd.data, PASSWD.size()));
|
|
||||||
|
|
||||||
// Make sure it's a true copy, not linked to the original
|
REQUIRE(0 == strcmp(USER.c_str(), c_struct.username));
|
||||||
orgOpts.set_user_name(EMPTY_STR);
|
REQUIRE(c_struct.password == nullptr);
|
||||||
orgOpts.set_password(EMPTY_STR);
|
REQUIRE(PASSWD.size() == size_t(c_struct.binarypwd.len));
|
||||||
|
REQUIRE(0 == memcmp(PASSWD.data(), c_struct.binarypwd.data, PASSWD.size()));
|
||||||
|
|
||||||
REQUIRE(USER == opts.get_user_name());
|
// Make sure it's a true copy, not linked to the original
|
||||||
REQUIRE(PASSWD == opts.get_password_str());
|
orgOpts.set_user_name(EMPTY_STR);
|
||||||
|
orgOpts.set_password(EMPTY_STR);
|
||||||
|
|
||||||
// Check that the original was moved
|
REQUIRE(USER == opts.get_user_name());
|
||||||
REQUIRE(EMPTY_STR == orgOpts.get_user_name());
|
REQUIRE(PASSWD == opts.get_password_str());
|
||||||
REQUIRE(EMPTY_STR == orgOpts.get_password_str());
|
|
||||||
|
// Check that the original was moved
|
||||||
|
REQUIRE(EMPTY_STR == orgOpts.get_user_name());
|
||||||
|
REQUIRE(EMPTY_STR == orgOpts.get_password_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
SECTION("properties") {
|
||||||
|
orgOpts.set_properties({{ mqtt::property::SESSION_EXPIRY_INTERVAL, 0 }});
|
||||||
|
|
||||||
|
mqtt::connect_options opts { std::move(orgOpts) };
|
||||||
|
|
||||||
|
REQUIRE(opts.get_properties().contains(mqtt::property::SESSION_EXPIRY_INTERVAL));
|
||||||
|
REQUIRE(opts.c_struct().connectProperties == &opts.get_properties().c_struct());
|
||||||
|
|
||||||
|
// Check that the original was moved
|
||||||
|
REQUIRE(orgOpts.get_properties().empty());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ----------------------------------------------------------------------
|
// ----------------------------------------------------------------------
|
||||||
@ -435,7 +459,7 @@ TEST_CASE("set_token", "[options]")
|
|||||||
REQUIRE(2*TIMEOUT_SEC == c_struct.connectTimeout);
|
REQUIRE(2*TIMEOUT_SEC == c_struct.connectTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
SECTION("set stervers") {
|
SECTION("set servers") {
|
||||||
opts.set_servers(URIs);
|
opts.set_servers(URIs);
|
||||||
|
|
||||||
REQUIRE(URIs.get() == opts.get_servers().get());
|
REQUIRE(URIs.get() == opts.get_servers().get());
|
||||||
|
@ -76,6 +76,69 @@ TEST_CASE("disconnect_options user constructor", "[options]")
|
|||||||
REQUIRE(tok == opts.get_token());
|
REQUIRE(tok == opts.get_token());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ----------------------------------------------------------------------
|
||||||
|
// Test the copy constructor
|
||||||
|
// ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
TEST_CASE("disconnect_options copy ctor", "[options]")
|
||||||
|
{
|
||||||
|
constexpr std::chrono::milliseconds TIMEOUT { 50 };
|
||||||
|
|
||||||
|
mqtt::disconnect_options orgOpts { TIMEOUT };
|
||||||
|
|
||||||
|
SECTION("simple options") {
|
||||||
|
mqtt::disconnect_options opts { orgOpts };
|
||||||
|
|
||||||
|
REQUIRE(TIMEOUT == opts.get_timeout());
|
||||||
|
|
||||||
|
REQUIRE(opts.get_properties().empty());
|
||||||
|
|
||||||
|
// Make sure it's a true copy, not linked to the original
|
||||||
|
orgOpts.set_timeout(0);
|
||||||
|
REQUIRE(TIMEOUT == opts.get_timeout());
|
||||||
|
}
|
||||||
|
|
||||||
|
SECTION("properties") {
|
||||||
|
orgOpts.set_properties({{ mqtt::property::SESSION_EXPIRY_INTERVAL, 0 }});
|
||||||
|
|
||||||
|
mqtt::disconnect_options opts { orgOpts };
|
||||||
|
|
||||||
|
REQUIRE(opts.get_properties().contains(mqtt::property::SESSION_EXPIRY_INTERVAL));
|
||||||
|
REQUIRE(1 == opts.c_struct().properties.count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----------------------------------------------------------------------
|
||||||
|
// Test the move constructor
|
||||||
|
// ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
TEST_CASE("disconnect_options move_constructor", "[options]")
|
||||||
|
{
|
||||||
|
constexpr std::chrono::milliseconds TIMEOUT { 50 };
|
||||||
|
|
||||||
|
mqtt::disconnect_options orgOpts { TIMEOUT };
|
||||||
|
|
||||||
|
SECTION("simple options") {
|
||||||
|
mqtt::disconnect_options opts { std::move(orgOpts) };
|
||||||
|
|
||||||
|
REQUIRE(TIMEOUT == opts.get_timeout());
|
||||||
|
|
||||||
|
REQUIRE(opts.get_properties().empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
SECTION("properties") {
|
||||||
|
orgOpts.set_properties({{ mqtt::property::SESSION_EXPIRY_INTERVAL, 0 }});
|
||||||
|
|
||||||
|
mqtt::disconnect_options opts { std::move(orgOpts) };
|
||||||
|
|
||||||
|
REQUIRE(opts.get_properties().contains(mqtt::property::SESSION_EXPIRY_INTERVAL));
|
||||||
|
REQUIRE(1 == opts.c_struct().properties.count);
|
||||||
|
|
||||||
|
// Check that the original was moved
|
||||||
|
REQUIRE(orgOpts.get_properties().empty());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ----------------------------------------------------------------------
|
// ----------------------------------------------------------------------
|
||||||
// Test set timeout
|
// Test set timeout
|
||||||
// ----------------------------------------------------------------------
|
// ----------------------------------------------------------------------
|
||||||
@ -99,7 +162,7 @@ TEST_CASE("disconnect_options set timeout", "[options]")
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ----------------------------------------------------------------------
|
// ----------------------------------------------------------------------
|
||||||
// Test set contect token
|
// Test set connect token
|
||||||
// ----------------------------------------------------------------------
|
// ----------------------------------------------------------------------
|
||||||
|
|
||||||
TEST_CASE("disconnect_options set token", "[options]")
|
TEST_CASE("disconnect_options set token", "[options]")
|
||||||
|
89
test/unit/test_thread_queue.cpp
Normal file
89
test/unit/test_thread_queue.cpp
Normal file
@ -0,0 +1,89 @@
|
|||||||
|
// test_thread_queue.cpp
|
||||||
|
//
|
||||||
|
// Unit tests for the thread_queue class in the Paho MQTT C++ library.
|
||||||
|
//
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
* Copyright (c) 2022 Frank Pagliughi <fpagliughi@mindspring.com>
|
||||||
|
*
|
||||||
|
* All rights reserved. This program and the accompanying materials
|
||||||
|
* are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
* and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||||
|
*
|
||||||
|
* The Eclipse Public License is available at
|
||||||
|
* http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
* and the Eclipse Distribution License is available at
|
||||||
|
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||||||
|
*
|
||||||
|
* Contributors:
|
||||||
|
* Frank Pagliughi - Initial implementation
|
||||||
|
*******************************************************************************/
|
||||||
|
|
||||||
|
#define UNIT_TESTS
|
||||||
|
|
||||||
|
#include "catch2/catch.hpp"
|
||||||
|
#include "mqtt/types.h"
|
||||||
|
#include "mqtt/thread_queue.h"
|
||||||
|
|
||||||
|
#include <thread>
|
||||||
|
#include <future>
|
||||||
|
#include <chrono>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
using namespace mqtt;
|
||||||
|
using namespace std::chrono;
|
||||||
|
|
||||||
|
TEST_CASE("que put/get", "[thread_queue]")
|
||||||
|
{
|
||||||
|
thread_queue<int> que;
|
||||||
|
|
||||||
|
que.put(1);
|
||||||
|
que.put(2);
|
||||||
|
REQUIRE(que.get() == 1);
|
||||||
|
|
||||||
|
que.put(3);
|
||||||
|
REQUIRE(que.get() == 2);
|
||||||
|
REQUIRE(que.get() == 3);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("que mt put/get", "[thread_queue]")
|
||||||
|
{
|
||||||
|
thread_queue<string> que;
|
||||||
|
const size_t N = 1000000;
|
||||||
|
const size_t N_THR = 2;
|
||||||
|
|
||||||
|
auto producer = [&que]() {
|
||||||
|
string s;
|
||||||
|
for (size_t i=0; i<512; ++i)
|
||||||
|
s.push_back('a' + i%26);
|
||||||
|
|
||||||
|
for (size_t i=0; i<N; ++i)
|
||||||
|
que.put(s);
|
||||||
|
};
|
||||||
|
|
||||||
|
auto consumer = [&que]() {
|
||||||
|
string s;
|
||||||
|
bool ok = true;
|
||||||
|
for (size_t i=0; i<N && ok; ++i) {
|
||||||
|
ok = que.try_get_for(&s, seconds{1});
|
||||||
|
}
|
||||||
|
return ok;
|
||||||
|
};
|
||||||
|
|
||||||
|
std::vector<std::thread> producers;
|
||||||
|
std::vector<std::future<bool>> consumers;
|
||||||
|
|
||||||
|
for (size_t i=0; i<N_THR; ++i)
|
||||||
|
producers.push_back(std::thread(producer));
|
||||||
|
|
||||||
|
for (size_t i=0; i<N_THR; ++i)
|
||||||
|
consumers.push_back(std::async(consumer));
|
||||||
|
|
||||||
|
for (size_t i=0; i<N_THR; ++i)
|
||||||
|
producers[i].join();
|
||||||
|
|
||||||
|
for (size_t i=0; i<N_THR; ++i) {
|
||||||
|
REQUIRE(consumers[i].get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -4,7 +4,7 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
* Copyright (c) 2020 Frank Pagliughi <fpagliughi@mindspring.com>
|
* Copyright (c) 2020-2022 Frank Pagliughi <fpagliughi@mindspring.com>
|
||||||
*
|
*
|
||||||
* All rights reserved. This program and the accompanying materials
|
* All rights reserved. This program and the accompanying materials
|
||||||
* are made available under the terms of the Eclipse Public License v1.0
|
* are made available under the terms of the Eclipse Public License v1.0
|
||||||
@ -102,6 +102,16 @@ TEST_CASE("get/set", "[topic]")
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_CASE("split", "[topic]")
|
||||||
|
{
|
||||||
|
auto v = topic::split(TOPIC);
|
||||||
|
|
||||||
|
REQUIRE(3 == v.size());
|
||||||
|
REQUIRE("my" == v[0]);
|
||||||
|
REQUIRE("topic" == v[1]);
|
||||||
|
REQUIRE("name" == v[2]);
|
||||||
|
}
|
||||||
|
|
||||||
// ----------------------------------------------------------------------
|
// ----------------------------------------------------------------------
|
||||||
// Publish
|
// Publish
|
||||||
// ----------------------------------------------------------------------
|
// ----------------------------------------------------------------------
|
||||||
@ -178,3 +188,47 @@ TEST_CASE("publish full binary", "[topic]")
|
|||||||
REQUIRE(RETAINED == msg->is_retained());
|
REQUIRE(RETAINED == msg->is_retained());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
// topic_filter
|
||||||
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
TEST_CASE("has_wildcards", "[topic_filter]")
|
||||||
|
{
|
||||||
|
REQUIRE(!topic_filter::has_wildcards(TOPIC));
|
||||||
|
|
||||||
|
REQUIRE(topic_filter::has_wildcards("some/wild/+/topic"));
|
||||||
|
REQUIRE(topic_filter::has_wildcards("some/multi/wild/#"));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("matches", "[topic_filter]")
|
||||||
|
{
|
||||||
|
SECTION("no_wildcards") {
|
||||||
|
topic_filter filt { TOPIC };
|
||||||
|
|
||||||
|
REQUIRE(filt.matches(TOPIC));
|
||||||
|
REQUIRE(!filt.matches("some/other/topic"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test single-level wildcard, '+'
|
||||||
|
SECTION("single_wildcard") {
|
||||||
|
topic_filter filt { "my/+/name" };
|
||||||
|
|
||||||
|
REQUIRE(filt.matches("my/topic/name"));
|
||||||
|
REQUIRE(filt.matches("my/other/name"));
|
||||||
|
REQUIRE(!filt.matches("my/other/id"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test multi-level wildcard, '#'
|
||||||
|
SECTION("multi_wildcard") {
|
||||||
|
topic_filter filt { "my/topic/#" };
|
||||||
|
|
||||||
|
REQUIRE(filt.matches("my/topic/name"));
|
||||||
|
REQUIRE(filt.matches("my/topic/id"));
|
||||||
|
REQUIRE(filt.matches("my/topic/name/and/id"));
|
||||||
|
|
||||||
|
REQUIRE(!filt.matches("my/other/name"));
|
||||||
|
REQUIRE(!filt.matches("my/other/id"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
64
test/unit/test_topic_matcher.cpp
Normal file
64
test/unit/test_topic_matcher.cpp
Normal file
@ -0,0 +1,64 @@
|
|||||||
|
// test_topic_matcher.cpp
|
||||||
|
//
|
||||||
|
// Unit tests for the topic_matcher class in the Paho MQTT C++ library.
|
||||||
|
//
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
* Copyright (c) 2022 Frank Pagliughi <fpagliughi@mindspring.com>
|
||||||
|
*
|
||||||
|
* All rights reserved. This program and the accompanying materials
|
||||||
|
* are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
* and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||||
|
*
|
||||||
|
* The Eclipse Public License is available at
|
||||||
|
* http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
* and the Eclipse Distribution License is available at
|
||||||
|
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||||||
|
*
|
||||||
|
*******************************************************************************/
|
||||||
|
|
||||||
|
#define UNIT_TESTS
|
||||||
|
|
||||||
|
#include "catch2/catch.hpp"
|
||||||
|
#include "mqtt/topic_matcher.h"
|
||||||
|
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
using namespace mqtt;
|
||||||
|
|
||||||
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
TEST_CASE("insert/get", "[topic_matcher]")
|
||||||
|
{
|
||||||
|
topic_matcher<int> matcher;
|
||||||
|
|
||||||
|
matcher.insert({"some/random/topic", 42});
|
||||||
|
|
||||||
|
auto it = matcher.find("some/random/topic");
|
||||||
|
|
||||||
|
REQUIRE(it != matcher.end());
|
||||||
|
REQUIRE(it->first == "some/random/topic");
|
||||||
|
REQUIRE(it->second == 42);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("matcher matches", "[topic_matcher]")
|
||||||
|
{
|
||||||
|
topic_matcher<int> matcher {
|
||||||
|
{ "some/random/topic", 42 },
|
||||||
|
{ "some/#", 99 },
|
||||||
|
{ "some/other/topic", 55 },
|
||||||
|
{ "some/+/topic", 33 }
|
||||||
|
};
|
||||||
|
|
||||||
|
auto it = matcher.matches("some/random/topic");
|
||||||
|
|
||||||
|
for ( ; it != matcher.end(); ++it) {
|
||||||
|
std::cout << "Matcher got: '" << it->first << "' -> " << it->second << std::endl;
|
||||||
|
bool ok = (
|
||||||
|
(it->first == "some/random/topic" && it->second == 42) ||
|
||||||
|
(it->first == "some/#" && it->second == 99) ||
|
||||||
|
(it-> first == "some/+/topic" && it->second == 33)
|
||||||
|
);
|
||||||
|
REQUIRE(ok);
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user