1
0
mirror of https://github.com/eclipse/paho.mqtt.cpp.git synced 2025-05-09 19:31:22 +08:00

Initial commit

This commit is contained in:
Frank Pagliughi 2013-09-03 16:21:24 +01:00 committed by Ian Craggs
commit 62fce2e106
35 changed files with 6448 additions and 0 deletions

136
Makefile Normal file
View File

@ -0,0 +1,136 @@
# Make include file for the mqttpp library
#
# This will compile all the .cpp files in the directory into the library.
# Any files to be excluded should be placed in the variable SRC_IGNORE, like:
# SRC_IGNORE = this.c that.cpp
#
MODULE = mqttpp
# ----- Tools -----
ifndef VERBOSE
QUIET := @
endif
# ----- Directories -----
LIB_DIR ?= lib
OBJ_DIR ?= obj
INC_DIR ?= .
PAHO_C_LIB ?= /home/fmp/static/opensrc/mqtt/paho/org.eclipse.paho.mqtt.c
INC_DIRS += $(INC_DIR) $(PAHO_C_LIB)/src
_MK_OBJ_DIR := $(shell mkdir -p $(OBJ_DIR))
_MK_LIB_DIR := $(shell mkdir -p $(LIB_DIR))
# ----- Definitions for the shared library -----
LIB_BASE ?= $(MODULE)
LIB_MAJOR ?= 0
LIB_MINOR ?= 1
LIB_LINK = lib$(LIB_BASE).so
LIB_MAJOR_LINK = $(LIB_LINK).$(LIB_MAJOR)
LIB = $(LIB_MAJOR_LINK).$(LIB_MINOR)
TGT = $(LIB_DIR)/$(LIB)
# ----- Sources -----
SRCS += $(wildcard *.cpp)
ifdef SRC_IGNORE
SRCS := $(filter-out $(SRC_IGNORE),$(SRCS))
endif
OBJS := $(addprefix $(OBJ_DIR)/,$(SRCS:.cpp=.o))
DEPS := $(OBJS:.o=.dep)
# ----- Compiler flags, etc -----
CXXFLAGS += -std=c++0x
CPPFLAGS += -Wall -fPIC
ifdef DEBUG
DEFS += DEBUG
CPPFLAGS += -g -O0
else
DEFS += _NDEBUG
CPPFLAGS += -O2 -Wno-unused-result -Werror
endif
CPPFLAGS += $(addprefix -D,$(DEFS)) $(addprefix -I,$(INC_DIRS))
LIB_DEPS += c stdc++ pthread
LIB_DEP_FLAGS += $(addprefix -l,$(LIB_DEPS))
LDFLAGS := -g -shared -Wl,-soname,$(LIB_MAJOR_LINK) -L$(LIB_DIR)
# ----- Compiler directives -----
$(OBJ_DIR)/%.o: %.cpp
@echo $(CXX) $<
$(QUIET) $(COMPILE.cpp) $(OUTPUT_OPTION) $<
# ----- Build targets -----
.PHONY: all
all: $(TGT) $(LIB_DIR)/$(LIB_LINK) $(LIB_DIR)/$(LIB_MAJOR_LINK)
# This might work for a static library
#$(TGT): $(OBJS)
# @echo Creating library: $@
# $(QUIET) $(AR) $(ARFLAGS) $@ $^
$(TGT): $(OBJS)
@echo Creating library: $@
$(QUIET) $(CC) $(LDFLAGS) -o $@ $^ $(LIB_DEP_FLAGS)
$(LIB_DIR)/$(LIB_LINK): $(LIB_DIR)/$(LIB_MAJOR_LINK)
$(QUIET) cd $(LIB_DIR) ; $(RM) $(LIB_LINK) ; ln -s $(LIB_MAJOR_LINK) $(LIB_LINK)
$(LIB_DIR)/$(LIB_MAJOR_LINK): $(TGT)
$(QUIET) cd $(LIB_DIR) ; $(RM) $(LIB_MAJOR_LINK) ; ln -s $(LIB) $(LIB_MAJOR_LINK)
.PHONY: dump
dump:
@echo LIB=$(LIB)
@echo TGT=$(TGT)
@echo LIB_DIR=$(LIB_DIR)
@echo OBJ_DIR=$(OBJ_DIR)
@echo CFLAGS=$(CFLAGS)
@echo CPPFLAGS=$(CPPFLAGS)
@echo CXX=$(CXX)
@echo COMPILE.cpp=$(COMPILE.cpp)
@echo SRCS=$(SRCS)
@echo OBJS=$(OBJS)
@echo DEPS:$(DEPS)
@echo LIB_DEPS=$(LIB_DEPS)
.PHONY: clean
clean:
$(QUIET) $(RM) $(TGT) $(LIB_DIR)/$(LIB_LINK) $(LIB_DIR)/$(LIB_MAJOR_LINK) \
$(OBJS)
.PHONY: distclean
distclean: clean
$(QUIET) rm -rf $(OBJ_DIR) $(LIB_DIR)
# ----- Header dependencies -----
MKG := $(findstring $(MAKECMDGOALS),"clean distclean dump")
ifeq "$(MKG)" ""
-include $(DEPS)
endif
$(OBJ_DIR)/%.dep: %.cpp
@echo DEP $<
$(QUIET) $(CXX) -M $(CPPFLAGS) $(CXXFLAGS) $< > $@.$$$$; \
sed 's,\($*\)\.o[ :]*,$$(OBJ_DIR)/\1.o $@ : ,g' < $@.$$$$ > $@; \
$(RM) $@.$$$$

25
README.md Normal file
View File

@ -0,0 +1,25 @@
The API organization and documentation were adapted from the Paho Java library
by Dave Locke.
Copyright (c) 2012, IBM Corp
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
which accompanies this distribution, and is available at
http://www.eclipse.org/legal/epl-v10.html
-----------
This code requires the Paho C library by Ian Craggs
Copyright (c) 2013 IBM Corp.
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.

0
doc/.include Normal file
View File

1781
doc/Doxyfile Normal file

File diff suppressed because it is too large Load Diff

15
edl-v10 Normal file
View File

@ -0,0 +1,15 @@
Eclipse Distribution License - v 1.0
Copyright (c) 2007, Eclipse Foundation, Inc. and its licensors.
All rights reserved.
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
Neither the name of the Eclipse Foundation, Inc. nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

70
epl-v10 Normal file
View File

@ -0,0 +1,70 @@
Eclipse Public License - v 1.0
THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT.
1. DEFINITIONS
"Contribution" means:
a) in the case of the initial Contributor, the initial code and documentation distributed under this Agreement, and
b) in the case of each subsequent Contributor:
i) changes to the Program, and
ii) additions to the Program;
where such changes and/or additions to the Program originate from and are distributed by that particular Contributor. A Contribution 'originates' from a Contributor if it was added to the Program by such Contributor itself or anyone acting on such Contributor's behalf. Contributions do not include additions to the Program which: (i) are separate modules of software distributed in conjunction with the Program under their own license agreement, and (ii) are not derivative works of the Program.
"Contributor" means any person or entity that distributes the Program.
"Licensed Patents" mean patent claims licensable by a Contributor which are necessarily infringed by the use or sale of its Contribution alone or when combined with the Program.
"Program" means the Contributions distributed in accordance with this Agreement.
"Recipient" means anyone who receives the Program under this Agreement, including all Contributors.
2. GRANT OF RIGHTS
a) Subject to the terms of this Agreement, each Contributor hereby grants Recipient a non-exclusive, worldwide, royalty-free copyright license to reproduce, prepare derivative works of, publicly display, publicly perform, distribute and sublicense the Contribution of such Contributor, if any, and such derivative works, in source code and object code form.
b) Subject to the terms of this Agreement, each Contributor hereby grants Recipient a non-exclusive, worldwide, royalty-free patent license under Licensed Patents to make, use, sell, offer to sell, import and otherwise transfer the Contribution of such Contributor, if any, in source code and object code form. This patent license shall apply to the combination of the Contribution and the Program if, at the time the Contribution is added by the Contributor, such addition of the Contribution causes such combination to be covered by the Licensed Patents. The patent license shall not apply to any other combinations which include the Contribution. No hardware per se is licensed hereunder.
c) Recipient understands that although each Contributor grants the licenses to its Contributions set forth herein, no assurances are provided by any Contributor that the Program does not infringe the patent or other intellectual property rights of any other entity. Each Contributor disclaims any liability to Recipient for claims brought by any other entity based on infringement of intellectual property rights or otherwise. As a condition to exercising the rights and licenses granted hereunder, each Recipient hereby assumes sole responsibility to secure any other intellectual property rights needed, if any. For example, if a third party patent license is required to allow Recipient to distribute the Program, it is Recipient's responsibility to acquire that license before distributing the Program.
d) Each Contributor represents that to its knowledge it has sufficient copyright rights in its Contribution, if any, to grant the copyright license set forth in this Agreement.
3. REQUIREMENTS
A Contributor may choose to distribute the Program in object code form under its own license agreement, provided that:
a) it complies with the terms and conditions of this Agreement; and
b) its license agreement:
i) effectively disclaims on behalf of all Contributors all warranties and conditions, express and implied, including warranties or conditions of title and non-infringement, and implied warranties or conditions of merchantability and fitness for a particular purpose;
ii) effectively excludes on behalf of all Contributors all liability for damages, including direct, indirect, special, incidental and consequential damages, such as lost profits;
iii) states that any provisions which differ from this Agreement are offered by that Contributor alone and not by any other party; and
iv) states that source code for the Program is available from such Contributor, and informs licensees how to obtain it in a reasonable manner on or through a medium customarily used for software exchange.
When the Program is made available in source code form:
a) it must be made available under this Agreement; and
b) a copy of this Agreement must be included with each copy of the Program.
Contributors may not remove or alter any copyright notices contained within the Program.
Each Contributor must identify itself as the originator of its Contribution, if any, in a manner that reasonably allows subsequent Recipients to identify the originator of the Contribution.
4. COMMERCIAL DISTRIBUTION
Commercial distributors of software may accept certain responsibilities with respect to end users, business partners and the like. While this license is intended to facilitate the commercial use of the Program, the Contributor who includes the Program in a commercial product offering should do so in a manner which does not create potential liability for other Contributors. Therefore, if a Contributor includes the Program in a commercial product offering, such Contributor ("Commercial Contributor") hereby agrees to defend and indemnify every other Contributor ("Indemnified Contributor") against any losses, damages and costs (collectively "Losses") arising from claims, lawsuits and other legal actions brought by a third party against the Indemnified Contributor to the extent caused by the acts or omissions of such Commercial Contributor in connection with its distribution of the Program in a commercial product offering. The obligations in this section do not apply to any claims or Losses relating to any actual or alleged intellectual property infringement. In order to qualify, an Indemnified Contributor must: a) promptly notify the Commercial Contributor in writing of such claim, and b) allow the Commercial Contributor to control, and cooperate with the Commercial Contributor in, the defense and any related settlement negotiations. The Indemnified Contributor may participate in any such claim at its own expense.
For example, a Contributor might include the Program in a commercial product offering, Product X. That Contributor is then a Commercial Contributor. If that Commercial Contributor then makes performance claims, or offers warranties related to Product X, those performance claims and warranties are such Commercial Contributor's responsibility alone. Under this section, the Commercial Contributor would have to defend claims against the other Contributors related to those performance claims and warranties, and if a court requires any other Contributor to pay any damages as a result, the Commercial Contributor must pay those damages.
5. NO WARRANTY
EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, THE PROGRAM IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR CONDITIONS OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. Each Recipient is solely responsible for determining the appropriateness of using and distributing the Program and assumes all risks associated with its exercise of rights under this Agreement , including but not limited to the risks and costs of program errors, compliance with applicable laws, damage to or loss of data, programs or equipment, and unavailability or interruption of operations.
6. DISCLAIMER OF LIABILITY
EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, NEITHER RECIPIENT NOR ANY CONTRIBUTORS SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION LOST PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE EXERCISE OF ANY RIGHTS GRANTED HEREUNDER, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
7. GENERAL
If any provision of this Agreement is invalid or unenforceable under applicable law, it shall not affect the validity or enforceability of the remainder of the terms of this Agreement, and without further action by the parties hereto, such provision shall be reformed to the minimum extent necessary to make such provision valid and enforceable.
If Recipient institutes patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Program itself (excluding combinations of the Program with other software or hardware) infringes such Recipient's patent(s), then such Recipient's rights granted under Section 2(b) shall terminate as of the date such litigation is filed.
All Recipient's rights under this Agreement shall terminate if it fails to comply with any of the material terms or conditions of this Agreement and does not cure such failure in a reasonable period of time after becoming aware of such noncompliance. If all Recipient's rights under this Agreement terminate, Recipient agrees to cease use and distribution of the Program as soon as reasonably practicable. However, Recipient's obligations under this Agreement and any licenses granted by Recipient relating to the Program shall continue and survive.
Everyone is permitted to copy and distribute copies of this Agreement, but in order to avoid inconsistency the Agreement is copyrighted and may only be modified in the following manner. The Agreement Steward reserves the right to publish new versions (including revisions) of this Agreement from time to time. No one other than the Agreement Steward has the right to modify this Agreement. The Eclipse Foundation is the initial Agreement Steward. The Eclipse Foundation may assign the responsibility to serve as the Agreement Steward to a suitable separate entity. Each new version of the Agreement will be given a distinguishing version number. The Program (including Contributions) may always be distributed subject to the version of the Agreement under which it was received. In addition, after a new version of the Agreement is published, Contributor may elect to distribute the Program (including its Contributions) under the new version. Except as expressly stated in Sections 2(a) and 2(b) above, Recipient receives no rights or licenses to the intellectual property of any Contributor under this Agreement, whether expressly, by implication, estoppel or otherwise. All rights in the Program not expressly granted under this Agreement are reserved.
This Agreement is governed by the laws of the State of New York and the intellectual property laws of the United States of America. No party to this Agreement will bring a legal action under this Agreement more than one year after the cause of action arose. Each party waives its rights to a jury trial in any resulting litigation.

108
notice.html Normal file
View File

@ -0,0 +1,108 @@
<?xml version="1.0" encoding="ISO-8859-1" ?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1" />
<title>Eclipse Foundation Software User Agreement</title>
</head>
<body lang="EN-US">
<h2>Eclipse Foundation Software User Agreement</h2>
<p>February 1, 2011</p>
<h3>Usage Of Content</h3>
<p>THE ECLIPSE FOUNDATION MAKES AVAILABLE SOFTWARE, DOCUMENTATION, INFORMATION AND/OR OTHER MATERIALS FOR OPEN SOURCE PROJECTS
(COLLECTIVELY &quot;CONTENT&quot;). USE OF THE CONTENT IS GOVERNED BY THE TERMS AND CONDITIONS OF THIS AGREEMENT AND/OR THE TERMS AND
CONDITIONS OF LICENSE AGREEMENTS OR NOTICES INDICATED OR REFERENCED BELOW. BY USING THE CONTENT, YOU AGREE THAT YOUR USE
OF THE CONTENT IS GOVERNED BY THIS AGREEMENT AND/OR THE TERMS AND CONDITIONS OF ANY APPLICABLE LICENSE AGREEMENTS OR
NOTICES INDICATED OR REFERENCED BELOW. IF YOU DO NOT AGREE TO THE TERMS AND CONDITIONS OF THIS AGREEMENT AND THE TERMS AND
CONDITIONS OF ANY APPLICABLE LICENSE AGREEMENTS OR NOTICES INDICATED OR REFERENCED BELOW, THEN YOU MAY NOT USE THE CONTENT.</p>
<h3>Applicable Licenses</h3>
<p>Unless otherwise indicated, all Content made available by the Eclipse Foundation is provided to you under the terms and conditions of the Eclipse Public License Version 1.0
(&quot;EPL&quot;). A copy of the EPL is provided with this Content and is also available at <a href="http://www.eclipse.org/legal/epl-v10.html">http://www.eclipse.org/legal/epl-v10.html</a>.
For purposes of the EPL, &quot;Program&quot; will mean the Content.</p>
<p>Content includes, but is not limited to, source code, object code, documentation and other files maintained in the Eclipse Foundation source code
repository (&quot;Repository&quot;) in software modules (&quot;Modules&quot;) and made available as downloadable archives (&quot;Downloads&quot;).</p>
<ul>
<li>Content may be structured and packaged into modules to facilitate delivering, extending, and upgrading the Content. Typical modules may include plug-ins (&quot;Plug-ins&quot;), plug-in fragments (&quot;Fragments&quot;), and features (&quot;Features&quot;).</li>
<li>Each Plug-in or Fragment may be packaged as a sub-directory or JAR (Java&trade; ARchive) in a directory named &quot;plugins&quot;.</li>
<li>A Feature is a bundle of one or more Plug-ins and/or Fragments and associated material. Each Feature may be packaged as a sub-directory in a directory named &quot;features&quot;. Within a Feature, files named &quot;feature.xml&quot; may contain a list of the names and version numbers of the Plug-ins
and/or Fragments associated with that Feature.</li>
<li>Features may also include other Features (&quot;Included Features&quot;). Within a Feature, files named &quot;feature.xml&quot; may contain a list of the names and version numbers of Included Features.</li>
</ul>
<p>The terms and conditions governing Plug-ins and Fragments should be contained in files named &quot;about.html&quot; (&quot;Abouts&quot;). The terms and conditions governing Features and
Included Features should be contained in files named &quot;license.html&quot; (&quot;Feature Licenses&quot;). Abouts and Feature Licenses may be located in any directory of a Download or Module
including, but not limited to the following locations:</p>
<ul>
<li>The top-level (root) directory</li>
<li>Plug-in and Fragment directories</li>
<li>Inside Plug-ins and Fragments packaged as JARs</li>
<li>Sub-directories of the directory named &quot;src&quot; of certain Plug-ins</li>
<li>Feature directories</li>
</ul>
<p>Note: if a Feature made available by the Eclipse Foundation is installed using the Provisioning Technology (as defined below), you must agree to a license (&quot;Feature Update License&quot;) during the
installation process. If the Feature contains Included Features, the Feature Update License should either provide you with the terms and conditions governing the Included Features or
inform you where you can locate them. Feature Update Licenses may be found in the &quot;license&quot; property of files named &quot;feature.properties&quot; found within a Feature.
Such Abouts, Feature Licenses, and Feature Update Licenses contain the terms and conditions (or references to such terms and conditions) that govern your use of the associated Content in
that directory.</p>
<p>THE ABOUTS, FEATURE LICENSES, AND FEATURE UPDATE LICENSES MAY REFER TO THE EPL OR OTHER LICENSE AGREEMENTS, NOTICES OR TERMS AND CONDITIONS. SOME OF THESE
OTHER LICENSE AGREEMENTS MAY INCLUDE (BUT ARE NOT LIMITED TO):</p>
<ul>
<li>Eclipse Distribution License Version 1.0 (available at <a href="http://www.eclipse.org/licenses/edl-v10.html">http://www.eclipse.org/licenses/edl-v1.0.html</a>)</li>
<li>Common Public License Version 1.0 (available at <a href="http://www.eclipse.org/legal/cpl-v10.html">http://www.eclipse.org/legal/cpl-v10.html</a>)</li>
<li>Apache Software License 1.1 (available at <a href="http://www.apache.org/licenses/LICENSE">http://www.apache.org/licenses/LICENSE</a>)</li>
<li>Apache Software License 2.0 (available at <a href="http://www.apache.org/licenses/LICENSE-2.0">http://www.apache.org/licenses/LICENSE-2.0</a>)</li>
<li>Metro Link Public License 1.00 (available at <a href="http://www.opengroup.org/openmotif/supporters/metrolink/license.html">http://www.opengroup.org/openmotif/supporters/metrolink/license.html</a>)</li>
<li>Mozilla Public License Version 1.1 (available at <a href="http://www.mozilla.org/MPL/MPL-1.1.html">http://www.mozilla.org/MPL/MPL-1.1.html</a>)</li>
</ul>
<p>IT IS YOUR OBLIGATION TO READ AND ACCEPT ALL SUCH TERMS AND CONDITIONS PRIOR TO USE OF THE CONTENT. If no About, Feature License, or Feature Update License is provided, please
contact the Eclipse Foundation to determine what terms and conditions govern that particular Content.</p>
<h3>Use of Provisioning Technology</h3>
<p>The Eclipse Foundation makes available provisioning software, examples of which include, but are not limited to, p2 and the Eclipse
Update Manager (&quot;Provisioning Technology&quot;) for the purpose of allowing users to install software, documentation, information and/or
other materials (collectively &quot;Installable Software&quot;). This capability is provided with the intent of allowing such users to
install, extend and update Eclipse-based products. Information about packaging Installable Software is available at <a
href="http://eclipse.org/equinox/p2/repository_packaging.html">http://eclipse.org/equinox/p2/repository_packaging.html</a>
(&quot;Specification&quot;).</p>
<p>You may use Provisioning Technology to allow other parties to install Installable Software. You shall be responsible for enabling the
applicable license agreements relating to the Installable Software to be presented to, and accepted by, the users of the Provisioning Technology
in accordance with the Specification. By using Provisioning Technology in such a manner and making it available in accordance with the
Specification, you further acknowledge your agreement to, and the acquisition of all necessary rights to permit the following:</p>
<ol>
<li>A series of actions may occur (&quot;Provisioning Process&quot;) in which a user may execute the Provisioning Technology
on a machine (&quot;Target Machine&quot;) with the intent of installing, extending or updating the functionality of an Eclipse-based
product.</li>
<li>During the Provisioning Process, the Provisioning Technology may cause third party Installable Software or a portion thereof to be
accessed and copied to the Target Machine.</li>
<li>Pursuant to the Specification, you will provide to the user the terms and conditions that govern the use of the Installable
Software (&quot;Installable Software Agreement&quot;) and such Installable Software Agreement shall be accessed from the Target
Machine in accordance with the Specification. Such Installable Software Agreement must inform the user of the terms and conditions that govern
the Installable Software and must solicit acceptance by the end user in the manner prescribed in such Installable Software Agreement. Upon such
indication of agreement by the user, the provisioning Technology will complete installation of the Installable Software.</li>
</ol>
<h3>Cryptography</h3>
<p>Content may contain encryption software. The country in which you are currently may have restrictions on the import, possession, and use, and/or re-export to
another country, of encryption software. BEFORE using any encryption software, please check the country's laws, regulations and policies concerning the import,
possession, or use, and re-export of encryption software, to see if this is permitted.</p>
<p><small>Java and all Java-based trademarks are trademarks of Oracle Corporation in the United States, other countries, or both.</small></p>
</body>
</html>

664
src/async_client.cpp Normal file
View File

@ -0,0 +1,664 @@
//async_client.cpp
/*******************************************************************************
* Copyright (c) 2013 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#include "mqtt/async_client.h"
#include "mqtt/token.h"
#include "mqtt/message.h"
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <cstring>
#include <cstdio>
namespace mqtt {
/////////////////////////////////////////////////////////////////////////////
async_client::async_client(const std::string& serverURI, const std::string& clientId)
: serverURI_(serverURI), clientId_(clientId),
persist_(nullptr), userCallback_(nullptr)
{
MQTTAsync_create(&cli_, const_cast<char*>(serverURI.c_str()),
const_cast<char*>(clientId.c_str()),
MQTTCLIENT_PERSISTENCE_DEFAULT, nullptr);
}
async_client::async_client(const std::string& serverURI, const std::string& clientId,
const std::string& persistDir)
: serverURI_(serverURI), clientId_(clientId),
persist_(nullptr), userCallback_(nullptr)
{
MQTTAsync_create(&cli_, const_cast<char*>(serverURI.c_str()),
const_cast<char*>(clientId.c_str()),
MQTTCLIENT_PERSISTENCE_DEFAULT,
const_cast<char*>(persistDir.c_str()));
}
async_client::async_client(const std::string& serverURI, const std::string& clientId,
iclient_persistence* persistence)
: serverURI_(serverURI), clientId_(clientId),
persist_(nullptr), userCallback_(nullptr)
{
if (!persistence) {
MQTTAsync_create(&cli_, const_cast<char*>(serverURI.c_str()),
const_cast<char*>(clientId.c_str()),
MQTTCLIENT_PERSISTENCE_NONE, nullptr);
}
else {
persist_ = new MQTTClient_persistence {
persistence,
&iclient_persistence::persistence_open,
&iclient_persistence::persistence_close,
&iclient_persistence::persistence_put,
&iclient_persistence::persistence_get,
&iclient_persistence::persistence_remove,
&iclient_persistence::persistence_keys,
&iclient_persistence::persistence_clear,
&iclient_persistence::persistence_containskey
};
MQTTAsync_create(&cli_, const_cast<char*>(serverURI.c_str()),
const_cast<char*>(clientId.c_str()),
MQTTCLIENT_PERSISTENCE_USER, persist_);
}
}
async_client::~async_client()
{
MQTTAsync_destroy(&cli_);
delete persist_;
}
// --------------------------------------------------------------------------
void async_client::on_connection_lost(void *context, char *cause)
{
if (context) {
async_client* m = static_cast<async_client*>(context);
callback* cb = m->get_callback();
if (cb)
cb->connection_lost(cause ? std::string(cause) : std::string());
}
}
int async_client::on_message_arrived(void* context, char* topicName, int topicLen,
MQTTAsync_message* msg)
{
if (context) {
async_client* m = static_cast<async_client*>(context);
callback* cb = m->get_callback();
if (cb) {
std::string topic(topicName, topicName+topicLen);
message_ptr m = std::make_shared<message>(*msg);
cb->message_arrived(topic, m);
}
}
MQTTAsync_freeMessage(&msg);
MQTTAsync_free(topicName);
// TODO: Should the user code determine the return value?
// The Java version does doesn't seem to...
return (!0);
}
// Callback to indicate that a message was delivered to the server. It seems
// to only be called for a message with a QOS >= 1, but it happens before
// the on_success() call for the token. Thus we don't have the underlying
// MQTTAsync_token of the outgoing message at the time of this callback.
//
// So, all in all, this callback in it's current implementation seems rather
// redundant.
//
#if 0
void async_client::on_delivery_complete(void* context, MQTTAsync_token msgID)
{
if (context) {
async_client* m = static_cast<async_client*>(context);
callback* cb = m->get_callback();
if (cb) {
idelivery_token_ptr tok = m->get_pending_delivery_token(msgID);
cb->delivery_complete(tok);
}
}
}
#endif
// --------------------------------------------------------------------------
void async_client::add_token(itoken_ptr tok)
{
if (tok) {
guard g(lock_);
pendingTokens_.push_back(tok);
}
}
void async_client::add_token(idelivery_token_ptr tok)
{
if (tok) {
guard g(lock_);
pendingDeliveryTokens_.push_back(tok);
}
}
// Note that we uniquely identify a token by the address of its raw pointer,
// since the message ID is not unique.
void async_client::remove_token(itoken* tok)
{
if (!tok)
return;
guard g(lock_);
for (auto p=pendingDeliveryTokens_.begin();
p!=pendingDeliveryTokens_.end(); ++p) {
if (p->get() == tok) {
idelivery_token_ptr dtok = *p;
pendingDeliveryTokens_.erase(p);
// If there's a user callback registered, we can now call
// delivery_complete()
if (userCallback_) {
message_ptr msg = dtok->get_message();
if (msg && msg->get_qos() > 0) {
callback* cb = userCallback_;
g.unlock();
cb->delivery_complete(dtok);
}
}
return;
}
}
for (auto p=pendingTokens_.begin(); p!=pendingTokens_.end(); ++p) {
if (p->get() == tok) {
pendingTokens_.erase(p);
return;
}
}
}
std::vector<char*> async_client::alloc_topic_filters(
const topic_filter_collection& topicFilters)
{
std::vector<char*> filts;
for (const auto& t : topicFilters) {
char* filt = new char[t.size()+1];
std::strcpy(filt, t.c_str());
filts.push_back(filt);
}
return filts;
}
void async_client::free_topic_filters(std::vector<char*>& filts)
{
for (const auto& f : filts)
delete[] f;
}
// --------------------------------------------------------------------------
itoken_ptr async_client::connect() throw(exception, security_exception)
{
connect_options opts;
return connect(opts);
}
itoken_ptr async_client::connect(connect_options opts) throw(exception, security_exception)
{
token* ctok = new token(*this);
itoken_ptr tok = itoken_ptr(ctok);
add_token(tok);
opts.opts_.onSuccess = &token::on_success;
opts.opts_.onFailure = &token::on_failure;
opts.opts_.context = ctok;
int rc = MQTTAsync_connect(cli_, &opts.opts_);
if (rc != MQTTASYNC_SUCCESS) {
remove_token(tok);
throw exception(rc);
}
return tok;
}
itoken_ptr async_client::connect(connect_options opts, void* userContext,
iaction_listener& cb) throw(exception, security_exception)
{
token* ctok = new token(*this);
itoken_ptr tok = itoken_ptr(ctok);
add_token(tok);
tok->set_user_context(userContext);
tok->set_action_callback(cb);
opts.opts_.onSuccess = &token::on_success;
opts.opts_.onFailure = &token::on_failure;
opts.opts_.context = ctok;
int rc = MQTTAsync_connect(cli_, &opts.opts_);
if (rc != MQTTASYNC_SUCCESS) {
remove_token(tok);
throw exception(rc);
}
return tok;
}
itoken_ptr async_client::connect(void* userContext, iaction_listener& cb)
throw(exception, security_exception)
{
connect_options opts;
opts.opts_.keepAliveInterval = 30;
opts.opts_.cleansession = 1;
return connect(opts, userContext, cb);
}
itoken_ptr async_client::disconnect(long timeout) throw(exception)
{
token* ctok = new token(*this);
itoken_ptr tok = itoken_ptr(ctok);
add_token(tok);
MQTTAsync_disconnectOptions disconnOpts( MQTTAsync_disconnectOptions_initializer );
// TODO: Check timeout range?
disconnOpts.timeout = int(timeout);
disconnOpts.onSuccess = &token::on_success;
disconnOpts.onFailure = &token::on_failure;
disconnOpts.context = ctok;
int rc = MQTTAsync_disconnect(cli_, &disconnOpts);
if (rc != MQTTASYNC_SUCCESS) {
remove_token(tok);
throw exception(rc);
}
return tok;
}
itoken_ptr async_client::disconnect(long timeout, void* userContext, iaction_listener& cb)
throw(exception)
{
token* ctok = new token(*this);
itoken_ptr tok = itoken_ptr(ctok);
add_token(tok);
tok->set_user_context(userContext);
tok->set_action_callback(cb);
MQTTAsync_disconnectOptions disconnOpts( MQTTAsync_disconnectOptions_initializer );
// TODO: Check timeout range?
disconnOpts.timeout = int(timeout);
disconnOpts.onSuccess = &token::on_success;
disconnOpts.onFailure = &token::on_failure;
disconnOpts.context = ctok;
int rc = MQTTAsync_disconnect(cli_, &disconnOpts);
if (rc != MQTTASYNC_SUCCESS) {
remove_token(tok);
throw exception(rc);
}
return tok;
}
idelivery_token_ptr async_client::get_pending_delivery_token(int msgID) const
{
if (msgID > 0) {
guard g(lock_);
for (const auto& t : pendingDeliveryTokens_) {
if (t->get_message_id() == msgID)
return t;
}
}
return idelivery_token_ptr();
}
std::vector<idelivery_token_ptr> async_client::get_pending_delivery_tokens() const
{
std::vector<idelivery_token_ptr> toks;
guard g(lock_);
for (const auto& t : pendingDeliveryTokens_)
toks.push_back(t);
return toks;
}
idelivery_token_ptr async_client::publish(const std::string& topic, const void* payload,
size_t n, int qos, bool retained)
throw(exception)
{
message_ptr msg = std::make_shared<message>(payload, n);
msg->set_qos(qos);
msg->set_retained(retained);
return publish(topic, msg);
}
idelivery_token_ptr async_client::publish(const std::string& topic,
const void* payload, size_t n,
int qos, bool retained, void* userContext,
iaction_listener& cb)
throw(exception)
{
message_ptr msg = std::make_shared<message>(payload, n);
msg->set_qos(qos);
msg->set_retained(retained);
return publish(topic, msg, userContext, cb);
}
idelivery_token_ptr async_client::publish(const std::string& topic, message_ptr msg)
throw(exception)
{
delivery_token* dtok = new delivery_token(*this, topic);
idelivery_token_ptr tok = idelivery_token_ptr(dtok);
dtok->set_message(msg);
add_token(tok);
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = &delivery_token::on_success;
opts.onFailure = &delivery_token::on_failure;
opts.context = dtok;
int rc = MQTTAsync_sendMessage(cli_, (char*) topic.c_str(), &(msg->msg_), &opts);
if (rc != MQTTASYNC_SUCCESS) {
remove_token(tok);
throw exception(rc);
}
return tok;
}
idelivery_token_ptr async_client::publish(const std::string& topic, message_ptr msg,
void* userContext, iaction_listener& cb)
throw(exception)
{
delivery_token* dtok = new delivery_token(*this, topic);
idelivery_token_ptr tok = idelivery_token_ptr(dtok);
dtok->set_message(msg);
tok->set_user_context(userContext);
tok->set_action_callback(cb);
add_token(tok);
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = &delivery_token::on_success;
opts.onFailure = &delivery_token::on_failure;
opts.context = dtok;
int rc = MQTTAsync_sendMessage(cli_, (char*) topic.c_str(), &(msg->msg_), &opts);
if (rc != MQTTASYNC_SUCCESS) {
remove_token(tok);
throw exception(rc);
}
return tok;
}
void async_client::set_callback(callback& cb) throw(exception)
{
guard g(lock_);
userCallback_ = &cb;
int rc = MQTTAsync_setCallbacks(cli_, this,
&async_client::on_connection_lost,
&async_client::on_message_arrived,
nullptr /*&async_client::on_delivery_complete*/);
if (rc != MQTTASYNC_SUCCESS)
throw exception(rc);
}
itoken_ptr async_client::subscribe(const topic_filter_collection& topicFilters,
const qos_collection& qos)
throw(std::invalid_argument,exception)
{
if (topicFilters.size() != qos.size())
throw std::invalid_argument("Collection sizes don't match");
std::vector<char*> filts = alloc_topic_filters(topicFilters);
token* stok = new token(*this, topicFilters);
itoken_ptr tok = itoken_ptr(stok);
add_token(tok);
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = &token::on_success;
opts.onFailure = &token::on_failure;
opts.context = stok;
int rc = MQTTAsync_subscribeMany(cli_, (int) topicFilters.size(),
(char**) &filts[0], (int*) &qos[0], &opts);
free_topic_filters(filts);
if (rc != MQTTASYNC_SUCCESS) {
remove_token(tok);
throw exception(rc);
}
return tok;
}
itoken_ptr async_client::subscribe(const topic_filter_collection& topicFilters,
const qos_collection& qos,
void* userContext, iaction_listener& cb)
throw(std::invalid_argument,exception)
{
if (topicFilters.size() != qos.size())
throw std::invalid_argument("Collection sizes don't match");
std::vector<char*> filts = alloc_topic_filters(topicFilters);
// No exceptions till C-strings are deleted!
token* stok = new token(*this, topicFilters);
itoken_ptr tok = itoken_ptr(stok);
tok->set_user_context(userContext);
tok->set_action_callback(cb);
add_token(tok);
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = &token::on_success;
opts.onFailure = &token::on_failure;
opts.context = stok;
int rc = MQTTAsync_subscribeMany(cli_, (int) topicFilters.size(),
(char**) &filts[0], (int*) &qos[0], &opts);
free_topic_filters(filts);
if (rc != MQTTASYNC_SUCCESS) {
remove_token(tok);
throw exception(rc);
}
return tok;
}
itoken_ptr async_client::subscribe(const std::string& topicFilter, int qos)
throw(exception)
{
token* stok = new token(*this, topicFilter);
itoken_ptr tok = itoken_ptr(stok);
add_token(tok);
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = &token::on_success;
opts.onFailure = &token::on_failure;
opts.context = stok;
int rc = MQTTAsync_subscribe(cli_, (char*) topicFilter.c_str(), qos, &opts);
if (rc != MQTTASYNC_SUCCESS) {
remove_token(tok);
throw exception(rc);
}
return tok;
}
itoken_ptr async_client::subscribe(const std::string& topicFilter, int qos,
void* userContext, iaction_listener& cb)
throw(exception)
{
token* stok = new token(*this, topicFilter);
itoken_ptr tok = itoken_ptr(stok);
tok->set_user_context(userContext);
tok->set_action_callback(cb);
add_token(tok);
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = &token::on_success;
opts.onFailure = &token::on_failure;
opts.context = stok;
int rc = MQTTAsync_subscribe(cli_, (char*) topicFilter.c_str(), qos, &opts);
if (rc != MQTTASYNC_SUCCESS) {
remove_token(tok);
throw exception(rc);
}
return tok;
}
itoken_ptr async_client::unsubscribe(const std::string& topicFilter)
throw(exception)
{
token* stok = new token(*this, topicFilter);
itoken_ptr tok = itoken_ptr(stok);
add_token(tok);
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = &token::on_success;
opts.onFailure = &token::on_failure;
opts.context = stok;
int rc = MQTTAsync_unsubscribe(cli_, (char*) topicFilter.c_str(), &opts);
if (rc != MQTTASYNC_SUCCESS) {
remove_token(tok);
throw exception(rc);
}
return tok;
}
itoken_ptr async_client::unsubscribe(const topic_filter_collection& topicFilters)
throw(exception)
{
size_t n = topicFilters.size();
std::vector<char*> filts = alloc_topic_filters(topicFilters);
token* stok = new token(*this, topicFilters);
itoken_ptr tok = itoken_ptr(stok);
add_token(tok);
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = &token::on_success;
opts.onFailure = &token::on_failure;
opts.context = stok;
int rc = MQTTAsync_unsubscribeMany(cli_, (int) n, (char**) &filts[0], &opts);
free_topic_filters(filts);
if (rc != MQTTASYNC_SUCCESS) {
remove_token(tok);
throw exception(rc);
}
return tok;
}
itoken_ptr async_client::unsubscribe(const topic_filter_collection& topicFilters,
void* userContext, iaction_listener& cb)
throw(exception)
{
size_t n = topicFilters.size();
std::vector<char*> filts = alloc_topic_filters(topicFilters);
token* stok = new token(*this, topicFilters);
itoken_ptr tok = itoken_ptr(stok);
tok->set_user_context(userContext);
tok->set_action_callback(cb);
add_token(tok);
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = &token::on_success;
opts.onFailure = &token::on_failure;
opts.context = stok;
int rc = MQTTAsync_unsubscribeMany(cli_, (int) n, (char**) &filts[0], &opts);
free_topic_filters(filts);
if (rc != MQTTASYNC_SUCCESS) {
remove_token(tok);
throw exception(rc);
}
return tok;
}
itoken_ptr async_client::unsubscribe(const std::string& topicFilter,
void* userContext, iaction_listener& cb)
throw(exception)
{
token* stok = new token(*this, topicFilter);
itoken_ptr tok = itoken_ptr(stok);
tok->set_user_context(userContext);
tok->set_action_callback(cb);
add_token(tok);
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = &token::on_success;
opts.onFailure = &token::on_failure;
opts.context = stok;
int rc = MQTTAsync_unsubscribe(cli_, (char*) topicFilter.c_str(), &opts);
if (rc != MQTTASYNC_SUCCESS) {
remove_token(tok);
throw exception(rc);
}
return tok;
}
/////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
}

173
src/client.cpp Normal file
View File

@ -0,0 +1,173 @@
// client.cpp
// Implementation of the client class for the mqtt C++ client library.
/*******************************************************************************
* Copyright (c) 2013 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#include "mqtt/client.h"
namespace mqtt {
const int client::DFLT_QOS = 1;
/////////////////////////////////////////////////////////////////////////////
client::client(const std::string& serverURI, const std::string& clientId)
: cli_(serverURI, clientId), timeout_(-1)
{
}
client::client(const std::string& serverURI, const std::string& clientId,
const std::string& persistDir)
: cli_(serverURI, clientId, persistDir), timeout_(-1)
{
}
client::client(const std::string& serverURI, const std::string& clientId,
iclient_persistence* persistence)
: cli_(serverURI, clientId, persistence), timeout_(-1)
{
}
void client::close()
{
// TODO: What?
}
void client::connect()
{
cli_.connect()->wait_for_completion(timeout_);
}
void client::connect(connect_options opts)
{
cli_.connect(opts)->wait_for_completion(timeout_);
}
void client::disconnect()
{
cli_.disconnect()->wait_for_completion(timeout_);
}
void client::disconnect(long timeout)
{
cli_.disconnect(timeout)->wait_for_completion(timeout_);
}
//std::string client::generate_client_id()
//{
//}
std::string client::get_client_id() const
{
return cli_.get_client_id();
}
std::string client::get_server_uri() const
{
return cli_.get_server_uri();
}
//Debug getDebug()
//Return a debug object that can be used to help solve problems.
std::vector<idelivery_token_ptr> client::get_pending_delivery_tokens() const
{
return cli_.get_pending_delivery_tokens();
}
long client::get_time_to_wait() const
{
return timeout_;
}
topic client::get_topic(const std::string& top)
{
return topic(top, cli_);
}
bool client::is_connected() const
{
return cli_.is_connected();
}
void client::publish(const std::string& top, const void* payload, size_t n,
int qos, bool retained)
{
cli_.publish(top, payload, n, qos, retained)->wait_for_completion(timeout_);
}
void client::publish(const std::string& top, message_ptr msg)
{
cli_.publish(top, msg)->wait_for_completion(timeout_);
}
void client::set_callback(callback& cb)
{
cli_.set_callback(cb);
}
void client::set_time_to_wait(int timeout)
{
timeout_ = timeout;
}
void client::subscribe(const std::string& topicFilter)
{
cli_.subscribe(topicFilter, DFLT_QOS)->wait_for_completion(timeout_);
}
void client::subscribe(const topic_filter_collection& topicFilters)
{
qos_collection qos;
for (size_t i=0; i<topicFilters.size(); ++i)
qos.push_back(DFLT_QOS);
cli_.subscribe(topicFilters, qos)->wait_for_completion(timeout_);
}
void client::subscribe(const topic_filter_collection& topicFilters,
const qos_collection& qos)
{
cli_.subscribe(topicFilters, qos)->wait_for_completion(timeout_);
}
void client::subscribe(const std::string& topicFilter, int qos)
{
cli_.subscribe(topicFilter, qos)->wait_for_completion(timeout_);
}
void client::unsubscribe(const std::string& topicFilter)
{
cli_.unsubscribe(topicFilter)->wait_for_completion(timeout_);
}
void client::unsubscribe(const topic_filter_collection& topicFilters)
{
qos_collection qos;
for (size_t i=0; i<topicFilters.size(); ++i)
qos.push_back(DFLT_QOS);
cli_.subscribe(topicFilters, qos)->wait_for_completion(timeout_);
}
/////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
}

226
src/iclient_persistence.cpp Normal file
View File

@ -0,0 +1,226 @@
// iclient_persistence.cpp
/*******************************************************************************
* Copyright (c) 2013 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#include "mqtt/iclient_persistence.h"
#include <cstring>
namespace mqtt {
/////////////////////////////////////////////////////////////////////////////
// This is an internal class for wrapping a buffer into a persistable type.
// Note that it does not copy the buffer or take possession of it, and thus
// is only useful for a subset of circumstances where the buffer is
// guaranteed to live longer than the wrapper object, and performance is
// important.
class persistence_wrapper : virtual public ipersistable
{
const uint8_t* hdr_;
const size_t hdrlen_;
const uint8_t* payload_;
const size_t payloadlen_;
public:
persistence_wrapper(const void* payload, size_t payloadlen)
: hdr_(nullptr), hdrlen_(0),
payload_(static_cast<const uint8_t*>(payload)), payloadlen_(payloadlen)
{}
persistence_wrapper(const void* hdr, size_t hdrlen,
const void* payload, size_t payloadlen)
: hdr_(static_cast<const uint8_t*>(hdr)), hdrlen_(hdrlen),
payload_(static_cast<const uint8_t*>(payload)), payloadlen_(payloadlen)
{}
virtual const uint8_t* get_header_bytes() const { return hdr_; }
virtual size_t get_header_length() const { return hdrlen_; }
virtual size_t get_header_offset() const { return 0; }
virtual const uint8_t* get_payload_bytes() const { return payload_; }
virtual size_t get_payload_length() const { return payloadlen_; }
virtual size_t get_payload_offset() const { return 0; }
virtual std::vector<uint8_t> get_header_byte_arr() const {
return std::vector<uint8_t>(hdr_, hdr_+hdrlen_);
}
virtual std::vector<uint8_t> get_payload_byte_arr() const {
return std::vector<uint8_t>(payload_, payload_+payloadlen_);
}
};
/////////////////////////////////////////////////////////////////////////////
// Functions to transition C persistence calls to the C++ persistence object.
// Upon the call to persistence_open(), the 'context' has the address of the
// C++ persistence object, which is reassigned to the 'handle'. Subsequent
// calls have the object address as the handle.
int iclient_persistence::persistence_open(void** handle, char* clientID,
char* serverURI, void* context)
{
try {
if (context) {
static_cast<iclient_persistence*>(context)->open(clientID, serverURI);
*handle = context;
return 0;
}
}
catch (...) {}
return MQTTCLIENT_PERSISTENCE_ERROR;
}
int iclient_persistence::persistence_close(void* handle)
{
try {
if (handle) {
static_cast<iclient_persistence*>(handle)->close();
return 0;
}
}
catch (...) {}
return MQTTCLIENT_PERSISTENCE_ERROR;
}
int iclient_persistence::persistence_put(void* handle, char* key, int bufcount,
char* buffers[], int buflens[])
{
try {
if (handle && bufcount > 0) {
ipersistable_ptr p;
if (bufcount == 1)
p = std::make_shared<persistence_wrapper>(buffers[0], buflens[0]);
else if (bufcount == 2)
p = std::make_shared<persistence_wrapper>(buffers[0], buflens[0],
buffers[1], buflens[1]);
else {
std::string buf;
for (int i=0; i<bufcount; ++i) {
if (buffers[i] && buflens[i] > 0)
buf.append(buffers[i], buflens[i]);
}
if (buf.empty()) // No data!
return MQTTCLIENT_PERSISTENCE_ERROR;
p = std::make_shared<persistence_wrapper>(&buf[0], buf.size());
}
static_cast<iclient_persistence*>(handle)->put(key, p);
return 0;
}
}
catch (...) {}
return MQTTCLIENT_PERSISTENCE_ERROR;
}
int iclient_persistence::persistence_get(void* handle, char* key,
char** buffer, int* buflen)
{
try {
if (handle) {
ipersistable_ptr p = static_cast<iclient_persistence*>(handle)->get(key);
size_t hdrlen = p->get_header_length(),
payloadlen = p->get_payload_length();
if (!p->get_header_bytes()) hdrlen = 0;
if (!p->get_payload_bytes()) payloadlen = 0;
// TODO: Check range
*buflen = (int) (hdrlen + payloadlen);
char* buf = (char*) malloc(*buflen);
std::memcpy(buf, p->get_header_bytes(), hdrlen);
std::memcpy(buf+hdrlen, p->get_payload_bytes(), payloadlen);
*buffer = buf;
return 0;
}
}
catch (...) {}
return MQTTCLIENT_PERSISTENCE_ERROR;
}
int iclient_persistence::persistence_remove(void* handle, char* key)
{
try {
if (handle) {
static_cast<iclient_persistence*>(handle)->remove(key);
return 0;
}
}
catch (...) {}
return MQTTCLIENT_PERSISTENCE_ERROR;
}
int iclient_persistence::persistence_keys(void* handle, char*** keys, int* nkeys)
{
try {
if (handle && keys && nkeys) {
std::vector<std::string> k(
static_cast<iclient_persistence*>(handle)->keys());
size_t n = k.size();
*nkeys = n; // TODO: Check range
if (n == 0)
*keys = nullptr;
else {
*keys = (char**) malloc(n*sizeof(char*));
for (size_t i=0; i<n; ++i) {
size_t len = k[i].size();
(*keys)[i] = (char*) malloc(len+1);
std::memcpy((*keys)[i], k[i].data(), len);
(*keys)[i][len] = '\0';
}
}
return 0;
}
}
catch (...) {}
return MQTTCLIENT_PERSISTENCE_ERROR;
}
int iclient_persistence::persistence_clear(void* handle)
{
try {
if (handle) {
static_cast<iclient_persistence*>(handle)->clear();
return 0;
}
}
catch (...) {}
return MQTTCLIENT_PERSISTENCE_ERROR;
}
int iclient_persistence::persistence_containskey(void* handle, char* key)
{
try {
if (handle &&
static_cast<iclient_persistence*>(handle)->contains_key(key))
return 0;
}
catch (...) {}
return MQTTCLIENT_PERSISTENCE_ERROR;
}
/////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
}

133
src/message.cpp Normal file
View File

@ -0,0 +1,133 @@
// message.cpp
/*******************************************************************************
* Copyright (c) 2013 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#include "mqtt/message.h"
#include <string>
#include <cstring>
namespace mqtt {
/////////////////////////////////////////////////////////////////////////////
message::message() : msg_(MQTTAsync_message_initializer)
{
}
message::message(const void* payload, size_t len)
: msg_(MQTTAsync_message_initializer)
{
copy_payload(payload, len);
}
message::message(const std::string& payload)
: msg_(MQTTAsync_message_initializer)
{
copy_payload(payload.data(), payload.length());
}
message::message(const MQTTAsync_message& msg) : msg_(msg)
{
copy_payload(msg.payload, msg.payloadlen);
}
message::message(const message& other) : msg_(other.msg_)
{
copy_payload(other.msg_.payload, other.msg_.payloadlen);
}
message::message(message&& other) : msg_(other.msg_)
{
other.msg_.payload = nullptr;
other.msg_.payloadlen = 0;
}
message::~message()
{
clear_payload();
}
void message::copy_payload(const void* payload, size_t len)
{
if (!payload || len == 0) {
msg_.payload = nullptr;
msg_.payloadlen = 0;
}
else {
msg_.payloadlen = len;
msg_.payload = new char[len];
std::memcpy(msg_.payload, payload, len);
}
}
message& message::operator=(const message& rhs)
{
if (&rhs == this)
return *this;
delete[] (char*) msg_.payload;
msg_ = rhs.msg_;
copy_payload(rhs.msg_.payload, rhs.msg_.payloadlen);
return *this;
}
message& message::operator=(message&& rhs)
{
if (&rhs == this)
return *this;
delete[] (char*) msg_.payload;
msg_ = rhs.msg_;
rhs.msg_.payload = nullptr;
rhs.msg_.payloadlen = 0;
return *this;
}
void message::clear_payload()
{
delete[] (char*) msg_.payload;
msg_.payload = nullptr;
msg_.payloadlen = 0;
}
std::string message::get_payload() const
{
if (!msg_.payload || msg_.payloadlen == 0)
return std::string();
const char *p = (const char *) msg_.payload;
return std::string(p, p+msg_.payloadlen);
}
void message::set_payload(const void* payload, size_t len)
{
delete[] (char*) msg_.payload;
copy_payload(payload, len);
}
void message::set_payload(const std::string& payload)
{
set_payload(payload.data(), payload.length());
}
/////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
}

650
src/mqtt/async_client.h Normal file
View File

@ -0,0 +1,650 @@
/////////////////////////////////////////////////////////////////////////////
/// @file async_client.h
/// Declaration of MQTT async_client class
/// @date May 1, 2013
/// @author Frank Pagliughi
/////////////////////////////////////////////////////////////////////////////
/*******************************************************************************
* Copyright (c) 2013 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#ifndef __mqtt_async_client_h
#define __mqtt_async_client_h
extern "C" {
#include "MQTTAsync.h"
}
#include "mqtt/token.h"
#include "mqtt/delivery_token.h"
#include "mqtt/iclient_persistence.h"
#include "mqtt/iaction_listener.h"
#include "mqtt/connect_options.h"
#include "mqtt/exception.h"
#include "mqtt/message.h"
#include "mqtt/callback.h"
#include <string>
#include <vector>
#include <list>
#include <memory>
#include <stdexcept>
namespace mqtt {
const uint32_t VERSION = 0x00010000;
const std::string VERSION_STR("mqttpp v. 0.1"),
COPYRIGHT("Copyright (c) 2013 Frank Pagliughi");
/////////////////////////////////////////////////////////////////////////////
/**
* Enables an application to communicate with an MQTT server using
* non-blocking methods.
*
* It provides applications a simple programming interface to all features
* of the MQTT version 3.1 specification including:
*
* @li connect
* @li publish
* @li subscribe
* @li unsubscribe
* @li disconnect
*/
class iasync_client
{
friend class token;
virtual void remove_token(itoken* tok) =0;
public:
/** Type for a collection of filters */
typedef std::vector<std::string> topic_filter_collection;
/** Type for a collection of QOS values */
typedef std::vector<int> qos_collection;
/**
* Virtual destructor
*/
virtual ~iasync_client() {}
/**
* Connects to an MQTT server using the default options.
* @return bool
* @throw exception
* @throw security_exception
*/
virtual itoken_ptr connect() throw(exception, security_exception) =0;
/**
* Connects to an MQTT server using the provided connect options.
* @param options
* @return bool
* @throw exception
* @throw security_exception
*/
virtual itoken_ptr connect(connect_options options)
throw(exception, security_exception) =0;
/**
* Connects to an MQTT server using the specified options.
*
* @param options
*
* @return bool
* @throw exception
* @throw security_exception
*/
virtual itoken_ptr connect(connect_options options, void* userContext,
iaction_listener& cb) throw(exception, security_exception) =0;
/**
*
* @param userContext
* @param callback
*
* @return bool
* @throw exception
* @throw security_exception
*/
virtual itoken_ptr connect(void* userContext, iaction_listener& cb)
throw(exception, security_exception) =0;
/**
* Disconnects from the server.
* @return itoken_ptr
*/
virtual itoken_ptr disconnect() throw(exception) =0;
/**
* Disconnects from the server.
*
* @param quiesceTimeout
* @return itoken_ptr
*/
virtual itoken_ptr disconnect(long quiesceTimeout) throw(exception) =0;
/**
* Disconnects from the server.
*
* @param quiesceTimeout
* @param userContext
* @param callback
* @return itoken_ptr
*/
virtual itoken_ptr disconnect(long quiesceTimeout, void* userContext, iaction_listener& cb)
throw(exception) =0;
/**
* Disconnects from the server.
* @param userContext
* @param callback
* @return itoken_ptr
*/
virtual itoken_ptr disconnect(void* userContext, iaction_listener& cb)
throw(exception) =0;
/**
* Returns the delivery token for the specified message ID.
* @return idelivery_token
*/
virtual idelivery_token_ptr get_pending_delivery_token(int msgID) const =0;
/**
* Returns the delivery tokens for any outstanding publish operations.
* @return idelivery_token[]
*/
virtual std::vector<idelivery_token_ptr> get_pending_delivery_tokens() const =0;
/**
* Returns the client ID used by this client.
* @return std::string
*/
virtual std::string get_client_id() const =0;
/**
* Returns the address of the server used by this client.
*/
virtual std::string get_server_uri() const =0;
/**
* Determines if this client is currently connected to the server.
*/
virtual bool is_connected() const =0;
/**
* Publishes a message to a topic on the server
* @param topic
* @param payload
* @param qos
* @param retained
*
* @return idelivery_token
*/
virtual idelivery_token_ptr publish(const std::string& topic, const void* payload,
size_t n, int qos, bool retained)
throw(exception) =0;
/**
* Publishes a message to a topic on the server
* @param topic
* @param payload
* @param qos
* @param retained
* @param userContext
* @param cb
*
* @return idelivery_token
*/
virtual idelivery_token_ptr publish(const std::string& topic,
const void* payload, size_t n,
int qos, bool retained, void* userContext,
iaction_listener& cb) throw(exception) =0;
/**
* Publishes a message to a topic on the server Takes an Message
* message and delivers it to the server at the requested quality of
* service.
*
* @param topic
* @param message
*
* @return idelivery_token
*/
virtual idelivery_token_ptr publish(const std::string& topic, message_ptr msg)
throw(exception) =0;
/**
* Publishes a message to a topic on the server.
* @param topic
* @param message
* @param userContext
* @param callback
*
* @return idelivery_token
*/
virtual idelivery_token_ptr publish(const std::string& topic, message_ptr msg,
void* userContext, iaction_listener& cb)
throw(exception) =0;
/**
* Sets a callback listener to use for events that happen
* asynchronously.
* @param callback
*/
virtual void set_callback(callback& cb) throw(exception) =0;
/**
* Subscribe to multiple topics, each of which may include wildcards.
* @param topicFilters
* @param qos
*
* @return bool
*/
virtual itoken_ptr subscribe(const topic_filter_collection& topicFilters,
const qos_collection& qos)
throw(std::invalid_argument,exception) =0;
/**
* Subscribes to multiple topics, each of which may include wildcards.
* @param topicFilters
* @param qos
* @param userContext
* @param callback
*
* @return bool
*/
virtual itoken_ptr subscribe(const topic_filter_collection& topicFilters,
const qos_collection& qos,
void* userContext, iaction_listener& callback)
throw(std::invalid_argument,exception) =0;
/**
* Subscribe to a topic, which may include wildcards.
* @param topicFilter
* @param qos
*
* @return bool
*/
virtual itoken_ptr subscribe(const std::string& topicFilter, int qos)
throw(exception) =0;
/**
* Subscribe to a topic, which may include wildcards.
* @param topicFilter
* @param qos
* @param userContext
* @param callback
*
* @return bool
*/
virtual itoken_ptr subscribe(const std::string& topicFilter, int qos,
void* userContext, iaction_listener& callback)
throw(exception) =0;
/**
* Requests the server unsubscribe the client from a topic.
* @param topicFilter
*
* @return bool
*/
virtual itoken_ptr unsubscribe(const std::string& topicFilter) throw(exception) =0;
/**
* Requests the server unsubscribe the client from one or more topics.
* @param string
* @return bool
*/
virtual itoken_ptr unsubscribe(const topic_filter_collection& topicFilters)
throw(exception) =0;
/**
* Requests the server unsubscribe the client from one or more topics.
* @param string
* @param userContext
* @param callback
*
* @return bool
*/
virtual itoken_ptr unsubscribe(const topic_filter_collection& topicFilters,
void* userContext, iaction_listener& callback)
throw(exception) =0;
/**
* Requests the server unsubscribe the client from a topics.
* @param topicFilter
* @param userContext
* @param callback
*
* @return bool
*/
virtual itoken_ptr unsubscribe(const std::string& topicFilter,
void* userContext, iaction_listener& callback)
throw(exception) =0;
};
/////////////////////////////////////////////////////////////////////////////
/**
* Lightweight client for talking to an MQTT server using non-blocking
* methods that allow an operation to run in the background.
*/
class async_client : public virtual iasync_client
{
public:
/** Pointer type for this object */
typedef std::shared_ptr<async_client> ptr_t;
private:
/** Lock guard type for this class */
typedef std::unique_lock<std::mutex> guard;
/** Object monitor mutex */
mutable std::mutex lock_;
/** The underlying C-lib client. */
MQTTAsync cli_;
/** The server URI string. */
std::string serverURI_;
/** The client ID string that we provided to the server. */
std::string clientId_;
/** A user persistence wrapper (if any) */
MQTTClient_persistence* persist_;
/** Callback supplied by the user (if any) */
callback* userCallback_;
/** A list of tokens that are in play */
std::list<itoken_ptr> pendingTokens_;
/** A list of delivery tokens that are in play */
std::list<idelivery_token_ptr> pendingDeliveryTokens_;
static void on_connection_lost(void *context, char *cause);
static int on_message_arrived(void* context, char* topicName, int topicLen,
MQTTAsync_message* msg);
static void on_delivery_complete(void* context, MQTTAsync_token tok);
/** Manage internal list of active tokens */
friend class token;
virtual void add_token(itoken_ptr tok);
virtual void add_token(idelivery_token_ptr tok);
virtual void remove_token(itoken* tok);
virtual void remove_token(itoken_ptr tok) { remove_token(tok.get()); }
void remove_token(idelivery_token_ptr tok) { remove_token(tok.get()); }
/** Memory management for C-style filter collections */
std::vector<char*> alloc_topic_filters(
const topic_filter_collection& topicFilters);
void free_topic_filters(std::vector<char*>& filts);
/**
* Convenience function to get user callback safely.
* @return callback*
*/
callback* get_callback() const {
guard g(lock_);
return userCallback_;
}
/** Non-copyable */
async_client() =delete;
async_client(const async_client&) =delete;
async_client& operator=(const async_client&) =delete;
public:
/**
* Create an async_client that can be used to communicate with an MQTT
* server.
* This uses file-based persistence in the current working directory.
* @param serverURI
* @param clientId
*/
async_client(const std::string& serverURI, const std::string& clientId);
/**
* Create an async_client that can be used to communicate with an MQTT
* server.
* This uses file-based persistence in the specified directory.
* @param serverURI
* @param clientId
* @param persistDir
*/
async_client(const std::string& serverURI, const std::string& clientId,
const std::string& persistDir);
/**
* Create an async_client that can be used to communicate with an MQTT
* server.
* This allows the caller to specify a user-defined persistance object,
* or use no persistence.
* @param serverURI
* @param clientId
* @param persistence The user persistence structure. If this is null,
* then no persistence is used.
*/
async_client(const std::string& serverURI, const std::string& clientId,
iclient_persistence* persistence);
/**
* Destructor
*/
~async_client();
/**
* Connects to an MQTT server using the default options.
* @return bool
* @throw exception
* @throw security_exception
*/
virtual itoken_ptr connect() throw(exception, security_exception);
/**
* Connects to an MQTT server using the provided connect options.
* @param options
* @return bool
* @throw exception
* @throw security_exception
*/
virtual itoken_ptr connect(connect_options options) throw(exception, security_exception);
/**
* Connects to an MQTT server using the specified options.
*
* @param options
*
* @return bool
* @throw exception
* @throw security_exception
*/
virtual itoken_ptr connect(connect_options options, void* userContext,
iaction_listener& cb) throw(exception, security_exception);
/**
*
* @param userContext
* @param callback
*
* @return bool
* @throw exception
* @throw security_exception
*/
virtual itoken_ptr connect(void* userContext, iaction_listener& cb)
throw(exception, security_exception);
/**
* Disconnects from the server.
* @return itoken_ptr
*/
virtual itoken_ptr disconnect() throw(exception) { return disconnect(0L); }
/**
* Disconnects from the server.
*
* @param quiesceTimeout
* @return itoken_ptr
*/
virtual itoken_ptr disconnect(long quiesceTimeout) throw(exception);
/**
* Disconnects from the server.
*
* @param quiesceTimeout
* @param userContext
* @param callback
* @return itoken_ptr
*/
virtual itoken_ptr disconnect(long quiesceTimeout, void* userContext, iaction_listener& cb)
throw(exception);
/**
* Disconnects from the server.
* @param userContext
* @param callback
* @return itoken_ptr
*/
virtual itoken_ptr disconnect(void* userContext, iaction_listener& cb) throw(exception) {
return disconnect(0L, userContext, cb);
}
/**
* Returns the delivery token for the specified message ID.
* @return idelivery_token
*/
virtual idelivery_token_ptr get_pending_delivery_token(int msgID) const;
/**
* Returns the delivery tokens for any outstanding publish operations.
* @return idelivery_token[]
*/
virtual std::vector<idelivery_token_ptr> get_pending_delivery_tokens() const;
/**
* Returns the client ID used by this client.
* @return std::string
*/
virtual std::string get_client_id() const { return clientId_; }
/**
* Returns the address of the server used by this client.
*/
virtual std::string get_server_uri() const { return serverURI_; }
/**
* Determines if this client is currently connected to the server.
*/
virtual bool is_connected() const { return MQTTAsync_isConnected(cli_) != 0; }
/**
* Publishes a message to a topic on the server
* @param topic
* @param payload
* @param qos
* @param retained
*
* @return idelivery_token
*/
virtual idelivery_token_ptr publish(const std::string& topic, const void* payload,
size_t n, int qos, bool retained) throw(exception);
/**
* Publishes a message to a topic on the server
* @param topic
* @param payload
* @param qos
* @param retained
* @param userContext
* @param cb
*
* @return idelivery_token
*/
virtual idelivery_token_ptr publish(const std::string& topic,
const void* payload, size_t n,
int qos, bool retained, void* userContext,
iaction_listener& cb) throw(exception);
/**
* Publishes a message to a topic on the server Takes an Message
* message and delivers it to the server at the requested quality of
* service.
*
* @param topic
* @param message
*
* @return idelivery_token
*/
virtual idelivery_token_ptr publish(const std::string& topic, message_ptr msg)
throw(exception);
/**
* Publishes a message to a topic on the server.
* @param topic
* @param message
* @param userContext
* @param callback
*
* @return idelivery_token
*/
virtual idelivery_token_ptr publish(const std::string& topic, message_ptr msg,
void* userContext, iaction_listener& cb)
throw(exception);
/**
* Sets a callback listener to use for events that happen
* asynchronously.
* @param callback
*/
virtual void set_callback(callback& cb) throw(exception);
/**
* Subscribe to multiple topics, each of which may include wildcards.
* @param topicFilters
* @param qos
*
* @return bool
*/
virtual itoken_ptr subscribe(const topic_filter_collection& topicFilters,
const qos_collection& qos)
throw(std::invalid_argument,exception);
/**
* Subscribes to multiple topics, each of which may include wildcards.
* @param topicFilters
* @param qos
* @param userContext
* @param callback
*
* @return bool
*/
virtual itoken_ptr subscribe(const topic_filter_collection& topicFilters,
const qos_collection& qos,
void* userContext, iaction_listener& callback)
throw(std::invalid_argument,exception);
/**
* Subscribe to a topic, which may include wildcards.
* @param topicFilter
* @param qos
*
* @return bool
*/
virtual itoken_ptr subscribe(const std::string& topicFilter, int qos)
throw(exception);
/**
* Subscribe to a topic, which may include wildcards.
* @param topicFilter
* @param qos
* @param userContext
* @param callback
*
* @return bool
*/
virtual itoken_ptr subscribe(const std::string& topicFilter, int qos,
void* userContext, iaction_listener& callback)
throw(exception);
/**
* Requests the server unsubscribe the client from a topic.
* @param topicFilter
*
* @return bool
*/
virtual itoken_ptr unsubscribe(const std::string& topicFilter) throw(exception);
/**
* Requests the server unsubscribe the client from one or more topics.
* @param string
* @return bool
*/
virtual itoken_ptr unsubscribe(const topic_filter_collection& topicFilters)
throw(exception);
/**
* Requests the server unsubscribe the client from one or more topics.
* @param string
* @param userContext
* @param callback
*
* @return bool
*/
virtual itoken_ptr unsubscribe(const topic_filter_collection& topicFilters,
void* userContext, iaction_listener& callback)
throw(exception);
/**
* Requests the server unsubscribe the client from a topics.
* @param topicFilter
* @param userContext
* @param callback
*
* @return bool
*/
virtual itoken_ptr unsubscribe(const std::string& topicFilter,
void* userContext, iaction_listener& callback)
throw(exception);
};
/**
* Shared pointer to an asynchronous MQTT client object.
*/
typedef async_client::ptr_t async_client_ptr;
/////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
}
#endif // __mqtt_async_client_h

74
src/mqtt/callback.h Normal file
View File

@ -0,0 +1,74 @@
/////////////////////////////////////////////////////////////////////////////
/// @file callback.h
/// Declaration of MQTT callback class
/// @date May 1, 2013
/// @author Frank Pagliughi
/////////////////////////////////////////////////////////////////////////////
/*******************************************************************************
* Copyright (c) 2013 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#ifndef __mqtt_callback_h
#define __mqtt_callback_h
extern "C" {
#include "MQTTAsync.h"
}
#include "mqtt/delivery_token.h"
#include <string>
#include <vector>
#include <memory>
namespace mqtt {
/////////////////////////////////////////////////////////////////////////////
/**
* Provides a mechanism for tracking the completion of an asynchronous
* action.
*/
class callback
{
public:
typedef std::shared_ptr<callback> ptr_t;
/**
* This method is called when the connection to the server is lost.
* @param cause
*/
virtual void connection_lost(const std::string& cause) =0;
/**
* This method is called when a message arrives from the server.
* @param topic
* @param msg
*/
virtual void message_arrived(const std::string& topic, message_ptr msg) =0;
/**
* Called when delivery for a message has been completed, and all
* acknowledgments have been received.
* @param token
*/
virtual void delivery_complete(idelivery_token_ptr tok) =0;
};
typedef callback::ptr_t callback_ptr;
/////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
}
#endif // __mqtt_callback_h

229
src/mqtt/client.h Normal file
View File

@ -0,0 +1,229 @@
/////////////////////////////////////////////////////////////////////////////
/// @file client.h
/// Declaration of MQTT client class
/// @date May 1, 2013
/// @author Frank Pagliughi
/////////////////////////////////////////////////////////////////////////////
/*******************************************************************************
* Copyright (c) 2013 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#ifndef __mqtt_client_h
#define __mqtt_client_h
//extern "C" {
// #include "MQTTClient.h"
// #include "MQTTClientPersistence.h"
//}
#include "async_client.h"
#include <string>
#include <memory>
namespace mqtt {
/////////////////////////////////////////////////////////////////////////////
/**
* Lightweight client for talking to an MQTT server using methods that block
* until an operation completes.
*/
class client
{
static const int DFLT_QOS;
//MQTTClient cli_;
/**
* The actual client
*/
async_client cli_;
/**
* The longest amount of time to wait for an operation (in milliseconds)
*/
int timeout_;
/** Non-copyable */
client() =delete;
client(const async_client&) =delete;
client& operator=(const async_client&) =delete;
public:
/** Smart pointer type for this object */
typedef std::shared_ptr<client> ptr_t;
/** Type for a collection of filters */
typedef async_client::topic_filter_collection topic_filter_collection;
/** Type for a collection of QOS values */
typedef async_client::qos_collection qos_collection;
/**
* Create a client that can be used to communicate with an MQTT server.
* This uses file-based persistence in the current working directory.
* @param serverURI
* @param clientId
*/
client(const std::string& serverURI, const std::string& clientId);
/**
* Create a client that can be used to communicate with an MQTT server.
* This uses file-based persistence in the specified directory.
* @param serverURI
* @param clientId
* @param persistDir
*/
client(const std::string& serverURI, const std::string& clientId,
const std::string& persistDir);
/**
* Create a client that can be used to communicate with an MQTT server.
* This allows the caller to specify a user-defined persistance object,
* or use no persistence.
* @param serverURI
* @param clientId
* @param persistence The user persistence structure. If this is null,
* then no persistence is used.
*/
client(const std::string& serverURI, const std::string& clientId,
iclient_persistence* persistence);
/**
* Close the client and releases all resource associated with the
* client.
*/
virtual void close();
/**
* Connects to an MQTT server using the default options.
*/
virtual void connect();
/**
* Connects to an MQTT server using the specified options.
* @param options
*/
virtual void connect(connect_options options);
/**
* Disconnects from the server.
*/
virtual void disconnect();
/**
* Disconnects from the server.
*/
virtual void disconnect(long quiesceTimeout);
/**
* Returns a randomly generated client identifier based on the current
* user's login name and the system time.
*/
//static std::string generateClientId();
/**
* Returns the client ID used by this client.
* @return std::string
*/
virtual std::string get_client_id() const;
//Debug getDebug()
//Return a debug object that can be used to help solve problems.
/**
* Returns the delivery tokens for any outstanding publish operations.
*/
virtual std::vector<idelivery_token_ptr> get_pending_delivery_tokens() const;
/**
* Returns the address of the server used by this client, as a URI.
* @return std::string
*/
virtual std::string get_server_uri() const;
/**
* Return the maximum time to wait for an action to complete.
* @return long
*/
virtual long get_time_to_wait() const;
/**
* Get a topic object which can be used to publish messages.
* @param tpc
* @return topic
*/
virtual topic get_topic(const std::string& tpc);
/**
* Determines if this client is currently connected to the server.
* @return bool
*/
virtual bool is_connected() const;
/**
* Publishes a message to a topic on the server and return once it is
* delivered.
* @param topic
* @param payload
* @param n
* @param qos
* @param retained
*/
virtual void publish(const std::string& top, const void* payload, size_t n,
int qos, bool retained);
/**
* Publishes a message to a topic on the server.
* @param tpc
* @param msg
*/
virtual void publish(const std::string& tpc, message_ptr msg);
/**
* Sets the callback listener to use for events that happen
* asynchronously.
* @param callback
*/
virtual void set_callback(callback& cb);
/**
* Set the maximum time to wait for an action to complete
* @param timeToWaitInMillis
*/
virtual void set_time_to_wait(int timeToWaitInMillis);
/**
* Subscribe to a topic, which may include wildcards using a QoS of 1.
* @param topicFilter
*/
virtual void subscribe(const std::string& topicFilter);
/**
* Subscribes to a one or more topics, which may include wildcards using
* a QoS of 1.
*/
virtual void subscribe(const topic_filter_collection& topicFilters);
/**
* Subscribes to multiple topics, each of which may include wildcards.
* @param string
*/
virtual void subscribe(const topic_filter_collection& topicFilters,
const qos_collection& qos);
/**
* Subscribe to a topic, which may include wildcards.
* @param topicFilter
* @param qos
*/
virtual void subscribe(const std::string& topicFilter, int qos);
/**
* Requests the server unsubscribe the client from a topic.
* @param topicFilter
*/
virtual void unsubscribe(const std::string& topicFilter);
/**
* Requests the server unsubscribe the client from one or more topics.
*/
virtual void unsubscribe(const topic_filter_collection& topicFilters);
};
typedef client::ptr_t client_ptr;
/////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
}
#endif // __mqtt_client_h

199
src/mqtt/connect_options.h Normal file
View File

@ -0,0 +1,199 @@
/////////////////////////////////////////////////////////////////////////////
/// @file connect_options.h
/// Declaration of MQTT connect_options class
/// @date May 1, 2013
/// @author Frank Pagliughi
/////////////////////////////////////////////////////////////////////////////
/*******************************************************************************
* Copyright (c) 2013 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#ifndef __mqtt_connect_options_h
#define __mqtt_connect_options_h
extern "C" {
#include "MQTTAsync.h"
}
#include "mqtt/message.h"
#include "mqtt/topic.h"
#include <string>
#include <vector>
#include <memory>
namespace mqtt {
class async_client;
/////////////////////////////////////////////////////////////////////////////
/**
* Holds the set of options that control how the client connects to a
* server.
*/
class connect_options
{
/** The underlying C connection options */
MQTTAsync_connectOptions opts_;
/** The client has special access */
friend class async_client;
public:
/**
* Smart/shared pointer to this class.
*/
typedef std::shared_ptr<connect_options> ptr_t;
/**
* Constructs a new MqttConnectOptions object using the default values.
*/
connect_options() : opts_( MQTTAsync_connectOptions_initializer ) {}
/**
* Returns the connection timeout value.
* @return int
*/
int get_connection_timeout() const;
//java.util.Properties getDebug()
/**
* Returns the "keep alive" interval.
* @return int
*/
int get_keep_alive_interval() const {
return opts_.keepAliveInterval;
}
/**
* Returns the password to use for the connection.
* @return std::string
*/
std::string get_password() const {
return std::string(opts_.password);
}
/**
* Returns the socket factory that will be used when connecting, or null
* if one has not been set.
*/
//javax.net.SocketFactory get_socket_factory();
/**
* Returns the SSL properties for the connection.
*/
//java.util.Properties get_ssl_properties();
/**
* Returns the user name to use for the connection.
* @return std::string
*/
std::string get_user_name() const {
return std::string(opts_.username);
}
/**
* Returns the topic to be used for last will and testament (LWT).
* @return std::string
*/
std::string get_will_destination() const;
/**
* Returns the message to be sent as last will and testament (LWT).
* @return MqttMessage
*/
message get_will_message() const;
/**
* Returns whether the server should remember state for the client
* across reconnects.
* @return bool
*/
bool is_clean_session() const { return opts_.cleansession != 0; }
/**
* Sets whether the server should remember state for the client across
* reconnects.
* @param cleanSession
*/
void set_clean_session(bool cleanSession) {
opts_.cleansession = (cleanSession) ? (!0) : 0;
}
/**
* Sets the connection timeout value.
* @param timeout
*/
void set_connection_timeout(int timeout) {
opts_.connectTimeout = timeout;
}
/**
* Sets the "keep alive" interval.
* @param keepAliveInterval
*/
void set_keep_alive_interval(int keepAliveInterval) {
opts_.keepAliveInterval = keepAliveInterval;
}
/**
* Sets the password to use for the connection.
*/
void set_password(const std::string& password);
/**
* Sets the SocketFactory to use.
*/
//void set_socket_factory(javax.net.SocketFactory socketFactory)
/**
* Sets the SSL properties for the connection.
*/
//void set_ssl_properties(java.util.Properties props);
/**
* Sets the user name to use for the connection.
* @param userName
*/
void set_user_name(const std::string& userName);
/**
* Sets the "Last Will and Testament" (LWT) for the connection.
* @param top
* @param payload
* @param n
* @param qos
* @param retained
*/
void set_will(const topic& top, void* payload, size_t n, int qos, bool retained) {
set_will(top.get_name(), payload, n, qos, retained);
}
/**
* Sets the "Last Will and Testament" (LWT) for the connection.
* @param top
* @param payload
* @param n
* @param qos
* @param retained
*/
void set_will(const std::string& top, void* payload, size_t n, int qos, bool retained);
/**
* Sets up the will information, based on the supplied parameters.
* @param top
* @param msg
* @param qos
* @param retained
*/
/*protected*/ void set_will(const std::string& top, message msg, int qos, bool retained);
std::string to_str() const;
};
/**
* Shared pointer to the connection options class.
*/
typedef connect_options::ptr_t connect_options_ptr;
/////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
}
#endif // __mqtt_connect_options_h

109
src/mqtt/delivery_token.h Normal file
View File

@ -0,0 +1,109 @@
/////////////////////////////////////////////////////////////////////////////
/// @file delivery_token.h
/// Declaration of MQTT delivery_token class
/// @date May 1, 2013
/// @author Frank Pagliughi
/////////////////////////////////////////////////////////////////////////////
/*******************************************************************************
* Copyright (c) 2013 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#ifndef __mqtt_delivery_token_h
#define __mqtt_delivery_token_h
extern "C" {
#include "MQTTAsync.h"
}
#include "mqtt/token.h"
#include "mqtt/message.h"
#include <memory>
namespace mqtt {
/////////////////////////////////////////////////////////////////////////////
/**
* Provides a mechanism for tracking the delivery of a message.
*/
class idelivery_token : public virtual itoken
{
public:
typedef std::shared_ptr<idelivery_token> ptr_t;
/**
* Returns the message associated with this token.
* @return The message associated with this token.
*/
virtual message_ptr get_message() const =0;
};
typedef idelivery_token::ptr_t idelivery_token_ptr;
/////////////////////////////////////////////////////////////////////////////
/**
* Provides a mechanism to track the delivery progress of a message.
* Used to track the the delivery progress of a message when a publish is
* executed in a non-blocking manner (run in the background) action.
*/
class delivery_token : public virtual idelivery_token,
public token
{
/** The message being tracked. */
message_ptr msg_;
/** Client has special access. */
friend class async_client;
/**
* Sets the message that this token correspn
* @param msg
*/
void set_message(message_ptr msg) { msg_ = msg; }
public:
/**
* Smart/shared pointer to this class.
*/
typedef std::shared_ptr<delivery_token> ptr_t;
delivery_token(iasync_client& cli) : token(cli) {}
delivery_token(iasync_client& cli, const std::string& topic) : token(cli, topic) {}
delivery_token(iasync_client& cli, const std::vector<std::string>& topics)
: token(cli, topics) {}
//delivery_token(const std::string& logContext);
/**
* Returns the message associated with this token.
* @return message
*/
virtual message_ptr get_message() const { return msg_; }
};
/**
* Shared pointer to a delivery_token.
*/
typedef delivery_token::ptr_t delivery_token_ptr;
/////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
}
#endif // __mqtt_delivery_token_h

109
src/mqtt/exception.h Normal file
View File

@ -0,0 +1,109 @@
/////////////////////////////////////////////////////////////////////////////
/// @file exception.h
/// Declaration of MQTT exception class
/// @date May 1, 2013
/// @author Frank Pagliughi
/////////////////////////////////////////////////////////////////////////////
/*******************************************************************************
* Copyright (c) 2013 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#ifndef __mqtt_exception_h
#define __mqtt_exception_h
extern "C" {
#include "MQTTAsync.h"
}
#include <string>
#include <vector>
#include <memory>
#include <exception>
#include <stdexcept>
namespace mqtt {
/////////////////////////////////////////////////////////////////////////////
/**
* Provides a mechanism for tracking the completion of an asynchronous
* action.
*/
class exception : public std::runtime_error
{
int code_;
public:
explicit exception(int reasonCode) : std::runtime_error("mqtt::exception"),
code_(reasonCode) {}
/**
* Returns the underlying cause of this exception, if available.
*/
//java.lang.Throwable getCause()
/**
* Returns the detail message for this exception.
*/
std::string get_message() const { return std::string(what()); }
/**
* Returns the reason code for this exception.
*/
int get_reason_code() const { return code_; }
/**
* Returns a String representation of this exception.
* @return std::tring
*/
std::string to_str() const { return std::string(what()); }
/**
* Returns an explanatory string for the exception.
* @return const char*
*/
virtual const char* what() const noexcept {
return std::exception::what();
}
};
/////////////////////////////////////////////////////////////////////////////
/**
* This exception is thrown by the implementor of the persistence interface
* if there is a problem reading or writing persistent data.
*/
class persistence_exception : public exception
{
public:
// TODO: Define "reason codes"
persistence_exception() : exception(MQTTCLIENT_PERSISTENCE_ERROR) {}
persistence_exception(int reasonCode) : exception(reasonCode) {}
};
/////////////////////////////////////////////////////////////////////////////
/**
* Thrown when a client is not authorized to perform an operation, or if
there is a problem with the security configuration.
*/
class security_exception : public exception
{
public:
security_exception(int reasonCode) : exception(reasonCode) {}
};
/////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
}
#endif // __mqtt_token_h

View File

@ -0,0 +1,83 @@
/////////////////////////////////////////////////////////////////////////////
/// @file iaction_listener.h
/// Declaration of MQTT iaction_listener class
/// @date May 1, 2013
/// @author Frank Pagliughi
/////////////////////////////////////////////////////////////////////////////
/*******************************************************************************
* Copyright (c) 2013 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#ifndef __mqtt_iaction_listener_h
#define __mqtt_iaction_listener_h
extern "C" {
#include "MQTTAsync.h"
}
#include <string>
#include <vector>
#include <memory>
namespace mqtt {
class itoken;
/////////////////////////////////////////////////////////////////////////////
/**
* Provides a mechanism for tracking the completion of an asynchronous
* action.
*
* A listener is registered on a token and that token is associated with
* an action like connect or publish. When used with tokens on the
* async_client the listener will be called back on the MQTT client's
* thread. The listener will be informed if the action succeeds or fails. It
* is important that the listener returns control quickly otherwise the
* operation of the MQTT client will be stalled.
*/
class iaction_listener
{
public:
/**
* Shared pointer to this class.
*/
typedef std::shared_ptr<iaction_listener> ptr_t;
/**
* Virtual base destructor.
*/
virtual ~iaction_listener() {}
/**
* This method is invoked when an action fails.
* @param asyncActionToken
* @param exc
*/
virtual void on_failure(const itoken& asyncActionToken /*, java.lang.Throwable exc*/) =0;
/**
* This method is invoked when an action has completed successfully.
* @param asyncActionToken
*/
virtual void on_success(const itoken& asyncActionToken) =0;
};
typedef iaction_listener::ptr_t iaction_listener_ptr;
/////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
}
#endif // __mqtt_iaction_listener_h

View File

@ -0,0 +1,133 @@
/////////////////////////////////////////////////////////////////////////////
/// @file iclient_persistence.h
/// Declaration of MQTT iclient_persistence interface
/// @date May 1, 2013
/// @author Frank Pagliughi
/////////////////////////////////////////////////////////////////////////////
/*******************************************************************************
* Copyright (c) 2013 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#ifndef __mqtt_iclient_persistence_h
#define __mqtt_iclient_persistence_h
extern "C" {
#include "MQTTAsync.h"
}
#include "mqtt/ipersistable.h"
#include <string>
#include <memory>
#include <vector>
namespace mqtt {
/////////////////////////////////////////////////////////////////////////////
/**
* Represents a persistent data store, used to store outbound and inbound
* messages while they are in flight, enabling delivery to the QoS
* specified. You can specify an implementation of this interface using
* client::client(string, string, iclient_persistence), which the
* client will use to persist QoS 1 and 2 messages.
*
* If the methods defined throw the MqttPersistenceException then the state
* of the data persisted should remain as prior to the method being called.
* For example, if put(string, persistable) throws an exception at any
* point then the data will be assumed to not be in the persistent store.
* Similarly if remove(string) throws an exception then the data will be
* assumed to still be held in the persistent store.
*
* It is up to the persistence interface to log any exceptions or error
* information which may be required when diagnosing a persistence failure.
*/
class iclient_persistence
{
friend class iasync_client;
public:
/** C-callbacks */
static int persistence_open(void** handle, char* clientID, char* serverURI, void* context);
static int persistence_close(void* handle);
static int persistence_put(void* handle, char* key, int bufcount, char* buffers[], int buflens[]);
static int persistence_get(void* handle, char* key, char** buffer, int* buflen);
static int persistence_remove(void* handle, char* key);
static int persistence_keys(void* handle, char*** keys, int* nkeys);
static int persistence_clear(void* handle);
static int persistence_containskey(void* handle, char* key);
public:
/**
* Smart/shared pointer to this class.
*/
typedef std::shared_ptr<iclient_persistence> ptr_t;
/**
* Virtual destructor.
*/
virtual ~iclient_persistence() {}
/**
* Initialise the persistent store.
*/
virtual void open(const std::string& clientId, const std::string& serverURI) =0;
/**
* Close the persistent store that was previously opened.
*/
virtual void close() =0;
/**
* Clears persistence, so that it no longer contains any persisted data.
*/
virtual void clear() =0;
/**
* Returns whether or not data is persisted using the specified key.
* @param key
* @return bool
*/
virtual bool contains_key(const std::string& key) =0;
/**
* Gets the specified data out of the persistent store.
* @param key
* @return persistable
*/
virtual ipersistable_ptr get(const std::string& key) const =0;
/**
* Returns an Enumeration over the keys in this persistent data store.
*/
virtual std::vector<std::string> keys() const =0;
/**
* Puts the specified data into the persistent store.
* @param key
* @param persistable
*/
virtual void put(const std::string& key, ipersistable_ptr persistable) =0;
/**
* Remove the data for the specified key.
* @param key
*/
virtual void remove(const std::string& key) =0;
};
/**
* Shared pointer to a persistence client.
*/
typedef std::shared_ptr<iclient_persistence> iclient_persistence_ptr;
/////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
}
#endif // __mqtt_iclient_persistence_h

140
src/mqtt/ipersistable.h Normal file
View File

@ -0,0 +1,140 @@
/////////////////////////////////////////////////////////////////////////////
/// @file ipersistable.h
/// Declaration of MQTT ipersistable interface.
/// @date May 24, 2013
/// @author Frank Pagliughi
/////////////////////////////////////////////////////////////////////////////
/*******************************************************************************
* Copyright (c) 2013 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#ifndef __mqtt_ipersistable_h
#define __mqtt_ipersistable_h
extern "C" {
#include "MQTTAsync.h"
}
#include <string>
#include <memory>
#include <vector>
#include <stdexcept>
namespace mqtt {
/////////////////////////////////////////////////////////////////////////////
/**
* Represents an object used to pass data to be persisted across the
* MqttClientPersistence interface.
*
* When data is passed across the interface the header and payload are
* separated, so that unnecessary message copies may be avoided. For
* example, if a 10 MB payload was published it would be inefficient to
* create a byte array a few bytes larger than 10 MB and copy the MQTT
* message header and payload into a contiguous byte array.
*
* When the request to persist data is made a separate byte array and offset
* is passed for the header and payload. Only the data between offset and
* length need be persisted. So for example, a message to be persisted
* consists of a header byte array starting at offset 1 and length 4, plus a
* payload byte array starting at offset 30 and length 40000. There are
* three ways in which the persistence implementation may return data to the
* client on recovery:
*
* @li
* It could return the data as it was passed in originally, with the same
* byte arrays and offsets.
*
* @li
* It could safely just persist and return the bytes from the offset for the
* specified length. For example, return a header byte array with offset 0
* and length 4, plus a payload byte array with offset 0 and length 40000
*
* @li
* It could return the header and payload as a contiguous byte array with
* the header bytes preceeding the payload. The contiguous byte array should
* be set as the header byte array, with the payload byte array being null.
* For example, return a single byte array with offset 0 and length 40004.
* This is useful when recovering from a file where the header and payload
* could be written as a contiguous stream of bytes.
*/
class ipersistable
{
public:
/**
* Smart/shared pointer to this class.
*/
typedef std::shared_ptr<ipersistable> ptr_t;
/**
* Virtual destructor
*/
virtual ~ipersistable() {}
/**
* Returns the header bytes in an array.
* @return std::vector<uint8_t>
*/
virtual const uint8_t* get_header_bytes() const =0;
/**
* Returns the header bytes in an array.
* @return std::vector<uint8_t>
*/
virtual std::vector<uint8_t> get_header_byte_arr() const =0;
/**
* Returns the length of the header.
* @return int
*/
virtual size_t get_header_length() const =0;
/**
* Returns the offset of the header within the byte array returned by
* get_header_bytes().
* @return int
*/
virtual size_t get_header_offset() const =0;
/**
* Returns the payload bytes in an array.
* @return std::vector<uint8_t>
*/
virtual const uint8_t* get_payload_bytes() const =0;
/**
* Returns the payload bytes in an array.
* @return std::vector<uint8_t>
*/
virtual std::vector<uint8_t> get_payload_byte_arr() const =0;
/**
* Returns the length of the payload.
* @return int
*/
virtual size_t get_payload_length() const =0;
/**
* Returns the offset of the payload within the byte array returned by
* get_payload_bytes().
*
* @return int
*/
virtual size_t get_payload_offset() const =0;
};
typedef ipersistable::ptr_t ipersistable_ptr;
/////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
}
#endif // __mqtt_ipersistable_h

183
src/mqtt/message.h Normal file
View File

@ -0,0 +1,183 @@
/////////////////////////////////////////////////////////////////////////////
/// @file message.h
/// Declaration of MQTT message class
/// @date May 1, 2013
/// @author Frank Pagliughi
/////////////////////////////////////////////////////////////////////////////
/*******************************************************************************
* Copyright (c) 2013 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#ifndef __mqtt_message_h
#define __mqtt_message_h
extern "C" {
#include "MQTTAsync.h"
}
#include <string>
#include <memory>
#include <stdexcept>
namespace mqtt {
/////////////////////////////////////////////////////////////////////////////
/**
* An MQTT message holds the application payload and options specifying how
* the message is to be delivered The message includes a "payload" (the body
* of the message) represented as a byte array.
*/
class message
{
/** The underlying C message struct */
MQTTAsync_message msg_;
/** The client has special access. */
friend class async_client;
/**
* Set the dup flag in the underlying message
* @param dup
*/
void set_duplicate(bool dup) { msg_.dup = (dup) ? (!0) : 0; }
/**
* Copies the specified payload into this object.
* @param payload
* @param len
*/
void copy_payload(const void* payload, size_t len);
public:
/**
* Smart/shared pointer to this class.
*/
typedef std::shared_ptr<message> ptr_t;
/**
* Constructs a message with an empty payload, and all other values set
* to defaults.
*/
message();
/**
* Constructs a message with the specified array as a payload, and all
* other values set to defaults.
*/
message(const void* payload, size_t len);
/**
* Constructs a message with the specified string as a payload, and
* all other values set to defaults.
*/
message(const std::string& payload);
/**
* Constructs a message as a copy of the message structure.
*/
message(const MQTTAsync_message& msg);
/**
* Constructs a message as a copy of the other message.
*/
message(const message& other);
/**
* Moves the other message to this one.
*/
message(message&& other);
/**
* Destroys a message and frees all associated resources.
*/
~message();
/**
* Copies another message to this one.
* @param rhs The other message.
* @return A reference to this message.
*/
message& operator=(const message& rhs);
/**
* Moves another message to this one.
* @param rhs The other message.
* @return A reference to this message.
*/
message& operator=(message&& rhs);
/**
* Clears the payload, resetting it to be empty.
*/
void clear_payload();
/**
* Gets the payload
*/
std::string get_payload() const;
/**
* Returns the quality of service for this message.
* @return The quality of service for this message.
*/
int get_qos() const { return msg_.qos; }
/**
* Returns whether or not this message might be a duplicate of one which
* has already been received.
* @return bool
*/
bool is_duplicate() const { return msg_.dup != 0; }
/**
* Returns whether or not this message should be/was retained by the
* server.
* @return bool
*/
bool is_retained() const { return msg_.retained != 0; }
/**
* Sets the payload of this message to be the specified byte array.
*/
void set_payload(const void* payload, size_t n);
/**
* Sets the payload of this message to be the specified string.
*/
void set_payload(const std::string& payload);
/**
* Sets the quality of service for this message.
*
* @param qos
*/
void set_qos(int qos) throw(std::invalid_argument) {
validate_qos(qos);
msg_.qos = qos;
}
/**
* Whether or not the publish message should be retained by the
* messaging engine.
* @param retained
*/
void set_retained(bool retained) { msg_.retained = (retained) ? (!0) : 0; }
/**
* Returns a string representation of this messages payload.
* @return std::string
*/
std::string to_str() const { return get_payload(); }
/**
* Determines if the QOS value is a valid one.
* @param qos The QOS value.
* @throw std::invalid_argument If the qos value is invalid.
*/
static void validate_qos(int qos) throw(std::invalid_argument) {
if (qos < 0 || qos > 2)
throw std::invalid_argument("QOS invalid");
}
};
typedef message::ptr_t message_ptr;
/////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
}
#endif // __mqtt_message_h

331
src/mqtt/token.h Normal file
View File

@ -0,0 +1,331 @@
/////////////////////////////////////////////////////////////////////////////
/// @file token.h
/// Declaration of MQTT token class
/// @date May 1, 2013
/// @author Frank Pagliughi
/////////////////////////////////////////////////////////////////////////////
/*******************************************************************************
* Copyright (c) 2013 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#ifndef __mqtt_token_h
#define __mqtt_token_h
extern "C" {
#include "MQTTAsync.h"
}
#include "mqtt/iaction_listener.h"
#include "mqtt/exception.h"
#include <string>
#include <vector>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
namespace mqtt {
class iasync_client;
/////////////////////////////////////////////////////////////////////////////
/**
* Provides a mechanism for tracking the completion of an asynchronous task.
*/
class itoken
{
public:
typedef std::shared_ptr<itoken> ptr_t;
/**
* Virtual base destructor.
*/
virtual ~itoken() {}
/**
* Return the async listener for this token.
* @return iaction_listener
*/
virtual iaction_listener* get_action_callback() const =0;
/**
* Returns the MQTT client that is responsible for processing the
* asynchronous action.
* @return iasync_client
*/
virtual iasync_client* get_client() const =0;
/**
* Returns an exception providing more detail if an operation failed.
* @return Exception
*/
//virtual exception get_exception() =0;
/**
* Returns the message ID of the message that is associated with the
* token.
* @return int
*/
virtual int get_message_id() const =0;
/**
* Returns the topic string(s) for the action being tracked by this
* token.
* @return std::vector<std::string>
*/
virtual const std::vector<std::string>& get_topics() const =0;
/**
* Retrieve the context associated with an action.
* @return void*
*/
virtual void* get_user_context() const =0;
/**
* Returns whether or not the action has finished.
* @return bool
*/
virtual bool is_complete() const =0;
/**
* Register a listener to be notified when an action completes.
* @param listener
*/
virtual void set_action_callback(iaction_listener& listener) =0;
/**
* Store some context associated with an action.
* @param userContext
*/
virtual void set_user_context(void* userContext) =0;
/**
* Blocks the current thread until the action this token is associated
* with has completed.
*/
virtual void wait_for_completion() =0;
/**
* Blocks the current thread until the action this token is associated
* with has completed.
* @param timeout
*/
virtual void wait_for_completion(long timeout) =0;
};
typedef itoken::ptr_t itoken_ptr;
/////////////////////////////////////////////////////////////////////////////
/**
* Provides a mechanism for tracking the completion of an asynchronous
* action.
*/
class token : public virtual itoken
{
/** Lock guard type for this class. */
typedef std::unique_lock<std::mutex> guard;
/** Object monitor mutex. */
mutable std::mutex lock_;
/** Condition variable signals when the action completes */
mutable std::condition_variable cond_;
/** The underlying C token. Note that this is just an integer */
MQTTAsync_token tok_;
/** The topic string(s) for the action being tracked by this token */
std::vector<std::string> topics_;
/** The MQTT client that is processing this action */
iasync_client* cli_;
/** User supplied context */
void* userContext_;
/**
* User supplied listener.
* Note that the user listener fires after the action is marked
* complete, but before the token is signaled.
*/
iaction_listener* listener_;
/** Whether the action has yet to complete */
bool complete_;
/** The action success/failure code */
int rc_;
/** Client has special access for full initialization */
friend class async_client;
void set_topics(const std::string& top) {
topics_.clear();
topics_.push_back(top);
}
void set_topics(const std::vector<std::string>& top) {
topics_ = top;
}
/**
* C-style callback for success.
* This simply passes the call on to the proper token object for
* processing.
* @param tokObj The token object to process the call. Note that this is
* @em not the user-supplied context pointer. That is
* kept in the object itself.
* @param rsp The success response.
*/
static void on_success(void* tokObj, MQTTAsync_successData* rsp);
/**
* C-style callback for failure.
* This simply passes the call on to the proper token object for
* processing.
* @param tokObj The token object to process the call. Note that this is
* @em not the user-supplied context pointer. That is
* kept in the object itself.
* @param rsp The failure response.
*/
static void on_failure(void* tokObj, MQTTAsync_failureData* rsp);
/**
* Internal handler for the success callback.
* @param rsp The success response.
*/
void on_success(MQTTAsync_successData* rsp);
/**
* Internal handler for the failure callback.
* @param rsp The failure response.
*/
void on_failure(MQTTAsync_failureData* rsp);
public:
typedef std::shared_ptr<token> ptr_t;
/**
* Constructs a token object.
* @param cli
*/
token(iasync_client& cli);
/**
* Constructs a token object.
* @param tok
*/
token(iasync_client& cli, MQTTAsync_token tok);
/**
* Constructs a token object.
* @param cli
*/
token(iasync_client& cli, const std::string& topic);
/**
* Constructs a token object.
* @param cli
*/
token(iasync_client& cli, const std::vector<std::string>& topics);
//token(const std::string& logContext);
/**
* Return the async listener for this token.
* @return iaction_listener
*/
virtual iaction_listener* get_action_callback() const {
// TODO: Guard?
return listener_;
}
/**
* Returns the MQTT client that is responsible for processing the
* asynchronous action.
* @return iasync_client
*/
virtual iasync_client* get_client() const {
return (iasync_client*) cli_;
}
/**
* Returns an exception providing more detail if an operation failed.
* @return Exception
*/
//virtual exception get_exception();
/**
* Returns the message ID of the message that is associated with the
* token.
* @return int
*/
virtual int get_message_id() const { return int(tok_); }
/**
* Returns the topic string(s) for the action being tracked by this
* token.
*/
virtual const std::vector<std::string>& get_topics() const {
return topics_;
}
/**
* Retrieve the context associated with an action.
*/
virtual void* get_user_context() const {
guard g(lock_);
return userContext_;
}
/**
* Returns whether or not the action has finished.
* @return bool
*/
virtual bool is_complete() const { return complete_; }
/**
* Register a listener to be notified when an action completes.
* @param listener
*/
virtual void set_action_callback(iaction_listener& listener) {
guard g(lock_);
listener_ = &listener;
}
/**
* Store some context associated with an action.
* @param userContext
*/
virtual void set_user_context(void* userContext) {
guard g(lock_);
userContext_ = userContext;
}
/**
* Blocks the current thread until the action this token is associated
* with has completed.
*/
virtual void wait_for_completion();
/**
* Blocks the current thread until the action this token is associated
* with has completed.
* @param timeout The timeout (in milliseconds)
*/
virtual void wait_for_completion(long timeout);
/**
* Waits a relative amount of time for the action to complete.
* @param relTime The amount of time to wait for the event.
* @return @em true if the event gets signaled in the specified time,
* @em false on a timeout.
*/
template <class Rep, class Period>
bool wait_for_completion(const std::chrono::duration<Rep, Period>& relTime) {
wait_for_completion((long) std::chrono::duration_cast<std::chrono::milliseconds>(relTime).count());
return rc_ == 0;
}
/**
* Waits until an absolute time for the action to complete.
* @param absTime The absolute time to wait for the event.
* @return @em true if the event gets signaled in the specified time,
* @em false on a timeout.
*/
template <class Clock, class Duration>
bool wait_until_completion(const std::chrono::time_point<Clock, Duration>& absTime) {
guard g(lock_);
if (!cond_.wait_until(g, absTime, [this]{return complete_;}))
return false;
if (rc_ != MQTTASYNC_SUCCESS)
throw exception(rc_);
return true;
}
};
typedef token::ptr_t token_ptr;
/////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
}
#endif // __mqtt_token_h

115
src/mqtt/topic.h Normal file
View File

@ -0,0 +1,115 @@
/////////////////////////////////////////////////////////////////////////////
/// @file topic.h
/// Declaration of MQTT topic class
/// @date May 1, 2013
/// @author Frank Pagliughi
/////////////////////////////////////////////////////////////////////////////
/*******************************************************************************
* Copyright (c) 2013 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#ifndef __mqtt_topic_h
#define __mqtt_topic_h
extern "C" {
#include "MQTTAsync.h"
}
#include "mqtt/delivery_token.h"
#include "mqtt/message.h"
#include <string>
#include <vector>
#include <memory>
namespace mqtt {
class async_client;
/////////////////////////////////////////////////////////////////////////////
/**
* Represents a topic destination, used for publish/subscribe messaging.
*/
class topic
{
/// The topic name
std::string name_;
/// The client to which this topic is connected
async_client* cli_;
public:
/**
* A smart/shared pointer to this class.
*/
typedef std::shared_ptr<topic> ptr_t;
/**
* Construct an MQTT topic destination for messages.
* @param name
* @param cli
*/
topic(const std::string& name, async_client& cli) : name_(name), cli_(&cli) {}
/**
* Returns the name of the queue or topic.
* @return std::string
*/
std::string get_name() const { return name_; }
/**
* Publishes a message on the topic.
* @param payload
* @param n
* @param qos
* @param retained
*
* @return delivery_token
*/
idelivery_token_ptr publish(const void* payload, size_t n, int qos, bool retained);
/**
* Publishes a message on the topic.
* @param payload
* @param qos
* @param retained
*
* @return delivery_token
*/
idelivery_token_ptr publish(const std::string& str, int qos, bool retained) {
return publish(str.data(), str.length(), qos, retained);
}
/**
* Publishes the specified message to this topic, but does not wait for
* delivery of the message to complete.
* @param message
* @return delivery_token
*/
idelivery_token_ptr publish(message_ptr msg);
/**
* Returns a string representation of this topic.
* @return std::string
*/
std::string to_str() const { return name_; }
};
/**
* A shared pointer to the topic class.
*/
typedef topic::ptr_t topic_ptr;
/////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
}
#endif // __mqtt_topic_h

0
src/obj/async_client.dep Normal file
View File

0
src/obj/client.dep Normal file
View File

View File

0
src/obj/message.dep Normal file
View File

0
src/obj/token.dep Normal file
View File

0
src/obj/topic.dep Normal file
View File

36
src/samples/Makefile Normal file
View File

@ -0,0 +1,36 @@
# Makefile for the mqttpp sample applications
PAHO_C_LIB ?= /home/fmp/static/opensrc/mqtt/paho/org.eclipse.paho.mqtt.c
all: async_publish async_subscribe sync_publish
CXXFLAGS += -Wall -std=c++0x
CPPFLAGS += -I.. -I$(PAHO_C_LIB)/src
ifdef DEBUG
CPPFLAGS += -DDEBUG
CXXFLAGS += -g -O0
else
CPPFLAGS += -D_NDEBUG
CXXFLAGS += -O2
endif
LDLIBS += -L../lib -L$(PAHO_C_LIB)/src/linux_ia64 -lmqttpp -lmqttv3a
async_publish: async_publish.cpp
$(CXX) $(CPPFLAGS) $(CXXFLAGS) -o $@ $< $(LDLIBS)
async_subscribe: async_subscribe.cpp
$(CXX) $(CPPFLAGS) $(CXXFLAGS) -o $@ $< $(LDLIBS)
sync_publish: sync_publish.cpp
$(CXX) $(CPPFLAGS) $(CXXFLAGS) -o $@ $< $(LDLIBS)
.PHONY: clean
clean:
rm -f async_publish async_subscribe sync_publish
.PHONY: distclean
distclean: clean

View File

@ -0,0 +1,178 @@
/*******************************************************************************
* Copyright (c) 2013 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#include <iostream>
#include <cstdlib>
#include <string>
#include <thread> // For sleep
#include <chrono>
#include <cstring>
#include "mqtt/async_client.h"
const std::string ADDRESS("tcp://localhost:1883");
const std::string CLIENTID("AsyncPublisher");
const std::string TOPIC("hello");
const char* PAYLOAD1 = "Hello World!";
const char* PAYLOAD2 = "Hi there!";
const char* PAYLOAD3 = "Is anyone listening?";
const char* PAYLOAD4 = "Someone is always listening.";
const int QOS = 1;
const long TIMEOUT = 10000L;
inline void sleep(int ms) {
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
}
/////////////////////////////////////////////////////////////////////////////
/**
* A callback class for use with the main MQTT client.
*/
class callback : public virtual mqtt::callback
{
public:
virtual void connection_lost(const std::string& cause) {
std::cout << "\nConnection lost" << std::endl;
if (!cause.empty())
std::cout << "\tcause: " << cause << std::endl;
}
// We're not subscribed to anything, so this should never be called.
virtual void message_arrived(const std::string& topic, mqtt::message_ptr msg) {}
virtual void delivery_complete(mqtt::idelivery_token_ptr tok) {
std::cout << "Delivery complete for token: "
<< (tok ? tok->get_message_id() : -1) << std::endl;
}
};
/////////////////////////////////////////////////////////////////////////////
/**
* A base action listener.
*/
class action_listener : public virtual mqtt::iaction_listener
{
protected:
virtual void on_failure(const mqtt::itoken& tok) {
std::cout << "\n\tListener: Failure on token: "
<< tok.get_message_id() << std::endl;
}
virtual void on_success(const mqtt::itoken& tok) {
std::cout << "\n\tListener: Success on token: "
<< tok.get_message_id() << std::endl;
}
};
/////////////////////////////////////////////////////////////////////////////
/**
* A derived action listener for publish events.
*/
class delivery_action_listener : public action_listener
{
bool done_;
virtual void on_failure(const mqtt::itoken& tok) {
action_listener::on_failure(tok);
done_ = true;
}
virtual void on_success(const mqtt::itoken& tok) {
action_listener::on_success(tok);
done_ = true;
}
public:
delivery_action_listener() : done_(false) {}
bool is_done() const { return done_; }
};
/////////////////////////////////////////////////////////////////////////////
int main(int argc, char* argv[])
{
mqtt::async_client client(ADDRESS, CLIENTID);
callback cb;
client.set_callback(cb);
try {
mqtt::itoken_ptr conntok = client.connect();
std::cout << "Waiting for the connection..." << std::flush;
conntok->wait_for_completion();
std::cout << "OK" << std::endl;
// First use a message pointer.
std::cout << "Sending message..." << std::flush;
mqtt::message_ptr pubmsg = std::make_shared<mqtt::message>(PAYLOAD1);
pubmsg->set_qos(QOS);
client.publish(TOPIC, pubmsg)->wait_for_completion(TIMEOUT);
std::cout << "OK" << std::endl;
// Now try with itemized publish.
std::cout << "Sending next message..." << std::flush;
mqtt::idelivery_token_ptr pubtok;
pubtok = client.publish(TOPIC, PAYLOAD2, std::strlen(PAYLOAD2), QOS, false);
pubtok->wait_for_completion(TIMEOUT);
std::cout << "OK" << std::endl;
// Now try with a listener
std::cout << "Sending next message..." << std::flush;
action_listener listener;
pubmsg = std::make_shared<mqtt::message>(PAYLOAD3);
pubtok = client.publish(TOPIC, pubmsg, nullptr, listener);
pubtok->wait_for_completion();
std::cout << "OK" << std::endl;
// Finally try with a listener, but no token
std::cout << "Sending final message..." << std::flush;
delivery_action_listener deliveryListener;
pubmsg = std::make_shared<mqtt::message>(PAYLOAD4);
client.publish(TOPIC, pubmsg, nullptr, deliveryListener);
while (!deliveryListener.is_done()) {
sleep(100);
}
std::cout << "OK" << std::endl;
// Double check that there are no pending tokens
std::vector<mqtt::idelivery_token_ptr> toks = client.get_pending_delivery_tokens();
if (!toks.empty())
std::cout << "Error: There are pending delivery tokens!" << std::endl;
// Disconnect
std::cout << "Disconnecting..." << std::flush;
conntok = client.disconnect();
conntok->wait_for_completion();
std::cout << "OK" << std::endl;
}
catch (const mqtt::exception& exc) {
std::cerr << "Error: " << exc.what() << std::endl;
return 1;
}
return 0;
}

View File

@ -0,0 +1,163 @@
/*******************************************************************************
* Copyright (c) 2013 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#include <iostream>
#include <cstdlib>
#include <string>
#include <cstring>
#include <cctype>
#include <thread>
#include <chrono>
#include "mqtt/async_client.h"
const std::string ADDRESS("tcp://localhost:1883");
const std::string CLIENTID("AsyncSubcriber");
const std::string TOPIC("hello");
const int QOS = 1;
const long TIMEOUT = 10000L;
/////////////////////////////////////////////////////////////////////////////
class action_listener : public virtual mqtt::iaction_listener
{
std::string name_;
virtual void on_failure(const mqtt::itoken& tok) {
std::cout << name_ << " failure";
if (tok.get_message_id() != 0)
std::cout << " (token: " << tok.get_message_id() << ")" << std::endl;
std::cout << std::endl;
}
virtual void on_success(const mqtt::itoken& tok) {
std::cout << name_ << " success";
if (tok.get_message_id() != 0)
std::cout << " (token: " << tok.get_message_id() << ")" << std::endl;
if (!tok.get_topics().empty())
std::cout << "\ttoken topic: '" << tok.get_topics()[0] << "', ..." << std::endl;
std::cout << std::endl;
}
public:
action_listener(const std::string& name) : name_(name) {}
};
/////////////////////////////////////////////////////////////////////////////
class callback : public virtual mqtt::callback,
public virtual mqtt::iaction_listener
{
int nretry_;
mqtt::async_client& cli_;
action_listener& listener_;
void reconnect() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
mqtt::connect_options connOpts;
connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true);
try {
cli_.connect(connOpts, nullptr, *this);
}
catch (const mqtt::exception& exc) {
std::cerr << "Error: " << exc.what() << std::endl;
exit(1);
}
}
// Re-connection failure
virtual void on_failure(const mqtt::itoken& tok) {
std::cout << "Reconnection failed." << std::endl;
if (++nretry_ > 5)
exit(1);
reconnect();
}
// Re-connection success
virtual void on_success(const mqtt::itoken& tok) {
std::cout << "Reconnection success" << std::endl;;
cli_.subscribe(TOPIC, QOS, nullptr, listener_);
}
virtual void connection_lost(const std::string& cause) {
std::cout << "\nConnection lost" << std::endl;
if (!cause.empty())
std::cout << "\tcause: " << cause << std::endl;
std::cout << "Reconnecting." << std::endl;
nretry_ = 0;
reconnect();
}
virtual void message_arrived(const std::string& topic, mqtt::message_ptr msg) {
std::cout << "Message arrived" << std::endl;
std::cout << "\ttopic: '" << topic << "'" << std::endl;
std::cout << "\t'" << msg->to_str() << "'\n" << std::endl;
}
virtual void delivery_complete(mqtt::idelivery_token_ptr token) {}
public:
callback(mqtt::async_client& cli, action_listener& listener)
: cli_(cli), listener_(listener) {}
};
/////////////////////////////////////////////////////////////////////////////
int main(int argc, char* argv[])
{
mqtt::async_client client(ADDRESS, CLIENTID);
action_listener subListener("Subscription");
callback cb(client, subListener);
client.set_callback(cb);
mqtt::connect_options connOpts;
connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true);
try {
mqtt::itoken_ptr conntok = client.connect(connOpts);
std::cout << "Waiting for the connection..." << std::flush;
conntok->wait_for_completion();
std::cout << "OK" << std::endl;
std::cout << "Subscribing to topic " << TOPIC << "\n"
<< "for client " << CLIENTID
<< " using QoS" << QOS << "\n\n"
<< "Press Q<Enter> to quit\n" << std::endl;
client.subscribe(TOPIC, QOS, nullptr, subListener);
while (std::tolower(std::cin.get()) != 'q')
;
std::cout << "Disconnecting..." << std::flush;
conntok = client.disconnect();
conntok->wait_for_completion();
std::cout << "OK" << std::endl;
}
catch (const mqtt::exception& exc) {
std::cerr << "Error: " << exc.what() << std::endl;
return 1;
}
return 0;
}

View File

@ -0,0 +1,193 @@
/*******************************************************************************
* Copyright (c) 2013 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#include <iostream>
#include <cstdlib>
#include <string>
#include <map>
#include <vector>
#include <cstring>
#include "mqtt/client.h"
#include "mqtt/ipersistable.h"
const std::string ADDRESS("tcp://localhost:1883");
const std::string CLIENTID("SyncPublisher");
const std::string TOPIC("hello");
const std::string PAYLOAD1("Hello World!");
const char* PAYLOAD2 = "Hi there!";
const char* PAYLOAD3 = "Is anyone listening?";
const int QOS = 1;
const int TIMEOUT = 10000;
/////////////////////////////////////////////////////////////////////////////
class sample_mem_persistence : virtual public mqtt::iclient_persistence
{
bool open_;
std::map<std::string, mqtt::ipersistable_ptr> store_;
public:
sample_mem_persistence() : open_(false) {}
// "Open" the store
virtual void open(const std::string& clientId, const std::string& serverURI) {
std::cout << "[Opening persistence for '" << clientId
<< "' at '" << serverURI << "']" << std::endl;
open_ = true;
}
// Close the persistent store that was previously opened.
virtual void close() {
std::cout << "[Closing persistence store.]" << std::endl;
open_ = false;
}
// Clears persistence, so that it no longer contains any persisted data.
virtual void clear() {
std::cout << "[Clearing persistence store.]" << std::endl;
store_.clear();
}
// Returns whether or not data is persisted using the specified key.
virtual bool contains_key(const std::string &key) {
return store_.find(key) != store_.end();
}
// Gets the specified data out of the persistent store.
virtual mqtt::ipersistable_ptr get(const std::string& key) const {
std::cout << "[Searching persistence for key '"
<< key << "']" << std::endl;
auto p = store_.find(key);
if (p == store_.end())
throw mqtt::persistence_exception();
std::cout << "[Found persistence data for key '"
<< key << "']" << std::endl;
return p->second;
}
/**
* Returns the keys in this persistent data store.
*/
virtual std::vector<std::string> keys() const {
std::vector<std::string> ks;
for (const auto& k : store_)
ks.push_back(k.first);
return ks;
}
// Puts the specified data into the persistent store.
virtual void put(const std::string& key, mqtt::ipersistable_ptr persistable) {
std::cout << "[Persisting data with key '"
<< key << "']" << std::endl;
store_[key] = persistable;
}
// Remove the data for the specified key.
virtual void remove(const std::string &key) {
std::cout << "[Persistence removing key '" << key << "']" << std::endl;
auto p = store_.find(key);
if (p == store_.end())
throw mqtt::persistence_exception();
store_.erase(p);
std::cout << "[Persistence key removed '" << key << "']" << std::endl;
}
};
/////////////////////////////////////////////////////////////////////////////
class callback : public virtual mqtt::callback
{
public:
virtual void connection_lost(const std::string& cause) {
std::cout << "\nConnection lost" << std::endl;
if (!cause.empty())
std::cout << "\tcause: " << cause << std::endl;
}
// We're not subscrived to anything, so this should never be called.
virtual void message_arrived(const std::string& topic, mqtt::message_ptr msg) {
}
virtual void delivery_complete(mqtt::idelivery_token_ptr tok) {
std::cout << "\n\t[Delivery complete for token: "
<< (tok ? tok->get_message_id() : -1) << "]" << std::endl;
}
};
// --------------------------------------------------------------------------
int main(int argc, char* argv[])
{
sample_mem_persistence persist;
mqtt::client client(ADDRESS, CLIENTID, &persist);
callback cb;
client.set_callback(cb);
mqtt::connect_options connOpts;
connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true);
try {
std::cout << "Connecting..." << std::flush;
client.connect(connOpts);
std::cout << "OK" << std::endl;
// First use a message pointer.
std::cout << "Sending message..." << std::flush;
mqtt::message_ptr pubmsg = std::make_shared<mqtt::message>(PAYLOAD1);
pubmsg->set_qos(QOS);
client.publish(TOPIC, pubmsg);
std::cout << "OK" << std::endl;
// Now try with itemized publish.
std::cout << "Sending next message..." << std::flush;
client.publish(TOPIC, PAYLOAD2, strlen(PAYLOAD2)+1, 0, false);
std::cout << "OK" << std::endl;
// Now try with a listener, but no token
std::cout << "Sending final message..." << std::flush;
pubmsg = std::make_shared<mqtt::message>(PAYLOAD3);
pubmsg->set_qos(QOS);
client.publish(TOPIC, pubmsg);
std::cout << "OK" << std::endl;
// Disconnect
std::cout << "Disconnecting..." << std::flush;
client.disconnect();
std::cout << "OK" << std::endl;
}
catch (const mqtt::persistence_exception& exc) {
std::cerr << "Persistence Error: " << exc.what() << " ["
<< exc.get_reason_code() << "]" << std::endl;
return 1;
}
catch (const mqtt::exception& exc) {
std::cerr << "Error: " << exc.what() << " ["
<< exc.get_reason_code() << "]" << std::endl;
return 1;
}
return 0;
}

149
src/token.cpp Normal file
View File

@ -0,0 +1,149 @@
// token.cpp
/*******************************************************************************
* Copyright (c) 2013 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#include "mqtt/token.h"
#include "mqtt/async_client.h"
#include <string>
#include <cstring>
namespace mqtt {
/////////////////////////////////////////////////////////////////////////////
// Failure callback from the C library.
// The 'context' is a raw pointer to the token object.
void token::on_failure(void* context, MQTTAsync_failureData* rsp)
{
if (context) {
token* tok = static_cast<token*>(context);
tok->on_failure(rsp);
tok->get_client()->remove_token(tok);
}
}
// Success callback from the C library.
// The 'context' is a raw pointer to the token object.
void token::on_success(void* context, MQTTAsync_successData* rsp)
{
if (context) {
token* tok = static_cast<token*>(context);
tok->on_success(rsp);
tok->get_client()->remove_token(tok);
}
}
void token::on_success(MQTTAsync_successData* rsp)
{
guard g(lock_);
iaction_listener* listener = listener_;
tok_ = (rsp) ? rsp->token : 0;
rc_ = MQTTASYNC_SUCCESS;
complete_ = true;
g.unlock();
// Note: callback always completes before the obect is signalled.
if (listener)
listener->on_success(*this);
cond_.notify_all();
}
void token::on_failure(MQTTAsync_failureData* rsp)
{
guard g(lock_);
iaction_listener* listener = listener_;
if (rsp) {
tok_ = rsp->token;
rc_ = rsp->code;
}
else {
tok_ = 0;
rc_ = -1;
}
complete_ = true;
g.unlock();
// Note: callback always completes before the obect is signalled.
if (listener)
listener->on_failure(*this);
cond_.notify_all();
}
// --------------------------------------------------------------------------
token::token(iasync_client& cli) : tok_(MQTTAsync_token(0)), cli_(&cli),
userContext_(nullptr), listener_(nullptr),
complete_(false), rc_(0)
{
}
token::token(iasync_client& cli, MQTTAsync_token tok) : tok_(tok), cli_(&cli),
userContext_(nullptr), listener_(nullptr),
complete_(false), rc_(0)
{
}
token::token(iasync_client& cli, const std::string& top)
: tok_(MQTTAsync_token(0)), cli_(&cli),
userContext_(nullptr), listener_(nullptr),
complete_(false), rc_(0)
{
topics_.push_back(top);
}
token::token(iasync_client& cli, const std::vector<std::string>& topics)
: tok_(MQTTAsync_token(0)), topics_(topics), cli_(&cli),
userContext_(nullptr), listener_(nullptr),
complete_(false), rc_(0)
{
}
//exception token::get_exception()
//{
//}
void token::wait_for_completion()
{
guard g(lock_);
cond_.wait(g, [this]{return complete_;});
if (rc_ != MQTTASYNC_SUCCESS)
throw exception(rc_);
}
void token::wait_for_completion(long timeout)
{
guard g(lock_);
if (timeout == 0) { // No wait. Are we done now?
if (!complete_)
throw exception(MQTTASYNC_FAILURE); // TODO: Get a timout error number
}
else if (timeout < 0) { // Wait forever
cond_.wait(g, [this]{return complete_;});
}
else {
if (!cond_.wait_for(g, std::chrono::milliseconds(timeout),
[this]{return complete_;}))
throw exception(MQTTASYNC_FAILURE); // TODO: Get a timout error number
}
if (rc_ != MQTTASYNC_SUCCESS)
throw exception(rc_);
}
/////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
}

43
src/topic.cpp Normal file
View File

@ -0,0 +1,43 @@
// topic.cpp
/*******************************************************************************
* Copyright (c) 2013 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
#include "mqtt/topic.h"
#include "mqtt/async_client.h"
namespace mqtt {
/////////////////////////////////////////////////////////////////////////////
idelivery_token_ptr topic::publish(const void* payload, size_t n,
int qos, bool retained)
{
return cli_->publish(name_, payload, n, qos, retained);
}
idelivery_token_ptr topic::publish(message_ptr msg)
{
return cli_->publish(name_, msg);
}
/////////////////////////////////////////////////////////////////////////////
// end namespace mqtt
}