Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: run memory decommit after snapshot load/save #3828

Merged
merged 6 commits into from
Oct 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/server/memory_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,8 @@ void MemoryCmd::Run(CmdArgList args) {
}

if (sub_cmd == "DECOMMIT") {
shard_set->pool()->AwaitBrief([](unsigned, auto* pb) {
ServerState::tlocal()->DecommitMemory(ServerState::kDataHeap | ServerState::kBackingHeap |
ServerState::kGlibcmalloc);
});
shard_set->pool()->AwaitBrief(
[](unsigned, auto* pb) { ServerState::tlocal()->DecommitMemory(ServerState::kAllMemory); });
return cntx_->SendSimpleString("OK");
}

Expand Down
6 changes: 6 additions & 0 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1991,6 +1991,12 @@ RdbLoader::~RdbLoader() {
break;
delete item;
}

// Decommit local memory.
// We create an RdbLoader for each thread, so each one will Decommit for itself after
// full sync ends (since we explicitly reset the RdbLoader).
auto* tlocal = ServerState::tlocal();
tlocal->DecommitMemory(ServerState::kAllMemory);
}

error_code RdbLoader::Load(io::Source* src) {
Expand Down
4 changes: 4 additions & 0 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1406,6 +1406,10 @@ RdbSaver::RdbSaver(::io::Sink* sink, SaveMode save_mode, bool align_writes) {
}

RdbSaver::~RdbSaver() {
// Decommit local memory.
// We create an RdbSaver for each thread, so each one will Decommit for itself.
auto* tlocal = ServerState::tlocal();
tlocal->DecommitMemory(ServerState::kAllMemory);
}

void RdbSaver::StartSnapshotInShard(bool stream_journal, const Cancellation* cll,
Expand Down
48 changes: 15 additions & 33 deletions src/server/rdb_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ class RdbTest : public BaseFamilyTest {
void SetUp();

io::FileSource GetSource(string name);

std::error_code LoadRdb(const string& filename) {
return pp_->at(0)->Await([&] {
io::FileSource fs = GetSource(filename);

RdbLoader loader(service_.get());
return loader.Load(&fs);
});
}
};

void RdbTest::SetUp() {
Expand Down Expand Up @@ -84,19 +93,12 @@ TEST_F(RdbTest, Crc) {
}

TEST_F(RdbTest, LoadEmpty) {
io::FileSource fs = GetSource("empty.rdb");
RdbLoader loader(NULL);
auto ec = loader.Load(&fs);
auto ec = LoadRdb("empty.rdb");
CHECK(!ec);
}

TEST_F(RdbTest, LoadSmall6) {
io::FileSource fs = GetSource("redis6_small.rdb");
RdbLoader loader{service_.get()};

// must run in proactor thread in order to avoid polluting the serverstate
// in the main, testing thread.
auto ec = pp_->at(0)->Await([&] { return loader.Load(&fs); });
auto ec = LoadRdb("redis6_small.rdb");

ASSERT_FALSE(ec) << ec.message();

Expand Down Expand Up @@ -128,12 +130,7 @@ TEST_F(RdbTest, LoadSmall6) {
}

TEST_F(RdbTest, Stream) {
io::FileSource fs = GetSource("redis6_stream.rdb");
RdbLoader loader{service_.get()};

// must run in proactor thread in order to avoid polluting the serverstate
// in the main, testing thread.
auto ec = pp_->at(0)->Await([&] { return loader.Load(&fs); });
auto ec = LoadRdb("redis6_stream.rdb");

ASSERT_FALSE(ec) << ec.message();

Expand Down Expand Up @@ -447,12 +444,7 @@ TEST_F(RdbTest, JsonTest) {
class HllRdbTest : public RdbTest, public testing::WithParamInterface<string> {};

TEST_P(HllRdbTest, Hll) {
io::FileSource fs = GetSource("hll.rdb");
RdbLoader loader{service_.get()};

// must run in proactor thread in order to avoid polluting the serverstate
// in the main, testing thread.
auto ec = pp_->at(0)->Await([&] { return loader.Load(&fs); });
auto ec = LoadRdb("hll.rdb");

ASSERT_FALSE(ec) << ec.message();

Expand All @@ -478,12 +470,7 @@ TEST_F(RdbTest, LoadSmall7) {
// 2. A hashtable called my-hset encoded as RDB_TYPE_HASH_LISTPACK
// 3. A set called my-set encoded as RDB_TYPE_SET_LISTPACK
// 4. A zset called my-zset encoded as RDB_TYPE_ZSET_LISTPACK
io::FileSource fs = GetSource("redis7_small.rdb");
RdbLoader loader{service_.get()};

// must run in proactor thread in order to avoid polluting the serverstate
// in the main, testing thread.
auto ec = pp_->at(0)->Await([&] { return loader.Load(&fs); });
auto ec = LoadRdb("redis7_small.rdb");

ASSERT_FALSE(ec) << ec.message();

Expand Down Expand Up @@ -520,12 +507,7 @@ TEST_F(RdbTest, RedisJson) {
// JSON.SET json-obj $
// '{"company":"DragonflyDB","product":"Dragonfly","website":"https://dragondlydb.io","years-active":[2021,2022,2023,2024,"and
// more!"]}'
io::FileSource fs = GetSource("redis_json.rdb");
RdbLoader loader{service_.get()};

// must run in proactor thread in order to avoid polluting the serverstate
// in the main, testing thread.
auto ec = pp_->at(0)->Await([&] { return loader.Load(&fs); });
auto ec = LoadRdb("redis_json.rdb");

ASSERT_FALSE(ec) << ec.message();

Expand Down
3 changes: 3 additions & 0 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ void Replica::Stop() {
// so we can freely release resources (connections).
sync_fb_.JoinIfNeeded();
acks_fb_.JoinIfNeeded();
for (auto& flow : shard_flows_) {
flow.reset();
}
}

void Replica::Pause(bool pause) {
Expand Down
7 changes: 6 additions & 1 deletion src/server/server_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,12 @@ class ServerState { // public struct - to allow initialization.
// Decommits 3 possible heaps according to the flags.
// For decommit_glibcmalloc the heap is global for the process, for others it's specific only
// for this thread.
enum { kDataHeap = 1, kBackingHeap = 2, kGlibcmalloc = 4 };
enum {
kDataHeap = 1,
kBackingHeap = 2,
kGlibcmalloc = 4,
kAllMemory = kDataHeap | kBackingHeap | kGlibcmalloc
};
void DecommitMemory(uint8_t flags);

// Exec descriptor frequency count for this thread.
Expand Down
Loading