diff --git a/src/mqtt/thread_queue.h b/src/mqtt/thread_queue.h index 4cdd37d..f96286c 100644 --- a/src/mqtt/thread_queue.h +++ b/src/mqtt/thread_queue.h @@ -7,7 +7,7 @@ ///////////////////////////////////////////////////////////////////////////// /******************************************************************************* - * Copyright (c) 2017-2021 Frank Pagliughi + * Copyright (c) 2017-2022 Frank Pagliughi * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 @@ -39,6 +39,7 @@ namespace mqtt { /** * A thread-safe queue for inter-thread communication. + * * This is a lockinq queue with blocking operations. The get() operations * can always block on an empty queue, but have variations for non-blocking * (try_get) and bounded-time blocking (try_get_for, try_get_until). @@ -149,14 +150,11 @@ public: */ void put(value_type val) { unique_guard g(lock_); - if (que_.size() >= cap_) - notFullCond_.wait(g, [this]{return que_.size() < cap_;}); - bool wasEmpty = que_.empty(); + notFullCond_.wait(g, [this]{return que_.size() < cap_;}); + que_.emplace(std::move(val)); - if (wasEmpty) { - g.unlock(); - notEmptyCond_.notify_one(); - } + g.unlock(); + notEmptyCond_.notify_one(); } /** * Non-blocking attempt to place an item into the queue. @@ -166,14 +164,12 @@ public: */ bool try_put(value_type val) { unique_guard g(lock_); - size_type n = que_.size(); - if (n >= cap_) + if (que_.size() >= cap_) return false; + que_.emplace(std::move(val)); - if (n == 0) { - g.unlock(); - notEmptyCond_.notify_one(); - } + g.unlock(); + notEmptyCond_.notify_one(); return true; } /** @@ -186,16 +182,14 @@ public: * timeout occurred. */ template - bool try_put_for(value_type* val, const std::chrono::duration& relTime) { + bool try_put_for(value_type val, const std::chrono::duration& relTime) { unique_guard g(lock_); - if (que_.size() >= cap_ && !notFullCond_.wait_for(g, relTime, [this]{return que_.size() < cap_;})) + if (!notFullCond_.wait_for(g, relTime, [this]{return que_.size() < cap_;})) return false; - bool wasEmpty = que_.empty(); + que_.emplace(std::move(val)); - if (wasEmpty) { - g.unlock(); - notEmptyCond_.notify_one(); - } + g.unlock(); + notEmptyCond_.notify_one(); return true; } /** @@ -209,16 +203,14 @@ public: * timeout occurred. */ template - bool try_put_until(value_type* val, const std::chrono::time_point& absTime) { + bool try_put_until(value_type val, const std::chrono::time_point& absTime) { unique_guard g(lock_); - if (que_.size() >= cap_ && !notFullCond_.wait_until(g, absTime, [this]{return que_.size() < cap_;})) + if (!notFullCond_.wait_until(g, absTime, [this]{return que_.size() < cap_;})) return false; - bool wasEmpty = que_.empty(); + que_.emplace(std::move(val)); - if (wasEmpty) { - g.unlock(); - notEmptyCond_.notify_one(); - } + g.unlock(); + notEmptyCond_.notify_one(); return true; } /** @@ -228,15 +220,16 @@ public: * @param val Pointer to a variable to receive the value. */ void get(value_type* val) { + if (!val) + return; + unique_guard g(lock_); - if (que_.empty()) - notEmptyCond_.wait(g, [this]{return !que_.empty();}); + notEmptyCond_.wait(g, [this]{return !que_.empty();}); + *val = std::move(que_.front()); que_.pop(); - if (que_.size() == cap_-1) { - g.unlock(); - notFullCond_.notify_one(); - } + g.unlock(); + notFullCond_.notify_one(); } /** * Retrieve a value from the queue. @@ -246,14 +239,12 @@ public: */ value_type get() { unique_guard g(lock_); - if (que_.empty()) - notEmptyCond_.wait(g, [this]{return !que_.empty();}); + notEmptyCond_.wait(g, [this]{return !que_.empty();}); + value_type val = std::move(que_.front()); que_.pop(); - if (que_.size() == cap_-1) { - g.unlock(); - notFullCond_.notify_one(); - } + g.unlock(); + notFullCond_.notify_one(); return val; } /** @@ -265,15 +256,17 @@ public: * the queue is empty. */ bool try_get(value_type* val) { + if (!val) + return false; + unique_guard g(lock_); if (que_.empty()) return false; + *val = std::move(que_.front()); que_.pop(); - if (que_.size() == cap_-1) { - g.unlock(); - notFullCond_.notify_one(); - } + g.unlock(); + notFullCond_.notify_one(); return true; } /** @@ -288,15 +281,17 @@ public: */ template bool try_get_for(value_type* val, const std::chrono::duration& relTime) { - unique_guard g(lock_); - if (que_.empty() && !notEmptyCond_.wait_for(g, relTime, [this]{return !que_.empty();})) + if (!val) return false; + + unique_guard g(lock_); + if (!notEmptyCond_.wait_for(g, relTime, [this]{return !que_.empty();})) + return false; + *val = std::move(que_.front()); que_.pop(); - if (que_.size() == cap_-1) { - g.unlock(); - notFullCond_.notify_one(); - } + g.unlock(); + notFullCond_.notify_one(); return true; } /** @@ -311,15 +306,17 @@ public: */ template bool try_get_until(value_type* val, const std::chrono::time_point& absTime) { - unique_guard g(lock_); - if (que_.empty() && !notEmptyCond_.wait_until(g, absTime, [this]{return !que_.empty();})) + if (!val) return false; + + unique_guard g(lock_); + if (!notEmptyCond_.wait_until(g, absTime, [this]{return !que_.empty();})) + return false; + *val = std::move(que_.front()); que_.pop(); - if (que_.size() == cap_-1) { - g.unlock(); - notFullCond_.notify_one(); - } + g.unlock(); + notFullCond_.notify_one(); return true; } }; diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index d03f522..50332e9 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -41,6 +41,7 @@ add_executable(unit_tests unit_tests.cpp test_properties.cpp test_response_options.cpp test_string_collection.cpp + test_thread_queue.cpp test_token.cpp test_topic.cpp test_topic_matcher.cpp diff --git a/test/unit/test_thread_queue.cpp b/test/unit/test_thread_queue.cpp new file mode 100644 index 0000000..14ed375 --- /dev/null +++ b/test/unit/test_thread_queue.cpp @@ -0,0 +1,89 @@ +// test_thread_queue.cpp +// +// Unit tests for the thread_queue class in the Paho MQTT C++ library. +// + +/******************************************************************************* + * Copyright (c) 2022 Frank Pagliughi + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Frank Pagliughi - Initial implementation + *******************************************************************************/ + +#define UNIT_TESTS + +#include "catch2/catch.hpp" +#include "mqtt/types.h" +#include "mqtt/thread_queue.h" + +#include +#include +#include +#include + +using namespace mqtt; +using namespace std::chrono; + +TEST_CASE("que put/get", "[thread_queue]") +{ + thread_queue que; + + que.put(1); + que.put(2); + REQUIRE(que.get() == 1); + + que.put(3); + REQUIRE(que.get() == 2); + REQUIRE(que.get() == 3); +} + +TEST_CASE("que mt put/get", "[thread_queue]") +{ + thread_queue que; + const size_t N = 1000000; + const size_t N_THR = 2; + + auto producer = [&que]() { + string s; + for (size_t i=0; i<512; ++i) + s.push_back('a' + i%26); + + for (size_t i=0; i producers; + std::vector> consumers; + + for (size_t i=0; i