1
0
mirror of https://github.com/eclipse/paho.mqtt.cpp.git synced 2025-05-09 03:11:23 +08:00

add function for checking async consumer event queue size

Signed-off-by: Sebastian Sams <sebastian.sams@bestsolution.at>
This commit is contained in:
Sebastian Sams 2024-10-07 06:55:04 +00:00
parent 6131a30530
commit 0fee432f02
3 changed files with 43 additions and 0 deletions

View File

@ -800,6 +800,20 @@ public:
bool consumer_done() noexcept override {
return !que_ || que_->done();
}
/**
* Gets the number of events available for immediate consumption.
* Note that this retrieves the number of "raw" events, not messages,
* e.g. may include a connected_event which is not returned by try_consume_message().
* When polling the queue from multiple threads, prefer using try_consume_event(),
* as the event count may change between checking the size and actual retrieval.
* @return the number of events in the queue.
*/
std::size_t consumer_events_available() const override {
if (que_)
return que_->size();
else
return 0;
}
/**
* Read the next message from the queue.
* This blocks until a new message arrives.

View File

@ -473,6 +473,15 @@ public:
virtual bool consumer_done() noexcept {
return false;
}
/**
* Gets the number of events available for immediate consumption.
* Note that this retrieves the number of "raw" events, not messages,
* e.g. may include a connected_event which is not returned by try_consume_message().
* When polling the queue from multiple threads, prefer using try_consume_event(),
* as the event count may change between checking the size and actual retrieval.
* @return the number of events in the queue.
*/
virtual std::size_t consumer_events_available() const { return 0; }
/**
* Read the next message from the queue.
* This blocks until a new message arrives.

View File

@ -1006,3 +1006,23 @@ TEST_CASE("async_client consumer timeout", "[client]")
cli.start_consuming();
cli.try_consume_message_until(std::chrono::steady_clock::now());
}
TEST_CASE("async_client consumer events available", "[client]")
{
async_client cli{GOOD_SERVER_URI, CLIENT_ID};
cli.start_consuming();
REQUIRE(0 == cli.consumer_events_available());
token_ptr conn_tok{cli.connect()};
REQUIRE(conn_tok);
conn_tok->wait();
REQUIRE(cli.is_connected());
// expect connected_event to be in the queue now
REQUIRE(1 == cli.consumer_events_available());
event e;
REQUIRE(cli.try_consume_event(&e));
REQUIRE(0 == cli.consumer_events_available());
cli.stop_consuming();
cli.disconnect()->wait();
}