Skip to content

Commit

Permalink
feat(server): memory defrag support - unit tests added to verify #448
Browse files Browse the repository at this point in the history
feat(server): memory defrag support - mi_option changed #448

feat(server): active fragmentation - spell check issue #448

feat(server): active memory defrag task #448

Signed-off-by: Boaz Sade <[email protected]>
  • Loading branch information
boazsade committed Dec 1, 2022
1 parent cd40bd7 commit b0836e1
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 45 deletions.
35 changes: 31 additions & 4 deletions src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ using absl::GetFlag;
namespace {

constexpr XXH64_hash_t kHashSeed = 24061983;
constexpr size_t kAlignSize = 8u;

// Approximation since does not account for listpacks.
size_t QlMAllocSize(quicklist* ql) {
Expand Down Expand Up @@ -217,7 +218,7 @@ struct TL {

thread_local TL tl;

constexpr bool kUseSmallStrings = true;
constexpr bool kUseSmallStrings = false;

/// TODO: Ascii encoding becomes slow for large blobs. We should factor it out into a separate
/// file and implement with SIMD instructions.
Expand Down Expand Up @@ -380,6 +381,18 @@ void RobjWrapper::SetString(string_view s, pmr::memory_resource* mr) {
}
}

bool RobjWrapper::Reallocate(std::pmr::memory_resource* mr) {
if (type_ == OBJ_STRING) {
// Currently only for strings.
void* old_ptr = inner_obj_;
inner_obj_ = mr->allocate(sz_, kAlignSize);
memcpy(inner_obj_, old_ptr, sz_);
mr->deallocate(old_ptr, 0, kAlignSize);
return true;
}
return false;
}

void RobjWrapper::Init(unsigned type, unsigned encoding, void* inner) {
type_ = type;
encoding_ = encoding;
Expand All @@ -398,13 +411,13 @@ void RobjWrapper::MakeInnerRoom(size_t current_cap, size_t desired, pmr::memory_
desired += SDS_MAX_PREALLOC;
}

void* newp = mr->allocate(desired, 8);
void* newp = mr->allocate(desired, kAlignSize);
if (sz_) {
memcpy(newp, inner_obj_, sz_);
}

if (current_cap) {
mr->deallocate(inner_obj_, current_cap, 8);
mr->deallocate(inner_obj_, current_cap, kAlignSize);
}
inner_obj_ = newp;
}
Expand Down Expand Up @@ -659,7 +672,7 @@ robj* CompactObj::AsRObj() const {
res->type = u_.r_obj.type();

if (res->type == OBJ_SET) {
LOG(FATAL) << "Should not call AsRObj for type " << res->type;
LOG(FATAL) << "Should not call AsRObj for type " << res->type;
}

if (res->type == OBJ_HASH) {
Expand Down Expand Up @@ -850,6 +863,20 @@ string_view CompactObj::GetSlice(string* scratch) const {
return string_view{};
}

bool CompactObj::DefragIfNeeded(float ratio) {
// Supporting only for Strings - but not externals, or small string - they are using different
// allocations. otherwise we may crash See HasAllocated function for testing other conditions
const bool is_mimalloc_allocated =
(HasAllocated() && taglen_ != SMALL_TAG && ObjType() == OBJ_STRING);
if (is_mimalloc_allocated) {
// leave this in its own line if ever need debug this
if (zmalloc_page_is_underutilized(RObjPtr(), ratio)) {
return u_.r_obj.Reallocate(tl.local_mr);
}
}
return false;
}

bool CompactObj::HasAllocated() const {
if (IsRef() || taglen_ == INT_TAG || IsInline() || taglen_ == EXTERNAL_TAG ||
(taglen_ == ROBJ_TAG && u_.r_obj.inner_obj() == nullptr))
Expand Down
6 changes: 5 additions & 1 deletion src/core/compact_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ typedef struct redisObject robj;
namespace dfly {

constexpr unsigned kEncodingIntSet = 0;
constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings
constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings
constexpr unsigned kEncodingStrMap2 = 2; // for set/map encodings of strings using DenseSet
constexpr unsigned kEncodingListPack = 3;

Expand Down Expand Up @@ -52,6 +52,8 @@ class RobjWrapper {
return std::string_view{reinterpret_cast<char*>(inner_obj_), sz_};
}

bool Reallocate(std::pmr::memory_resource* mr);

private:
size_t InnerObjMallocUsed() const;
void MakeInnerRoom(size_t current_cap, size_t desired, std::pmr::memory_resource* mr);
Expand Down Expand Up @@ -208,6 +210,8 @@ class CompactObj {
return mask_ & IO_PENDING;
}

bool DefragIfNeeded(float ratio);

void SetIoPending(bool b) {
if (b) {
mask_ |= IO_PENDING;
Expand Down
38 changes: 20 additions & 18 deletions src/redis/zmalloc.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
#define __zm_str(s) #s

#if defined(USE_JEMALLOC)
#define ZMALLOC_LIB ("jemalloc-" __xstr(JEMALLOC_VERSION_MAJOR) "." __xstr(JEMALLOC_VERSION_MINOR) "." __xstr(JEMALLOC_VERSION_BUGFIX))
#define ZMALLOC_LIB \
("jemalloc-" __xstr(JEMALLOC_VERSION_MAJOR) "." __xstr(JEMALLOC_VERSION_MINOR) "." __xstr( \
JEMALLOC_VERSION_BUGFIX))
#include <jemalloc/jemalloc.h>
#if (JEMALLOC_VERSION_MAJOR == 2 && JEMALLOC_VERSION_MINOR >= 1) || (JEMALLOC_VERSION_MAJOR > 2)
#define HAVE_MALLOC_SIZE 1
Expand Down Expand Up @@ -82,32 +84,32 @@
#define HAVE_DEFRAG
#endif

void *zmalloc(size_t size);
void *zcalloc(size_t size);
void *zrealloc(void *ptr, size_t size);
void *ztrymalloc(size_t size);
void *ztrycalloc(size_t size);
void *ztryrealloc(void *ptr, size_t size);
void zfree(void *ptr);
void* zmalloc(size_t size);
void* zcalloc(size_t size);
void* zrealloc(void* ptr, size_t size);
void* ztrymalloc(size_t size);
void* ztrycalloc(size_t size);
void* ztryrealloc(void* ptr, size_t size);
void zfree(void* ptr);

size_t znallocx(size_t size); // Equivalent to nallocx for jemalloc or mi_good_size for mimalloc.
size_t znallocx(size_t size); // Equivalent to nallocx for jemalloc or mi_good_size for mimalloc.
void zfree_size(void* ptr, size_t size); // equivalent to sdallocx or mi_free_size

void *zmalloc_usable(size_t size, size_t *usable);
void *zcalloc_usable(size_t size, size_t *usable);
void *zrealloc_usable(void *ptr, size_t size, size_t *usable);
void *ztrymalloc_usable(size_t size, size_t *usable);
void *ztrycalloc_usable(size_t size, size_t *usable);
void *ztryrealloc_usable(void *ptr, size_t size, size_t *usable);
void* zmalloc_usable(size_t size, size_t* usable);
void* zcalloc_usable(size_t size, size_t* usable);
void* zrealloc_usable(void* ptr, size_t size, size_t* usable);
void* ztrymalloc_usable(size_t size, size_t* usable);
void* ztrycalloc_usable(size_t size, size_t* usable);
void* ztryrealloc_usable(void* ptr, size_t size, size_t* usable);

// size_t zmalloc_used_memory(void);
void zmalloc_set_oom_handler(void (*oom_handler)(size_t));
size_t zmalloc_get_rss(void);
int zmalloc_get_allocator_info(size_t *allocated, size_t *active, size_t *resident);
int zmalloc_get_allocator_info(size_t* allocated, size_t* active, size_t* resident);
void set_jemalloc_bg_thread(int enable);
int jemalloc_purge();
size_t zmalloc_get_private_dirty(long pid);
size_t zmalloc_get_smap_bytes_by_field(char *field, long pid);
size_t zmalloc_get_smap_bytes_by_field(char* field, long pid);
size_t zmalloc_get_memory_size(void);
size_t zmalloc_usable_size(const void* p);

Expand All @@ -116,7 +118,7 @@ size_t zmalloc_usable_size(const void* p);
* This uses the current local thread heap.
* return 0 if not, 1 if underutilized
*/
int zmalloc_page_is_underutilized(void *ptr, float ratio);
int zmalloc_page_is_underutilized(void* ptr, float ratio);
// roman: void zlibc_free(void *ptr);

void init_zmalloc_threadlocal(void* heap);
Expand Down
1 change: 1 addition & 0 deletions src/server/dfly_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ Usage: dragonfly [FLAGS]
}
mi_option_enable(mi_option_show_errors);
mi_option_set(mi_option_max_warnings, 0);
mi_option_set(mi_option_decommit_delay, 0);

base::sys::KernelVersion kver;
base::sys::GetKernelVersion(&kver);
Expand Down
65 changes: 58 additions & 7 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -758,21 +758,72 @@ TEST_F(DflyEngineTest, Bug496) {
TEST_F(DefragDflyEngineTest, TestDefragOption) {
// Fill data into dragonfly and then check if we have
// any location in memory to defrag. See issue #448 for details about this.
EXPECT_GE(max_memory_limit, 100'000);
const int NUMBER_OF_KEYS = 184000;
max_memory_limit = 300'000; // control memory size so no need for too many keys
constexpr int kNumberOfKeys = 100'000; // this fill the memory
constexpr int kKeySize = 137;
const uint64_t kMinExpectedDefragCount = 80;

std::vector<std::string> keys2delete;
keys2delete.push_back("del");

// Generate a list of keys that would be deleted
// The keys that we will delete are all in the form of "key-name:1<other digits>"
// This is because we are populating keys that has this format, but we don't want
// to delete all keys, only some random keys so we deleting those that start with 1
constexpr int kFactor = 10;
int kMaxNumKeysToDelete = 10'000;
int current_step = kFactor;
for (int i = 1; i < kMaxNumKeysToDelete; current_step *= kFactor) {
for (int j = i - 1 + current_step; i < current_step; i++, j++) {
keys2delete.push_back("key-name:" + std::to_string(j));
}
}

std::vector<std::string_view> keys(keys2delete.begin(), keys2delete.end());

RespExpr resp = Run({"DEBUG", "POPULATE", std::to_string(NUMBER_OF_KEYS), "key-name", "130"});
RespExpr resp = Run(
{"DEBUG", "POPULATE", std::to_string(kNumberOfKeys), "key-name", std::to_string(kKeySize)});
ASSERT_EQ(resp, "OK");
resp = Run({"DBSIZE"});
EXPECT_THAT(resp, IntArg(NUMBER_OF_KEYS));
EXPECT_THAT(resp, IntArg(kNumberOfKeys));

shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) {
EngineShard* shard = EngineShard::tlocal();
ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty!
EXPECT_EQ(shard->GetDefragStats().success_count, 0);
// we are not running stats yet
EXPECT_EQ(shard->GetDefragStats().tries, 0);
EXPECT_GT(GetMallocCurrentCommitted(), NUMBER_OF_KEYS);
// we are expecting to have at least one try by now
EXPECT_GT(shard->GetDefragStats().tries, 0);
EXPECT_GT(GetMallocCurrentCommitted(), kMaxNumKeysToDelete);
});

ArgSlice delete_cmd(keys);
auto r = CheckedInt(delete_cmd);
// the first element in this is the command del so size is one less
ASSERT_EQ(r, keys2delete.size() - 1);

// At this point we need to see whether we did running the task and whether the task did something
shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) {
EngineShard* shard = EngineShard::tlocal();
ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty!
int tries = 10;
uint64_t defrag_count = base->AwaitBrief([&]() {
// This task is kind of condition to await on for the scan to run and complete for at least
// one time. Since there is no option to sleep (it would just block all fibers), we are
// creating a busy wait fiber here that checks the stats whether the defrag task run and
// whether it did something.
auto stats = shard->GetDefragStats();
while (tries > 0 && stats.tries < 3) {
stats = shard->GetDefragStats();
if (stats.success_count > 10) {
return stats.success_count;
}
--tries;
this_fiber::sleep_for(220ms);
}
return stats.success_count;
});

EXPECT_GT(defrag_count, kMinExpectedDefragCount);
});
}

Expand Down
72 changes: 59 additions & 13 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ using absl::GetFlag;

namespace {

constexpr DbIndex DEFAULT_DB_INDEX = 0;
constexpr DbIndex kDefaultDbIndex = 0;
constexpr uint64_t kCursorDoneState = 0u;

vector<EngineShardSet::CachedStats> cached_stats; // initialized in EngineShardSet::Init

Expand Down Expand Up @@ -80,7 +81,7 @@ bool EngineShard::DefragTaskState::IsRequired() {
const uint64_t threshold_mem = max_memory_limit * GetFlag(FLAGS_mem_defrag_threshold);
const double commit_use_threshold = GetFlag(FLAGS_commit_use_threshold);

if (cursor > 0) {
if (cursor > kCursorDoneState) {
return true;
}

Expand All @@ -100,34 +101,79 @@ bool EngineShard::DefragTaskState::IsRequired() {

// for now this does nothing
bool EngineShard::DoDefrag() {
// TODO - Impl!!
return defrag_state_.cursor > 0;
// --------------------------------------------------------------------------
// NOTE: This task is running with exclusive access to the shard.
// i.e. - Since we are using shared noting access here, and all access
// are done using fibers, This fiber is run only when no other fiber in the
// context of the controlling thread will access this shard!
// --------------------------------------------------------------------------

constexpr size_t kCursorCountRelation = 50;
const float threshold = GetFlag(FLAGS_mem_utilization_threshold);

auto& slice = db_slice();
DCHECK(slice.IsDbValid(kDefaultDbIndex));
auto [prime_table, expire_table] = slice.GetTables(kDefaultDbIndex);
// otherwise we will end with negative invalid segment id - crashed at CI
CHECK(prime_table->depth() < 40)
<< "the depth for the prime table is invalid: " << prime_table->depth();
PrimeTable::Cursor cur = defrag_state_.cursor;
const std::size_t max_iteration = slice.DbSize(kDefaultDbIndex) / kCursorCountRelation;
uint64_t defrag_count = 0;
unsigned interation_count = 0;
do {
cur = prime_table->Traverse(cur, [&](PrimeIterator it) {
// for each value check whether we should move it because it
// seats on underutilized page of memory, and if so, do it.
bool did = it->second.DefragIfNeeded(threshold);
if (did) {
defrag_count++;
}
});
interation_count++;
} while (interation_count < max_iteration && cur);

defrag_state_.cursor = cur.value();
if (defrag_count > 0) {
VLOG(1) << "shard " << slice.shard_id() << ": successfully defrag " << defrag_count
<< " times";
} else {
VLOG(1) << "shard " << slice.shard_id() << ": run the defrag " << interation_count
<< "times out of maximum " << max_iteration << ", with cursor at "
<< defrag_state_.cursor << " times but no location for defrag were found";
}
defrag_state_.stats.success_count += defrag_count;
defrag_state_.stats.tries++;
return defrag_state_.cursor > kCursorDoneState;
}

void EngineShard::DefragTaskState::Init() {
cursor = 0u;
cursor = kCursorDoneState;
stats.Init();
}

// the memory defragmentation task is as follow:
// 1. Check if memory usage is high enough
// 2. Check if diff between commited and used memory is high enough
// 3. Check if we have memory changes (to ensure that we not running endlessly). - TODO
// 4. if all the above pass -> run on the shard and try to defragmented memory by re-allocating
// values
// if the cursor for this is signal that we are not done, schedule the task to run at high
// priority otherwise lower the task priority so that it would not use the CPU when not required
// 4. if all the above pass -> scan this shard and try to find whether we can move pointer to
// underutilized pages values
// if the cursor returned from scan is not in done state, schedule the task to run at high
// priority.
// otherwise lower the task priority so that it would not use the CPU when not required
uint32_t EngineShard::DefragTask() {
constexpr uint32_t kRunAtLowPriority = 0u;

const auto shard_id = db_slice().shard_id();
bool required_state = defrag_state_.IsRequired();
if (required_state) {
if (defrag_state_.IsRequired()) {
VLOG(1) << shard_id << ": need to run defrag memory cursor state: " << defrag_state_.cursor;
if (DoDefrag()) {
// we didn't finish the scan
return util::ProactorBase::kOnIdleMaxLevel;
}
}
// by default we just want to not get in the way..
return 0u;

return kRunAtLowPriority;
}

EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap)
Expand Down
5 changes: 5 additions & 0 deletions src/server/engine_shard_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class EngineShard {
struct DefragStats {
uint64_t success_count = 0; // how many objects were moved
uint64_t tries = 0; // how many times we tried

void Init() {
success_count = 0u;
tries = 0u;
}
};

// EngineShard() is private down below.
Expand Down
Loading

0 comments on commit b0836e1

Please sign in to comment.