diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index f48095c4cfa7..996e6a6d6e36 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -39,6 +39,7 @@ using absl::GetFlag; namespace { constexpr XXH64_hash_t kHashSeed = 24061983; +constexpr size_t kAlignSize = 8u; // Approximation since does not account for listpacks. size_t QlMAllocSize(quicklist* ql) { @@ -217,7 +218,7 @@ struct TL { thread_local TL tl; -constexpr bool kUseSmallStrings = true; +constexpr bool kUseSmallStrings = false; /// TODO: Ascii encoding becomes slow for large blobs. We should factor it out into a separate /// file and implement with SIMD instructions. @@ -380,6 +381,23 @@ void RobjWrapper::SetString(string_view s, pmr::memory_resource* mr) { } } +bool RobjWrapper::DefragIfNeeded(float ratio) { + if (type() == OBJ_STRING) { // only applicable to strings + if (zmalloc_page_is_underutilized(inner_obj(), ratio)) { + return Reallocate(tl.local_mr); + } + } + return false; +} + +bool RobjWrapper::Reallocate(std::pmr::memory_resource* mr) { + void* old_ptr = inner_obj_; + inner_obj_ = mr->allocate(sz_, kAlignSize); + memcpy(inner_obj_, old_ptr, sz_); + mr->deallocate(old_ptr, 0, kAlignSize); + return true; +} + void RobjWrapper::Init(unsigned type, unsigned encoding, void* inner) { type_ = type; encoding_ = encoding; @@ -398,13 +416,13 @@ void RobjWrapper::MakeInnerRoom(size_t current_cap, size_t desired, pmr::memory_ desired += SDS_MAX_PREALLOC; } - void* newp = mr->allocate(desired, 8); + void* newp = mr->allocate(desired, kAlignSize); if (sz_) { memcpy(newp, inner_obj_, sz_); } if (current_cap) { - mr->deallocate(inner_obj_, current_cap, 8); + mr->deallocate(inner_obj_, current_cap, kAlignSize); } inner_obj_ = newp; } @@ -659,7 +677,7 @@ robj* CompactObj::AsRObj() const { res->type = u_.r_obj.type(); if (res->type == OBJ_SET) { - LOG(FATAL) << "Should not call AsRObj for type " << res->type; + LOG(FATAL) << "Should not call AsRObj for type " << res->type; } if (res->type == OBJ_HASH) { @@ -850,6 +868,28 @@ string_view CompactObj::GetSlice(string* scratch) const { return string_view{}; } +bool CompactObj::DefragIfNeeded(float ratio) { + switch (taglen_) { + case ROBJ_TAG: + // currently only these objet types are supported for this operation + if (u_.r_obj.inner_obj() != nullptr) { + return u_.r_obj.DefragIfNeeded(ratio); + } + return false; + case SMALL_TAG: + // TODO - support this later + return false; + case INT_TAG: + // this is not relevant in this case + return false; + case EXTERNAL_TAG: + return false; + default: + // This is the case when the object is at inline_str + return false; + } +} + bool CompactObj::HasAllocated() const { if (IsRef() || taglen_ == INT_TAG || IsInline() || taglen_ == EXTERNAL_TAG || (taglen_ == ROBJ_TAG && u_.r_obj.inner_obj() == nullptr)) diff --git a/src/core/compact_object.h b/src/core/compact_object.h index 6f56a16432ca..087df9ee7e44 100644 --- a/src/core/compact_object.h +++ b/src/core/compact_object.h @@ -16,7 +16,7 @@ typedef struct redisObject robj; namespace dfly { constexpr unsigned kEncodingIntSet = 0; -constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings +constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings constexpr unsigned kEncodingStrMap2 = 2; // for set/map encodings of strings using DenseSet constexpr unsigned kEncodingListPack = 3; @@ -52,7 +52,10 @@ class RobjWrapper { return std::string_view{reinterpret_cast(inner_obj_), sz_}; } + bool DefragIfNeeded(float ratio); + private: + bool Reallocate(std::pmr::memory_resource* mr); size_t InnerObjMallocUsed() const; void MakeInnerRoom(size_t current_cap, size_t desired, std::pmr::memory_resource* mr); @@ -208,6 +211,8 @@ class CompactObj { return mask_ & IO_PENDING; } + bool DefragIfNeeded(float ratio); + void SetIoPending(bool b) { if (b) { mask_ |= IO_PENDING; diff --git a/src/core/compact_object_test.cc b/src/core/compact_object_test.cc index 8740959e9231..34d4c73cc2d0 100644 --- a/src/core/compact_object_test.cc +++ b/src/core/compact_object_test.cc @@ -342,7 +342,7 @@ TEST_F(CompactObjectTest, MimallocUnderutilzationWithRealloc) { bool found = HasUnderutilizedMemory(ptrs, kUnderUtilizedRatio); ASSERT_FALSE(found); DeallocateAtRandom(kRandomStep, &ptrs); - // TestMiMallocUnderutilized(ptrs, run_reallocation, allocation_size); + // This is another case, where we are filling the "gaps" by doing re-allocations // in this case, since we are not setting all the values back it should still have // places that are not used. Plus since we are not looking at the first page diff --git a/src/server/dfly_main.cc b/src/server/dfly_main.cc index ae91cbfd217c..09bab7c59e26 100644 --- a/src/server/dfly_main.cc +++ b/src/server/dfly_main.cc @@ -341,6 +341,7 @@ Usage: dragonfly [FLAGS] } mi_option_enable(mi_option_show_errors); mi_option_set(mi_option_max_warnings, 0); + mi_option_set(mi_option_decommit_delay, 0); base::sys::KernelVersion kver; base::sys::GetKernelVersion(&kver); diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 0f4ff745b766..924b3b53e8aa 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -758,21 +758,66 @@ TEST_F(DflyEngineTest, Bug496) { TEST_F(DefragDflyEngineTest, TestDefragOption) { // Fill data into dragonfly and then check if we have // any location in memory to defrag. See issue #448 for details about this. - EXPECT_GE(max_memory_limit, 100'000); - const int NUMBER_OF_KEYS = 184000; + max_memory_limit = 300'000; // control memory size so no need for too many keys + constexpr int kNumberOfKeys = 100'000; // this fill the memory + constexpr int kKeySize = 137; + constexpr int kMaxDefragTriesForTests = 10; + + std::vector keys2delete; + keys2delete.push_back("del"); + + // Generate a list of keys that would be deleted + // The keys that we will delete are all in the form of "key-name:1" + // This is because we are populating keys that has this format, but we don't want + // to delete all keys, only some random keys so we deleting those that start with 1 + constexpr int kFactor = 10; + int kMaxNumKeysToDelete = 10'000; + int current_step = kFactor; + for (int i = 1; i < kMaxNumKeysToDelete; current_step *= kFactor) { + for (; i < current_step; i++) { + int j = i - 1 + current_step; + keys2delete.push_back("key-name:" + std::to_string(j)); + } + } + + std::vector keys(keys2delete.begin(), keys2delete.end()); - RespExpr resp = Run({"DEBUG", "POPULATE", std::to_string(NUMBER_OF_KEYS), "key-name", "130"}); + RespExpr resp = Run( + {"DEBUG", "POPULATE", std::to_string(kNumberOfKeys), "key-name", std::to_string(kKeySize)}); ASSERT_EQ(resp, "OK"); resp = Run({"DBSIZE"}); - EXPECT_THAT(resp, IntArg(NUMBER_OF_KEYS)); + EXPECT_THAT(resp, IntArg(kNumberOfKeys)); shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) { EngineShard* shard = EngineShard::tlocal(); ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty! + this_fiber::sleep_for(100ms); EXPECT_EQ(shard->GetDefragStats().success_count, 0); - // we are not running stats yet - EXPECT_EQ(shard->GetDefragStats().tries, 0); - EXPECT_GT(GetMallocCurrentCommitted(), NUMBER_OF_KEYS); + // we are expecting to have at least one try by now + EXPECT_GT(shard->GetDefragStats().tries, 0); + }); + + ArgSlice delete_cmd(keys); + auto r = CheckedInt(delete_cmd); + // the first element in this is the command del so size is one less + ASSERT_EQ(r, keys2delete.size() - 1); + + // At this point we need to see whether we did running the task and whether the task did something + shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) { + EngineShard* shard = EngineShard::tlocal(); + ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty! + // a "busy wait" to ensure that memory defragmentations was successful: + // the task ran and did it work + auto stats = shard->GetDefragStats(); + for (int i = 0; i < kMaxDefragTriesForTests; i++) { + stats = shard->GetDefragStats(); + if (stats.success_count > 0) { + break; + } + this_fiber::sleep_for(220ms); + } + // make sure that we successfully found places to defrag in memory + EXPECT_GT(stats.success_count, 0); }); } diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 2c702f8309b6..489998d2d2e6 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -51,7 +51,8 @@ using absl::GetFlag; namespace { -constexpr DbIndex DEFAULT_DB_INDEX = 0; +constexpr DbIndex kDefaultDbIndex = 0; +constexpr uint64_t kCursorDoneState = 0u; vector cached_stats; // initialized in EngineShardSet::Init @@ -80,7 +81,7 @@ bool EngineShard::DefragTaskState::IsRequired() { const uint64_t threshold_mem = max_memory_limit * GetFlag(FLAGS_mem_defrag_threshold); const double commit_use_threshold = GetFlag(FLAGS_commit_use_threshold); - if (cursor > 0) { + if (cursor > kCursorDoneState) { return true; } @@ -100,34 +101,73 @@ bool EngineShard::DefragTaskState::IsRequired() { // for now this does nothing bool EngineShard::DoDefrag() { - // TODO - Impl!! - return defrag_state_.cursor > 0; -} - -void EngineShard::DefragTaskState::Init() { - cursor = 0u; + // -------------------------------------------------------------------------- + // NOTE: This task is running with exclusive access to the shard. + // i.e. - Since we are using shared noting access here, and all access + // are done using fibers, This fiber is run only when no other fiber in the + // context of the controlling thread will access this shard! + // -------------------------------------------------------------------------- + + constexpr size_t kMaxTraverses = 50; + const float threshold = GetFlag(FLAGS_mem_utilization_threshold); + + auto& slice = db_slice(); + DCHECK(slice.IsDbValid(kDefaultDbIndex)); + auto [prime_table, expire_table] = slice.GetTables(kDefaultDbIndex); + PrimeTable::Cursor cur = defrag_state_.cursor; + uint64_t defrag_count = 0; + unsigned traverses_count = 0; + + do { + cur = prime_table->Traverse(cur, [&](PrimeIterator it) { + // for each value check whether we should move it because it + // seats on underutilized page of memory, and if so, do it. + bool did = it->second.DefragIfNeeded(threshold); + if (did) { + defrag_count++; + } + }); + traverses_count++; + } while (traverses_count < kMaxTraverses && cur); + + defrag_state_.cursor = cur.value(); + if (defrag_count > 0) { + VLOG(1) << "shard " << slice.shard_id() << ": successfully defrag " << defrag_count + << " times, did it in " << traverses_count << " cursor is at the " + << (defrag_state_.cursor == 0 ? "end" : "in progress"); + } else { + VLOG(1) << "shard " << slice.shard_id() << ": run the defrag " << traverses_count + << " times out of maximum " << kMaxTraverses << ", with cursor at " + << (defrag_state_.cursor == 0 ? "end" : "in progress") + << " but no location for defrag were found"; + } + defrag_state_.stats.success_count += defrag_count; + defrag_state_.stats.tries++; + return defrag_state_.cursor > kCursorDoneState; } // the memory defragmentation task is as follow: // 1. Check if memory usage is high enough // 2. Check if diff between commited and used memory is high enough // 3. Check if we have memory changes (to ensure that we not running endlessly). - TODO -// 4. if all the above pass -> run on the shard and try to defragmented memory by re-allocating -// values -// if the cursor for this is signal that we are not done, schedule the task to run at high -// priority otherwise lower the task priority so that it would not use the CPU when not required +// 4. if all the above pass -> scan this shard and try to find whether we can move pointer to +// underutilized pages values +// if the cursor returned from scan is not in done state, schedule the task to run at high +// priority. +// otherwise lower the task priority so that it would not use the CPU when not required uint32_t EngineShard::DefragTask() { + constexpr uint32_t kRunAtLowPriority = 0u; + const auto shard_id = db_slice().shard_id(); - bool required_state = defrag_state_.IsRequired(); - if (required_state) { + if (defrag_state_.IsRequired()) { VLOG(1) << shard_id << ": need to run defrag memory cursor state: " << defrag_state_.cursor; if (DoDefrag()) { // we didn't finish the scan return util::ProactorBase::kOnIdleMaxLevel; } } - // by default we just want to not get in the way.. - return 0u; + + return kRunAtLowPriority; } EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap) @@ -150,7 +190,6 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* db_slice_.UpdateExpireBase(absl::GetCurrentTimeNanos() / 1000000, 0); // start the defragmented task here - defrag_state_.Init(); defrag_task_ = pb->AddOnIdleTask([this]() { return this->DefragTask(); }); } @@ -170,9 +209,7 @@ void EngineShard::Shutdown() { ProactorBase::me()->CancelPeriodic(periodic_task_); } - if (defrag_task_ != 0) { - ProactorBase::me()->RemoveOnIdleTask(defrag_task_); - } + ProactorBase::me()->RemoveOnIdleTask(defrag_task_); } void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) { diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 77a3fb9215c3..e319d0aaafd2 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -158,8 +158,6 @@ class EngineShard { uint64_t cursor = 0u; DefragStats stats; - void Init(); - // check the current threshold and return true if // we need to do the de-fermentation bool IsRequired(); diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index 02364dc4b97e..c0de0bdae76f 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -26,6 +26,15 @@ ABSL_FLAG(bool, force_epoll, false, "If true, uses epoll api instead iouring to namespace dfly { +std::ostream& operator<<(std::ostream& os, ArgSlice& list) { + os << "["; + if (!list.empty()) { + std::for_each(list.begin(), list.end() - 1, [&os](const auto& val) { os << val << ", "; }); + os << (*(list.end() - 1)); + } + return os << "]"; +} + extern unsigned kInitSegmentLog; using MP = MemcacheParser; @@ -267,7 +276,7 @@ auto BaseFamilyTest::GetMC(MP::CmdType cmd_type, std::initializer_listSplitLines(); } -int64_t BaseFamilyTest::CheckedInt(std::initializer_list list) { +int64_t BaseFamilyTest::CheckedInt(ArgSlice list) { RespExpr resp = Run(list); if (resp.type == RespExpr::INT64) { return get(resp.u); diff --git a/src/server/test_utils.h b/src/server/test_utils.h index d4dc2957284c..975307992f76 100644 --- a/src/server/test_utils.h +++ b/src/server/test_utils.h @@ -56,7 +56,10 @@ class BaseFamilyTest : public ::testing::Test { MCResponse RunMC(MemcacheParser::CmdType cmd_type, std::string_view key = std::string_view{}); MCResponse GetMC(MemcacheParser::CmdType cmd_type, std::initializer_list list); - int64_t CheckedInt(std::initializer_list list); + int64_t CheckedInt(std::initializer_list list) { + return CheckedInt(ArgSlice{list.begin(), list.size()}); + } + int64_t CheckedInt(ArgSlice list); bool IsLocked(DbIndex db_index, std::string_view key) const; ConnectionContext::DebugInfo GetDebugInfo(const std::string& id) const;