From 29fd8fb57ece3180e408c8895e5966891c4ff3ec Mon Sep 17 00:00:00 2001 From: Laurentiu Cristofor Date: Tue, 22 Jun 2021 16:03:33 -0700 Subject: [PATCH 1/5] Add a multiple-producer single-consumer queue implementation --- production/common/tests/test_queue.cpp | 56 +++++++ production/inc/gaia_internal/common/queue.hpp | 50 +++++++ production/inc/gaia_internal/common/queue.inc | 137 ++++++++++++++++++ 3 files changed, 243 insertions(+) diff --git a/production/common/tests/test_queue.cpp b/production/common/tests/test_queue.cpp index a1a8ff129523..d3aea5324535 100644 --- a/production/common/tests/test_queue.cpp +++ b/production/common/tests/test_queue.cpp @@ -67,3 +67,59 @@ TEST(common, queue) ASSERT_EQ(0, value); ASSERT_EQ(0, queue.size()); } + +TEST(common, mpsc_queue) +{ + mpsc_queue_t queue; + + int value = -1; + + // Verify that the queue is empty. + queue.dequeue(value); + ASSERT_EQ(-1, value); + ASSERT_EQ(0, queue.size()); + + // Insert two values. + queue.enqueue(1); + queue.enqueue(2); + ASSERT_EQ(2, queue.size()); + + // Extract a value. + queue.dequeue(value); + ASSERT_EQ(1, value); + ASSERT_EQ(1, queue.size()); + + // Insert another value. + queue.enqueue(3); + ASSERT_EQ(2, queue.size()); + + // Extract the two values. + queue.dequeue(value); + ASSERT_EQ(2, value); + ASSERT_EQ(1, queue.size()); + + queue.dequeue(value); + ASSERT_EQ(3, value); + ASSERT_EQ(0, queue.size()); + + // Verify that the queue is empty again. + value = -1; + queue.dequeue(value); + ASSERT_EQ(-1, value); + ASSERT_EQ(0, queue.size()); + + // Insert one more value. + queue.enqueue(4); + ASSERT_EQ(1, queue.size()); + + // Extract the value. + queue.dequeue(value); + ASSERT_EQ(4, value); + ASSERT_EQ(0, queue.size()); + + // Verify that the queue is empty again. + value = 0; + queue.dequeue(value); + ASSERT_EQ(0, value); + ASSERT_EQ(0, queue.size()); +} diff --git a/production/inc/gaia_internal/common/queue.hpp b/production/inc/gaia_internal/common/queue.hpp index af34f0d77566..ec46b6503cc0 100644 --- a/production/inc/gaia_internal/common/queue.hpp +++ b/production/inc/gaia_internal/common/queue.hpp @@ -40,6 +40,8 @@ struct queue_element_t queue_element_t(T value); }; +// This is a general-purpose queue implementation +// with synchronization performed using mutexes at node level. template class queue_t { @@ -72,6 +74,54 @@ class queue_t bool dequeue_internal(T& value); }; +template +struct mpsc_queue_node_t +{ + mpsc_queue_node_t* volatile next; + T value; + + mpsc_queue_node_t(); + mpsc_queue_node_t(T value); +}; + +// This implementation of a multiple-producer single-consumer (mpsc) queue +// is based on the implementation described at: +// https://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue +template +class mpsc_queue_t +{ +public: + mpsc_queue_t(); + ~mpsc_queue_t(); + + // Insert a value in the queue. + void enqueue(const T& value); + + // Extract a value from the queue. + // If the queue is empty, the value will be left unset. + // The caller should properly initialize the value before this call, + // to be able to determine if it was actually set. + void dequeue(T& value); + + inline bool is_empty() const; + + inline size_t size() const; + +protected: + void enqueue_internal(mpsc_queue_node_t* node); + mpsc_queue_node_t* dequeue_internal(); + +protected: + std::atomic*> volatile m_head; + mpsc_queue_node_t* m_tail; + + // The queue structure is guaranteed to never actually be empty + // by having it contain references to this "stub" node when it is logically empty. + mpsc_queue_node_t m_stub; + + std::atomic m_size; +}; + #include "queue.inc" /*@}*/ diff --git a/production/inc/gaia_internal/common/queue.inc b/production/inc/gaia_internal/common/queue.inc index 29356919c9c7..93ffb1525cc1 100644 --- a/production/inc/gaia_internal/common/queue.inc +++ b/production/inc/gaia_internal/common/queue.inc @@ -174,3 +174,140 @@ size_t queue_t::size() const { return m_size; } + +template +mpsc_queue_node_t::mpsc_queue_node_t() + : value{} +{ +} + +template +mpsc_queue_node_t::mpsc_queue_node_t(T value) +{ + this->value = value; + + this->next = nullptr; +} + +template +mpsc_queue_t::mpsc_queue_t() +{ + m_head = &m_stub; + m_tail = &m_stub; + m_stub.next = nullptr; + m_size = 0; +} + +template +mpsc_queue_t::~mpsc_queue_t() +{ + mpsc_queue_node_t* node = nullptr; + while ((node = dequeue_internal())) + { + delete node; + } +} + +template +void mpsc_queue_t::enqueue_internal(mpsc_queue_node_t* node) +{ + // Set up node to be last node. + node->next = nullptr; + + // Exchange head and link previous head to our node. + mpsc_queue_node_t* previous = m_head.exchange(node); + previous->next = node; +} + +template +mpsc_queue_node_t* mpsc_queue_t::dequeue_internal() +{ + // This is a multiple-producer, single-consumer queue, + // and this is the consumer call, + // so only one thread is expected to be executing it. + mpsc_queue_node_t* tail = m_tail; + mpsc_queue_node_t* next = tail->next; + + if (tail == &m_stub) + { + if (next == nullptr) + { + // Queue only containes the stub, so there is nothing to return. + return nullptr; + } + + // There is a node after the stub. + // Update tail past the stub node and update tail/next. + m_tail = next; + tail = next; + next = next->next; + } + + if (next) + { + // We have another node after this one, so we can just update the tail. + m_tail = next; + + return tail; + } + + // There is a single node in the queue, so we'll need to update head as well. + // Store its current value - other producers may update it. + mpsc_queue_node_t* head = m_head; + + // DEVNOTE: Original code returned nullptr if head != tail. + // But that can only happen if a new node was inserted, in which case + // we should be able to continue with the removing of the tail node. + if (tail == head) + { + // Push back the stub, so that we have one remaining entry in the queue. + enqueue_internal(&m_stub); + } + + // Refresh the tail.next value. + next = tail->next; + + // DEVNOTE: Original code had a check on next, but we've replaced it with an assert. + ASSERT_INVARIANT(next, "Next node was expected to be non-null at this point!"); + + // Remove tail node now. + m_tail = next; + + return tail; +} + +template +void mpsc_queue_t::enqueue(const T& value) +{ + mpsc_queue_node_t* node = new mpsc_queue_node_t(value); + + enqueue_internal(node); + + ++m_size; +} + +template +void mpsc_queue_t::dequeue(T& value) +{ + mpsc_queue_node_t* node = dequeue_internal(); + + if (node) + { + value = node->value; + delete node; + + --m_size; + } +} + +template +bool mpsc_queue_t::is_empty() const +{ + return (size() == 0); +} + +template +size_t mpsc_queue_t::size() const +{ + return m_size; +} From fd48753c412908c407c79123e74e0a76e8a3f3d6 Mon Sep 17 00:00:00 2001 From: Laurentiu Cristofor Date: Tue, 22 Jun 2021 16:11:10 -0700 Subject: [PATCH 2/5] Use mpsc_queue_t instead of queue_t --- production/db/inc/core/record_list.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/production/db/inc/core/record_list.hpp b/production/db/inc/core/record_list.hpp index ca39abb3f529..ff14e4c913b1 100644 --- a/production/db/inc/core/record_list.hpp +++ b/production/db/inc/core/record_list.hpp @@ -187,7 +187,7 @@ class record_list_t record_range_t* m_record_ranges; - gaia::common::queue_t m_deletions_requested; + gaia::common::mpsc_queue_t m_deletions_requested; // These booleans are used to ensure that only one operation can be attempted at a time. std::atomic m_is_deletion_marking_in_progress; From 553c255aea77ad71dcf24bfe606af3b63c13fe3e Mon Sep 17 00:00:00 2001 From: Laurentiu Cristofor Date: Tue, 22 Jun 2021 16:53:16 -0700 Subject: [PATCH 3/5] Add mpsc_queue code licenses --- .../1024cores_data_structures/apache_2.txt | 1 + .../simplified_bsd_license.txt | 23 +++++++++++++++++++ 2 files changed, 24 insertions(+) create mode 100644 production/licenses/1024cores_data_structures/apache_2.txt create mode 100644 production/licenses/1024cores_data_structures/simplified_bsd_license.txt diff --git a/production/licenses/1024cores_data_structures/apache_2.txt b/production/licenses/1024cores_data_structures/apache_2.txt new file mode 100644 index 000000000000..a221fab97492 --- /dev/null +++ b/production/licenses/1024cores_data_structures/apache_2.txt @@ -0,0 +1 @@ +https://www.apache.org/licenses/LICENSE-2.0.html diff --git a/production/licenses/1024cores_data_structures/simplified_bsd_license.txt b/production/licenses/1024cores_data_structures/simplified_bsd_license.txt new file mode 100644 index 000000000000..8d2f5b55c021 --- /dev/null +++ b/production/licenses/1024cores_data_structures/simplified_bsd_license.txt @@ -0,0 +1,23 @@ +https://www.1024cores.net/home/code-license + +Code License + +Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, this list of + + conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright notice, this list + + of conditions and the following disclaimer in the documentation and/or other materials + + provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +The views and conclusions contained in the software and documentation are those of the authors and should not be interpreted as representing official policies, either expressed or implied, of Dmitry Vyukov. + +If not stated otherwise, all non-source-code text and images on this site are provided under the terms of the Creative Commons Attribution-NonCommercial-ShareAlike 3.0 Unported License. Source code is covered by the Simplified BSD License and by Apache License, Version 2.0. The opinions expressed on this site are my own and do not necessarily reflect the views of Google. From e20178682faad43ea9e19388f493cbbc681fc53d Mon Sep 17 00:00:00 2001 From: Laurentiu Cristofor Date: Tue, 22 Jun 2021 18:26:16 -0700 Subject: [PATCH 4/5] Add README file for new licenses folder --- production/licenses/README.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 production/licenses/README.md diff --git a/production/licenses/README.md b/production/licenses/README.md new file mode 100644 index 000000000000..09ceddb5507a --- /dev/null +++ b/production/licenses/README.md @@ -0,0 +1,7 @@ +# licenses +This is a folder for collecting license notices for third-party code that is used in our production code. + +Each specific third-party piece of code can have its specific licenses collected here in its own folder (folders should be named in ways that they clearly identify the code that they apply to). + +Try to provide URLs for all licenses collected here. For standard licenses, nothing more than an URL is needed. For custom licenses, include the text of the custom license along the URL from which you picked it up. See existing entries for examples. + From ee93691d1fd58f42d9db24de0e927d7d78974105 Mon Sep 17 00:00:00 2001 From: Laurentiu Cristofor Date: Wed, 23 Jun 2021 09:30:29 -0700 Subject: [PATCH 5/5] Remove volatile from atomic variable --- production/inc/gaia_internal/common/queue.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/production/inc/gaia_internal/common/queue.hpp b/production/inc/gaia_internal/common/queue.hpp index ec46b6503cc0..42c54d9e5a19 100644 --- a/production/inc/gaia_internal/common/queue.hpp +++ b/production/inc/gaia_internal/common/queue.hpp @@ -112,7 +112,7 @@ class mpsc_queue_t mpsc_queue_node_t* dequeue_internal(); protected: - std::atomic*> volatile m_head; + std::atomic*> m_head; mpsc_queue_node_t* m_tail; // The queue structure is guaranteed to never actually be empty