1
0
mirror of https://github.com/eclipse/paho.mqtt.cpp.git synced 2025-05-09 03:11:23 +08:00

Added consumer_clear()

This commit is contained in:
fpagliughi 2024-07-13 15:49:07 -04:00
parent 2e68d0faf2
commit e558946c1d
3 changed files with 36 additions and 1 deletions

View File

@ -747,18 +747,39 @@ public:
) override;
/**
* Start consuming messages.
*
* This initializes the client to receive messages through a queue that
* can be read synchronously.
*
* Normally this should be called _before_ connecting the client to the
* broker, in order to have the consumer queue in place in the event
* that the immediately starts sending messages (such as any retained
* messages) while the client is still in the context of the connect
* call.
*
* This _must_ also be called before calling any 'consume_message' or
* "'consume_event' methods.
*
* Internally, this just creates a thread-safe queue for `mqtt::event`
* objects, then hooks into the message and state-change callback to
* push events into the queue in the order received.
*/
void start_consuming() override;
/**
* Stop consuming messages.
*
* This shuts down the internal callback and closes the internal
* consumer queue. Any remaining messages and events can be read until
* the queue is emptied, but nothing further will be added to it.
* This will also wake up any thread waiting on the queue.
*/
void stop_consuming() override;
/**
* This clears the consumer queue, discarding any pending event.
*/
void clear_consumer() override {
if (que_) que_->clear();
}
/**
* Determines if the consumer queue has been closed.
* Once closed, any events in the queue can still be read, but no new
@ -884,7 +905,6 @@ public:
this->try_consume_message_until(&msg, absTime);
return msg;
}
/**
* Read the next message from the queue.
* This blocks until a new message arrives.

View File

@ -449,6 +449,10 @@ public:
* messages.
*/
virtual void stop_consuming() = 0;
/**
* This clears the consumer queue, discarding any pending event.
*/
virtual void clear_consumer() {}
/**
* Determines if the consumer queue has been closed.
* Once closed, any events in the queue can still be read, but no new

View File

@ -202,6 +202,17 @@ public:
guard g{lock_};
return is_done();
}
/**
* Clear the contents of the queue.
* This discards all items in the queue.
*/
void clear() {
unique_guard g{lock_};
while (!que_.empty())
que_.pop();
g.unlock();
notFullCond_.notify_all();
}
/**
* Put an item into the queue.
* If the queue is full, this will block the caller until items are