Skip to content

Commit

Permalink
chore(tiering): Introduce second chance replacement strategy
Browse files Browse the repository at this point in the history
Introduce hot/cold replacement strategy https://www.geeksforgeeks.org/second-chance-or-clock-page-replacement-policy/

Also, add protection against overruning memory budget
Finally, cancel in-flight offloading requests for entries that were looked up.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Jul 17, 2024
1 parent 4898b25 commit 59540d7
Show file tree
Hide file tree
Showing 12 changed files with 81 additions and 55 deletions.
10 changes: 5 additions & 5 deletions src/core/compact_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,13 @@ class CompactObj {
}
}

bool HasIoPending() const {
bool DefragIfNeeded(float ratio);

bool HasStashPending() const {
return mask_ & IO_PENDING;
}

bool DefragIfNeeded(float ratio);

void SetIoPending(bool b) {
void SetStashPending(bool b) {
if (b) {
mask_ |= IO_PENDING;
} else {
Expand Down Expand Up @@ -451,7 +451,7 @@ class CompactObj {
//
static_assert(sizeof(u_) == 16, "");

mutable uint8_t mask_ = 0;
uint8_t mask_ = 0;

// We currently reserve 5 bits for tags and 3 bits for extending the mask. currently reserved.
uint8_t taglen_ = 0;
Expand Down
9 changes: 5 additions & 4 deletions src/server/acl/acl_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@

#include "server/acl/acl_family.h"

#include "absl/container/flat_hash_map.h"
#include "absl/flags/internal/flag.h"
#include "absl/strings/ascii.h"
#include "absl/strings/str_cat.h"
#include <absl/container/flat_hash_map.h>
#include <absl/strings/ascii.h>
#include <absl/strings/str_cat.h>

#include "base/flags.h"
#include "base/gtest.h"
#include "base/logging.h"
#include "facade/facade_test.h"
Expand Down
1 change: 1 addition & 0 deletions src/server/acl/acl_log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "base/flags.h"
#include "base/logging.h"
#include "facade/dragonfly_connection.h"
#include "server/conn_context.h"

ABSL_FLAG(size_t, acllog_max_len, 32,
"Specify the number of log entries. Logs are kept locally for each thread "
Expand Down
10 changes: 5 additions & 5 deletions src/server/acl/acl_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@
#include <deque>
#include <string>

#include "base/flags.h"
#include "server/conn_context.h"
namespace dfly {

ABSL_DECLARE_FLAG(size_t, acllog_max_len);
class ConnectionContext;

namespace dfly::acl {
namespace acl {

class AclLog {
public:
Expand Down Expand Up @@ -49,4 +48,5 @@ class AclLog {
size_t total_entries_allowed_;
};

} // namespace dfly::acl
} // namespace acl
} // namespace dfly
9 changes: 5 additions & 4 deletions src/server/acl/validator.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@

#include <utility>

#include "facade/facade_types.h"
#include "server/acl/acl_log.h"
#include "server/command_registry.h"
#include "server/conn_context.h"

namespace dfly::acl {

class AclKeys;

std::pair<bool, AclLog::Reason> IsUserAllowedToInvokeCommandGeneric(
const std::vector<uint64_t>& acl_commands, const AclKeys& keys, CmdArgList tail_args,
const std::vector<uint64_t>& acl_commands, const AclKeys& keys, facade::CmdArgList tail_args,
const CommandId& id);

bool IsUserAllowedToInvokeCommand(const ConnectionContext& cntx, const CommandId& id,
CmdArgList tail_args);

facade::CmdArgList tail_args);
} // namespace dfly::acl
6 changes: 4 additions & 2 deletions src/server/cluster/incoming_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

#include "server/cluster/incoming_slot_migration.h"

#include "absl/cleanup/cleanup.h"
#include "absl/strings/str_cat.h"
#include <absl/cleanup/cleanup.h>
#include <absl/strings/str_cat.h>

#include "base/flags.h"
#include "base/logging.h"
#include "cluster_utility.h"
#include "server/error.h"
Expand Down
26 changes: 18 additions & 8 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,18 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
fetched_items_.insert(res.it->first.AsRef());
}

// If the value has a pending stash, cancel it before any modification are applied.
// Rationale: we either look it up for reads - and then it's hot, or alternatively,
// we follow up with modifications during mutation operations, and in that case storing on disk
// does not make much sense.
if (res.it->second.HasStashPending()) {
owner_->tiered_storage()->CancelStash(cntx.db_index, key, &res.it->second);
}

// Mark this entry as being looked up. We use key (first) deliberately to preserve the hotness
// attribute of the entry in case of value overtides.
res.it->first.SetTouched(true);

db.top_keys.Touch(key);

std::move(update_stats_on_miss).Cancel();
Expand All @@ -491,6 +503,11 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
if (cluster::IsClusterEnabled()) {
db.slots_stats[cluster::KeySlot(key)].total_reads++;
}
if (res.it->second.IsExternal()) {
events_.ram_misses++;
} else {
events_.ram_hits++;
}
break;
}
return res;
Expand Down Expand Up @@ -976,13 +993,6 @@ void DbSlice::PreUpdate(DbIndex db_ind, Iterator it, std::string_view key) {
DVLOG(2) << "Running callbacks in dbid " << db_ind;
CallChangeCallbacks(db_ind, ChangeReq{it.GetInnerIt()});

// If the value has a pending stash, cancel it before any modification are applied.
// Note: we don't delete offloaded values before updates, because a read-modify operation (like
// append) can be applied instead of a full overwrite. Deleting is reponsibility of the commands
if (it.IsOccupied() && it->second.HasIoPending()) {
owner_->tiered_storage()->CancelStash(db_ind, key, &it->second);
}

it.GetInnerIt().SetVersion(NextVersion());
}

Expand Down Expand Up @@ -1401,7 +1411,7 @@ void DbSlice::ClearOffloadedEntries(absl::Span<const DbIndex> indices, const DbT
cursor = db_ptr->prime.Traverse(cursor, [&](PrimeIterator it) {
if (it->second.IsExternal()) {
tiered_storage->Delete(index, &it->second);
} else if (it->second.HasIoPending()) {
} else if (it->second.HasStashPending()) {
tiered_storage->CancelStash(index, it->first.GetSlice(&scratch), &it->second);
}
});
Expand Down
2 changes: 2 additions & 0 deletions src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ void CopyValueToBuffer(const PrimeValue& pv, char* dest) {

string GetString(const PrimeValue& pv) {
string res;
DCHECK_EQ(pv.ObjType(), OBJ_STRING);

if (pv.ObjType() != OBJ_STRING)
return res;
res.resize(pv.Size());
Expand Down
2 changes: 1 addition & 1 deletion src/server/test_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ using namespace std;
ABSL_DECLARE_FLAG(string, dbfilename);
ABSL_DECLARE_FLAG(uint32_t, num_shards);
ABSL_FLAG(bool, force_epoll, false, "If true, uses epoll api instead iouring to run tests");

ABSL_DECLARE_FLAG(size_t, acllog_max_len);
namespace dfly {

std::ostream& operator<<(std::ostream& os, const DbStats& stats) {
Expand Down
56 changes: 34 additions & 22 deletions src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@
#include "base/logging.h"
#include "server/common.h"
#include "server/db_slice.h"
#include "server/engine_shard_set.h"
#include "server/snapshot.h"
#include "server/table.h"
#include "server/tiering/common.h"
#include "server/tiering/op_manager.h"
#include "server/tiering/small_bins.h"
#include "server/tx_base.h"

ABSL_FLAG(bool, tiered_storage_cache_fetched, true,
"WIP: Load results of offloaded reads to memory");
using namespace facade;

ABSL_FLAG(uint32_t, tiered_storage_memory_margin, 1_MB,
"In bytes. If memory budget on a shard goes blow this limit, tiering stops "
"hot-loading values into ram.");

ABSL_FLAG(unsigned, tiered_storage_write_depth, 50,
"Maximum number of concurrent stash requests issued by background offload");
Expand All @@ -36,8 +38,6 @@ namespace dfly {
using namespace std;
using namespace util;

using namespace tiering::literals;

using KeyRef = tiering::OpManager::KeyRef;

namespace {
Expand Down Expand Up @@ -80,14 +80,14 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {

public:
ShardOpManager(TieredStorage* ts, DbSlice* db_slice, size_t max_size)
: tiering::OpManager{max_size}, ts_{ts}, db_slice_{db_slice} {
cache_fetched_ = absl::GetFlag(FLAGS_tiered_storage_cache_fetched);
: tiering::OpManager{max_size}, ts_{ts}, db_slice_{*db_slice} {
memory_margin_ = absl::GetFlag(FLAGS_tiered_storage_memory_margin);
}

// Clear IO pending flag for entry
void ClearIoPending(OpManager::KeyRef key) {
if (auto pv = Find(key); pv) {
pv->SetIoPending(false);
pv->SetStashPending(false);
stats_.total_cancels++;
}
}
Expand All @@ -99,14 +99,14 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
}

DbTableStats* GetDbTableStats(DbIndex dbid) {
return db_slice_->MutableStats(dbid);
return db_slice_.MutableStats(dbid);
}

private:
PrimeValue* Find(OpManager::KeyRef key) {
// TODO: Get DbContext for transaction for correct dbid and time
// Bypass all update and stat mechanisms
auto it = db_slice_->GetDBTable(key.first)->prime.Find(key.second);
auto it = db_slice_.GetDBTable(key.first)->prime.Find(key.second);
return IsValid(it) ? &it->second : nullptr;
}

Expand Down Expand Up @@ -141,7 +141,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
if (auto pv = Find(key); pv) {
RecordAdded(*pv, segment.length, GetDbTableStats(key.first));

pv->SetIoPending(false);
pv->SetStashPending(false);
pv->SetExternal(segment.offset, segment.length);

stats_.total_stashes++;
Expand All @@ -154,15 +154,19 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
SetExternal({sub_dbid, sub_key}, sub_segment);
}

bool cache_fetched_ = false;
bool HasEnoughMemoryMargin(int64_t value_len) {
return db_slice_.memory_budget() - memory_margin_ - value_len > 0;
}

int64_t memory_margin_ = 0;

struct {
size_t total_stashes = 0, total_cancels = 0, total_fetches = 0;
size_t total_defrags = 0;
} stats_;

TieredStorage* ts_;
DbSlice* db_slice_;
DbSlice& db_slice_;
};

void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, string_view page) {
Expand All @@ -173,7 +177,7 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str
auto predicate = [item_segment](const PrimeKey& key, const PrimeValue& probe) {
return probe.IsExternal() && tiering::DiskSegment{probe.GetExternalSlice()} == item_segment;
};
auto it = db_slice_->GetDBTable(dbid)->prime.FindFirst(hash, predicate);
auto it = db_slice_.GetDBTable(dbid)->prime.FindFirst(hash, predicate);
if (!IsValid(it))
continue;

Expand All @@ -200,7 +204,8 @@ bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, string_view value,
// the snapshotting.
// TODO: to revisit this when we rewrite it with more efficient snapshotting algorithm.

bool should_upload = modified || (cache_fetched_ && !SliceSnapshot::IsSnaphotInProgress());
bool should_upload =
modified || (HasEnoughMemoryMargin(value.size()) && !SliceSnapshot::IsSnaphotInProgress());

if (!should_upload)
return false;
Expand Down Expand Up @@ -332,7 +337,7 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) {
}

StringOrView raw_string = value->GetRawString();
value->SetIoPending(true);
value->SetStashPending(true);

tiering::OpManager::EntryId id;
error_code ec;
Expand Down Expand Up @@ -364,13 +369,13 @@ void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) {
}

void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value) {
DCHECK(value->HasIoPending());
DCHECK(value->HasStashPending());
if (OccupiesWholePages(value->Size())) {
op_manager_->Delete(KeyRef(dbid, key));
} else if (auto bin = bins_->Delete(dbid, key); bin) {
op_manager_->Delete(*bin);
}
value->SetIoPending(false);
value->SetStashPending(false);
}

float TieredStorage::WriteDepthUsage() const {
Expand Down Expand Up @@ -423,11 +428,18 @@ void TieredStorage::RunOffloading(DbIndex dbid) {
disk_stats.max_file_size)
return;

auto cb = [this, dbid, tmp = std::string{}](PrimeIterator it) mutable {
TryStash(dbid, it->first.GetSlice(&tmp), &it->second);
string tmp;
auto cb = [this, dbid, &tmp](PrimeIterator it) mutable {
if (ShouldStash(it->second)) {
if (it->first.WasTouched()) {
it->first.SetTouched(false);
} else {
TryStash(dbid, it->first.GetSlice(&tmp), &it->second);
}
}
};

PrimeTable& table = op_manager_->db_slice_->GetDBTable(dbid)->prime;
PrimeTable& table = op_manager_->db_slice_.GetDBTable(dbid)->prime;
PrimeTable::Cursor start_cursor{};

// Loop while we haven't traversed all entries or reached our stash io device limit.
Expand All @@ -442,7 +454,7 @@ void TieredStorage::RunOffloading(DbIndex dbid) {

bool TieredStorage::ShouldStash(const PrimeValue& pv) const {
auto disk_stats = op_manager_->GetStats().disk_stats;
return !pv.IsExternal() && !pv.HasIoPending() && pv.ObjType() == OBJ_STRING &&
return !pv.IsExternal() && !pv.HasStashPending() && pv.ObjType() == OBJ_STRING &&
pv.Size() >= kMinValueSize &&
disk_stats.allocated_bytes + tiering::kPageSize + pv.Size() < disk_stats.max_file_size;
}
Expand Down
4 changes: 0 additions & 4 deletions src/server/tiered_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ using namespace testing;

ABSL_DECLARE_FLAG(bool, force_epoll);
ABSL_DECLARE_FLAG(string, tiered_prefix);
ABSL_DECLARE_FLAG(bool, tiered_storage_cache_fetched);
ABSL_DECLARE_FLAG(bool, backing_file_direct);
ABSL_DECLARE_FLAG(float, tiered_offload_threshold);
ABSL_DECLARE_FLAG(unsigned, tiered_storage_write_depth);

Expand Down Expand Up @@ -53,8 +51,6 @@ class TieredStorageTest : public BaseFamilyTest {
if (GetFlag(FLAGS_tiered_prefix).empty()) {
SetFlag(&FLAGS_tiered_prefix, "/tmp/tiered_storage_test");
}
SetFlag(&FLAGS_tiered_storage_cache_fetched, true);
SetFlag(&FLAGS_backing_file_direct, true);

BaseFamilyTest::SetUp();
}
Expand Down
1 change: 1 addition & 0 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <absl/strings/match.h>

#include "base/flags.h"
#include "base/logging.h"
#include "facade/op_status.h"
#include "redis/redis_aux.h"
Expand Down

0 comments on commit 59540d7

Please sign in to comment.