From 1b1b88a6b01d13f4dced03efe8e044cf0ece84cc Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 15 Dec 2022 21:31:34 +0200 Subject: [PATCH] chore: Adding a mpsc intrusive queue based on Vyukov's design. Refactor ReadStringObj to accept a destination argument. Signed-off-by: Roman Gershman --- src/core/mpsc_intrusive_queue.h | 93 +++++++++++++++++++ src/server/generic_family.cc | 10 +- src/server/rdb_load.cc | 157 ++++++++++++++++++++------------ src/server/rdb_load.h | 4 +- 4 files changed, 198 insertions(+), 66 deletions(-) create mode 100644 src/core/mpsc_intrusive_queue.h diff --git a/src/core/mpsc_intrusive_queue.h b/src/core/mpsc_intrusive_queue.h new file mode 100644 index 000000000000..330d9c8f9974 --- /dev/null +++ b/src/core/mpsc_intrusive_queue.h @@ -0,0 +1,93 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include +#include + +// TODO: to move to helio + +namespace dfly { +namespace detail { + +// a MPSC queue where multiple threads push and a single thread pops. +// +// Requires global functions for T: +// +// T* MPSC_intrusive_load_next(const T& src) +// void MPSC_intrusive_store_next(T* next, T* dest); +// based on the design from here: +// https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue +template class MPSCIntrusiveQueue { + private: + static constexpr size_t cache_alignment = 64; + static constexpr size_t cacheline_length = 64; + + alignas(cache_alignment) std::aligned_storage::type storage_{}; + T* dummy_; + alignas(cache_alignment) std::atomic head_; + alignas(cache_alignment) T* tail_; + char pad_[cacheline_length]; + + public: + MPSCIntrusiveQueue() + : dummy_{reinterpret_cast(std::addressof(storage_))}, head_{dummy_}, tail_{dummy_} { + MPSC_intrusive_store_next(dummy_, nullptr); + } + + MPSCIntrusiveQueue(MPSCIntrusiveQueue const&) = delete; + MPSCIntrusiveQueue& operator=(MPSCIntrusiveQueue const&) = delete; + + void Push(T* ctx) noexcept { + // ctx becomes a new head. + MPSC_intrusive_store_next(ctx, nullptr); + T* prev = head_.exchange(ctx, std::memory_order_acq_rel); + MPSC_intrusive_store_next(prev, ctx); + } + + T* Pop() noexcept; +}; + +template T* MPSCIntrusiveQueue::Pop() noexcept { + T* tail = tail_; + + // tail->next_.load(std::memory_order_acquire); + T* next = MPSC_intrusive_load_next(*tail); + if (dummy_ == tail) { + if (nullptr == next) { + // empty + return nullptr; + } + tail_ = next; + tail = next; + next = MPSC_intrusive_load_next(*next); + } + + if (nullptr != next) { + // non-empty + tail_ = next; + return tail; + } + + T* head = head_.load(std::memory_order_acquire); + if (tail != head) { + // non-empty, retry is in order: we are in the middle of push. + return nullptr; + } + + Push(dummy_); + + next = MPSC_intrusive_load_next(*tail); + if (nullptr != next) { + tail_ = next; + return tail; + } + + // non-empty, retry is in order: we are still adding. + return nullptr; +} + +} // namespace detail +} // namespace dfly diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index e03ad2493037..5cfba87e186c 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -134,13 +134,15 @@ class RdbRestoreValue : protected RdbLoaderBase { std::optional RdbRestoreValue::Parse(std::string_view payload) { InMemSource source(payload); src_ = &source; - if (auto type_id = FetchType(); type_id && rdbIsObjectType(type_id.value())) { - io::Result io_res = ReadObj(type_id.value()); // load the type from the input stream - if (!io_res) { + if (io::Result type_id = FetchType(); type_id && rdbIsObjectType(type_id.value())) { + OpaqueObj obj; + error_code ec = ReadObj(type_id.value(), &obj); // load the type from the input stream + if (ec) { LOG(ERROR) << "failed to load data for type id " << (unsigned int)type_id.value(); return std::nullopt; } - return std::optional(std::move(io_res.value())); + + return std::optional(std::move(obj)); } else { LOG(ERROR) << "failed to load type id from the input stream or type id is invalid"; return std::nullopt; diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 56e3bdb7f9b0..2b28ab0a8cbd 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -1183,69 +1183,90 @@ auto RdbLoaderBase::ReadKey() -> io::Result { return FetchGenericString(); } -auto RdbLoaderBase::ReadObj(int rdbtype) -> io::Result { +error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) { + io::Result iores; + switch (rdbtype) { case RDB_TYPE_STRING: { + dest->rdb_type = RDB_TYPE_STRING; + /* Read string value */ - auto fetch = ReadStringObj(); - if (!fetch) - return make_unexpected(fetch.error()); - return OpaqueObj{std::move(*fetch), RDB_TYPE_STRING}; + return ReadStringObj(&dest->obj); } case RDB_TYPE_SET: - return ReadSet(); + iores = ReadSet(); + break; case RDB_TYPE_SET_INTSET: - return ReadIntSet(); + iores = ReadIntSet(); + break; case RDB_TYPE_HASH_ZIPLIST: - return ReadHZiplist(); + iores = ReadHZiplist(); + break; case RDB_TYPE_HASH: - return ReadHMap(); + iores = ReadHMap(); + break; case RDB_TYPE_ZSET: case RDB_TYPE_ZSET_2: - return ReadZSet(rdbtype); + iores = ReadZSet(rdbtype); + break; case RDB_TYPE_ZSET_ZIPLIST: - return ReadZSetZL(); + iores = ReadZSetZL(); + break; case RDB_TYPE_LIST_QUICKLIST: case RDB_TYPE_LIST_QUICKLIST_2: - return ReadListQuicklist(rdbtype); + iores = ReadListQuicklist(rdbtype); + break; case RDB_TYPE_STREAM_LISTPACKS: - return ReadStreams(); + iores = ReadStreams(); break; - } + break; + default: + LOG(ERROR) << "Unsupported rdb type " << rdbtype; - LOG(ERROR) << "Unsupported rdb type " << rdbtype; + return RdbError(errc::invalid_encoding); + } - return Unexpected(errc::invalid_encoding); + if (!iores) + return iores.error(); + *dest = std::move(*iores); + return error_code{}; } -auto RdbLoaderBase::ReadStringObj() -> io::Result { +error_code RdbLoaderBase::ReadStringObj(RdbVariant* dest) { bool isencoded; size_t len; - SET_OR_UNEXPECT(LoadLen(&isencoded), len); + SET_OR_RETURN(LoadLen(&isencoded), len); if (isencoded) { switch (len) { case RDB_ENC_INT8: case RDB_ENC_INT16: - case RDB_ENC_INT32: - return ReadIntObj(len); - case RDB_ENC_LZF: - return ReadLzf(); + case RDB_ENC_INT32: { + io::Result io_int = ReadIntObj(len); + if (!io_int) + return io_int.error(); + dest->emplace(*io_int); + return error_code{}; + } + case RDB_ENC_LZF: { + io::Result lzf = ReadLzf(); + if (!lzf) + return lzf.error(); + + dest->emplace(std::move(lzf.value())); + return error_code{}; + } default: LOG(ERROR) << "Unknown RDB string encoding " << len; - return Unexpected(errc::rdb_file_corrupted); + return RdbError(errc::rdb_file_corrupted); } } - base::PODArray blob; + auto& blob = dest->emplace>(); blob.resize(len); error_code ec = FetchBuf(len, blob.data()); - if (ec) { - return make_unexpected(ec); - } - - return blob; + return ec; } io::Result RdbLoaderBase::ReadIntObj(int enctype) { @@ -1295,24 +1316,24 @@ auto RdbLoaderBase::ReadSet() -> io::Result { unique_ptr res(new LoadTrace); res->arr.resize(len); for (size_t i = 0; i < len; i++) { - io::Result fetch = ReadStringObj(); - if (!fetch) { - return make_unexpected(fetch.error()); + error_code ec = ReadStringObj(&res->arr[i].rdb_var); + if (ec) { + return make_unexpected(ec); } - res->arr[i].rdb_var = std::move(fetch.value()); } return OpaqueObj{std::move(res), RDB_TYPE_SET}; } auto RdbLoaderBase::ReadIntSet() -> io::Result { - io::Result fetch = ReadStringObj(); - if (!fetch) { - return make_unexpected(fetch.error()); + RdbVariant obj; + error_code ec = ReadStringObj(&obj); + if (ec) { + return make_unexpected(ec); } - const LzfString* lzf = get_if(&fetch.value()); - const base::PODArray* arr = get_if>(&fetch.value()); + const LzfString* lzf = get_if(&obj); + const base::PODArray* arr = get_if>(&obj); if (lzf) { if (lzf->uncompressed_len == 0 || lzf->compressed_blob.empty()) @@ -1324,12 +1345,14 @@ auto RdbLoaderBase::ReadIntSet() -> io::Result { return Unexpected(errc::rdb_file_corrupted); } - return OpaqueObj{std::move(*fetch), RDB_TYPE_SET_INTSET}; + return OpaqueObj{std::move(obj), RDB_TYPE_SET_INTSET}; } auto RdbLoaderBase::ReadHZiplist() -> io::Result { RdbVariant str_obj; - SET_OR_UNEXPECT(ReadStringObj(), str_obj); + error_code ec = ReadStringObj(&str_obj); + if (ec) + return make_unexpected(ec); if (StrLen(str_obj) == 0) { return Unexpected(errc::rdb_file_corrupted); @@ -1349,7 +1372,9 @@ auto RdbLoaderBase::ReadHMap() -> io::Result { load_trace->arr.resize(len * 2); for (size_t i = 0; i < load_trace->arr.size(); ++i) { - SET_OR_UNEXPECT(ReadStringObj(), load_trace->arr[i].rdb_var); + error_code ec = ReadStringObj(&load_trace->arr[i].rdb_var); + if (ec) + return make_unexpected(ec); } return OpaqueObj{std::move(load_trace), RDB_TYPE_HASH}; @@ -1369,7 +1394,9 @@ auto RdbLoaderBase::ReadZSet(int rdbtype) -> io::Result { double score; for (size_t i = 0; i < load_trace->arr.size(); ++i) { - SET_OR_UNEXPECT(ReadStringObj(), load_trace->arr[i].rdb_var); + error_code ec = ReadStringObj(&load_trace->arr[i].rdb_var); + if (ec) + return make_unexpected(ec); if (rdbtype == RDB_TYPE_ZSET_2) { SET_OR_UNEXPECT(FetchBinaryDouble(), score); } else { @@ -1388,7 +1415,9 @@ auto RdbLoaderBase::ReadZSet(int rdbtype) -> io::Result { auto RdbLoaderBase::ReadZSetZL() -> io::Result { RdbVariant str_obj; - SET_OR_UNEXPECT(ReadStringObj(), str_obj); + error_code ec = ReadStringObj(&str_obj); + if (ec) + return make_unexpected(ec); if (StrLen(str_obj) == 0) { return Unexpected(errc::rdb_file_corrupted); @@ -1420,7 +1449,10 @@ auto RdbLoaderBase::ReadListQuicklist(int rdbtype) -> io::Result { } RdbVariant var; - SET_OR_UNEXPECT(ReadStringObj(), var); + error_code ec = ReadStringObj(&var); + if (ec) + return make_unexpected(ec); + if (StrLen(var) == 0) { return Unexpected(errc::rdb_file_corrupted); } @@ -1438,21 +1470,24 @@ auto RdbLoaderBase::ReadStreams() -> io::Result { unique_ptr load_trace(new LoadTrace); load_trace->arr.resize(listpacks * 2); + error_code ec; for (size_t i = 0; i < listpacks; ++i) { /* Get the master ID, the one we'll use as key of the radix tree * node: the entries inside the listpack itself are delta-encoded * relatively to this ID. */ RdbVariant stream_id, blob; - SET_OR_UNEXPECT(ReadStringObj(), stream_id); - + ec = ReadStringObj(&stream_id); + if (ec) + return make_unexpected(ec); if (StrLen(stream_id) != sizeof(streamID)) { LOG(ERROR) << "Stream node key entry is not the size of a stream ID"; return Unexpected(errc::rdb_file_corrupted); } - SET_OR_UNEXPECT(ReadStringObj(), blob); - + ec = ReadStringObj(&blob); + if (ec) + return make_unexpected(ec); if (StrLen(blob) == 0) { LOG(ERROR) << "Stream listpacks loading failed"; return Unexpected(errc::rdb_file_corrupted); @@ -1484,7 +1519,9 @@ auto RdbLoaderBase::ReadStreams() -> io::Result { // sds cgname; RdbVariant cgname; - SET_OR_UNEXPECT(ReadStringObj(), cgname); + ec = ReadStringObj(&cgname); + if (ec) + return make_unexpected(ec); cgroup.name = std::move(cgname); SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.ms); @@ -1529,7 +1566,9 @@ auto RdbLoaderBase::ReadStreams() -> io::Result { for (size_t j = 0; j < consumers_num; ++j) { auto& consumer = cgroup.cons_arr[j]; - SET_OR_UNEXPECT(ReadStringObj(), consumer.name); + ec = ReadStringObj(&consumer.name); + if (ec) + return make_unexpected(ec); SET_OR_UNEXPECT(FetchInt(), consumer.seen_time); @@ -1788,10 +1827,10 @@ error_code RdbLoader::Load(io::Source* src) { std::error_code RdbLoaderBase::EnsureRead(size_t min_sz) { // In the flow of reading compressed data, we store the uncompressed data to in uncompressed - // buffer. When parsing entries we call ensure read with 9 bytes to read the length of key/value. - // If the key/value is very small (less than 9 bytes) the remainded data in uncompressed buffer - // might contain less than 9 bytes. We need to make sure that we dont read from sink to the - // uncompressed buffer and therefor in this flow we return here. + // buffer. When parsing entries we call ensure read with 9 bytes to read the length of + // key/value. If the key/value is very small (less than 9 bytes) the remainded data in + // uncompressed buffer might contain less than 9 bytes. We need to make sure that we dont read + // from sink to the uncompressed buffer and therefor in this flow we return here. if (mem_buf_ != &origin_mem_buf_) return std::error_code{}; if (mem_buf_->InputLen() >= min_sz) @@ -2033,15 +2072,13 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) { // We free key in LoadItemsBuffer. SET_OR_RETURN(ReadKey(), key); - io::Result io_res = ReadObj(type); + error_code ec = ReadObj(type, &val); - if (!io_res) { - VLOG(1) << "ReadObj error " << io_res.error() << " for key " << key; - return io_res.error(); + if (ec) { + VLOG(1) << "ReadObj error " << ec << " for key " << key; + return ec; } - val = std::move(io_res.value()); - /* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was * received from the master. In the latter case, the master is diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index 2acf46ea011c..beebb26a30b2 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -114,8 +114,8 @@ class RdbLoaderBase { ::io::Result ReadKey(); - ::io::Result ReadObj(int rdbtype); - ::io::Result ReadStringObj(); + std::error_code ReadObj(int rdbtype, OpaqueObj* dest); + std::error_code ReadStringObj(RdbVariant* rdb_variant); ::io::Result ReadIntObj(int encoding); ::io::Result ReadLzf();