mirror of
https://github.com/eclipse/paho.mqtt.cpp.git
synced 2025-05-09 19:31:22 +08:00
Converted the consumer message queue to an event queue.
This commit is contained in:
parent
5357a282d7
commit
43e8ef67a3
@ -282,8 +282,10 @@ int main(int argc, char* argv[])
|
|||||||
{
|
{
|
||||||
string serverURI = (argc > 1) ? string{argv[1]} : DFLT_SERVER_URI;
|
string serverURI = (argc > 1) ? string{argv[1]} : DFLT_SERVER_URI;
|
||||||
|
|
||||||
|
// Create a persistence object
|
||||||
encoded_file_persistence persist{PERSIST_KEY};
|
encoded_file_persistence persist{PERSIST_KEY};
|
||||||
|
|
||||||
|
// Create a client to use the persistence.
|
||||||
mqtt::async_client cli(serverURI, CLIENT_ID, MAX_BUFFERED_MSGS, &persist);
|
mqtt::async_client cli(serverURI, CLIENT_ID, MAX_BUFFERED_MSGS, &persist);
|
||||||
|
|
||||||
auto connOpts = mqtt::connect_options_builder()
|
auto connOpts = mqtt::connect_options_builder()
|
||||||
|
@ -36,6 +36,7 @@
|
|||||||
#include "mqtt/callback.h"
|
#include "mqtt/callback.h"
|
||||||
#include "mqtt/create_options.h"
|
#include "mqtt/create_options.h"
|
||||||
#include "mqtt/delivery_token.h"
|
#include "mqtt/delivery_token.h"
|
||||||
|
#include "mqtt/event.h"
|
||||||
#include "mqtt/exception.h"
|
#include "mqtt/exception.h"
|
||||||
#include "mqtt/iaction_listener.h"
|
#include "mqtt/iaction_listener.h"
|
||||||
#include "mqtt/iasync_client.h"
|
#include "mqtt/iasync_client.h"
|
||||||
@ -110,8 +111,8 @@ class async_client : public virtual iasync_client
|
|||||||
public:
|
public:
|
||||||
/** Smart/shared pointer for an object of this class */
|
/** Smart/shared pointer for an object of this class */
|
||||||
using ptr_t = std::shared_ptr<async_client>;
|
using ptr_t = std::shared_ptr<async_client>;
|
||||||
/** Type for a thread-safe queue to consume messages synchronously */
|
/** Type for a thread-safe queue to consume events synchronously */
|
||||||
using consumer_queue_type = std::unique_ptr<thread_queue<const_message_ptr>>;
|
using consumer_queue_type = std::unique_ptr<thread_queue<event_type>>;
|
||||||
|
|
||||||
/** Handler type for registering an individual message callback */
|
/** Handler type for registering an individual message callback */
|
||||||
using message_handler = std::function<void(const_message_ptr)>;
|
using message_handler = std::function<void(const_message_ptr)>;
|
||||||
@ -761,14 +762,14 @@ public:
|
|||||||
* This blocks until a new message arrives.
|
* This blocks until a new message arrives.
|
||||||
* @return The message and topic.
|
* @return The message and topic.
|
||||||
*/
|
*/
|
||||||
const_message_ptr consume_message() override { return que_->get(); }
|
const_message_ptr consume_message() override;
|
||||||
/**
|
/**
|
||||||
* Try to read the next message from the queue without blocking.
|
* Try to read the next message from the queue without blocking.
|
||||||
* @param msg Pointer to the value to receive the message
|
* @param msg Pointer to the value to receive the message
|
||||||
* @return @em true is a message was read, @em false if no message was
|
* @return @em true is a message was read, @em false if no message was
|
||||||
* available.
|
* available.
|
||||||
*/
|
*/
|
||||||
bool try_consume_message(const_message_ptr* msg) override { return que_->try_get(msg); }
|
bool try_consume_message(const_message_ptr* msg);
|
||||||
/**
|
/**
|
||||||
* Waits a limited time for a message to arrive.
|
* Waits a limited time for a message to arrive.
|
||||||
* @param msg Pointer to the value to receive the message
|
* @param msg Pointer to the value to receive the message
|
||||||
@ -780,7 +781,15 @@ public:
|
|||||||
bool try_consume_message_for(
|
bool try_consume_message_for(
|
||||||
const_message_ptr* msg, const std::chrono::duration<Rep, Period>& relTime
|
const_message_ptr* msg, const std::chrono::duration<Rep, Period>& relTime
|
||||||
) {
|
) {
|
||||||
return que_->try_get_for(msg, relTime);
|
event_type evt;
|
||||||
|
if (!que_->try_get_for(&evt, relTime))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (const auto* pval = std::get_if<message_arrived_event>(&evt))
|
||||||
|
*msg = std::move(pval->msg);
|
||||||
|
else
|
||||||
|
*msg = const_message_ptr{};
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Waits a limited time for a message to arrive.
|
* Waits a limited time for a message to arrive.
|
||||||
@ -793,7 +802,7 @@ public:
|
|||||||
const std::chrono::duration<Rep, Period>& relTime
|
const std::chrono::duration<Rep, Period>& relTime
|
||||||
) {
|
) {
|
||||||
const_message_ptr msg;
|
const_message_ptr msg;
|
||||||
que_->try_get_for(&msg, relTime);
|
this->try_consume_message_for(&msg, relTime);
|
||||||
return msg;
|
return msg;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
@ -807,7 +816,15 @@ public:
|
|||||||
bool try_consume_message_until(
|
bool try_consume_message_until(
|
||||||
const_message_ptr* msg, const std::chrono::time_point<Clock, Duration>& absTime
|
const_message_ptr* msg, const std::chrono::time_point<Clock, Duration>& absTime
|
||||||
) {
|
) {
|
||||||
return que_->try_get_until(msg, absTime);
|
event_type evt;
|
||||||
|
if (!que_->try_get_until(&evt, absTime))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (const auto* pval = std::get_if<message_arrived_event>(&evt))
|
||||||
|
*msg = std::move(pval->msg);
|
||||||
|
else
|
||||||
|
*msg = const_message_ptr{};
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Waits until a specific time for a message to appear.
|
* Waits until a specific time for a message to appear.
|
||||||
@ -819,9 +836,73 @@ public:
|
|||||||
const std::chrono::time_point<Clock, Duration>& absTime
|
const std::chrono::time_point<Clock, Duration>& absTime
|
||||||
) {
|
) {
|
||||||
const_message_ptr msg;
|
const_message_ptr msg;
|
||||||
que_->try_get_until(&msg, absTime);
|
this->try_consume_message_until(&msg, absTime);
|
||||||
return msg;
|
return msg;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read the next message from the queue.
|
||||||
|
* This blocks until a new message arrives.
|
||||||
|
* @return The message and topic.
|
||||||
|
*/
|
||||||
|
event_type consume_event() override { return que_->get(); }
|
||||||
|
/**
|
||||||
|
* Try to read the next message from the queue without blocking.
|
||||||
|
* @param msg Pointer to the value to receive the message
|
||||||
|
* @return @em true is a message was read, @em false if no message was
|
||||||
|
* available.
|
||||||
|
*/
|
||||||
|
bool try_consume_event(event_type* evt) override { return que_->try_get(evt); }
|
||||||
|
/**
|
||||||
|
* Waits a limited time for a message to arrive.
|
||||||
|
* @param msg Pointer to the value to receive the message
|
||||||
|
* @param relTime The maximum amount of time to wait for a message.
|
||||||
|
* @return @em true if a message was read, @em false if a timeout
|
||||||
|
* occurred.
|
||||||
|
*/
|
||||||
|
template <typename Rep, class Period>
|
||||||
|
bool try_consume_event_for(
|
||||||
|
event_type* evt, const std::chrono::duration<Rep, Period>& relTime
|
||||||
|
) {
|
||||||
|
return que_->try_get_for(evt, relTime);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Waits a limited time for a message to arrive.
|
||||||
|
* @param relTime The maximum amount of time to wait for a message.
|
||||||
|
* @return A shared pointer to the message that was received. It will be
|
||||||
|
* empty on timeout.
|
||||||
|
*/
|
||||||
|
template <typename Rep, class Period>
|
||||||
|
event_type try_consume_event_for(const std::chrono::duration<Rep, Period>& relTime) {
|
||||||
|
event_type evt;
|
||||||
|
que_->try_get_for(&evt, relTime);
|
||||||
|
return evt;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Waits until a specific time for a message to appear.
|
||||||
|
* @param msg Pointer to the value to receive the message
|
||||||
|
* @param absTime The time point to wait until, before timing out.
|
||||||
|
* @return @em true if a message was read, @em false if a timeout
|
||||||
|
* occurred.
|
||||||
|
*/
|
||||||
|
template <class Clock, class Duration>
|
||||||
|
bool try_consume_event_until(
|
||||||
|
event_type* evt, const std::chrono::time_point<Clock, Duration>& absTime
|
||||||
|
) {
|
||||||
|
return que_->try_get_until(evt, absTime);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Waits until a specific time for a message to appear.
|
||||||
|
* @param absTime The time point to wait until, before timing out.
|
||||||
|
* @return The message, if read, an empty pointer if not.
|
||||||
|
*/
|
||||||
|
template <class Clock, class Duration>
|
||||||
|
event_type try_consume_event_until(const std::chrono::time_point<Clock, Duration>& absTime
|
||||||
|
) {
|
||||||
|
event_type evt;
|
||||||
|
que_->try_get_until(&evt, absTime);
|
||||||
|
return evt;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/** Smart/shared pointer to an asynchronous MQTT client object */
|
/** Smart/shared pointer to an asynchronous MQTT client object */
|
||||||
|
@ -81,7 +81,6 @@ using callback_ptr = callback::ptr_t;
|
|||||||
using const_callback_ptr = callback::const_ptr_t;
|
using const_callback_ptr = callback::const_ptr_t;
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
// end namespace mqtt
|
|
||||||
} // namespace mqtt
|
} // namespace mqtt
|
||||||
|
|
||||||
#endif // __mqtt_callback_h
|
#endif // __mqtt_callback_h
|
||||||
|
@ -34,7 +34,7 @@ namespace mqtt {
|
|||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
/** An empty type that can be used as a `persistent_type` variant opiton. */
|
/** An empty type that can be used as a `persistent_type` variant option. */
|
||||||
struct no_persistence
|
struct no_persistence
|
||||||
{
|
{
|
||||||
};
|
};
|
||||||
|
@ -258,4 +258,4 @@ public:
|
|||||||
/////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
} // namespace mqtt
|
} // namespace mqtt
|
||||||
|
|
||||||
#endif // __mqtt_token_h
|
#endif // __mqtt_exception_h
|
||||||
|
@ -77,7 +77,6 @@ using iaction_listener_ptr = iaction_listener::ptr_t;
|
|||||||
using const_iaction_listener_ptr = iaction_listener::const_ptr_t;
|
using const_iaction_listener_ptr = iaction_listener::const_ptr_t;
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
// end namespace mqtt
|
|
||||||
} // namespace mqtt
|
} // namespace mqtt
|
||||||
|
|
||||||
#endif // __mqtt_iaction_listener_h
|
#endif // __mqtt_iaction_listener_h
|
||||||
|
@ -30,6 +30,7 @@
|
|||||||
#include "mqtt/connect_options.h"
|
#include "mqtt/connect_options.h"
|
||||||
#include "mqtt/delivery_token.h"
|
#include "mqtt/delivery_token.h"
|
||||||
#include "mqtt/disconnect_options.h"
|
#include "mqtt/disconnect_options.h"
|
||||||
|
#include "mqtt/event.h"
|
||||||
#include "mqtt/exception.h"
|
#include "mqtt/exception.h"
|
||||||
#include "mqtt/iaction_listener.h"
|
#include "mqtt/iaction_listener.h"
|
||||||
#include "mqtt/iclient_persistence.h"
|
#include "mqtt/iclient_persistence.h"
|
||||||
@ -461,6 +462,19 @@ public:
|
|||||||
* available.
|
* available.
|
||||||
*/
|
*/
|
||||||
virtual bool try_consume_message(const_message_ptr* msg) = 0;
|
virtual bool try_consume_message(const_message_ptr* msg) = 0;
|
||||||
|
/**
|
||||||
|
* Read the next event from the queue.
|
||||||
|
* This blocks until a new message arrives.
|
||||||
|
* @return The message and topic.
|
||||||
|
*/
|
||||||
|
virtual event_type consume_event() = 0;
|
||||||
|
/**
|
||||||
|
* Try to read the next message from the queue without blocking.
|
||||||
|
* @param msg Pointer to the value to receive the message
|
||||||
|
* @return @em true is a message was read, @em false if no message was
|
||||||
|
* available.
|
||||||
|
*/
|
||||||
|
virtual bool try_consume_event(event_type* evt) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -96,21 +96,30 @@ async_client::~async_client() { MQTTAsync_destroy(&cli_); }
|
|||||||
// connect token then calls any registered callbacks.
|
// connect token then calls any registered callbacks.
|
||||||
void async_client::on_connected(void* context, char* cause)
|
void async_client::on_connected(void* context, char* cause)
|
||||||
{
|
{
|
||||||
if (context) {
|
if (!context)
|
||||||
async_client* cli = static_cast<async_client*>(context);
|
return;
|
||||||
|
|
||||||
|
async_client* cli = static_cast<async_client*>(context);
|
||||||
|
|
||||||
|
auto tok = cli->connTok_;
|
||||||
|
if (tok)
|
||||||
|
tok->on_success(nullptr);
|
||||||
|
|
||||||
|
callback* cb = cli->userCallback_;
|
||||||
|
auto& connHandler = cli->connHandler_;
|
||||||
|
auto& que = cli->que_;
|
||||||
|
|
||||||
|
if (cb || connHandler || que) {
|
||||||
string cause_str = cause ? string{cause} : string{};
|
string cause_str = cause ? string{cause} : string{};
|
||||||
|
|
||||||
auto tok = cli->connTok_;
|
|
||||||
if (tok)
|
|
||||||
tok->on_success(nullptr);
|
|
||||||
|
|
||||||
callback* cb = cli->userCallback_;
|
|
||||||
if (cb)
|
if (cb)
|
||||||
cb->connected(cause_str);
|
cb->connected(cause_str);
|
||||||
|
|
||||||
auto& connHandler = cli->connHandler_;
|
|
||||||
if (connHandler)
|
if (connHandler)
|
||||||
connHandler(cause_str);
|
connHandler(cause_str);
|
||||||
|
|
||||||
|
if (que)
|
||||||
|
que->put(connected_event{cause_str});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -122,21 +131,26 @@ void async_client::on_connected(void* context, char* cause)
|
|||||||
// connection.
|
// connection.
|
||||||
void async_client::on_connection_lost(void* context, char* cause)
|
void async_client::on_connection_lost(void* context, char* cause)
|
||||||
{
|
{
|
||||||
if (context) {
|
if (!context)
|
||||||
async_client* cli = static_cast<async_client*>(context);
|
return;
|
||||||
|
|
||||||
|
async_client* cli = static_cast<async_client*>(context);
|
||||||
|
|
||||||
|
callback* cb = cli->userCallback_;
|
||||||
|
auto& connLostHandler = cli->connLostHandler_;
|
||||||
|
auto& que = cli->que_;
|
||||||
|
|
||||||
|
if (cb || connLostHandler || que) {
|
||||||
string cause_str = cause ? string(cause) : string();
|
string cause_str = cause ? string(cause) : string();
|
||||||
|
|
||||||
callback* cb = cli->userCallback_;
|
|
||||||
if (cb)
|
if (cb)
|
||||||
cb->connection_lost(cause_str);
|
cb->connection_lost(cause_str);
|
||||||
|
|
||||||
auto& connLostHandler = cli->connLostHandler_;
|
|
||||||
if (connLostHandler)
|
if (connLostHandler)
|
||||||
connLostHandler(cause_str);
|
connLostHandler(cause_str);
|
||||||
|
|
||||||
consumer_queue_type& que = cli->que_;
|
|
||||||
if (que)
|
if (que)
|
||||||
que->put(const_message_ptr{});
|
que->put(connection_lost_event{cause_str});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -146,18 +160,22 @@ void async_client::on_disconnected(
|
|||||||
void* context, MQTTProperties* cprops, MQTTReasonCodes reasonCode
|
void* context, MQTTProperties* cprops, MQTTReasonCodes reasonCode
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
if (context) {
|
if (!context)
|
||||||
async_client* cli = static_cast<async_client*>(context);
|
return;
|
||||||
|
|
||||||
auto& disconnectedHandler = cli->disconnectedHandler_;
|
async_client* cli = static_cast<async_client*>(context);
|
||||||
if (disconnectedHandler) {
|
|
||||||
properties props(*cprops);
|
auto& disconnectedHandler = cli->disconnectedHandler_;
|
||||||
|
auto& que = cli->que_;
|
||||||
|
|
||||||
|
if (disconnectedHandler || que) {
|
||||||
|
properties props(*cprops);
|
||||||
|
|
||||||
|
if (disconnectedHandler)
|
||||||
disconnectedHandler(props, ReasonCode(reasonCode));
|
disconnectedHandler(props, ReasonCode(reasonCode));
|
||||||
}
|
|
||||||
|
|
||||||
consumer_queue_type& que = cli->que_;
|
|
||||||
if (que)
|
if (que)
|
||||||
que->put(const_message_ptr{});
|
que->put(disconnected_event{std::move(props), ReasonCode(reasonCode)});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -168,34 +186,32 @@ int async_client::on_message_arrived(
|
|||||||
void* context, char* topicName, int topicLen, MQTTAsync_message* msg
|
void* context, char* topicName, int topicLen, MQTTAsync_message* msg
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
if (context) {
|
if (!context)
|
||||||
async_client* cli = static_cast<async_client*>(context);
|
return to_int(true);
|
||||||
callback* cb = cli->userCallback_;
|
|
||||||
consumer_queue_type& que = cli->que_;
|
|
||||||
message_handler& msgHandler = cli->msgHandler_;
|
|
||||||
|
|
||||||
if (cb || que || msgHandler) {
|
async_client* cli = static_cast<async_client*>(context);
|
||||||
size_t len = (topicLen == 0) ? strlen(topicName) : size_t(topicLen);
|
callback* cb = cli->userCallback_;
|
||||||
|
auto& que = cli->que_;
|
||||||
|
auto& msgHandler = cli->msgHandler_;
|
||||||
|
|
||||||
string topic{topicName, len};
|
if (cb || que || msgHandler) {
|
||||||
auto m = message::create(std::move(topic), *msg);
|
size_t len = (topicLen == 0) ? strlen(topicName) : size_t(topicLen);
|
||||||
|
|
||||||
if (msgHandler)
|
string topic{topicName, len};
|
||||||
msgHandler(m);
|
auto m = message::create(std::move(topic), *msg);
|
||||||
|
|
||||||
if (cb)
|
if (msgHandler)
|
||||||
cb->message_arrived(m);
|
msgHandler(m);
|
||||||
|
|
||||||
if (que)
|
if (cb)
|
||||||
que->put(m);
|
cb->message_arrived(m);
|
||||||
}
|
|
||||||
|
if (que)
|
||||||
|
que->put(message_arrived_event{m});
|
||||||
}
|
}
|
||||||
|
|
||||||
MQTTAsync_freeMessage(&msg);
|
MQTTAsync_freeMessage(&msg);
|
||||||
MQTTAsync_free(topicName);
|
MQTTAsync_free(topicName);
|
||||||
|
|
||||||
// TODO: Should the user code determine the return value?
|
|
||||||
// The Java version does doesn't seem to...
|
|
||||||
return to_int(true);
|
return to_int(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -834,7 +850,7 @@ void async_client::start_consuming()
|
|||||||
// TODO: Should we replace user callback?
|
// TODO: Should we replace user callback?
|
||||||
// userCallback_ = nullptr;
|
// userCallback_ = nullptr;
|
||||||
|
|
||||||
que_.reset(new thread_queue<const_message_ptr>);
|
que_.reset(new thread_queue<event_type>);
|
||||||
|
|
||||||
int rc = MQTTAsync_setCallbacks(
|
int rc = MQTTAsync_setCallbacks(
|
||||||
cli_, this, &async_client::on_connection_lost, &async_client::on_message_arrived,
|
cli_, this, &async_client::on_connection_lost, &async_client::on_message_arrived,
|
||||||
@ -857,6 +873,26 @@ void async_client::stop_consuming()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const_message_ptr async_client::consume_message()
|
||||||
|
{
|
||||||
|
auto evt = que_->get();
|
||||||
|
if (const auto* pval = std::get_if<message_arrived_event>(&evt))
|
||||||
|
return pval->msg;
|
||||||
|
return const_message_ptr{};
|
||||||
|
}
|
||||||
|
|
||||||
|
bool async_client::try_consume_message(const_message_ptr* msg)
|
||||||
|
{
|
||||||
|
event_type evt;
|
||||||
|
if (!que_->try_get(&evt))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (const auto* pval = std::get_if<message_arrived_event>(&evt))
|
||||||
|
*msg = std::move(pval->msg);
|
||||||
|
else
|
||||||
|
*msg = const_message_ptr{};
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
// end namespace mqtt
|
|
||||||
} // namespace mqtt
|
} // namespace mqtt
|
||||||
|
@ -239,6 +239,9 @@ public:
|
|||||||
const_message_ptr consume_message() override { return const_message_ptr{}; }
|
const_message_ptr consume_message() override { return const_message_ptr{}; }
|
||||||
|
|
||||||
bool try_consume_message(const_message_ptr*) override { return false; }
|
bool try_consume_message(const_message_ptr*) override { return false; }
|
||||||
|
|
||||||
|
event_type consume_event() { return message_arrived_event{const_message_ptr{}}; }
|
||||||
|
bool try_consume_event(event_type* evt) override { return false; }
|
||||||
};
|
};
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
Loading…
x
Reference in New Issue
Block a user