Skip to content

Commit

Permalink
chore: Adding a mpsc intrusive queue based on Vyukov's design.
Browse files Browse the repository at this point in the history
Refactor ReadStringObj to accept a destination argument.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Dec 16, 2022
1 parent 08803e6 commit 1b1b88a
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 66 deletions.
93 changes: 93 additions & 0 deletions src/core/mpsc_intrusive_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once

#include <atomic>
#include <cstddef>

// TODO: to move to helio

namespace dfly {
namespace detail {

// a MPSC queue where multiple threads push and a single thread pops.
//
// Requires global functions for T:
//
// T* MPSC_intrusive_load_next(const T& src)
// void MPSC_intrusive_store_next(T* next, T* dest);
// based on the design from here:
// https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
template <typename T> class MPSCIntrusiveQueue {
private:
static constexpr size_t cache_alignment = 64;
static constexpr size_t cacheline_length = 64;

alignas(cache_alignment) std::aligned_storage<sizeof(T), alignof(T)>::type storage_{};
T* dummy_;
alignas(cache_alignment) std::atomic<T*> head_;
alignas(cache_alignment) T* tail_;
char pad_[cacheline_length];

public:
MPSCIntrusiveQueue()
: dummy_{reinterpret_cast<T*>(std::addressof(storage_))}, head_{dummy_}, tail_{dummy_} {
MPSC_intrusive_store_next(dummy_, nullptr);
}

MPSCIntrusiveQueue(MPSCIntrusiveQueue const&) = delete;
MPSCIntrusiveQueue& operator=(MPSCIntrusiveQueue const&) = delete;

void Push(T* ctx) noexcept {
// ctx becomes a new head.
MPSC_intrusive_store_next(ctx, nullptr);
T* prev = head_.exchange(ctx, std::memory_order_acq_rel);
MPSC_intrusive_store_next(prev, ctx);
}

T* Pop() noexcept;
};

template <typename T> T* MPSCIntrusiveQueue<T>::Pop() noexcept {
T* tail = tail_;

// tail->next_.load(std::memory_order_acquire);
T* next = MPSC_intrusive_load_next(*tail);
if (dummy_ == tail) {
if (nullptr == next) {
// empty
return nullptr;
}
tail_ = next;
tail = next;
next = MPSC_intrusive_load_next(*next);
}

if (nullptr != next) {
// non-empty
tail_ = next;
return tail;
}

T* head = head_.load(std::memory_order_acquire);
if (tail != head) {
// non-empty, retry is in order: we are in the middle of push.
return nullptr;
}

Push(dummy_);

next = MPSC_intrusive_load_next(*tail);
if (nullptr != next) {
tail_ = next;
return tail;
}

// non-empty, retry is in order: we are still adding.
return nullptr;
}

} // namespace detail
} // namespace dfly
10 changes: 6 additions & 4 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,15 @@ class RdbRestoreValue : protected RdbLoaderBase {
std::optional<RdbLoaderBase::OpaqueObj> RdbRestoreValue::Parse(std::string_view payload) {
InMemSource source(payload);
src_ = &source;
if (auto type_id = FetchType(); type_id && rdbIsObjectType(type_id.value())) {
io::Result<OpaqueObj> io_res = ReadObj(type_id.value()); // load the type from the input stream
if (!io_res) {
if (io::Result<uint8_t> type_id = FetchType(); type_id && rdbIsObjectType(type_id.value())) {
OpaqueObj obj;
error_code ec = ReadObj(type_id.value(), &obj); // load the type from the input stream
if (ec) {
LOG(ERROR) << "failed to load data for type id " << (unsigned int)type_id.value();
return std::nullopt;
}
return std::optional<OpaqueObj>(std::move(io_res.value()));

return std::optional<OpaqueObj>(std::move(obj));
} else {
LOG(ERROR) << "failed to load type id from the input stream or type id is invalid";
return std::nullopt;
Expand Down
Loading

0 comments on commit 1b1b88a

Please sign in to comment.