Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): memory defrag support - unit tests added to verify #448 #523

Merged
merged 1 commit into from
Dec 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ using absl::GetFlag;
namespace {

constexpr XXH64_hash_t kHashSeed = 24061983;
constexpr size_t kAlignSize = 8u;

// Approximation since does not account for listpacks.
size_t QlMAllocSize(quicklist* ql) {
Expand Down Expand Up @@ -217,7 +218,7 @@ struct TL {

thread_local TL tl;

constexpr bool kUseSmallStrings = true;
constexpr bool kUseSmallStrings = false;

/// TODO: Ascii encoding becomes slow for large blobs. We should factor it out into a separate
/// file and implement with SIMD instructions.
Expand Down Expand Up @@ -380,6 +381,23 @@ void RobjWrapper::SetString(string_view s, pmr::memory_resource* mr) {
}
}

bool RobjWrapper::DefragIfNeeded(float ratio) {
if (type() == OBJ_STRING) { // only applicable to strings
if (zmalloc_page_is_underutilized(inner_obj(), ratio)) {
return Reallocate(tl.local_mr);
}
}
return false;
}

bool RobjWrapper::Reallocate(std::pmr::memory_resource* mr) {
void* old_ptr = inner_obj_;
inner_obj_ = mr->allocate(sz_, kAlignSize);
memcpy(inner_obj_, old_ptr, sz_);
mr->deallocate(old_ptr, 0, kAlignSize);
return true;
}

void RobjWrapper::Init(unsigned type, unsigned encoding, void* inner) {
type_ = type;
encoding_ = encoding;
Expand All @@ -398,13 +416,13 @@ void RobjWrapper::MakeInnerRoom(size_t current_cap, size_t desired, pmr::memory_
desired += SDS_MAX_PREALLOC;
}

void* newp = mr->allocate(desired, 8);
void* newp = mr->allocate(desired, kAlignSize);
if (sz_) {
memcpy(newp, inner_obj_, sz_);
}

if (current_cap) {
mr->deallocate(inner_obj_, current_cap, 8);
mr->deallocate(inner_obj_, current_cap, kAlignSize);
}
inner_obj_ = newp;
}
Expand Down Expand Up @@ -659,7 +677,7 @@ robj* CompactObj::AsRObj() const {
res->type = u_.r_obj.type();

if (res->type == OBJ_SET) {
LOG(FATAL) << "Should not call AsRObj for type " << res->type;
LOG(FATAL) << "Should not call AsRObj for type " << res->type;
}

if (res->type == OBJ_HASH) {
Expand Down Expand Up @@ -850,6 +868,28 @@ string_view CompactObj::GetSlice(string* scratch) const {
return string_view{};
}

bool CompactObj::DefragIfNeeded(float ratio) {
switch (taglen_) {
case ROBJ_TAG:
// currently only these objet types are supported for this operation
if (u_.r_obj.inner_obj() != nullptr) {
return u_.r_obj.DefragIfNeeded(ratio);
}
return false;
case SMALL_TAG:
// TODO - support this later
return false;
case INT_TAG:
// this is not relevant in this case
return false;
case EXTERNAL_TAG:
return false;
default:
// This is the case when the object is at inline_str
return false;
}
}

bool CompactObj::HasAllocated() const {
if (IsRef() || taglen_ == INT_TAG || IsInline() || taglen_ == EXTERNAL_TAG ||
(taglen_ == ROBJ_TAG && u_.r_obj.inner_obj() == nullptr))
Expand Down
7 changes: 6 additions & 1 deletion src/core/compact_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ typedef struct redisObject robj;
namespace dfly {

constexpr unsigned kEncodingIntSet = 0;
constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings
constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings
constexpr unsigned kEncodingStrMap2 = 2; // for set/map encodings of strings using DenseSet
constexpr unsigned kEncodingListPack = 3;

Expand Down Expand Up @@ -52,7 +52,10 @@ class RobjWrapper {
return std::string_view{reinterpret_cast<char*>(inner_obj_), sz_};
}

bool DefragIfNeeded(float ratio);

private:
bool Reallocate(std::pmr::memory_resource* mr);
size_t InnerObjMallocUsed() const;
void MakeInnerRoom(size_t current_cap, size_t desired, std::pmr::memory_resource* mr);

Expand Down Expand Up @@ -208,6 +211,8 @@ class CompactObj {
return mask_ & IO_PENDING;
}

bool DefragIfNeeded(float ratio);

void SetIoPending(bool b) {
if (b) {
mask_ |= IO_PENDING;
Expand Down
2 changes: 1 addition & 1 deletion src/core/compact_object_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ TEST_F(CompactObjectTest, MimallocUnderutilzationWithRealloc) {
bool found = HasUnderutilizedMemory(ptrs, kUnderUtilizedRatio);
ASSERT_FALSE(found);
DeallocateAtRandom(kRandomStep, &ptrs);
// TestMiMallocUnderutilized(ptrs, run_reallocation, allocation_size);

// This is another case, where we are filling the "gaps" by doing re-allocations
// in this case, since we are not setting all the values back it should still have
// places that are not used. Plus since we are not looking at the first page
Expand Down
1 change: 1 addition & 0 deletions src/server/dfly_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ Usage: dragonfly [FLAGS]
}
mi_option_enable(mi_option_show_errors);
mi_option_set(mi_option_max_warnings, 0);
mi_option_set(mi_option_decommit_delay, 0);

base::sys::KernelVersion kver;
base::sys::GetKernelVersion(&kver);
Expand Down
59 changes: 52 additions & 7 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -758,21 +758,66 @@ TEST_F(DflyEngineTest, Bug496) {
TEST_F(DefragDflyEngineTest, TestDefragOption) {
// Fill data into dragonfly and then check if we have
// any location in memory to defrag. See issue #448 for details about this.
EXPECT_GE(max_memory_limit, 100'000);
const int NUMBER_OF_KEYS = 184000;
max_memory_limit = 300'000; // control memory size so no need for too many keys
constexpr int kNumberOfKeys = 100'000; // this fill the memory
constexpr int kKeySize = 137;
constexpr int kMaxDefragTriesForTests = 10;

std::vector<std::string> keys2delete;
keys2delete.push_back("del");

// Generate a list of keys that would be deleted
// The keys that we will delete are all in the form of "key-name:1<other digits>"
// This is because we are populating keys that has this format, but we don't want
// to delete all keys, only some random keys so we deleting those that start with 1
constexpr int kFactor = 10;
int kMaxNumKeysToDelete = 10'000;
int current_step = kFactor;
for (int i = 1; i < kMaxNumKeysToDelete; current_step *= kFactor) {
for (; i < current_step; i++) {
int j = i - 1 + current_step;
keys2delete.push_back("key-name:" + std::to_string(j));
}
}

std::vector<std::string_view> keys(keys2delete.begin(), keys2delete.end());

RespExpr resp = Run({"DEBUG", "POPULATE", std::to_string(NUMBER_OF_KEYS), "key-name", "130"});
RespExpr resp = Run(
{"DEBUG", "POPULATE", std::to_string(kNumberOfKeys), "key-name", std::to_string(kKeySize)});
ASSERT_EQ(resp, "OK");
resp = Run({"DBSIZE"});
EXPECT_THAT(resp, IntArg(NUMBER_OF_KEYS));
EXPECT_THAT(resp, IntArg(kNumberOfKeys));

shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) {
EngineShard* shard = EngineShard::tlocal();
ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty!
this_fiber::sleep_for(100ms);
EXPECT_EQ(shard->GetDefragStats().success_count, 0);
// we are not running stats yet
EXPECT_EQ(shard->GetDefragStats().tries, 0);
EXPECT_GT(GetMallocCurrentCommitted(), NUMBER_OF_KEYS);
// we are expecting to have at least one try by now
EXPECT_GT(shard->GetDefragStats().tries, 0);
});

ArgSlice delete_cmd(keys);
auto r = CheckedInt(delete_cmd);
// the first element in this is the command del so size is one less
ASSERT_EQ(r, keys2delete.size() - 1);

// At this point we need to see whether we did running the task and whether the task did something
shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) {
EngineShard* shard = EngineShard::tlocal();
ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty!
// a "busy wait" to ensure that memory defragmentations was successful:
// the task ran and did it work
auto stats = shard->GetDefragStats();
for (int i = 0; i < kMaxDefragTriesForTests; i++) {
stats = shard->GetDefragStats();
if (stats.success_count > 0) {
break;
}
this_fiber::sleep_for(220ms);
}
// make sure that we successfully found places to defrag in memory
EXPECT_GT(stats.success_count, 0);
});
}

Expand Down
77 changes: 57 additions & 20 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ using absl::GetFlag;

namespace {

constexpr DbIndex DEFAULT_DB_INDEX = 0;
constexpr DbIndex kDefaultDbIndex = 0;
constexpr uint64_t kCursorDoneState = 0u;

vector<EngineShardSet::CachedStats> cached_stats; // initialized in EngineShardSet::Init

Expand Down Expand Up @@ -80,7 +81,7 @@ bool EngineShard::DefragTaskState::IsRequired() {
const uint64_t threshold_mem = max_memory_limit * GetFlag(FLAGS_mem_defrag_threshold);
const double commit_use_threshold = GetFlag(FLAGS_commit_use_threshold);

if (cursor > 0) {
if (cursor > kCursorDoneState) {
return true;
}

Expand All @@ -100,34 +101,73 @@ bool EngineShard::DefragTaskState::IsRequired() {

// for now this does nothing
bool EngineShard::DoDefrag() {
// TODO - Impl!!
return defrag_state_.cursor > 0;
}

void EngineShard::DefragTaskState::Init() {
cursor = 0u;
// --------------------------------------------------------------------------
// NOTE: This task is running with exclusive access to the shard.
// i.e. - Since we are using shared noting access here, and all access
// are done using fibers, This fiber is run only when no other fiber in the
// context of the controlling thread will access this shard!
// --------------------------------------------------------------------------

constexpr size_t kMaxTraverses = 50;
const float threshold = GetFlag(FLAGS_mem_utilization_threshold);

auto& slice = db_slice();
DCHECK(slice.IsDbValid(kDefaultDbIndex));
auto [prime_table, expire_table] = slice.GetTables(kDefaultDbIndex);
PrimeTable::Cursor cur = defrag_state_.cursor;
uint64_t defrag_count = 0;
unsigned traverses_count = 0;

do {
cur = prime_table->Traverse(cur, [&](PrimeIterator it) {
// for each value check whether we should move it because it
// seats on underutilized page of memory, and if so, do it.
bool did = it->second.DefragIfNeeded(threshold);
if (did) {
defrag_count++;
}
});
traverses_count++;
} while (traverses_count < kMaxTraverses && cur);

defrag_state_.cursor = cur.value();
if (defrag_count > 0) {
VLOG(1) << "shard " << slice.shard_id() << ": successfully defrag " << defrag_count
<< " times, did it in " << traverses_count << " cursor is at the "
<< (defrag_state_.cursor == 0 ? "end" : "in progress");
} else {
VLOG(1) << "shard " << slice.shard_id() << ": run the defrag " << traverses_count
<< " times out of maximum " << kMaxTraverses << ", with cursor at "
<< (defrag_state_.cursor == 0 ? "end" : "in progress")
<< " but no location for defrag were found";
}
defrag_state_.stats.success_count += defrag_count;
defrag_state_.stats.tries++;
return defrag_state_.cursor > kCursorDoneState;
}

// the memory defragmentation task is as follow:
// 1. Check if memory usage is high enough
// 2. Check if diff between commited and used memory is high enough
// 3. Check if we have memory changes (to ensure that we not running endlessly). - TODO
// 4. if all the above pass -> run on the shard and try to defragmented memory by re-allocating
// values
// if the cursor for this is signal that we are not done, schedule the task to run at high
// priority otherwise lower the task priority so that it would not use the CPU when not required
// 4. if all the above pass -> scan this shard and try to find whether we can move pointer to
// underutilized pages values
// if the cursor returned from scan is not in done state, schedule the task to run at high
// priority.
// otherwise lower the task priority so that it would not use the CPU when not required
uint32_t EngineShard::DefragTask() {
constexpr uint32_t kRunAtLowPriority = 0u;

const auto shard_id = db_slice().shard_id();
bool required_state = defrag_state_.IsRequired();
if (required_state) {
if (defrag_state_.IsRequired()) {
VLOG(1) << shard_id << ": need to run defrag memory cursor state: " << defrag_state_.cursor;
if (DoDefrag()) {
// we didn't finish the scan
return util::ProactorBase::kOnIdleMaxLevel;
}
}
// by default we just want to not get in the way..
return 0u;

return kRunAtLowPriority;
}

EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap)
Expand All @@ -150,7 +190,6 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t*

db_slice_.UpdateExpireBase(absl::GetCurrentTimeNanos() / 1000000, 0);
// start the defragmented task here
defrag_state_.Init();
defrag_task_ = pb->AddOnIdleTask([this]() { return this->DefragTask(); });
}

Expand All @@ -170,9 +209,7 @@ void EngineShard::Shutdown() {
ProactorBase::me()->CancelPeriodic(periodic_task_);
}

if (defrag_task_ != 0) {
ProactorBase::me()->RemoveOnIdleTask(defrag_task_);
}
ProactorBase::me()->RemoveOnIdleTask(defrag_task_);
}

void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
Expand Down
2 changes: 0 additions & 2 deletions src/server/engine_shard_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@ class EngineShard {
uint64_t cursor = 0u;
DefragStats stats;

void Init();

// check the current threshold and return true if
// we need to do the de-fermentation
bool IsRequired();
Expand Down
11 changes: 10 additions & 1 deletion src/server/test_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ ABSL_FLAG(bool, force_epoll, false, "If true, uses epoll api instead iouring to

namespace dfly {

std::ostream& operator<<(std::ostream& os, ArgSlice& list) {
os << "[";
if (!list.empty()) {
std::for_each(list.begin(), list.end() - 1, [&os](const auto& val) { os << val << ", "; });
os << (*(list.end() - 1));
}
return os << "]";
}

extern unsigned kInitSegmentLog;

using MP = MemcacheParser;
Expand Down Expand Up @@ -267,7 +276,7 @@ auto BaseFamilyTest::GetMC(MP::CmdType cmd_type, std::initializer_list<std::stri
return conn->SplitLines();
}

int64_t BaseFamilyTest::CheckedInt(std::initializer_list<std::string_view> list) {
int64_t BaseFamilyTest::CheckedInt(ArgSlice list) {
RespExpr resp = Run(list);
if (resp.type == RespExpr::INT64) {
return get<int64_t>(resp.u);
Expand Down
5 changes: 4 additions & 1 deletion src/server/test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ class BaseFamilyTest : public ::testing::Test {
MCResponse RunMC(MemcacheParser::CmdType cmd_type, std::string_view key = std::string_view{});
MCResponse GetMC(MemcacheParser::CmdType cmd_type, std::initializer_list<std::string_view> list);

int64_t CheckedInt(std::initializer_list<std::string_view> list);
int64_t CheckedInt(std::initializer_list<std::string_view> list) {
return CheckedInt(ArgSlice{list.begin(), list.size()});
}
int64_t CheckedInt(ArgSlice list);

bool IsLocked(DbIndex db_index, std::string_view key) const;
ConnectionContext::DebugInfo GetDebugInfo(const std::string& id) const;
Expand Down