mirror of
https://github.com/eclipse/paho.mqtt.cpp.git
synced 2025-05-10 03:39:07 +08:00
#410 Added 'shutdown_event' and reworked consumer to prevent propagating exceptions on shutdown.
This commit is contained in:
parent
1e7d090229
commit
df82ee4378
@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
|||||||
- [#503](https://github.com/eclipse-paho/paho.mqtt.cpp/issues/503) Fixed issue that generated docs were empty.
|
- [#503](https://github.com/eclipse-paho/paho.mqtt.cpp/issues/503) Fixed issue that generated docs were empty.
|
||||||
- [#518](https://github.com/eclipse-paho/paho.mqtt.cpp/pull/518) Add function for checking async consumer event queue size
|
- [#518](https://github.com/eclipse-paho/paho.mqtt.cpp/pull/518) Add function for checking async consumer event queue size
|
||||||
- [#519](https://github.com/eclipse-paho/paho.mqtt.cpp/pull/519) Fix potential deadlock in set_callback
|
- [#519](https://github.com/eclipse-paho/paho.mqtt.cpp/pull/519) Fix potential deadlock in set_callback
|
||||||
|
- [#524](https://github.com/eclipse-paho/paho.mqtt.cpp/issues/524) Fixed copy and move operations for 'subscribe_options'. Added unit tests.
|
||||||
|
|
||||||
|
|
||||||
## [Version 1.4.1](https://github.com/eclipse/paho.mqtt.cpp/compare/v1.4.0..v1.4.1) - (2024-07-09)
|
## [Version 1.4.1](https://github.com/eclipse/paho.mqtt.cpp/compare/v1.4.0..v1.4.1) - (2024-07-09)
|
||||||
|
@ -21,18 +21,20 @@
|
|||||||
// processing, perhaps based on the topics. It could be common, however, to
|
// processing, perhaps based on the topics. It could be common, however, to
|
||||||
// want to have multiple threads for publishing.
|
// want to have multiple threads for publishing.
|
||||||
//
|
//
|
||||||
// The sample demonstrates:
|
// This example demonstrates:
|
||||||
// - Creating a client and accessing it from a shared_ptr<>
|
// - Creating a client and sharing it across threads using a shared_ptr<>
|
||||||
// - Using one thread to receive incoming messages from the broker and
|
// - Using one thread to receive incoming messages from the broker and
|
||||||
// another thread to publish messages to it.
|
// another thread to publish messages to it.
|
||||||
// - Connecting to an MQTT server/broker.
|
// - Connecting to an MQTT server/broker.
|
||||||
// - Subscribing to a topic
|
// - Automatic reconnect
|
||||||
// - Using the asynchronous consumer
|
// - Publishing messages
|
||||||
// - Publishing messages.
|
// - Subscribing to multiple topics
|
||||||
|
// - Using the asynchronous message consumer
|
||||||
|
// - Signaling consumer from another thread
|
||||||
//
|
//
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
* Copyright (c) 2020-2023 Frank Pagliughi <fpagliughi@mindspring.com>
|
* Copyright (c) 2020-2025 Frank Pagliughi <fpagliughi@mindspring.com>
|
||||||
*
|
*
|
||||||
* All rights reserved. This program and the accompanying materials
|
* All rights reserved. This program and the accompanying materials
|
||||||
* are made available under the terms of the Eclipse Public License v2.0
|
* are made available under the terms of the Eclipse Public License v2.0
|
||||||
@ -61,8 +63,8 @@
|
|||||||
using namespace std;
|
using namespace std;
|
||||||
using namespace std::chrono;
|
using namespace std::chrono;
|
||||||
|
|
||||||
const std::string DFLT_SERVER_ADDRESS("mqtt://localhost:1883");
|
const std::string DFLT_SERVER_ADDRESS{"mqtt://localhost:1883"};
|
||||||
const std::string CLIENT_ID("multithr_pub_sub_cpp");
|
const std::string CLIENT_ID{"multithr_pub_sub_cpp"};
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
@ -172,6 +174,10 @@ int main(int argc, char* argv[])
|
|||||||
auto rsp = cli->connect(connOpts)->get_connect_response();
|
auto rsp = cli->connect(connOpts)->get_connect_response();
|
||||||
cout << "OK\n" << endl;
|
cout << "OK\n" << endl;
|
||||||
|
|
||||||
|
cout << "Now start an application such as 'async_publish_time'\n"
|
||||||
|
<< "that publishes to a 'data/' topic...\n"
|
||||||
|
<< endl;
|
||||||
|
|
||||||
// Subscribe if this is a new session with the server
|
// Subscribe if this is a new session with the server
|
||||||
if (!rsp.is_session_present())
|
if (!rsp.is_session_present())
|
||||||
cli->subscribe(TOPICS, QOS);
|
cli->subscribe(TOPICS, QOS);
|
||||||
@ -180,13 +186,32 @@ int main(int argc, char* argv[])
|
|||||||
|
|
||||||
std::thread publisher(publisher_func, cli, counter);
|
std::thread publisher(publisher_func, cli, counter);
|
||||||
|
|
||||||
|
// Start another thread to shut us down after a minute
|
||||||
|
|
||||||
|
std::thread{[cli] {
|
||||||
|
this_thread::sleep_for(30s);
|
||||||
|
cout << "Signaling the consumer to stop." << endl;
|
||||||
|
cli->stop_consuming();
|
||||||
|
}}.detach();
|
||||||
|
|
||||||
// Consume messages in this thread
|
// Consume messages in this thread
|
||||||
|
|
||||||
|
// Remember that with the message consumer, we can't detect a
|
||||||
|
// reconnect We would need to register a connect callback or use the
|
||||||
|
// event consumer.
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
auto msg = cli->consume_message();
|
auto msg = cli->consume_message();
|
||||||
|
|
||||||
if (!msg)
|
if (!msg) {
|
||||||
|
// Exit if the consumer was shut down
|
||||||
|
if (cli->consumer_closed())
|
||||||
|
break;
|
||||||
|
|
||||||
|
// Otherwise let auto-reconnect deal with it.
|
||||||
|
cout << "Disconnect detected. Attempting an auto-reconnect." << endl;
|
||||||
continue;
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (msg->get_topic() == "command" && msg->to_string() == "exit") {
|
if (msg->get_topic() == "command" && msg->to_string() == "exit") {
|
||||||
cout << "Exit command received" << endl;
|
cout << "Exit command received" << endl;
|
||||||
|
@ -826,8 +826,102 @@ public:
|
|||||||
return (que_) ? que_->size() : 0;
|
return (que_) ? que_->size() : 0;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Read the next message from the queue.
|
* Read the next client event from the queue.
|
||||||
* This blocks until a new message arrives.
|
* This blocks until a new message arrives.
|
||||||
|
* If the consumer queue is closed, this returns a shutdown event.
|
||||||
|
* @return The client event.
|
||||||
|
*/
|
||||||
|
event consume_event() override;
|
||||||
|
/**
|
||||||
|
* Try to read the next client event without blocking.
|
||||||
|
* @param evt Pointer to the value to receive the event
|
||||||
|
* @return @em true if an event was read, @em false if no
|
||||||
|
* event was available.
|
||||||
|
*/
|
||||||
|
bool try_consume_event(event* evt) override;
|
||||||
|
/**
|
||||||
|
* Waits a limited time for a client event to appear.
|
||||||
|
* @param evt Pointer to the value to receive the event.
|
||||||
|
* @param relTime The maximum amount of time to wait for an event.
|
||||||
|
* @return @em true if an event was read, @em false if a timeout
|
||||||
|
* occurred.
|
||||||
|
*/
|
||||||
|
template <typename Rep, class Period>
|
||||||
|
bool try_consume_event_for(
|
||||||
|
event* evt, const std::chrono::duration<Rep, Period>& relTime
|
||||||
|
) {
|
||||||
|
if (!que_)
|
||||||
|
throw mqtt::exception(-1, "Consumer not started");
|
||||||
|
|
||||||
|
try {
|
||||||
|
return que_->try_get_for(evt, relTime);
|
||||||
|
}
|
||||||
|
catch (queue_closed&) {
|
||||||
|
*evt = event{shutdown_event{}};
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Waits a limited time for a client event to arrive.
|
||||||
|
* @param relTime The maximum amount of time to wait for an event.
|
||||||
|
* @return The event that was received. It will contain empty message on
|
||||||
|
* timeout.
|
||||||
|
*/
|
||||||
|
template <typename Rep, class Period>
|
||||||
|
event try_consume_event_for(const std::chrono::duration<Rep, Period>& relTime) {
|
||||||
|
event evt;
|
||||||
|
try {
|
||||||
|
que_->try_get_for(&evt, relTime);
|
||||||
|
}
|
||||||
|
catch (queue_closed&) {
|
||||||
|
evt = event{shutdown_event{}};
|
||||||
|
}
|
||||||
|
return evt;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Waits until a specific time for a client event to appear.
|
||||||
|
* @param evt Pointer to the value to receive the event.
|
||||||
|
* @param absTime The time point to wait until, before timing out.
|
||||||
|
* @return @em true if an event was recceived, @em false if a timeout
|
||||||
|
* occurred.
|
||||||
|
*/
|
||||||
|
template <class Clock, class Duration>
|
||||||
|
bool try_consume_event_until(
|
||||||
|
event* evt, const std::chrono::time_point<Clock, Duration>& absTime
|
||||||
|
) {
|
||||||
|
if (!que_)
|
||||||
|
throw mqtt::exception(-1, "Consumer not started");
|
||||||
|
|
||||||
|
try {
|
||||||
|
return que_->try_get_until(evt, absTime);
|
||||||
|
}
|
||||||
|
catch (queue_closed&) {
|
||||||
|
*evt = event{shutdown_event{}};
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Waits until a specific time for a client event to appear.
|
||||||
|
* @param absTime The time point to wait until, before timing out.
|
||||||
|
* @return The event that was received. It will contain empty message on
|
||||||
|
* timeout.
|
||||||
|
*/
|
||||||
|
template <class Clock, class Duration>
|
||||||
|
event try_consume_event_until(const std::chrono::time_point<Clock, Duration>& absTime
|
||||||
|
) {
|
||||||
|
event evt;
|
||||||
|
try {
|
||||||
|
que_->try_get_until(&evt, absTime);
|
||||||
|
}
|
||||||
|
catch (queue_closed&) {
|
||||||
|
evt = event{shutdown_event{}};
|
||||||
|
}
|
||||||
|
return evt;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Read the next message from the queue.
|
||||||
|
* This blocks until a new message arrives or until a disconnect or
|
||||||
|
* shutdown occurs.
|
||||||
* @return The message and topic.
|
* @return The message and topic.
|
||||||
*/
|
*/
|
||||||
const_message_ptr consume_message() override;
|
const_message_ptr consume_message() override;
|
||||||
@ -855,7 +949,7 @@ public:
|
|||||||
event evt;
|
event evt;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (!que_->try_get_for(&evt, relTime))
|
if (!try_consume_event_for(&evt, relTime))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
if (const auto* pval = evt.get_message_if()) {
|
if (const auto* pval = evt.get_message_if()) {
|
||||||
@ -901,7 +995,7 @@ public:
|
|||||||
event evt;
|
event evt;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (!que_->try_get_until(&evt, absTime))
|
if (!try_consume_event_until(&evt, absTime))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
if (const auto* pval = evt.get_message_if()) {
|
if (const auto* pval = evt.get_message_if()) {
|
||||||
@ -930,76 +1024,6 @@ public:
|
|||||||
this->try_consume_message_until(&msg, absTime);
|
this->try_consume_message_until(&msg, absTime);
|
||||||
return msg;
|
return msg;
|
||||||
}
|
}
|
||||||
/**
|
|
||||||
* Read the next message from the queue.
|
|
||||||
* This blocks until a new message arrives.
|
|
||||||
* @return The message and topic.
|
|
||||||
*/
|
|
||||||
event consume_event() override { return que_->get(); }
|
|
||||||
/**
|
|
||||||
* Try to read the next message from the queue without blocking.
|
|
||||||
* @param evt Pointer to the value to receive the event
|
|
||||||
* @return @em true if an event was read, @em false if no
|
|
||||||
* event was available.
|
|
||||||
*/
|
|
||||||
bool try_consume_event(event* evt) override { return que_->try_get(evt); }
|
|
||||||
/**
|
|
||||||
* Waits a limited time for a message to arrive.
|
|
||||||
* @param evt Pointer to the value to receive the event.
|
|
||||||
* @param relTime The maximum amount of time to wait for an event.
|
|
||||||
* @return @em true if an event was read, @em false if a timeout
|
|
||||||
* occurred.
|
|
||||||
*/
|
|
||||||
template <typename Rep, class Period>
|
|
||||||
bool try_consume_event_for(
|
|
||||||
event* evt, const std::chrono::duration<Rep, Period>& relTime
|
|
||||||
) {
|
|
||||||
if (!que_)
|
|
||||||
throw mqtt::exception(-1, "Consumer not started");
|
|
||||||
|
|
||||||
return que_->try_get_for(evt, relTime);
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Waits a limited time for an event to arrive.
|
|
||||||
* @param relTime The maximum amount of time to wait for an event.
|
|
||||||
* @return The event that was received. It will contain empty message on
|
|
||||||
* timeout.
|
|
||||||
*/
|
|
||||||
template <typename Rep, class Period>
|
|
||||||
event try_consume_event_for(const std::chrono::duration<Rep, Period>& relTime) {
|
|
||||||
event evt;
|
|
||||||
que_->try_get_for(&evt, relTime);
|
|
||||||
return evt;
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Waits until a specific time for an event to appear.
|
|
||||||
* @param evt Pointer to the value to receive the event.
|
|
||||||
* @param absTime The time point to wait until, before timing out.
|
|
||||||
* @return @em true if an event was recceived, @em false if a timeout
|
|
||||||
* occurred.
|
|
||||||
*/
|
|
||||||
template <class Clock, class Duration>
|
|
||||||
bool try_consume_event_until(
|
|
||||||
event* evt, const std::chrono::time_point<Clock, Duration>& absTime
|
|
||||||
) {
|
|
||||||
if (!que_)
|
|
||||||
throw mqtt::exception(-1, "Consumer not started");
|
|
||||||
|
|
||||||
return que_->try_get_until(evt, absTime);
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Waits until a specific time for an event to appear.
|
|
||||||
* @param absTime The time point to wait until, before timing out.
|
|
||||||
* @return The event that was received. It will contain empty message on
|
|
||||||
* timeout.
|
|
||||||
*/
|
|
||||||
template <class Clock, class Duration>
|
|
||||||
event try_consume_event_until(const std::chrono::time_point<Clock, Duration>& absTime
|
|
||||||
) {
|
|
||||||
event evt;
|
|
||||||
que_->try_get_until(&evt, absTime);
|
|
||||||
return evt;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/** Smart/shared pointer to an asynchronous MQTT client object */
|
/** Smart/shared pointer to an asynchronous MQTT client object */
|
||||||
|
@ -53,6 +53,9 @@ struct disconnected_event
|
|||||||
ReasonCode reasonCode;
|
ReasonCode reasonCode;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/** Event for when the consumer queue is shutdown from another thread */
|
||||||
|
struct shutdown_event { };
|
||||||
|
|
||||||
|
|
||||||
/* Event for when a message arrives is just a message pointer */
|
/* Event for when a message arrives is just a message pointer */
|
||||||
|
|
||||||
@ -83,7 +86,7 @@ class event
|
|||||||
public:
|
public:
|
||||||
/** The variant type for any possible event. */
|
/** The variant type for any possible event. */
|
||||||
using event_type = std::variant<
|
using event_type = std::variant<
|
||||||
const_message_ptr, connected_event, connection_lost_event, disconnected_event>;
|
const_message_ptr, connected_event, connection_lost_event, disconnected_event, shutdown_event>;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
event_type evt_{};
|
event_type evt_{};
|
||||||
@ -124,6 +127,11 @@ public:
|
|||||||
* @param evt A disconnected event.
|
* @param evt A disconnected event.
|
||||||
*/
|
*/
|
||||||
event(disconnected_event evt) : evt_{std::move(evt)} {}
|
event(disconnected_event evt) : evt_{std::move(evt)} {}
|
||||||
|
/**
|
||||||
|
* Constructs a 'shutdown' event.
|
||||||
|
* @param evt A shutdown event.
|
||||||
|
*/
|
||||||
|
event(shutdown_event evt) : evt_{std::move(evt)} {}
|
||||||
/**
|
/**
|
||||||
* Copy constructor.
|
* Copy constructor.
|
||||||
* @param evt The event to copy.
|
* @param evt The event to copy.
|
||||||
@ -196,13 +204,22 @@ public:
|
|||||||
return std::holds_alternative<disconnected_event>(evt_);
|
return std::holds_alternative<disconnected_event>(evt_);
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Determines if this is any type of client disconnect.
|
* Determines if this event is an internal shutdown request.
|
||||||
|
* @return @em true if this event is a shutdown request, @em false
|
||||||
|
* otherwise.
|
||||||
|
*/
|
||||||
|
bool is_shutdown() const {
|
||||||
|
return std::holds_alternative<disconnected_event>(evt_);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Determines if this is any type of client disconnect or shutdown.
|
||||||
* @return @em true if this event is any type of client disconnect such
|
* @return @em true if this event is any type of client disconnect such
|
||||||
* as a 'connection lost' or 'disconnected' event.
|
* as a 'connection lost', 'disconnected', or shutdown event.
|
||||||
*/
|
*/
|
||||||
bool is_any_disconnect() const {
|
bool is_any_disconnect() const {
|
||||||
return std::holds_alternative<connection_lost_event>(evt_)
|
return std::holds_alternative<connection_lost_event>(evt_)
|
||||||
|| std::holds_alternative<disconnected_event>(evt_);
|
|| std::holds_alternative<disconnected_event>(evt_)
|
||||||
|
|| std::holds_alternative<shutdown_event>(evt_);
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Gets the message from the event, iff this is a message event.
|
* Gets the message from the event, iff this is a message event.
|
||||||
|
@ -879,6 +879,31 @@ void async_client::stop_consuming()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
event async_client::consume_event()
|
||||||
|
{
|
||||||
|
event evt;
|
||||||
|
try {
|
||||||
|
evt = que_->get();
|
||||||
|
}
|
||||||
|
catch (queue_closed&) {
|
||||||
|
evt = event{shutdown_event{}};
|
||||||
|
}
|
||||||
|
return evt;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool async_client::try_consume_event(event* evt)
|
||||||
|
{
|
||||||
|
bool res = false;
|
||||||
|
try {
|
||||||
|
res = que_->try_get(evt);
|
||||||
|
}
|
||||||
|
catch (queue_closed&) {
|
||||||
|
*evt = event{shutdown_event{}};
|
||||||
|
res = true;
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
const_message_ptr async_client::consume_message()
|
const_message_ptr async_client::consume_message()
|
||||||
{
|
{
|
||||||
if (!que_)
|
if (!que_)
|
||||||
@ -887,7 +912,7 @@ const_message_ptr async_client::consume_message()
|
|||||||
// For backward compatibility we ignore the 'connected' events,
|
// For backward compatibility we ignore the 'connected' events,
|
||||||
// whereas disconnected/lost return an empty pointer.
|
// whereas disconnected/lost return an empty pointer.
|
||||||
while (true) {
|
while (true) {
|
||||||
auto evt = que_->get();
|
auto evt = consume_event();
|
||||||
|
|
||||||
if (const auto* pval = evt.get_message_if())
|
if (const auto* pval = evt.get_message_if())
|
||||||
return *pval;
|
return *pval;
|
||||||
@ -905,7 +930,7 @@ bool async_client::try_consume_message(const_message_ptr* msg)
|
|||||||
event evt;
|
event evt;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (!que_->try_get(&evt))
|
if (!try_consume_event(&evt))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
if (const auto* pval = evt.get_message_if()) {
|
if (const auto* pval = evt.get_message_if()) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user