diff --git a/include/mqtt/async_client.h b/include/mqtt/async_client.h index 5fb7587..2a6e6a4 100644 --- a/include/mqtt/async_client.h +++ b/include/mqtt/async_client.h @@ -800,6 +800,20 @@ public: bool consumer_done() noexcept override { return !que_ || que_->done(); } + /** + * Gets the number of events available for immediate consumption. + * Note that this retrieves the number of "raw" events, not messages, + * e.g. may include a connected_event which is not returned by try_consume_message(). + * When polling the queue from multiple threads, prefer using try_consume_event(), + * as the event count may change between checking the size and actual retrieval. + * @return the number of events in the queue. + */ + std::size_t consumer_events_available() const override { + if (que_) + return que_->size(); + else + return 0; + } /** * Read the next message from the queue. * This blocks until a new message arrives. diff --git a/include/mqtt/iasync_client.h b/include/mqtt/iasync_client.h index b76086c..e8c72ce 100644 --- a/include/mqtt/iasync_client.h +++ b/include/mqtt/iasync_client.h @@ -473,6 +473,15 @@ public: virtual bool consumer_done() noexcept { return false; } + /** + * Gets the number of events available for immediate consumption. + * Note that this retrieves the number of "raw" events, not messages, + * e.g. may include a connected_event which is not returned by try_consume_message(). + * When polling the queue from multiple threads, prefer using try_consume_event(), + * as the event count may change between checking the size and actual retrieval. + * @return the number of events in the queue. + */ + virtual std::size_t consumer_events_available() const { return 0; } /** * Read the next message from the queue. * This blocks until a new message arrives. diff --git a/test/unit/test_async_client.cpp b/test/unit/test_async_client.cpp index 41ee42c..cbbdf29 100644 --- a/test/unit/test_async_client.cpp +++ b/test/unit/test_async_client.cpp @@ -1006,3 +1006,23 @@ TEST_CASE("async_client consumer timeout", "[client]") cli.start_consuming(); cli.try_consume_message_until(std::chrono::steady_clock::now()); } + +TEST_CASE("async_client consumer events available", "[client]") +{ + async_client cli{GOOD_SERVER_URI, CLIENT_ID}; + cli.start_consuming(); + REQUIRE(0 == cli.consumer_events_available()); + + token_ptr conn_tok{cli.connect()}; + REQUIRE(conn_tok); + conn_tok->wait(); + REQUIRE(cli.is_connected()); + // expect connected_event to be in the queue now + REQUIRE(1 == cli.consumer_events_available()); + event e; + REQUIRE(cli.try_consume_event(&e)); + REQUIRE(0 == cli.consumer_events_available()); + + cli.stop_consuming(); + cli.disconnect()->wait(); +}