Skip to content

Commit

Permalink
chore: Pass Item* to shards instead of Item
Browse files Browse the repository at this point in the history
This should allow us a simpler Item management once we introduce an Item pool.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Dec 16, 2022
1 parent 1b1b88a commit 82b0f82
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 35 deletions.
2 changes: 1 addition & 1 deletion src/core/mpsc_intrusive_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ template <typename T> class MPSCIntrusiveQueue {
static constexpr size_t cache_alignment = 64;
static constexpr size_t cacheline_length = 64;

alignas(cache_alignment) std::aligned_storage<sizeof(T), alignof(T)>::type storage_{};
alignas(cache_alignment) typename std::aligned_storage<sizeof(T), alignof(T)>::type storage_{};
T* dummy_;
alignas(cache_alignment) std::atomic<T*> head_;
alignas(cache_alignment) T* tail_;
Expand Down
12 changes: 6 additions & 6 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,20 +151,20 @@ std::optional<RdbLoaderBase::OpaqueObj> RdbRestoreValue::Parse(std::string_view

bool RdbRestoreValue::Add(std::string_view data, std::string_view key, DbSlice& db_slice,
DbIndex index, uint64_t expire_ms) {
auto value_to_load = Parse(data);
if (!value_to_load) {
auto opaque_res = Parse(data);
if (!opaque_res) {
return false;
}
Item item{
.key = std::string(key), .val = std::move(value_to_load.value()), .expire_ms = expire_ms};

PrimeValue pv;
if (auto ec = Visit(item, &pv); ec) {
if (auto ec = FromOpaque(*opaque_res, &pv); ec) {
// we failed - report and exit
LOG(WARNING) << "error while trying to save data: " << ec;
return false;
}

DbContext context{.db_index = index, .time_now_ms = GetCurrentTimeMs()};
auto [it, added] = db_slice.AddOrSkip(context, key, std::move(pv), item.expire_ms);
auto [it, added] = db_slice.AddOrSkip(context, key, std::move(pv), expire_ms);
return added;
}

Expand Down
52 changes: 32 additions & 20 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1644,6 +1644,12 @@ RdbLoader::RdbLoader(ScriptMgr* script_mgr) : script_mgr_(script_mgr) {
}

RdbLoader::~RdbLoader() {
while (true) {
Item* item = item_queue_.Pop();
if (item == nullptr)
break;
delete item;
}
}

error_code RdbLoader::Load(io::Source* src) {
Expand Down Expand Up @@ -2023,40 +2029,43 @@ void RdbLoader::FlushShardAsync(ShardId sid) {
if (out_buf.empty())
return;

ItemsBuf* ib = new ItemsBuf{std::move(out_buf)};
auto cb = [indx = this->cur_db_index_, ib, this] {
this->LoadItemsBuffer(indx, *ib);
delete ib;
auto cb = [indx = this->cur_db_index_, this, ib = std::move(out_buf)] {
this->LoadItemsBuffer(indx, ib);
};

shard_set->Add(sid, std::move(cb));
}

std::error_code RdbLoaderBase::Visit(const Item& item, CompactObj* pv) {
OpaqueObjLoader visitor(item.val.rdb_type, pv);
std::visit(visitor, item.val.obj);
std::error_code RdbLoaderBase::FromOpaque(const OpaqueObj& opaque, CompactObj* pv) {
OpaqueObjLoader visitor(opaque.rdb_type, pv);
std::visit(visitor, opaque.obj);

return visitor.ec();
}

void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
DbSlice& db_slice = EngineShard::tlocal()->db_slice();
DbContext db_cntx{.db_index = db_ind, .time_now_ms = GetCurrentTimeMs()};

for (const auto& item : ib) {
for (const auto* item : ib) {
PrimeValue pv;
if (ec_ = Visit(item, &pv); ec_) {
if (ec_ = FromOpaque(item->val, &pv); ec_) {
stop_early_ = true;
break;
}

if (item.expire_ms > 0 && db_cntx.time_now_ms >= item.expire_ms)
if (item->expire_ms > 0 && db_cntx.time_now_ms >= item->expire_ms)
continue;

auto [it, added] = db_slice.AddOrUpdate(db_cntx, item.key, std::move(pv), item.expire_ms);
auto [it, added] = db_slice.AddOrUpdate(db_cntx, item->key, std::move(pv), item->expire_ms);
if (!added) {
LOG(WARNING) << "RDB has duplicated key '" << item.key << "' in DB " << db_ind;
LOG(WARNING) << "RDB has duplicated key '" << item->key << "' in DB " << db_ind;
}
}

for (auto* item : ib) {
item_queue_.Push(item);
}
}

void RdbLoader::ResizeDb(size_t key_num, size_t expire_num) {
Expand All @@ -2066,16 +2075,19 @@ void RdbLoader::ResizeDb(size_t key_num, size_t expire_num) {

error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
/* Read key */
string key;
OpaqueObj val;
Item* item = item_queue_.Pop();

if (item == nullptr) {
item = new Item;
}

// We free key in LoadItemsBuffer.
SET_OR_RETURN(ReadKey(), key);
SET_OR_RETURN(ReadKey(), item->key);

error_code ec = ReadObj(type, &val);
error_code ec = ReadObj(type, &item->val);

if (ec) {
VLOG(1) << "ReadObj error " << ec << " for key " << key;
VLOG(1) << "ReadObj error " << ec << " for key " << item->key;
return ec;
}

Expand All @@ -2092,12 +2104,12 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
if (should_expire) {
// decrRefCount(val);
} else {
ShardId sid = Shard(key, shard_set->size());
uint64_t expire_at_ms = settings->expiretime;
ShardId sid = Shard(item->key, shard_set->size());
item->expire_ms = settings->expiretime;

auto& out_buf = shard_buf_[sid];

out_buf.emplace_back(Item{std::move(key), std::move(val), expire_at_ms});
out_buf.emplace_back(item);

constexpr size_t kBufSize = 128;
if (out_buf.size() >= kBufSize) {
Expand Down
28 changes: 20 additions & 8 deletions src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ extern "C" {

#include "base/io_buf.h"
#include "base/pod_array.h"
#include "core/mpsc_intrusive_queue.h"
#include "io/io.h"
#include "server/common.h"

Expand Down Expand Up @@ -87,20 +88,13 @@ class RdbLoaderBase {

class OpaqueObjLoader;

struct Item {
std::string key;
OpaqueObj val;
uint64_t expire_ms;
};
using ItemsBuf = std::vector<Item>;

::io::Result<uint8_t> FetchType() {
return FetchInt<uint8_t>();
}

template <typename T> io::Result<T> FetchInt();

std::error_code Visit(const Item& item, CompactObj* pv);
static std::error_code FromOpaque(const OpaqueObj& opaque, CompactObj* pv);

io::Result<uint64_t> LoadLen(bool* is_encoded);
std::error_code FetchBuf(size_t size, void* dest);
Expand Down Expand Up @@ -184,6 +178,23 @@ class RdbLoader : protected RdbLoaderBase {
}

private:
struct Item {
std::string key;
OpaqueObj val;
uint64_t expire_ms;
std::atomic<Item*> next;

friend void MPSC_intrusive_store_next(Item* dest, Item* nxt) {
dest->next.store(nxt, std::memory_order_release);
}

friend Item* MPSC_intrusive_load_next(const Item& src) {
return src.next.load(std::memory_order_acquire);
}
};

using ItemsBuf = std::vector<Item*>;

struct ObjSettings;
std::error_code LoadKeyValPair(int type, ObjSettings* settings);
void ResizeDb(size_t key_num, size_t expire_num);
Expand All @@ -207,6 +218,7 @@ class RdbLoader : protected RdbLoaderBase {

// Callback when receiving RDB_OPCODE_FULLSYNC_END
std::function<void()> full_sync_cut_cb;
detail::MPSCIntrusiveQueue<Item> item_queue_;
};

} // namespace dfly

0 comments on commit 82b0f82

Please sign in to comment.