mirror of
https://github.com/eclipse/paho.mqtt.cpp.git
synced 2025-05-09 19:31:22 +08:00
Removed persistence encoding, reworked persistence internals, and created a file persistence example.
This commit is contained in:
parent
c955131ee7
commit
6e1444155c
@ -35,22 +35,19 @@ namespace mqtt {
|
|||||||
// Constructors
|
// Constructors
|
||||||
|
|
||||||
async_client::async_client(const string& serverURI, const string& clientId,
|
async_client::async_client(const string& serverURI, const string& clientId,
|
||||||
const string& persistDir,
|
const string& persistDir)
|
||||||
ipersistence_encoder* encoder /*=nullptr*/)
|
: async_client(serverURI, clientId, 0, persistDir)
|
||||||
: async_client(serverURI, clientId, 0, persistDir, encoder)
|
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
async_client::async_client(const string& serverURI, const string& clientId,
|
async_client::async_client(const string& serverURI, const string& clientId,
|
||||||
iclient_persistence* persistence /*=nullptr*/,
|
iclient_persistence* persistence /*=nullptr*/)
|
||||||
ipersistence_encoder* encoder /*=nullptr*/)
|
: async_client(serverURI, clientId, 0, persistence)
|
||||||
: async_client(serverURI, clientId, 0, persistence, encoder)
|
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
async_client::async_client(const string& serverURI, const string& clientId,
|
async_client::async_client(const string& serverURI, const string& clientId,
|
||||||
int maxBufferedMessages, const string& persistDir,
|
int maxBufferedMessages, const string& persistDir)
|
||||||
ipersistence_encoder* encoder /*=nullptr*/)
|
|
||||||
: serverURI_(serverURI), clientId_(clientId),
|
: serverURI_(serverURI), clientId_(clientId),
|
||||||
mqttVersion_(MQTTVERSION_DEFAULT), userCallback_(nullptr)
|
mqttVersion_(MQTTVERSION_DEFAULT), userCallback_(nullptr)
|
||||||
{
|
{
|
||||||
@ -62,24 +59,20 @@ async_client::async_client(const string& serverURI, const string& clientId,
|
|||||||
&opts.opts_);
|
&opts.opts_);
|
||||||
if (rc != 0)
|
if (rc != 0)
|
||||||
throw exception(rc);
|
throw exception(rc);
|
||||||
|
|
||||||
persistence_encoder(encoder);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async_client::async_client(const string& serverURI, const string& clientId,
|
async_client::async_client(const string& serverURI, const string& clientId,
|
||||||
int maxBufferedMessages,
|
int maxBufferedMessages,
|
||||||
iclient_persistence* persistence /*=nullptr*/,
|
iclient_persistence* persistence /*=nullptr*/)
|
||||||
ipersistence_encoder* encoder /*=nullptr*/)
|
|
||||||
: async_client(serverURI, clientId,
|
: async_client(serverURI, clientId,
|
||||||
create_options(MQTTVERSION_DEFAULT, maxBufferedMessages),
|
create_options(MQTTVERSION_DEFAULT, maxBufferedMessages),
|
||||||
persistence, encoder)
|
persistence)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
async_client::async_client(const string& serverURI, const string& clientId,
|
async_client::async_client(const string& serverURI, const string& clientId,
|
||||||
const create_options& opts,
|
const create_options& opts,
|
||||||
const string& persistDir,
|
const string& persistDir)
|
||||||
ipersistence_encoder* encoder /*=nullptr*/)
|
|
||||||
: serverURI_(serverURI), clientId_(clientId),
|
: serverURI_(serverURI), clientId_(clientId),
|
||||||
mqttVersion_(opts.opts_.MQTTVersion), userCallback_(nullptr)
|
mqttVersion_(opts.opts_.MQTTVersion), userCallback_(nullptr)
|
||||||
{
|
{
|
||||||
@ -89,14 +82,11 @@ async_client::async_client(const string& serverURI, const string& clientId,
|
|||||||
const_cast<MQTTAsync_createOptions*>(&opts.opts_));
|
const_cast<MQTTAsync_createOptions*>(&opts.opts_));
|
||||||
if (rc != 0)
|
if (rc != 0)
|
||||||
throw exception(rc);
|
throw exception(rc);
|
||||||
|
|
||||||
persistence_encoder(encoder);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async_client::async_client(const string& serverURI, const string& clientId,
|
async_client::async_client(const string& serverURI, const string& clientId,
|
||||||
const create_options& opts,
|
const create_options& opts,
|
||||||
iclient_persistence* persistence /*=nullptr*/,
|
iclient_persistence* persistence /*=nullptr*/)
|
||||||
ipersistence_encoder* encoder /*=nullptr*/)
|
|
||||||
: serverURI_(serverURI), clientId_(clientId),
|
: serverURI_(serverURI), clientId_(clientId),
|
||||||
mqttVersion_(opts.opts_.MQTTVersion), userCallback_(nullptr)
|
mqttVersion_(opts.opts_.MQTTVersion), userCallback_(nullptr)
|
||||||
{
|
{
|
||||||
@ -123,9 +113,6 @@ async_client::async_client(const string& serverURI, const string& clientId,
|
|||||||
rc = MQTTAsync_createWithOptions(&cli_, serverURI.c_str(), clientId.c_str(),
|
rc = MQTTAsync_createWithOptions(&cli_, serverURI.c_str(), clientId.c_str(),
|
||||||
MQTTCLIENT_PERSISTENCE_USER, persist_.get(),
|
MQTTCLIENT_PERSISTENCE_USER, persist_.get(),
|
||||||
const_cast<MQTTAsync_createOptions*>(&opts.opts_));
|
const_cast<MQTTAsync_createOptions*>(&opts.opts_));
|
||||||
|
|
||||||
if (rc == 0)
|
|
||||||
persistence_encoder(encoder);
|
|
||||||
}
|
}
|
||||||
if (rc != 0)
|
if (rc != 0)
|
||||||
throw exception(rc);
|
throw exception(rc);
|
||||||
@ -364,14 +351,6 @@ void async_client::remove_token(token* tok)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void async_client::persistence_encoder(ipersistence_encoder* encoder)
|
|
||||||
{
|
|
||||||
if (encoder && cli_) {
|
|
||||||
MQTTAsync_setBeforePersistenceWrite(cli_, encoder, &ipersistence_encoder::before_write);
|
|
||||||
MQTTAsync_setAfterPersistenceRead(cli_, encoder, &ipersistence_encoder::after_read);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// --------------------------------------------------------------------------
|
// --------------------------------------------------------------------------
|
||||||
// Callback management
|
// Callback management
|
||||||
|
|
||||||
|
@ -32,43 +32,38 @@ const std::chrono::minutes client::DFLT_TIMEOUT = std::chrono::minutes(5);
|
|||||||
/////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
client::client(const string& serverURI, const string& clientId,
|
client::client(const string& serverURI, const string& clientId,
|
||||||
iclient_persistence* persistence /*=nullptr*/,
|
iclient_persistence* persistence /*=nullptr*/)
|
||||||
ipersistence_encoder* encoder /*=nullptr*/)
|
: cli_(serverURI, clientId, persistence),
|
||||||
: cli_(serverURI, clientId, persistence, encoder),
|
|
||||||
timeout_(DFLT_TIMEOUT), userCallback_(nullptr)
|
timeout_(DFLT_TIMEOUT), userCallback_(nullptr)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
client::client(const string& serverURI, const string& clientId,
|
client::client(const string& serverURI, const string& clientId,
|
||||||
const string& persistDir,
|
const string& persistDir)
|
||||||
ipersistence_encoder* encoder /*=nullptr*/)
|
: cli_(serverURI, clientId, persistDir),
|
||||||
: cli_(serverURI, clientId, persistDir, encoder),
|
|
||||||
timeout_(DFLT_TIMEOUT), userCallback_(nullptr)
|
timeout_(DFLT_TIMEOUT), userCallback_(nullptr)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
client::client(const string& serverURI, const string& clientId,
|
client::client(const string& serverURI, const string& clientId,
|
||||||
int maxBufferedMessages,
|
int maxBufferedMessages,
|
||||||
iclient_persistence* persistence /*=nullptr*/,
|
iclient_persistence* persistence /*=nullptr*/)
|
||||||
ipersistence_encoder* encoder /*=nullptr*/)
|
: cli_(serverURI, clientId, maxBufferedMessages, persistence),
|
||||||
: cli_(serverURI, clientId, maxBufferedMessages, persistence, encoder),
|
|
||||||
timeout_(DFLT_TIMEOUT), userCallback_(nullptr)
|
timeout_(DFLT_TIMEOUT), userCallback_(nullptr)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
client::client(const string& serverURI, const string& clientId,
|
client::client(const string& serverURI, const string& clientId,
|
||||||
int maxBufferedMessages, const string& persistDir,
|
int maxBufferedMessages, const string& persistDir)
|
||||||
ipersistence_encoder* encoder /*=nullptr*/)
|
: cli_(serverURI, clientId, maxBufferedMessages, persistDir),
|
||||||
: cli_(serverURI, clientId, maxBufferedMessages, persistDir, encoder),
|
|
||||||
timeout_(DFLT_TIMEOUT), userCallback_(nullptr)
|
timeout_(DFLT_TIMEOUT), userCallback_(nullptr)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
client::client(const string& serverURI, const string& clientId,
|
client::client(const string& serverURI, const string& clientId,
|
||||||
const create_options& opts,
|
const create_options& opts,
|
||||||
iclient_persistence* persistence /*=nullptr*/,
|
iclient_persistence* persistence /*=nullptr*/)
|
||||||
ipersistence_encoder* encoder /*=nullptr*/)
|
: cli_(serverURI, clientId, opts, persistence),
|
||||||
: cli_(serverURI, clientId, opts, persistence, encoder),
|
|
||||||
timeout_(DFLT_TIMEOUT), userCallback_(nullptr)
|
timeout_(DFLT_TIMEOUT), userCallback_(nullptr)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
@ -83,10 +83,10 @@ int iclient_persistence::persistence_get(void* handle, char* key,
|
|||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
if (handle && key && buffer && buflen) {
|
if (handle && key && buffer && buflen) {
|
||||||
auto sv = static_cast<iclient_persistence*>(handle)->get(key);
|
auto s = static_cast<iclient_persistence*>(handle)->get(key);
|
||||||
size_t n = sv.length();
|
size_t n = s.length();
|
||||||
*buffer = static_cast<char*>(MQTTAsync_malloc(n));
|
*buffer = static_cast<char*>(MQTTAsync_malloc(n));
|
||||||
memcpy(*buffer, sv.data(), n);
|
memcpy(*buffer, s.data(), n);
|
||||||
*buflen = int(n);
|
*buflen = int(n);
|
||||||
return MQTTASYNC_SUCCESS;
|
return MQTTASYNC_SUCCESS;
|
||||||
}
|
}
|
||||||
@ -113,10 +113,20 @@ int iclient_persistence::persistence_keys(void* handle, char*** keys, int* nkeys
|
|||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
if (handle && keys && nkeys) {
|
if (handle && keys && nkeys) {
|
||||||
auto& k = static_cast<iclient_persistence*>(handle)->keys();
|
auto k = static_cast<iclient_persistence*>(handle)->keys();
|
||||||
size_t n = k.size();
|
size_t n = k.size();
|
||||||
*nkeys = int(n);
|
*nkeys = int(n);
|
||||||
*keys = (n == 0) ? nullptr : const_cast<char**>(k.c_arr());
|
if (n == 0) {
|
||||||
|
*keys = nullptr;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
*keys = static_cast<char**>(MQTTAsync_malloc(n*sizeof(char*)));
|
||||||
|
for (size_t i=0; i<n; ++i) {
|
||||||
|
char* buf = static_cast<char*>(MQTTAsync_malloc(k[i].size()+1));
|
||||||
|
strcpy(buf, k[i].c_str());
|
||||||
|
(*keys)[i] = buf;
|
||||||
|
}
|
||||||
|
}
|
||||||
return MQTTASYNC_SUCCESS;
|
return MQTTASYNC_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -150,44 +160,6 @@ int iclient_persistence::persistence_containskey(void* handle, char* key)
|
|||||||
return MQTTCLIENT_PERSISTENCE_ERROR;
|
return MQTTCLIENT_PERSISTENCE_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////
|
|
||||||
// Encoder
|
|
||||||
|
|
||||||
// Callback before writing the data to persistence.
|
|
||||||
// It allows the application to encode/encrypt the data.
|
|
||||||
int ipersistence_encoder::before_write(void* context, int nbuf, char* bufs[], int buflens[])
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
if (context && nbuf > 0 && bufs && buflens) {
|
|
||||||
vector<size_t> lens;
|
|
||||||
for (int i=0; i<nbuf; ++i)
|
|
||||||
lens.push_back(size_t(buflens[i]));
|
|
||||||
static_cast<ipersistence_encoder*>(context)->encode(size_t(nbuf), bufs, &lens[0]);
|
|
||||||
for (int i=0; i<nbuf; ++i)
|
|
||||||
buflens[i] = int(lens[i]);
|
|
||||||
return MQTTASYNC_SUCCESS;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (...) {}
|
|
||||||
return MQTTCLIENT_PERSISTENCE_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Callback after reading the data from persistence.
|
|
||||||
// It allows the application to decode/decrypt the data.
|
|
||||||
int ipersistence_encoder::after_read(void* context, char** buf, int* buflen)
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
if (context && buf && *buf && buflen && *buflen > 0) {
|
|
||||||
size_t len = *buflen;
|
|
||||||
static_cast<ipersistence_encoder*>(context)->decode(buf, &len);
|
|
||||||
*buflen = int(len);
|
|
||||||
return MQTTASYNC_SUCCESS;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (...) {}
|
|
||||||
return MQTTCLIENT_PERSISTENCE_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
// end namespace mqtt
|
// end namespace mqtt
|
||||||
}
|
}
|
||||||
|
@ -159,9 +159,6 @@ private:
|
|||||||
throw exception(rc);
|
throw exception(rc);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Installs a persistence encoder/decoder */
|
|
||||||
void persistence_encoder(ipersistence_encoder* encoder);
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
* Create an async_client that can be used to communicate with an MQTT
|
* Create an async_client that can be used to communicate with an MQTT
|
||||||
@ -172,13 +169,10 @@ public:
|
|||||||
* @param clientId a client identifier that is unique on the server
|
* @param clientId a client identifier that is unique on the server
|
||||||
* being connected to
|
* being connected to
|
||||||
* @param persistDir The directory to use for persistence data
|
* @param persistDir The directory to use for persistence data
|
||||||
* @param encoder An object to encode and decode the persistence data.
|
|
||||||
* @throw exception if an argument is invalid
|
* @throw exception if an argument is invalid
|
||||||
*/
|
*/
|
||||||
async_client(const string& serverURI, const string& clientId,
|
async_client(const string& serverURI, const string& clientId,
|
||||||
const string& persistDir,
|
const string& persistDir);
|
||||||
ipersistence_encoder* encoder=nullptr);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create an async_client that can be used to communicate with an MQTT
|
* Create an async_client that can be used to communicate with an MQTT
|
||||||
* server.
|
* server.
|
||||||
@ -190,12 +184,10 @@ public:
|
|||||||
* being connected to
|
* being connected to
|
||||||
* @param persistence The user persistence structure. If this is null,
|
* @param persistence The user persistence structure. If this is null,
|
||||||
* then no persistence is used.
|
* then no persistence is used.
|
||||||
* @param encoder An object to encode and decode the persistence data.
|
|
||||||
* @throw exception if an argument is invalid
|
* @throw exception if an argument is invalid
|
||||||
*/
|
*/
|
||||||
async_client(const string& serverURI, const string& clientId,
|
async_client(const string& serverURI, const string& clientId,
|
||||||
iclient_persistence* persistence=nullptr,
|
iclient_persistence* persistence=nullptr);
|
||||||
ipersistence_encoder* encoder=nullptr);
|
|
||||||
/**
|
/**
|
||||||
* Create an async_client that can be used to communicate with an MQTT
|
* Create an async_client that can be used to communicate with an MQTT
|
||||||
* server, which allows for off-line message buffering.
|
* server, which allows for off-line message buffering.
|
||||||
@ -207,12 +199,10 @@ public:
|
|||||||
* @param maxBufferedMessages the maximum number of messages allowed to
|
* @param maxBufferedMessages the maximum number of messages allowed to
|
||||||
* be buffered while not connected
|
* be buffered while not connected
|
||||||
* @param persistDir The directory to use for persistence data
|
* @param persistDir The directory to use for persistence data
|
||||||
* @param encoder An object to encode and decode the persistence data.
|
|
||||||
* @throw exception if an argument is invalid
|
* @throw exception if an argument is invalid
|
||||||
*/
|
*/
|
||||||
async_client(const string& serverURI, const string& clientId,
|
async_client(const string& serverURI, const string& clientId,
|
||||||
int maxBufferedMessages, const string& persistDir,
|
int maxBufferedMessages, const string& persistDir);
|
||||||
ipersistence_encoder* encoder=nullptr);
|
|
||||||
/**
|
/**
|
||||||
* Create an async_client that can be used to communicate with an MQTT
|
* Create an async_client that can be used to communicate with an MQTT
|
||||||
* server, which allows for off-line message buffering.
|
* server, which allows for off-line message buffering.
|
||||||
@ -226,13 +216,11 @@ public:
|
|||||||
* be buffered while not connected
|
* be buffered while not connected
|
||||||
* @param persistence The user persistence structure. If this is null,
|
* @param persistence The user persistence structure. If this is null,
|
||||||
* then no persistence is used.
|
* then no persistence is used.
|
||||||
* @param encoder An object to encode and decode the persistence data.
|
|
||||||
* @throw exception if an argument is invalid
|
* @throw exception if an argument is invalid
|
||||||
*/
|
*/
|
||||||
async_client(const string& serverURI, const string& clientId,
|
async_client(const string& serverURI, const string& clientId,
|
||||||
int maxBufferedMessages,
|
int maxBufferedMessages,
|
||||||
iclient_persistence* persistence=nullptr,
|
iclient_persistence* persistence=nullptr);
|
||||||
ipersistence_encoder* encoder=nullptr);
|
|
||||||
/**
|
/**
|
||||||
* Create an async_client that can be used to communicate with an MQTT
|
* Create an async_client that can be used to communicate with an MQTT
|
||||||
* server, which allows for off-line message buffering.
|
* server, which allows for off-line message buffering.
|
||||||
@ -243,12 +231,10 @@ public:
|
|||||||
* being connected to
|
* being connected to
|
||||||
* @param opts The create options
|
* @param opts The create options
|
||||||
* @param persistDir The directory to use for persistence data
|
* @param persistDir The directory to use for persistence data
|
||||||
* @param encoder An object to encode and decode the persistence data.
|
|
||||||
* @throw exception if an argument is invalid
|
* @throw exception if an argument is invalid
|
||||||
*/
|
*/
|
||||||
async_client(const string& serverURI, const string& clientId,
|
async_client(const string& serverURI, const string& clientId,
|
||||||
const create_options& opts, const string& persistDir,
|
const create_options& opts, const string& persistDir);
|
||||||
ipersistence_encoder* encoder=nullptr);
|
|
||||||
/**
|
/**
|
||||||
* Create an async_client that can be used to communicate with an MQTT
|
* Create an async_client that can be used to communicate with an MQTT
|
||||||
* server, which allows for off-line message buffering.
|
* server, which allows for off-line message buffering.
|
||||||
@ -261,13 +247,11 @@ public:
|
|||||||
* @param opts The create options
|
* @param opts The create options
|
||||||
* @param persistence The user persistence structure. If this is null,
|
* @param persistence The user persistence structure. If this is null,
|
||||||
* then no persistence is used.
|
* then no persistence is used.
|
||||||
* @param encoder An object to encode and decode the persistence data.
|
|
||||||
* @throw exception if an argument is invalid
|
* @throw exception if an argument is invalid
|
||||||
*/
|
*/
|
||||||
async_client(const string& serverURI, const string& clientId,
|
async_client(const string& serverURI, const string& clientId,
|
||||||
const create_options& opts,
|
const create_options& opts,
|
||||||
iclient_persistence* persistence=nullptr,
|
iclient_persistence* persistence=nullptr);
|
||||||
ipersistence_encoder* encoder=nullptr);
|
|
||||||
/**
|
/**
|
||||||
* Destructor
|
* Destructor
|
||||||
*/
|
*/
|
||||||
|
@ -107,8 +107,7 @@ public:
|
|||||||
* then no persistence is used.
|
* then no persistence is used.
|
||||||
*/
|
*/
|
||||||
client(const string& serverURI, const string& clientId,
|
client(const string& serverURI, const string& clientId,
|
||||||
iclient_persistence* persistence=nullptr,
|
iclient_persistence* persistence=nullptr);
|
||||||
ipersistence_encoder* encoder=nullptr);
|
|
||||||
/**
|
/**
|
||||||
* Create an async_client that can be used to communicate with an MQTT
|
* Create an async_client that can be used to communicate with an MQTT
|
||||||
* server.
|
* server.
|
||||||
@ -120,8 +119,7 @@ public:
|
|||||||
* @param persistDir The directory to use for persistence data
|
* @param persistDir The directory to use for persistence data
|
||||||
*/
|
*/
|
||||||
client(const string& serverURI, const string& clientId,
|
client(const string& serverURI, const string& clientId,
|
||||||
const string& persistDir,
|
const string& persistDir);
|
||||||
ipersistence_encoder* encoder=nullptr);
|
|
||||||
/**
|
/**
|
||||||
* Create a client that can be used to communicate with an MQTT server,
|
* Create a client that can be used to communicate with an MQTT server,
|
||||||
* which allows for off-line message buffering.
|
* which allows for off-line message buffering.
|
||||||
@ -138,8 +136,7 @@ public:
|
|||||||
*/
|
*/
|
||||||
client(const string& serverURI, const string& clientId,
|
client(const string& serverURI, const string& clientId,
|
||||||
int maxBufferedMessages,
|
int maxBufferedMessages,
|
||||||
iclient_persistence* persistence=nullptr,
|
iclient_persistence* persistence=nullptr);
|
||||||
ipersistence_encoder* encoder=nullptr);
|
|
||||||
/**
|
/**
|
||||||
* Create a client that can be used to communicate with an MQTT server,
|
* Create a client that can be used to communicate with an MQTT server,
|
||||||
* which allows for off-line message buffering.
|
* which allows for off-line message buffering.
|
||||||
@ -151,11 +148,9 @@ public:
|
|||||||
* @param maxBufferedMessages the maximum number of messages allowed to
|
* @param maxBufferedMessages the maximum number of messages allowed to
|
||||||
* be buffered while not connected
|
* be buffered while not connected
|
||||||
* @param persistDir The directory to use for persistence data
|
* @param persistDir The directory to use for persistence data
|
||||||
* @param encoder An object to encode and decode the persistence data.
|
|
||||||
*/
|
*/
|
||||||
client(const string& serverURI, const string& clientId,
|
client(const string& serverURI, const string& clientId,
|
||||||
int maxBufferedMessages, const string& persistDir,
|
int maxBufferedMessages, const string& persistDir);
|
||||||
ipersistence_encoder* encoder=nullptr);
|
|
||||||
/**
|
/**
|
||||||
* Create an async_client that can be used to communicate with an MQTT
|
* Create an async_client that can be used to communicate with an MQTT
|
||||||
* server, which allows for off-line message buffering.
|
* server, which allows for off-line message buffering.
|
||||||
@ -168,13 +163,10 @@ public:
|
|||||||
* @param opts The create options
|
* @param opts The create options
|
||||||
* @param persistence The user persistence structure. If this is null,
|
* @param persistence The user persistence structure. If this is null,
|
||||||
* then no persistence is used.
|
* then no persistence is used.
|
||||||
* @param encoder An object to encode and decode the persistence data.
|
|
||||||
* @param encoder An object to encode and decode the persistence data.
|
|
||||||
*/
|
*/
|
||||||
client(const string& serverURI, const string& clientId,
|
client(const string& serverURI, const string& clientId,
|
||||||
const create_options& opts,
|
const create_options& opts,
|
||||||
iclient_persistence* persistence=nullptr,
|
iclient_persistence* persistence=nullptr);
|
||||||
ipersistence_encoder* encoder=nullptr);
|
|
||||||
/**
|
/**
|
||||||
* Virtual destructor
|
* Virtual destructor
|
||||||
*/
|
*/
|
||||||
|
@ -120,7 +120,7 @@ public:
|
|||||||
* Returns a collection of keys in this persistent data store.
|
* Returns a collection of keys in this persistent data store.
|
||||||
* @return A collection of strings representing the keys in the store.
|
* @return A collection of strings representing the keys in the store.
|
||||||
*/
|
*/
|
||||||
virtual const string_collection& keys() const =0;
|
virtual string_collection keys() const =0;
|
||||||
/**
|
/**
|
||||||
* Puts the specified data into the persistent store.
|
* Puts the specified data into the persistent store.
|
||||||
* @param key The key.
|
* @param key The key.
|
||||||
@ -132,7 +132,7 @@ public:
|
|||||||
* @param key The key
|
* @param key The key
|
||||||
* @return A const view of the data associated with the key.
|
* @return A const view of the data associated with the key.
|
||||||
*/
|
*/
|
||||||
virtual string_view get(const string& key) const =0;
|
virtual string get(const string& key) const =0;
|
||||||
/**
|
/**
|
||||||
* Remove the data for the specified key.
|
* Remove the data for the specified key.
|
||||||
* @param key The key
|
* @param key The key
|
||||||
@ -146,67 +146,6 @@ using iclient_persistence_ptr = iclient_persistence::ptr_t;
|
|||||||
/** Smart/shared pointer to a persistence client */
|
/** Smart/shared pointer to a persistence client */
|
||||||
using const_iclient_persistence_ptr = iclient_persistence::const_ptr_t;
|
using const_iclient_persistence_ptr = iclient_persistence::const_ptr_t;
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Interface for objects to encode and decode data going to and from the
|
|
||||||
* persistence store.
|
|
||||||
*
|
|
||||||
* This is typically used to encrypt the data before writing to
|
|
||||||
* persistence, and then decrypt it when reading it back from persistence.
|
|
||||||
*
|
|
||||||
* For optimized performance, the application can perform encoding in-place
|
|
||||||
* with each of the supplied buffers, if the resulting data fits. But, if
|
|
||||||
* not, it's left to the application to do its own memory management with
|
|
||||||
* @ref persistence_malloc() and @ref persistence_free().
|
|
||||||
*/
|
|
||||||
class ipersistence_encoder
|
|
||||||
{
|
|
||||||
friend class async_client;
|
|
||||||
|
|
||||||
/** Callbacks from the C library */
|
|
||||||
static int before_write(void* context, int bufcount, char* buffers[], int buflens[]);
|
|
||||||
static int after_read(void* context, char** buffer, int* buflen);
|
|
||||||
|
|
||||||
public:
|
|
||||||
/**
|
|
||||||
* Virtual destructor.
|
|
||||||
*/
|
|
||||||
virtual ~ipersistence_encoder() {}
|
|
||||||
/**
|
|
||||||
* Callback to let the application encode data before writing it to
|
|
||||||
* persistence.
|
|
||||||
*
|
|
||||||
* This is called just prior to writing the data to persistence.
|
|
||||||
*
|
|
||||||
* If the encoded data fits into each of the supplied buffers, the
|
|
||||||
* encoding can be done in place. If a buffer needs to grow, the
|
|
||||||
* application can call @ref persistence_malloc() to get a new buffer,
|
|
||||||
* and update the pointer. It then needs to deallocate the old buffer.
|
|
||||||
* In either case it should update the new size of the buffer.
|
|
||||||
*
|
|
||||||
* @param nbuf The number of buffers to encode.
|
|
||||||
* @param bufs The addresses of the data buffers to be encoded.
|
|
||||||
* @param lens The length of each buffer.
|
|
||||||
*/
|
|
||||||
virtual void encode(size_t nbuf, char* bufs[], size_t lens[]) =0;
|
|
||||||
/**
|
|
||||||
* Callback to let the application decode data after it is retrieved
|
|
||||||
* from persistence.
|
|
||||||
*
|
|
||||||
* If the decoded data fits into the supplied buffer, the decoding can
|
|
||||||
* be done in place. If the buffer needs to grow, the application can
|
|
||||||
* call @ref persistence_malloc() to get a new buffer, and update the
|
|
||||||
* pointer. It then needs to deallocate the old buffer. In either case
|
|
||||||
* it should update the new size of the buffer.
|
|
||||||
*
|
|
||||||
* @param pbuf Pointer to the data buffer to decoded.
|
|
||||||
* @param len Pointer to the length of the buffer.
|
|
||||||
*/
|
|
||||||
virtual void decode(char** pbuf, size_t* len) =0;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
// end namespace mqtt
|
// end namespace mqtt
|
||||||
}
|
}
|
||||||
|
@ -41,11 +41,6 @@
|
|||||||
* Frank Pagliughi - initial implementation and documentation
|
* Frank Pagliughi - initial implementation and documentation
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
|
|
||||||
// Don't worry about localtime() in this context
|
|
||||||
#if defined(_WIN32)
|
|
||||||
#define _CRT_SECURE_NO_WARNINGS
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#include <random>
|
#include <random>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
@ -56,6 +51,17 @@
|
|||||||
#include <ctime>
|
#include <ctime>
|
||||||
#include "mqtt/async_client.h"
|
#include "mqtt/async_client.h"
|
||||||
|
|
||||||
|
// Don't worry about localtime() in this context
|
||||||
|
#if defined(_WIN32)
|
||||||
|
#define _CRT_SECURE_NO_WARNINGS
|
||||||
|
#else
|
||||||
|
#include <sys/stat.h>
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <dirent.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <fstream>
|
||||||
|
#endif
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
using namespace std::chrono;
|
using namespace std::chrono;
|
||||||
|
|
||||||
@ -72,49 +78,136 @@ const int MAX_BUFFERED_MSGS = 120; // 120 * 5sec => 10min off-line buffering
|
|||||||
const string PERSIST_DIR { "data-persist" };
|
const string PERSIST_DIR { "data-persist" };
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Example of a simple, in-memory persistence class.
|
||||||
|
|
||||||
class persistence_encoder : virtual public mqtt::ipersistence_encoder
|
class encoded_file_persistence : virtual public mqtt::iclient_persistence
|
||||||
{
|
{
|
||||||
// XOR bit mask for data.
|
// The name of the store
|
||||||
uint16_t mask_;
|
// Used as the directory name
|
||||||
|
std::string name_;
|
||||||
|
|
||||||
/**
|
// Whether the store is open
|
||||||
* Callback to let the application encode data before writing it to
|
bool open_;
|
||||||
* persistence.
|
|
||||||
*/
|
|
||||||
void encode(size_t nbuf, char* bufs[], size_t lens[]) override {
|
|
||||||
for (size_t i=0; i<nbuf; ++i) {
|
|
||||||
auto sz = lens[i];
|
|
||||||
auto buf16 = static_cast<uint16_t*>(mqtt::persistence_malloc(sz*2));
|
|
||||||
|
|
||||||
for (size_t j=0; j<sz; ++j)
|
// A key for encoding the data
|
||||||
buf16[j] = uint16_t(bufs[i][j] ^ mask_);
|
std::string encodeKey_;
|
||||||
|
|
||||||
mqtt::persistence_free(bufs[i]);
|
|
||||||
bufs[i] = reinterpret_cast<char*>(buf16);
|
|
||||||
lens[i] = sz*2;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Callback to let the application decode data after it is retrieved
|
|
||||||
* from persistence.
|
|
||||||
*
|
|
||||||
* We do an in-place decode taking care with the overlapped data.
|
|
||||||
*/
|
|
||||||
void decode(char** pbuf, size_t* len) override {
|
|
||||||
cout << "Decoding buffer @: 0x" << pbuf << endl;
|
|
||||||
char* buf = *pbuf;
|
|
||||||
uint16_t* buf16 = reinterpret_cast<uint16_t*>(*pbuf);
|
|
||||||
size_t sz = *len / 2;
|
|
||||||
|
|
||||||
for (size_t i=0; i<sz; ++i)
|
|
||||||
buf[i] = char(buf16[i] ^ mask_);
|
|
||||||
|
|
||||||
*len = sz;
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
persistence_encoder() : mask_(0x0055) {}
|
encoded_file_persistence(const std::string& encodeKey)
|
||||||
|
: open_(false), encodeKey_(encodeKey) {}
|
||||||
|
|
||||||
|
// "Open" the store
|
||||||
|
void open(const std::string& clientId, const std::string& serverURI) override {
|
||||||
|
if (open_)
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (clientId.empty() || serverURI.empty())
|
||||||
|
throw mqtt::persistence_exception();
|
||||||
|
|
||||||
|
name_ = serverURI + "-" + clientId;
|
||||||
|
for (char& c : name_) {
|
||||||
|
if (c == ':')
|
||||||
|
c = '-';
|
||||||
|
}
|
||||||
|
|
||||||
|
mkdir(name_.c_str(), S_IRWXU | S_IRWXG);
|
||||||
|
open_ = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close the persistent store that was previously opened.
|
||||||
|
void close() override {
|
||||||
|
if (!open_)
|
||||||
|
return;
|
||||||
|
|
||||||
|
rmdir(name_.c_str());
|
||||||
|
open_ = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clears persistence, so that it no longer contains any persisted data.
|
||||||
|
void clear() override {
|
||||||
|
DIR* dir = opendir(name_.c_str());
|
||||||
|
if (!dir)
|
||||||
|
return;
|
||||||
|
|
||||||
|
dirent *next;
|
||||||
|
while ((next = readdir(dir)) != nullptr) {
|
||||||
|
auto fname = std::string(next->d_name);
|
||||||
|
if (fname == "." || fname == "..") continue;
|
||||||
|
std::string path = name_ + "/" + fname;
|
||||||
|
remove(path.c_str());
|
||||||
|
}
|
||||||
|
closedir(dir);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns whether or not data is persisted using the specified key.
|
||||||
|
bool contains_key(const std::string& key) override {
|
||||||
|
DIR* dir = opendir(name_.c_str());
|
||||||
|
if (!dir)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
dirent *next;
|
||||||
|
while ((next = readdir(dir)) != nullptr) {
|
||||||
|
if (std::string(next->d_name) == key) {
|
||||||
|
closedir(dir);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
closedir(dir);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns the keys in this persistent data store.
|
||||||
|
mqtt::string_collection keys() const override {
|
||||||
|
mqtt::string_collection ks;
|
||||||
|
DIR* dir = opendir(name_.c_str());
|
||||||
|
if (!dir)
|
||||||
|
return ks;
|
||||||
|
|
||||||
|
dirent *next;
|
||||||
|
while ((next = readdir(dir)) != nullptr) {
|
||||||
|
auto fname = std::string(next->d_name);
|
||||||
|
if (fname == "." || fname == "..") continue;
|
||||||
|
ks.push_back(fname);
|
||||||
|
}
|
||||||
|
|
||||||
|
closedir(dir);
|
||||||
|
return ks;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Puts the specified data into the persistent store.
|
||||||
|
void put(const std::string& key, const std::vector<mqtt::string_view>& bufs) override {
|
||||||
|
std::string path = name_ + "/" + key;
|
||||||
|
|
||||||
|
ofstream os(path, ios_base::binary);
|
||||||
|
if (!os)
|
||||||
|
throw mqtt::persistence_exception();
|
||||||
|
|
||||||
|
for (const auto& b : bufs)
|
||||||
|
os.write(b.data(), b.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Gets the specified data out of the persistent store.
|
||||||
|
std::string get(const std::string& key) const override {
|
||||||
|
std::string path = name_ + "/" + key;
|
||||||
|
|
||||||
|
ifstream is(path, ios_base::ate|ios_base::binary);
|
||||||
|
if (!is)
|
||||||
|
throw mqtt::persistence_exception();
|
||||||
|
|
||||||
|
streamsize sz = is.tellg();
|
||||||
|
is.seekg(0);
|
||||||
|
std::string s(sz, '\0');
|
||||||
|
is.read(&s[0], sz);
|
||||||
|
if (is.gcount() < sz)
|
||||||
|
s.resize(is.gcount());
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove the data for the specified key.
|
||||||
|
void remove(const std::string &key) override {
|
||||||
|
std::string path = name_ + "/" + key;
|
||||||
|
::remove(path.c_str());
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
@ -123,9 +216,8 @@ int main(int argc, char* argv[])
|
|||||||
{
|
{
|
||||||
string address = (argc > 1) ? string(argv[1]) : DFLT_ADDRESS;
|
string address = (argc > 1) ? string(argv[1]) : DFLT_ADDRESS;
|
||||||
|
|
||||||
persistence_encoder encoder;
|
encoded_file_persistence persist("elephant");
|
||||||
mqtt::async_client cli(address, CLIENT_ID, MAX_BUFFERED_MSGS,
|
mqtt::async_client cli(address, CLIENT_ID, MAX_BUFFERED_MSGS, &persist);
|
||||||
PERSIST_DIR, &encoder);
|
|
||||||
|
|
||||||
auto connOpts = mqtt::connect_options_builder()
|
auto connOpts = mqtt::connect_options_builder()
|
||||||
.keep_alive_interval(MAX_BUFFERED_MSGS * PERIOD)
|
.keep_alive_interval(MAX_BUFFERED_MSGS * PERIOD)
|
||||||
@ -176,7 +268,7 @@ int main(int argc, char* argv[])
|
|||||||
tm += PERIOD;
|
tm += PERIOD;
|
||||||
|
|
||||||
// TODO: Get rid of this
|
// TODO: Get rid of this
|
||||||
break;
|
if (nsample == 5) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Disconnect
|
// Disconnect
|
||||||
|
@ -86,9 +86,8 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Returns the keys in this persistent data store.
|
// Returns the keys in this persistent data store.
|
||||||
const mqtt::string_collection& keys() const override {
|
mqtt::string_collection keys() const override {
|
||||||
static mqtt::string_collection ks;
|
mqtt::string_collection ks;
|
||||||
ks.clear();
|
|
||||||
for (const auto& k : store_)
|
for (const auto& k : store_)
|
||||||
ks.push_back(k.first);
|
ks.push_back(k.first);
|
||||||
return ks;
|
return ks;
|
||||||
@ -105,7 +104,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Gets the specified data out of the persistent store.
|
// Gets the specified data out of the persistent store.
|
||||||
mqtt::string_view get(const std::string& key) const override {
|
std::string get(const std::string& key) const override {
|
||||||
std::cout << "[Searching persistence for key '"
|
std::cout << "[Searching persistence for key '"
|
||||||
<< key << "']" << std::endl;
|
<< key << "']" << std::endl;
|
||||||
auto p = store_.find(key);
|
auto p = store_.find(key);
|
||||||
@ -114,7 +113,7 @@ public:
|
|||||||
std::cout << "[Found persistence data for key '"
|
std::cout << "[Found persistence data for key '"
|
||||||
<< key << "']" << std::endl;
|
<< key << "']" << std::endl;
|
||||||
|
|
||||||
return mqtt::string_view(p->second);
|
return p->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove the data for the specified key.
|
// Remove the data for the specified key.
|
||||||
|
@ -87,9 +87,8 @@ public:
|
|||||||
|
|
||||||
// Returns the keys in this persistent data store.
|
// Returns the keys in this persistent data store.
|
||||||
// This could be more efficient, but you get the point.
|
// This could be more efficient, but you get the point.
|
||||||
const mqtt::string_collection& keys() const override {
|
mqtt::string_collection keys() const override {
|
||||||
static mqtt::string_collection ks;
|
mqtt::string_collection ks;
|
||||||
ks.clear();
|
|
||||||
for (const auto& k : store_)
|
for (const auto& k : store_)
|
||||||
ks.push_back(k.first);
|
ks.push_back(k.first);
|
||||||
return ks;
|
return ks;
|
||||||
@ -104,11 +103,11 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Gets the specified data out of the persistent store.
|
// Gets the specified data out of the persistent store.
|
||||||
mqtt::string_view get(const std::string& key) const override {
|
std::string get(const std::string& key) const override {
|
||||||
auto p = store_.find(key);
|
auto p = store_.find(key);
|
||||||
if (p == store_.end())
|
if (p == store_.end())
|
||||||
throw mqtt::persistence_exception();
|
throw mqtt::persistence_exception();
|
||||||
return mqtt::string_view(p->second);
|
return p->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove the data for the specified key.
|
// Remove the data for the specified key.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user