From b0836e1fd8a6a4e1991343513427f1c20ef5f5b2 Mon Sep 17 00:00:00 2001 From: Boaz Sade Date: Tue, 22 Nov 2022 08:21:00 +0200 Subject: [PATCH] feat(server): memory defrag support - unit tests added to verify #448 feat(server): memory defrag support - mi_option changed #448 feat(server): active fragmentation - spell check issue #448 feat(server): active memory defrag task #448 Signed-off-by: Boaz Sade --- src/core/compact_object.cc | 35 +++++++++++++++-- src/core/compact_object.h | 6 ++- src/redis/zmalloc.h | 38 +++++++++--------- src/server/dfly_main.cc | 1 + src/server/dragonfly_test.cc | 65 ++++++++++++++++++++++++++---- src/server/engine_shard_set.cc | 72 ++++++++++++++++++++++++++++------ src/server/engine_shard_set.h | 5 +++ src/server/test_utils.cc | 11 +++++- src/server/test_utils.h | 5 ++- 9 files changed, 193 insertions(+), 45 deletions(-) diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index f48095c4cfa7..b2e4aa471174 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -39,6 +39,7 @@ using absl::GetFlag; namespace { constexpr XXH64_hash_t kHashSeed = 24061983; +constexpr size_t kAlignSize = 8u; // Approximation since does not account for listpacks. size_t QlMAllocSize(quicklist* ql) { @@ -217,7 +218,7 @@ struct TL { thread_local TL tl; -constexpr bool kUseSmallStrings = true; +constexpr bool kUseSmallStrings = false; /// TODO: Ascii encoding becomes slow for large blobs. We should factor it out into a separate /// file and implement with SIMD instructions. @@ -380,6 +381,18 @@ void RobjWrapper::SetString(string_view s, pmr::memory_resource* mr) { } } +bool RobjWrapper::Reallocate(std::pmr::memory_resource* mr) { + if (type_ == OBJ_STRING) { + // Currently only for strings. + void* old_ptr = inner_obj_; + inner_obj_ = mr->allocate(sz_, kAlignSize); + memcpy(inner_obj_, old_ptr, sz_); + mr->deallocate(old_ptr, 0, kAlignSize); + return true; + } + return false; +} + void RobjWrapper::Init(unsigned type, unsigned encoding, void* inner) { type_ = type; encoding_ = encoding; @@ -398,13 +411,13 @@ void RobjWrapper::MakeInnerRoom(size_t current_cap, size_t desired, pmr::memory_ desired += SDS_MAX_PREALLOC; } - void* newp = mr->allocate(desired, 8); + void* newp = mr->allocate(desired, kAlignSize); if (sz_) { memcpy(newp, inner_obj_, sz_); } if (current_cap) { - mr->deallocate(inner_obj_, current_cap, 8); + mr->deallocate(inner_obj_, current_cap, kAlignSize); } inner_obj_ = newp; } @@ -659,7 +672,7 @@ robj* CompactObj::AsRObj() const { res->type = u_.r_obj.type(); if (res->type == OBJ_SET) { - LOG(FATAL) << "Should not call AsRObj for type " << res->type; + LOG(FATAL) << "Should not call AsRObj for type " << res->type; } if (res->type == OBJ_HASH) { @@ -850,6 +863,20 @@ string_view CompactObj::GetSlice(string* scratch) const { return string_view{}; } +bool CompactObj::DefragIfNeeded(float ratio) { + // Supporting only for Strings - but not externals, or small string - they are using different + // allocations. otherwise we may crash See HasAllocated function for testing other conditions + const bool is_mimalloc_allocated = + (HasAllocated() && taglen_ != SMALL_TAG && ObjType() == OBJ_STRING); + if (is_mimalloc_allocated) { + // leave this in its own line if ever need debug this + if (zmalloc_page_is_underutilized(RObjPtr(), ratio)) { + return u_.r_obj.Reallocate(tl.local_mr); + } + } + return false; +} + bool CompactObj::HasAllocated() const { if (IsRef() || taglen_ == INT_TAG || IsInline() || taglen_ == EXTERNAL_TAG || (taglen_ == ROBJ_TAG && u_.r_obj.inner_obj() == nullptr)) diff --git a/src/core/compact_object.h b/src/core/compact_object.h index 6f56a16432ca..96eeb9e71b49 100644 --- a/src/core/compact_object.h +++ b/src/core/compact_object.h @@ -16,7 +16,7 @@ typedef struct redisObject robj; namespace dfly { constexpr unsigned kEncodingIntSet = 0; -constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings +constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings constexpr unsigned kEncodingStrMap2 = 2; // for set/map encodings of strings using DenseSet constexpr unsigned kEncodingListPack = 3; @@ -52,6 +52,8 @@ class RobjWrapper { return std::string_view{reinterpret_cast(inner_obj_), sz_}; } + bool Reallocate(std::pmr::memory_resource* mr); + private: size_t InnerObjMallocUsed() const; void MakeInnerRoom(size_t current_cap, size_t desired, std::pmr::memory_resource* mr); @@ -208,6 +210,8 @@ class CompactObj { return mask_ & IO_PENDING; } + bool DefragIfNeeded(float ratio); + void SetIoPending(bool b) { if (b) { mask_ |= IO_PENDING; diff --git a/src/redis/zmalloc.h b/src/redis/zmalloc.h index 01a87691860d..62eb074337c0 100644 --- a/src/redis/zmalloc.h +++ b/src/redis/zmalloc.h @@ -38,7 +38,9 @@ #define __zm_str(s) #s #if defined(USE_JEMALLOC) -#define ZMALLOC_LIB ("jemalloc-" __xstr(JEMALLOC_VERSION_MAJOR) "." __xstr(JEMALLOC_VERSION_MINOR) "." __xstr(JEMALLOC_VERSION_BUGFIX)) +#define ZMALLOC_LIB \ + ("jemalloc-" __xstr(JEMALLOC_VERSION_MAJOR) "." __xstr(JEMALLOC_VERSION_MINOR) "." __xstr( \ + JEMALLOC_VERSION_BUGFIX)) #include #if (JEMALLOC_VERSION_MAJOR == 2 && JEMALLOC_VERSION_MINOR >= 1) || (JEMALLOC_VERSION_MAJOR > 2) #define HAVE_MALLOC_SIZE 1 @@ -82,32 +84,32 @@ #define HAVE_DEFRAG #endif -void *zmalloc(size_t size); -void *zcalloc(size_t size); -void *zrealloc(void *ptr, size_t size); -void *ztrymalloc(size_t size); -void *ztrycalloc(size_t size); -void *ztryrealloc(void *ptr, size_t size); -void zfree(void *ptr); +void* zmalloc(size_t size); +void* zcalloc(size_t size); +void* zrealloc(void* ptr, size_t size); +void* ztrymalloc(size_t size); +void* ztrycalloc(size_t size); +void* ztryrealloc(void* ptr, size_t size); +void zfree(void* ptr); -size_t znallocx(size_t size); // Equivalent to nallocx for jemalloc or mi_good_size for mimalloc. +size_t znallocx(size_t size); // Equivalent to nallocx for jemalloc or mi_good_size for mimalloc. void zfree_size(void* ptr, size_t size); // equivalent to sdallocx or mi_free_size -void *zmalloc_usable(size_t size, size_t *usable); -void *zcalloc_usable(size_t size, size_t *usable); -void *zrealloc_usable(void *ptr, size_t size, size_t *usable); -void *ztrymalloc_usable(size_t size, size_t *usable); -void *ztrycalloc_usable(size_t size, size_t *usable); -void *ztryrealloc_usable(void *ptr, size_t size, size_t *usable); +void* zmalloc_usable(size_t size, size_t* usable); +void* zcalloc_usable(size_t size, size_t* usable); +void* zrealloc_usable(void* ptr, size_t size, size_t* usable); +void* ztrymalloc_usable(size_t size, size_t* usable); +void* ztrycalloc_usable(size_t size, size_t* usable); +void* ztryrealloc_usable(void* ptr, size_t size, size_t* usable); // size_t zmalloc_used_memory(void); void zmalloc_set_oom_handler(void (*oom_handler)(size_t)); size_t zmalloc_get_rss(void); -int zmalloc_get_allocator_info(size_t *allocated, size_t *active, size_t *resident); +int zmalloc_get_allocator_info(size_t* allocated, size_t* active, size_t* resident); void set_jemalloc_bg_thread(int enable); int jemalloc_purge(); size_t zmalloc_get_private_dirty(long pid); -size_t zmalloc_get_smap_bytes_by_field(char *field, long pid); +size_t zmalloc_get_smap_bytes_by_field(char* field, long pid); size_t zmalloc_get_memory_size(void); size_t zmalloc_usable_size(const void* p); @@ -116,7 +118,7 @@ size_t zmalloc_usable_size(const void* p); * This uses the current local thread heap. * return 0 if not, 1 if underutilized */ -int zmalloc_page_is_underutilized(void *ptr, float ratio); +int zmalloc_page_is_underutilized(void* ptr, float ratio); // roman: void zlibc_free(void *ptr); void init_zmalloc_threadlocal(void* heap); diff --git a/src/server/dfly_main.cc b/src/server/dfly_main.cc index ae91cbfd217c..09bab7c59e26 100644 --- a/src/server/dfly_main.cc +++ b/src/server/dfly_main.cc @@ -341,6 +341,7 @@ Usage: dragonfly [FLAGS] } mi_option_enable(mi_option_show_errors); mi_option_set(mi_option_max_warnings, 0); + mi_option_set(mi_option_decommit_delay, 0); base::sys::KernelVersion kver; base::sys::GetKernelVersion(&kver); diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 0f4ff745b766..6c5d6216a69d 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -758,21 +758,72 @@ TEST_F(DflyEngineTest, Bug496) { TEST_F(DefragDflyEngineTest, TestDefragOption) { // Fill data into dragonfly and then check if we have // any location in memory to defrag. See issue #448 for details about this. - EXPECT_GE(max_memory_limit, 100'000); - const int NUMBER_OF_KEYS = 184000; + max_memory_limit = 300'000; // control memory size so no need for too many keys + constexpr int kNumberOfKeys = 100'000; // this fill the memory + constexpr int kKeySize = 137; + const uint64_t kMinExpectedDefragCount = 80; + + std::vector keys2delete; + keys2delete.push_back("del"); + + // Generate a list of keys that would be deleted + // The keys that we will delete are all in the form of "key-name:1" + // This is because we are populating keys that has this format, but we don't want + // to delete all keys, only some random keys so we deleting those that start with 1 + constexpr int kFactor = 10; + int kMaxNumKeysToDelete = 10'000; + int current_step = kFactor; + for (int i = 1; i < kMaxNumKeysToDelete; current_step *= kFactor) { + for (int j = i - 1 + current_step; i < current_step; i++, j++) { + keys2delete.push_back("key-name:" + std::to_string(j)); + } + } + + std::vector keys(keys2delete.begin(), keys2delete.end()); - RespExpr resp = Run({"DEBUG", "POPULATE", std::to_string(NUMBER_OF_KEYS), "key-name", "130"}); + RespExpr resp = Run( + {"DEBUG", "POPULATE", std::to_string(kNumberOfKeys), "key-name", std::to_string(kKeySize)}); ASSERT_EQ(resp, "OK"); resp = Run({"DBSIZE"}); - EXPECT_THAT(resp, IntArg(NUMBER_OF_KEYS)); + EXPECT_THAT(resp, IntArg(kNumberOfKeys)); shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) { EngineShard* shard = EngineShard::tlocal(); ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty! EXPECT_EQ(shard->GetDefragStats().success_count, 0); - // we are not running stats yet - EXPECT_EQ(shard->GetDefragStats().tries, 0); - EXPECT_GT(GetMallocCurrentCommitted(), NUMBER_OF_KEYS); + // we are expecting to have at least one try by now + EXPECT_GT(shard->GetDefragStats().tries, 0); + EXPECT_GT(GetMallocCurrentCommitted(), kMaxNumKeysToDelete); + }); + + ArgSlice delete_cmd(keys); + auto r = CheckedInt(delete_cmd); + // the first element in this is the command del so size is one less + ASSERT_EQ(r, keys2delete.size() - 1); + + // At this point we need to see whether we did running the task and whether the task did something + shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) { + EngineShard* shard = EngineShard::tlocal(); + ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty! + int tries = 10; + uint64_t defrag_count = base->AwaitBrief([&]() { + // This task is kind of condition to await on for the scan to run and complete for at least + // one time. Since there is no option to sleep (it would just block all fibers), we are + // creating a busy wait fiber here that checks the stats whether the defrag task run and + // whether it did something. + auto stats = shard->GetDefragStats(); + while (tries > 0 && stats.tries < 3) { + stats = shard->GetDefragStats(); + if (stats.success_count > 10) { + return stats.success_count; + } + --tries; + this_fiber::sleep_for(220ms); + } + return stats.success_count; + }); + + EXPECT_GT(defrag_count, kMinExpectedDefragCount); }); } diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 2c702f8309b6..bc80e1d1c6ca 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -51,7 +51,8 @@ using absl::GetFlag; namespace { -constexpr DbIndex DEFAULT_DB_INDEX = 0; +constexpr DbIndex kDefaultDbIndex = 0; +constexpr uint64_t kCursorDoneState = 0u; vector cached_stats; // initialized in EngineShardSet::Init @@ -80,7 +81,7 @@ bool EngineShard::DefragTaskState::IsRequired() { const uint64_t threshold_mem = max_memory_limit * GetFlag(FLAGS_mem_defrag_threshold); const double commit_use_threshold = GetFlag(FLAGS_commit_use_threshold); - if (cursor > 0) { + if (cursor > kCursorDoneState) { return true; } @@ -100,34 +101,79 @@ bool EngineShard::DefragTaskState::IsRequired() { // for now this does nothing bool EngineShard::DoDefrag() { - // TODO - Impl!! - return defrag_state_.cursor > 0; + // -------------------------------------------------------------------------- + // NOTE: This task is running with exclusive access to the shard. + // i.e. - Since we are using shared noting access here, and all access + // are done using fibers, This fiber is run only when no other fiber in the + // context of the controlling thread will access this shard! + // -------------------------------------------------------------------------- + + constexpr size_t kCursorCountRelation = 50; + const float threshold = GetFlag(FLAGS_mem_utilization_threshold); + + auto& slice = db_slice(); + DCHECK(slice.IsDbValid(kDefaultDbIndex)); + auto [prime_table, expire_table] = slice.GetTables(kDefaultDbIndex); + // otherwise we will end with negative invalid segment id - crashed at CI + CHECK(prime_table->depth() < 40) + << "the depth for the prime table is invalid: " << prime_table->depth(); + PrimeTable::Cursor cur = defrag_state_.cursor; + const std::size_t max_iteration = slice.DbSize(kDefaultDbIndex) / kCursorCountRelation; + uint64_t defrag_count = 0; + unsigned interation_count = 0; + do { + cur = prime_table->Traverse(cur, [&](PrimeIterator it) { + // for each value check whether we should move it because it + // seats on underutilized page of memory, and if so, do it. + bool did = it->second.DefragIfNeeded(threshold); + if (did) { + defrag_count++; + } + }); + interation_count++; + } while (interation_count < max_iteration && cur); + + defrag_state_.cursor = cur.value(); + if (defrag_count > 0) { + VLOG(1) << "shard " << slice.shard_id() << ": successfully defrag " << defrag_count + << " times"; + } else { + VLOG(1) << "shard " << slice.shard_id() << ": run the defrag " << interation_count + << "times out of maximum " << max_iteration << ", with cursor at " + << defrag_state_.cursor << " times but no location for defrag were found"; + } + defrag_state_.stats.success_count += defrag_count; + defrag_state_.stats.tries++; + return defrag_state_.cursor > kCursorDoneState; } void EngineShard::DefragTaskState::Init() { - cursor = 0u; + cursor = kCursorDoneState; + stats.Init(); } // the memory defragmentation task is as follow: // 1. Check if memory usage is high enough // 2. Check if diff between commited and used memory is high enough // 3. Check if we have memory changes (to ensure that we not running endlessly). - TODO -// 4. if all the above pass -> run on the shard and try to defragmented memory by re-allocating -// values -// if the cursor for this is signal that we are not done, schedule the task to run at high -// priority otherwise lower the task priority so that it would not use the CPU when not required +// 4. if all the above pass -> scan this shard and try to find whether we can move pointer to +// underutilized pages values +// if the cursor returned from scan is not in done state, schedule the task to run at high +// priority. +// otherwise lower the task priority so that it would not use the CPU when not required uint32_t EngineShard::DefragTask() { + constexpr uint32_t kRunAtLowPriority = 0u; + const auto shard_id = db_slice().shard_id(); - bool required_state = defrag_state_.IsRequired(); - if (required_state) { + if (defrag_state_.IsRequired()) { VLOG(1) << shard_id << ": need to run defrag memory cursor state: " << defrag_state_.cursor; if (DoDefrag()) { // we didn't finish the scan return util::ProactorBase::kOnIdleMaxLevel; } } - // by default we just want to not get in the way.. - return 0u; + + return kRunAtLowPriority; } EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap) diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 77a3fb9215c3..aca15efcaa5a 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -44,6 +44,11 @@ class EngineShard { struct DefragStats { uint64_t success_count = 0; // how many objects were moved uint64_t tries = 0; // how many times we tried + + void Init() { + success_count = 0u; + tries = 0u; + } }; // EngineShard() is private down below. diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index 02364dc4b97e..c0de0bdae76f 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -26,6 +26,15 @@ ABSL_FLAG(bool, force_epoll, false, "If true, uses epoll api instead iouring to namespace dfly { +std::ostream& operator<<(std::ostream& os, ArgSlice& list) { + os << "["; + if (!list.empty()) { + std::for_each(list.begin(), list.end() - 1, [&os](const auto& val) { os << val << ", "; }); + os << (*(list.end() - 1)); + } + return os << "]"; +} + extern unsigned kInitSegmentLog; using MP = MemcacheParser; @@ -267,7 +276,7 @@ auto BaseFamilyTest::GetMC(MP::CmdType cmd_type, std::initializer_listSplitLines(); } -int64_t BaseFamilyTest::CheckedInt(std::initializer_list list) { +int64_t BaseFamilyTest::CheckedInt(ArgSlice list) { RespExpr resp = Run(list); if (resp.type == RespExpr::INT64) { return get(resp.u); diff --git a/src/server/test_utils.h b/src/server/test_utils.h index d4dc2957284c..975307992f76 100644 --- a/src/server/test_utils.h +++ b/src/server/test_utils.h @@ -56,7 +56,10 @@ class BaseFamilyTest : public ::testing::Test { MCResponse RunMC(MemcacheParser::CmdType cmd_type, std::string_view key = std::string_view{}); MCResponse GetMC(MemcacheParser::CmdType cmd_type, std::initializer_list list); - int64_t CheckedInt(std::initializer_list list); + int64_t CheckedInt(std::initializer_list list) { + return CheckedInt(ArgSlice{list.begin(), list.size()}); + } + int64_t CheckedInt(ArgSlice list); bool IsLocked(DbIndex db_index, std::string_view key) const; ConnectionContext::DebugInfo GetDebugInfo(const std::string& id) const;