Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a multiple-producer single-consumer (MPSC) queue implementation #753

Merged
merged 5 commits into from
Jun 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions production/common/tests/test_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,59 @@ TEST(common, queue)
ASSERT_EQ(0, value);
ASSERT_EQ(0, queue.size());
}

TEST(common, mpsc_queue)
{
mpsc_queue_t<int> queue;

int value = -1;

// Verify that the queue is empty.
queue.dequeue(value);
ASSERT_EQ(-1, value);
ASSERT_EQ(0, queue.size());

// Insert two values.
queue.enqueue(1);
queue.enqueue(2);
ASSERT_EQ(2, queue.size());

// Extract a value.
queue.dequeue(value);
ASSERT_EQ(1, value);
ASSERT_EQ(1, queue.size());

// Insert another value.
queue.enqueue(3);
ASSERT_EQ(2, queue.size());

// Extract the two values.
queue.dequeue(value);
ASSERT_EQ(2, value);
ASSERT_EQ(1, queue.size());

queue.dequeue(value);
ASSERT_EQ(3, value);
ASSERT_EQ(0, queue.size());

// Verify that the queue is empty again.
value = -1;
queue.dequeue(value);
ASSERT_EQ(-1, value);
ASSERT_EQ(0, queue.size());

// Insert one more value.
queue.enqueue(4);
ASSERT_EQ(1, queue.size());

// Extract the value.
queue.dequeue(value);
ASSERT_EQ(4, value);
ASSERT_EQ(0, queue.size());

// Verify that the queue is empty again.
value = 0;
queue.dequeue(value);
ASSERT_EQ(0, value);
ASSERT_EQ(0, queue.size());
}
2 changes: 1 addition & 1 deletion production/db/inc/core/record_list.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class record_list_t

record_range_t* m_record_ranges;

gaia::common::queue_t<gaia::db::gaia_locator_t> m_deletions_requested;
gaia::common::mpsc_queue_t<gaia::db::gaia_locator_t> m_deletions_requested;

// These booleans are used to ensure that only one operation can be attempted at a time.
std::atomic<bool> m_is_deletion_marking_in_progress;
Expand Down
50 changes: 50 additions & 0 deletions production/inc/gaia_internal/common/queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ struct queue_element_t
queue_element_t(T value);
};

// This is a general-purpose queue implementation
// with synchronization performed using mutexes at node level.
template <class T>
class queue_t
{
Expand Down Expand Up @@ -72,6 +74,54 @@ class queue_t
bool dequeue_internal(T& value);
};

template <class T>
struct mpsc_queue_node_t
{
mpsc_queue_node_t* volatile next;
LaurentiuCristofor marked this conversation as resolved.
Show resolved Hide resolved
T value;

mpsc_queue_node_t();
mpsc_queue_node_t(T value);
};

// This implementation of a multiple-producer single-consumer (mpsc) queue
// is based on the implementation described at:
// https://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
template <class T>
class mpsc_queue_t
{
public:
mpsc_queue_t();
~mpsc_queue_t();

// Insert a value in the queue.
void enqueue(const T& value);

// Extract a value from the queue.
// If the queue is empty, the value will be left unset.
// The caller should properly initialize the value before this call,
// to be able to determine if it was actually set.
void dequeue(T& value);

inline bool is_empty() const;

inline size_t size() const;

protected:
void enqueue_internal(mpsc_queue_node_t<T>* node);
mpsc_queue_node_t<T>* dequeue_internal();

protected:
std::atomic<mpsc_queue_node_t<T>*> m_head;
mpsc_queue_node_t<T>* m_tail;

// The queue structure is guaranteed to never actually be empty
// by having it contain references to this "stub" node when it is logically empty.
mpsc_queue_node_t<T> m_stub;

std::atomic<size_t> m_size;
};

#include "queue.inc"

/*@}*/
Expand Down
137 changes: 137 additions & 0 deletions production/inc/gaia_internal/common/queue.inc
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,140 @@ size_t queue_t<T>::size() const
{
return m_size;
}

template <class T>
mpsc_queue_node_t<T>::mpsc_queue_node_t()
: value{}
{
}

template <class T>
mpsc_queue_node_t<T>::mpsc_queue_node_t(T value)
{
this->value = value;

this->next = nullptr;
}

template <class T>
mpsc_queue_t<T>::mpsc_queue_t()
{
m_head = &m_stub;
m_tail = &m_stub;
m_stub.next = nullptr;
m_size = 0;
}

template <class T>
mpsc_queue_t<T>::~mpsc_queue_t()
{
mpsc_queue_node_t<T>* node = nullptr;
while ((node = dequeue_internal()))
{
delete node;
}
}

template <class T>
void mpsc_queue_t<T>::enqueue_internal(mpsc_queue_node_t<T>* node)
{
// Set up node to be last node.
node->next = nullptr;

// Exchange head and link previous head to our node.
mpsc_queue_node_t<T>* previous = m_head.exchange(node);
previous->next = node;
}

template <class T>
mpsc_queue_node_t<T>* mpsc_queue_t<T>::dequeue_internal()
{
// This is a multiple-producer, single-consumer queue,
LaurentiuCristofor marked this conversation as resolved.
Show resolved Hide resolved
// and this is the consumer call,
// so only one thread is expected to be executing it.
mpsc_queue_node_t<T>* tail = m_tail;
mpsc_queue_node_t<T>* next = tail->next;

if (tail == &m_stub)
{
if (next == nullptr)
{
// Queue only containes the stub, so there is nothing to return.
return nullptr;
}

// There is a node after the stub.
// Update tail past the stub node and update tail/next.
m_tail = next;
tail = next;
next = next->next;
}

if (next)
{
// We have another node after this one, so we can just update the tail.
m_tail = next;

return tail;
}

// There is a single node in the queue, so we'll need to update head as well.
// Store its current value - other producers may update it.
mpsc_queue_node_t<T>* head = m_head;

// DEVNOTE: Original code returned nullptr if head != tail.
// But that can only happen if a new node was inserted, in which case
// we should be able to continue with the removing of the tail node.
if (tail == head)
{
// Push back the stub, so that we have one remaining entry in the queue.
enqueue_internal(&m_stub);
}

// Refresh the tail.next value.
next = tail->next;

// DEVNOTE: Original code had a check on next, but we've replaced it with an assert.
ASSERT_INVARIANT(next, "Next node was expected to be non-null at this point!");

// Remove tail node now.
m_tail = next;

return tail;
}

template <class T>
void mpsc_queue_t<T>::enqueue(const T& value)
{
mpsc_queue_node_t<T>* node = new mpsc_queue_node_t<T>(value);

enqueue_internal(node);

++m_size;
}

template <class T>
void mpsc_queue_t<T>::dequeue(T& value)
{
mpsc_queue_node_t<T>* node = dequeue_internal();

if (node)
{
value = node->value;
delete node;

--m_size;
}
}

template <class T>
bool mpsc_queue_t<T>::is_empty() const
{
return (size() == 0);
}

template <class T>
size_t mpsc_queue_t<T>::size() const
{
return m_size;
}
1 change: 1 addition & 0 deletions production/licenses/1024cores_data_structures/apache_2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
https://www.apache.org/licenses/LICENSE-2.0.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
https://www.1024cores.net/home/code-license

Code License

Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.

Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:

1. Redistributions of source code must retain the above copyright notice, this list of

conditions and the following disclaimer.

2. Redistributions in binary form must reproduce the above copyright notice, this list

of conditions and the following disclaimer in the documentation and/or other materials

provided with the distribution.

THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

The views and conclusions contained in the software and documentation are those of the authors and should not be interpreted as representing official policies, either expressed or implied, of Dmitry Vyukov.

If not stated otherwise, all non-source-code text and images on this site are provided under the terms of the Creative Commons Attribution-NonCommercial-ShareAlike 3.0 Unported License. Source code is covered by the Simplified BSD License and by Apache License, Version 2.0. The opinions expressed on this site are my own and do not necessarily reflect the views of Google.
7 changes: 7 additions & 0 deletions production/licenses/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# licenses
This is a folder for collecting license notices for third-party code that is used in our production code.

Each specific third-party piece of code can have its specific licenses collected here in its own folder (folders should be named in ways that they clearly identify the code that they apply to).

Try to provide URLs for all licenses collected here. For standard licenses, nothing more than an URL is needed. For custom licenses, include the text of the custom license along the URL from which you picked it up. See existing entries for examples.