diff --git a/examples/data_publish.cpp b/examples/data_publish.cpp index 6f194c7..e3fd501 100644 --- a/examples/data_publish.cpp +++ b/examples/data_publish.cpp @@ -282,8 +282,10 @@ int main(int argc, char* argv[]) { string serverURI = (argc > 1) ? string{argv[1]} : DFLT_SERVER_URI; + // Create a persistence object encoded_file_persistence persist{PERSIST_KEY}; + // Create a client to use the persistence. mqtt::async_client cli(serverURI, CLIENT_ID, MAX_BUFFERED_MSGS, &persist); auto connOpts = mqtt::connect_options_builder() diff --git a/include/mqtt/async_client.h b/include/mqtt/async_client.h index 44d0c88..78445c9 100644 --- a/include/mqtt/async_client.h +++ b/include/mqtt/async_client.h @@ -36,6 +36,7 @@ #include "mqtt/callback.h" #include "mqtt/create_options.h" #include "mqtt/delivery_token.h" +#include "mqtt/event.h" #include "mqtt/exception.h" #include "mqtt/iaction_listener.h" #include "mqtt/iasync_client.h" @@ -110,8 +111,8 @@ class async_client : public virtual iasync_client public: /** Smart/shared pointer for an object of this class */ using ptr_t = std::shared_ptr; - /** Type for a thread-safe queue to consume messages synchronously */ - using consumer_queue_type = std::unique_ptr>; + /** Type for a thread-safe queue to consume events synchronously */ + using consumer_queue_type = std::unique_ptr>; /** Handler type for registering an individual message callback */ using message_handler = std::function; @@ -761,14 +762,14 @@ public: * This blocks until a new message arrives. * @return The message and topic. */ - const_message_ptr consume_message() override { return que_->get(); } + const_message_ptr consume_message() override; /** * Try to read the next message from the queue without blocking. * @param msg Pointer to the value to receive the message * @return @em true is a message was read, @em false if no message was * available. */ - bool try_consume_message(const_message_ptr* msg) override { return que_->try_get(msg); } + bool try_consume_message(const_message_ptr* msg); /** * Waits a limited time for a message to arrive. * @param msg Pointer to the value to receive the message @@ -780,7 +781,15 @@ public: bool try_consume_message_for( const_message_ptr* msg, const std::chrono::duration& relTime ) { - return que_->try_get_for(msg, relTime); + event_type evt; + if (!que_->try_get_for(&evt, relTime)) + return false; + + if (const auto* pval = std::get_if(&evt)) + *msg = std::move(pval->msg); + else + *msg = const_message_ptr{}; + return true; } /** * Waits a limited time for a message to arrive. @@ -793,7 +802,7 @@ public: const std::chrono::duration& relTime ) { const_message_ptr msg; - que_->try_get_for(&msg, relTime); + this->try_consume_message_for(&msg, relTime); return msg; } /** @@ -807,7 +816,15 @@ public: bool try_consume_message_until( const_message_ptr* msg, const std::chrono::time_point& absTime ) { - return que_->try_get_until(msg, absTime); + event_type evt; + if (!que_->try_get_until(&evt, absTime)) + return false; + + if (const auto* pval = std::get_if(&evt)) + *msg = std::move(pval->msg); + else + *msg = const_message_ptr{}; + return true; } /** * Waits until a specific time for a message to appear. @@ -819,9 +836,73 @@ public: const std::chrono::time_point& absTime ) { const_message_ptr msg; - que_->try_get_until(&msg, absTime); + this->try_consume_message_until(&msg, absTime); return msg; } + + /** + * Read the next message from the queue. + * This blocks until a new message arrives. + * @return The message and topic. + */ + event_type consume_event() override { return que_->get(); } + /** + * Try to read the next message from the queue without blocking. + * @param msg Pointer to the value to receive the message + * @return @em true is a message was read, @em false if no message was + * available. + */ + bool try_consume_event(event_type* evt) override { return que_->try_get(evt); } + /** + * Waits a limited time for a message to arrive. + * @param msg Pointer to the value to receive the message + * @param relTime The maximum amount of time to wait for a message. + * @return @em true if a message was read, @em false if a timeout + * occurred. + */ + template + bool try_consume_event_for( + event_type* evt, const std::chrono::duration& relTime + ) { + return que_->try_get_for(evt, relTime); + } + /** + * Waits a limited time for a message to arrive. + * @param relTime The maximum amount of time to wait for a message. + * @return A shared pointer to the message that was received. It will be + * empty on timeout. + */ + template + event_type try_consume_event_for(const std::chrono::duration& relTime) { + event_type evt; + que_->try_get_for(&evt, relTime); + return evt; + } + /** + * Waits until a specific time for a message to appear. + * @param msg Pointer to the value to receive the message + * @param absTime The time point to wait until, before timing out. + * @return @em true if a message was read, @em false if a timeout + * occurred. + */ + template + bool try_consume_event_until( + event_type* evt, const std::chrono::time_point& absTime + ) { + return que_->try_get_until(evt, absTime); + } + /** + * Waits until a specific time for a message to appear. + * @param absTime The time point to wait until, before timing out. + * @return The message, if read, an empty pointer if not. + */ + template + event_type try_consume_event_until(const std::chrono::time_point& absTime + ) { + event_type evt; + que_->try_get_until(&evt, absTime); + return evt; + } }; /** Smart/shared pointer to an asynchronous MQTT client object */ diff --git a/include/mqtt/callback.h b/include/mqtt/callback.h index 5198bce..6fef83a 100644 --- a/include/mqtt/callback.h +++ b/include/mqtt/callback.h @@ -81,7 +81,6 @@ using callback_ptr = callback::ptr_t; using const_callback_ptr = callback::const_ptr_t; ///////////////////////////////////////////////////////////////////////////// -// end namespace mqtt } // namespace mqtt #endif // __mqtt_callback_h diff --git a/include/mqtt/create_options.h b/include/mqtt/create_options.h index bdd1e5c..7dfa0f0 100644 --- a/include/mqtt/create_options.h +++ b/include/mqtt/create_options.h @@ -34,7 +34,7 @@ namespace mqtt { ///////////////////////////////////////////////////////////////////////////// -/** An empty type that can be used as a `persistent_type` variant opiton. */ +/** An empty type that can be used as a `persistent_type` variant option. */ struct no_persistence { }; diff --git a/include/mqtt/exception.h b/include/mqtt/exception.h index 2616331..6c27918 100644 --- a/include/mqtt/exception.h +++ b/include/mqtt/exception.h @@ -258,4 +258,4 @@ public: ///////////////////////////////////////////////////////////////////////////// } // namespace mqtt -#endif // __mqtt_token_h +#endif // __mqtt_exception_h diff --git a/include/mqtt/iaction_listener.h b/include/mqtt/iaction_listener.h index be46e8c..fabba32 100644 --- a/include/mqtt/iaction_listener.h +++ b/include/mqtt/iaction_listener.h @@ -77,7 +77,6 @@ using iaction_listener_ptr = iaction_listener::ptr_t; using const_iaction_listener_ptr = iaction_listener::const_ptr_t; ///////////////////////////////////////////////////////////////////////////// -// end namespace mqtt } // namespace mqtt #endif // __mqtt_iaction_listener_h diff --git a/include/mqtt/iasync_client.h b/include/mqtt/iasync_client.h index a028068..0225974 100644 --- a/include/mqtt/iasync_client.h +++ b/include/mqtt/iasync_client.h @@ -30,6 +30,7 @@ #include "mqtt/connect_options.h" #include "mqtt/delivery_token.h" #include "mqtt/disconnect_options.h" +#include "mqtt/event.h" #include "mqtt/exception.h" #include "mqtt/iaction_listener.h" #include "mqtt/iclient_persistence.h" @@ -461,6 +462,19 @@ public: * available. */ virtual bool try_consume_message(const_message_ptr* msg) = 0; + /** + * Read the next event from the queue. + * This blocks until a new message arrives. + * @return The message and topic. + */ + virtual event_type consume_event() = 0; + /** + * Try to read the next message from the queue without blocking. + * @param msg Pointer to the value to receive the message + * @return @em true is a message was read, @em false if no message was + * available. + */ + virtual bool try_consume_event(event_type* evt) = 0; }; ///////////////////////////////////////////////////////////////////////////// diff --git a/src/async_client.cpp b/src/async_client.cpp index 5b785b9..0979a5c 100644 --- a/src/async_client.cpp +++ b/src/async_client.cpp @@ -96,21 +96,30 @@ async_client::~async_client() { MQTTAsync_destroy(&cli_); } // connect token then calls any registered callbacks. void async_client::on_connected(void* context, char* cause) { - if (context) { - async_client* cli = static_cast(context); + if (!context) + return; + + async_client* cli = static_cast(context); + + auto tok = cli->connTok_; + if (tok) + tok->on_success(nullptr); + + callback* cb = cli->userCallback_; + auto& connHandler = cli->connHandler_; + auto& que = cli->que_; + + if (cb || connHandler || que) { string cause_str = cause ? string{cause} : string{}; - auto tok = cli->connTok_; - if (tok) - tok->on_success(nullptr); - - callback* cb = cli->userCallback_; if (cb) cb->connected(cause_str); - auto& connHandler = cli->connHandler_; if (connHandler) connHandler(cause_str); + + if (que) + que->put(connected_event{cause_str}); } } @@ -122,21 +131,26 @@ void async_client::on_connected(void* context, char* cause) // connection. void async_client::on_connection_lost(void* context, char* cause) { - if (context) { - async_client* cli = static_cast(context); + if (!context) + return; + + async_client* cli = static_cast(context); + + callback* cb = cli->userCallback_; + auto& connLostHandler = cli->connLostHandler_; + auto& que = cli->que_; + + if (cb || connLostHandler || que) { string cause_str = cause ? string(cause) : string(); - callback* cb = cli->userCallback_; if (cb) cb->connection_lost(cause_str); - auto& connLostHandler = cli->connLostHandler_; if (connLostHandler) connLostHandler(cause_str); - consumer_queue_type& que = cli->que_; if (que) - que->put(const_message_ptr{}); + que->put(connection_lost_event{cause_str}); } } @@ -146,18 +160,22 @@ void async_client::on_disconnected( void* context, MQTTProperties* cprops, MQTTReasonCodes reasonCode ) { - if (context) { - async_client* cli = static_cast(context); + if (!context) + return; - auto& disconnectedHandler = cli->disconnectedHandler_; - if (disconnectedHandler) { - properties props(*cprops); + async_client* cli = static_cast(context); + + auto& disconnectedHandler = cli->disconnectedHandler_; + auto& que = cli->que_; + + if (disconnectedHandler || que) { + properties props(*cprops); + + if (disconnectedHandler) disconnectedHandler(props, ReasonCode(reasonCode)); - } - consumer_queue_type& que = cli->que_; if (que) - que->put(const_message_ptr{}); + que->put(disconnected_event{std::move(props), ReasonCode(reasonCode)}); } } @@ -168,34 +186,32 @@ int async_client::on_message_arrived( void* context, char* topicName, int topicLen, MQTTAsync_message* msg ) { - if (context) { - async_client* cli = static_cast(context); - callback* cb = cli->userCallback_; - consumer_queue_type& que = cli->que_; - message_handler& msgHandler = cli->msgHandler_; + if (!context) + return to_int(true); - if (cb || que || msgHandler) { - size_t len = (topicLen == 0) ? strlen(topicName) : size_t(topicLen); + async_client* cli = static_cast(context); + callback* cb = cli->userCallback_; + auto& que = cli->que_; + auto& msgHandler = cli->msgHandler_; - string topic{topicName, len}; - auto m = message::create(std::move(topic), *msg); + if (cb || que || msgHandler) { + size_t len = (topicLen == 0) ? strlen(topicName) : size_t(topicLen); - if (msgHandler) - msgHandler(m); + string topic{topicName, len}; + auto m = message::create(std::move(topic), *msg); - if (cb) - cb->message_arrived(m); + if (msgHandler) + msgHandler(m); - if (que) - que->put(m); - } + if (cb) + cb->message_arrived(m); + + if (que) + que->put(message_arrived_event{m}); } MQTTAsync_freeMessage(&msg); MQTTAsync_free(topicName); - - // TODO: Should the user code determine the return value? - // The Java version does doesn't seem to... return to_int(true); } @@ -834,7 +850,7 @@ void async_client::start_consuming() // TODO: Should we replace user callback? // userCallback_ = nullptr; - que_.reset(new thread_queue); + que_.reset(new thread_queue); int rc = MQTTAsync_setCallbacks( cli_, this, &async_client::on_connection_lost, &async_client::on_message_arrived, @@ -857,6 +873,26 @@ void async_client::stop_consuming() } } +const_message_ptr async_client::consume_message() +{ + auto evt = que_->get(); + if (const auto* pval = std::get_if(&evt)) + return pval->msg; + return const_message_ptr{}; +} + +bool async_client::try_consume_message(const_message_ptr* msg) +{ + event_type evt; + if (!que_->try_get(&evt)) + return false; + + if (const auto* pval = std::get_if(&evt)) + *msg = std::move(pval->msg); + else + *msg = const_message_ptr{}; + return true; +} + ///////////////////////////////////////////////////////////////////////////// -// end namespace mqtt } // namespace mqtt diff --git a/test/unit/mock_async_client.h b/test/unit/mock_async_client.h index 71b22e6..27e470c 100644 --- a/test/unit/mock_async_client.h +++ b/test/unit/mock_async_client.h @@ -239,6 +239,9 @@ public: const_message_ptr consume_message() override { return const_message_ptr{}; } bool try_consume_message(const_message_ptr*) override { return false; } + + event_type consume_event() { return message_arrived_event{const_message_ptr{}}; } + bool try_consume_event(event_type* evt) override { return false; } }; /////////////////////////////////////////////////////////////////////////////