1
0
mirror of https://github.com/eclipse/paho.mqtt.cpp.git synced 2025-05-10 03:39:07 +08:00

Merge branch 'develop' of github.com:eclipse/paho.mqtt.cpp into develop

This commit is contained in:
fpagliughi 2023-01-30 17:27:10 -05:00
commit d4106b8b49
3 changed files with 143 additions and 56 deletions

View File

@ -7,7 +7,7 @@
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
/******************************************************************************* /*******************************************************************************
* Copyright (c) 2017-2021 Frank Pagliughi <fpagliughi@mindspring.com> * Copyright (c) 2017-2022 Frank Pagliughi <fpagliughi@mindspring.com>
* *
* All rights reserved. This program and the accompanying materials * All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0 * are made available under the terms of the Eclipse Public License v1.0
@ -39,6 +39,7 @@ namespace mqtt {
/** /**
* A thread-safe queue for inter-thread communication. * A thread-safe queue for inter-thread communication.
*
* This is a lockinq queue with blocking operations. The get() operations * This is a lockinq queue with blocking operations. The get() operations
* can always block on an empty queue, but have variations for non-blocking * can always block on an empty queue, but have variations for non-blocking
* (try_get) and bounded-time blocking (try_get_for, try_get_until). * (try_get) and bounded-time blocking (try_get_for, try_get_until).
@ -149,15 +150,12 @@ public:
*/ */
void put(value_type val) { void put(value_type val) {
unique_guard g(lock_); unique_guard g(lock_);
if (que_.size() >= cap_)
notFullCond_.wait(g, [this]{return que_.size() < cap_;}); notFullCond_.wait(g, [this]{return que_.size() < cap_;});
bool wasEmpty = que_.empty();
que_.emplace(std::move(val)); que_.emplace(std::move(val));
if (wasEmpty) {
g.unlock(); g.unlock();
notEmptyCond_.notify_one(); notEmptyCond_.notify_one();
} }
}
/** /**
* Non-blocking attempt to place an item into the queue. * Non-blocking attempt to place an item into the queue.
* @param val The value to add to the queue. * @param val The value to add to the queue.
@ -166,14 +164,12 @@ public:
*/ */
bool try_put(value_type val) { bool try_put(value_type val) {
unique_guard g(lock_); unique_guard g(lock_);
size_type n = que_.size(); if (que_.size() >= cap_)
if (n >= cap_)
return false; return false;
que_.emplace(std::move(val)); que_.emplace(std::move(val));
if (n == 0) {
g.unlock(); g.unlock();
notEmptyCond_.notify_one(); notEmptyCond_.notify_one();
}
return true; return true;
} }
/** /**
@ -186,16 +182,14 @@ public:
* timeout occurred. * timeout occurred.
*/ */
template <typename Rep, class Period> template <typename Rep, class Period>
bool try_put_for(value_type* val, const std::chrono::duration<Rep, Period>& relTime) { bool try_put_for(value_type val, const std::chrono::duration<Rep, Period>& relTime) {
unique_guard g(lock_); unique_guard g(lock_);
if (que_.size() >= cap_ && !notFullCond_.wait_for(g, relTime, [this]{return que_.size() < cap_;})) if (!notFullCond_.wait_for(g, relTime, [this]{return que_.size() < cap_;}))
return false; return false;
bool wasEmpty = que_.empty();
que_.emplace(std::move(val)); que_.emplace(std::move(val));
if (wasEmpty) {
g.unlock(); g.unlock();
notEmptyCond_.notify_one(); notEmptyCond_.notify_one();
}
return true; return true;
} }
/** /**
@ -209,16 +203,14 @@ public:
* timeout occurred. * timeout occurred.
*/ */
template <class Clock, class Duration> template <class Clock, class Duration>
bool try_put_until(value_type* val, const std::chrono::time_point<Clock,Duration>& absTime) { bool try_put_until(value_type val, const std::chrono::time_point<Clock,Duration>& absTime) {
unique_guard g(lock_); unique_guard g(lock_);
if (que_.size() >= cap_ && !notFullCond_.wait_until(g, absTime, [this]{return que_.size() < cap_;})) if (!notFullCond_.wait_until(g, absTime, [this]{return que_.size() < cap_;}))
return false; return false;
bool wasEmpty = que_.empty();
que_.emplace(std::move(val)); que_.emplace(std::move(val));
if (wasEmpty) {
g.unlock(); g.unlock();
notEmptyCond_.notify_one(); notEmptyCond_.notify_one();
}
return true; return true;
} }
/** /**
@ -228,16 +220,17 @@ public:
* @param val Pointer to a variable to receive the value. * @param val Pointer to a variable to receive the value.
*/ */
void get(value_type* val) { void get(value_type* val) {
if (!val)
return;
unique_guard g(lock_); unique_guard g(lock_);
if (que_.empty())
notEmptyCond_.wait(g, [this]{return !que_.empty();}); notEmptyCond_.wait(g, [this]{return !que_.empty();});
*val = std::move(que_.front()); *val = std::move(que_.front());
que_.pop(); que_.pop();
if (que_.size() == cap_-1) {
g.unlock(); g.unlock();
notFullCond_.notify_one(); notFullCond_.notify_one();
} }
}
/** /**
* Retrieve a value from the queue. * Retrieve a value from the queue.
* If the queue is empty, this will block indefinitely until a value is * If the queue is empty, this will block indefinitely until a value is
@ -246,14 +239,12 @@ public:
*/ */
value_type get() { value_type get() {
unique_guard g(lock_); unique_guard g(lock_);
if (que_.empty())
notEmptyCond_.wait(g, [this]{return !que_.empty();}); notEmptyCond_.wait(g, [this]{return !que_.empty();});
value_type val = std::move(que_.front()); value_type val = std::move(que_.front());
que_.pop(); que_.pop();
if (que_.size() == cap_-1) {
g.unlock(); g.unlock();
notFullCond_.notify_one(); notFullCond_.notify_one();
}
return val; return val;
} }
/** /**
@ -265,15 +256,17 @@ public:
* the queue is empty. * the queue is empty.
*/ */
bool try_get(value_type* val) { bool try_get(value_type* val) {
if (!val)
return false;
unique_guard g(lock_); unique_guard g(lock_);
if (que_.empty()) if (que_.empty())
return false; return false;
*val = std::move(que_.front()); *val = std::move(que_.front());
que_.pop(); que_.pop();
if (que_.size() == cap_-1) {
g.unlock(); g.unlock();
notFullCond_.notify_one(); notFullCond_.notify_one();
}
return true; return true;
} }
/** /**
@ -288,15 +281,17 @@ public:
*/ */
template <typename Rep, class Period> template <typename Rep, class Period>
bool try_get_for(value_type* val, const std::chrono::duration<Rep, Period>& relTime) { bool try_get_for(value_type* val, const std::chrono::duration<Rep, Period>& relTime) {
unique_guard g(lock_); if (!val)
if (que_.empty() && !notEmptyCond_.wait_for(g, relTime, [this]{return !que_.empty();}))
return false; return false;
unique_guard g(lock_);
if (!notEmptyCond_.wait_for(g, relTime, [this]{return !que_.empty();}))
return false;
*val = std::move(que_.front()); *val = std::move(que_.front());
que_.pop(); que_.pop();
if (que_.size() == cap_-1) {
g.unlock(); g.unlock();
notFullCond_.notify_one(); notFullCond_.notify_one();
}
return true; return true;
} }
/** /**
@ -311,15 +306,17 @@ public:
*/ */
template <class Clock, class Duration> template <class Clock, class Duration>
bool try_get_until(value_type* val, const std::chrono::time_point<Clock,Duration>& absTime) { bool try_get_until(value_type* val, const std::chrono::time_point<Clock,Duration>& absTime) {
unique_guard g(lock_); if (!val)
if (que_.empty() && !notEmptyCond_.wait_until(g, absTime, [this]{return !que_.empty();}))
return false; return false;
unique_guard g(lock_);
if (!notEmptyCond_.wait_until(g, absTime, [this]{return !que_.empty();}))
return false;
*val = std::move(que_.front()); *val = std::move(que_.front());
que_.pop(); que_.pop();
if (que_.size() == cap_-1) {
g.unlock(); g.unlock();
notFullCond_.notify_one(); notFullCond_.notify_one();
}
return true; return true;
} }
}; };

View File

@ -41,6 +41,7 @@ add_executable(unit_tests unit_tests.cpp
test_properties.cpp test_properties.cpp
test_response_options.cpp test_response_options.cpp
test_string_collection.cpp test_string_collection.cpp
test_thread_queue.cpp
test_token.cpp test_token.cpp
test_topic.cpp test_topic.cpp
test_topic_matcher.cpp test_topic_matcher.cpp

View File

@ -0,0 +1,89 @@
// test_thread_queue.cpp
//
// Unit tests for the thread_queue class in the Paho MQTT C++ library.
//
/*******************************************************************************
* Copyright (c) 2022 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - Initial implementation
*******************************************************************************/
#define UNIT_TESTS
#include "catch2/catch.hpp"
#include "mqtt/types.h"
#include "mqtt/thread_queue.h"
#include <thread>
#include <future>
#include <chrono>
#include <vector>
using namespace mqtt;
using namespace std::chrono;
TEST_CASE("que put/get", "[thread_queue]")
{
thread_queue<int> que;
que.put(1);
que.put(2);
REQUIRE(que.get() == 1);
que.put(3);
REQUIRE(que.get() == 2);
REQUIRE(que.get() == 3);
}
TEST_CASE("que mt put/get", "[thread_queue]")
{
thread_queue<string> que;
const size_t N = 1000000;
const size_t N_THR = 2;
auto producer = [&que]() {
string s;
for (size_t i=0; i<512; ++i)
s.push_back('a' + i%26);
for (size_t i=0; i<N; ++i)
que.put(s);
};
auto consumer = [&que]() {
string s;
bool ok = true;
for (size_t i=0; i<N && ok; ++i) {
ok = que.try_get_for(&s, seconds{1});
}
return ok;
};
std::vector<std::thread> producers;
std::vector<std::future<bool>> consumers;
for (size_t i=0; i<N_THR; ++i)
producers.push_back(std::thread(producer));
for (size_t i=0; i<N_THR; ++i)
consumers.push_back(std::async(consumer));
for (size_t i=0; i<N_THR; ++i)
producers[i].join();
for (size_t i=0; i<N_THR; ++i) {
REQUIRE(consumers[i].get());
}
}