Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(tiering): add protection against overruning memory budget #3327

Merged
merged 1 commit into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/core/compact_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,13 @@ class CompactObj {
}
}

bool HasIoPending() const {
bool DefragIfNeeded(float ratio);

bool HasStashPending() const {
return mask_ & IO_PENDING;
}

bool DefragIfNeeded(float ratio);

void SetIoPending(bool b) {
void SetStashPending(bool b) {
if (b) {
mask_ |= IO_PENDING;
} else {
Expand Down Expand Up @@ -451,7 +451,7 @@ class CompactObj {
//
static_assert(sizeof(u_) == 16, "");

mutable uint8_t mask_ = 0;
uint8_t mask_ = 0;

// We currently reserve 5 bits for tags and 3 bits for extending the mask. currently reserved.
uint8_t taglen_ = 0;
Expand Down
9 changes: 5 additions & 4 deletions src/server/acl/acl_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@

#include "server/acl/acl_family.h"

#include "absl/container/flat_hash_map.h"
#include "absl/flags/internal/flag.h"
#include "absl/strings/ascii.h"
#include "absl/strings/str_cat.h"
#include <absl/container/flat_hash_map.h>
#include <absl/strings/ascii.h>
#include <absl/strings/str_cat.h>

#include "base/flags.h"
#include "base/gtest.h"
#include "base/logging.h"
#include "facade/facade_test.h"
Expand Down
1 change: 1 addition & 0 deletions src/server/acl/acl_log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "base/flags.h"
#include "base/logging.h"
#include "facade/dragonfly_connection.h"
#include "server/conn_context.h"

ABSL_FLAG(size_t, acllog_max_len, 32,
"Specify the number of log entries. Logs are kept locally for each thread "
Expand Down
10 changes: 5 additions & 5 deletions src/server/acl/acl_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@
#include <deque>
#include <string>

#include "base/flags.h"
#include "server/conn_context.h"
namespace dfly {

ABSL_DECLARE_FLAG(size_t, acllog_max_len);
class ConnectionContext;

namespace dfly::acl {
namespace acl {

class AclLog {
public:
Expand Down Expand Up @@ -49,4 +48,5 @@ class AclLog {
size_t total_entries_allowed_;
};

} // namespace dfly::acl
} // namespace acl
} // namespace dfly
9 changes: 5 additions & 4 deletions src/server/acl/validator.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@

#include <utility>

#include "facade/facade_types.h"
#include "server/acl/acl_log.h"
#include "server/command_registry.h"
#include "server/conn_context.h"

namespace dfly::acl {

class AclKeys;

std::pair<bool, AclLog::Reason> IsUserAllowedToInvokeCommandGeneric(
const std::vector<uint64_t>& acl_commands, const AclKeys& keys, CmdArgList tail_args,
const std::vector<uint64_t>& acl_commands, const AclKeys& keys, facade::CmdArgList tail_args,
const CommandId& id);

bool IsUserAllowedToInvokeCommand(const ConnectionContext& cntx, const CommandId& id,
CmdArgList tail_args);

facade::CmdArgList tail_args);
} // namespace dfly::acl
6 changes: 4 additions & 2 deletions src/server/cluster/incoming_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

#include "server/cluster/incoming_slot_migration.h"

#include "absl/cleanup/cleanup.h"
#include "absl/strings/str_cat.h"
#include <absl/cleanup/cleanup.h>
#include <absl/strings/str_cat.h>

#include "base/flags.h"
#include "base/logging.h"
#include "cluster_utility.h"
#include "server/error.h"
Expand Down
26 changes: 18 additions & 8 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,18 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
fetched_items_.insert(res.it->first.AsRef());
}

// If the value has a pending stash, cancel it before any modification are applied.
// Rationale: we either look it up for reads - and then it's hot, or alternatively,
// we follow up with modifications during mutation operations, and in that case storing on disk
// does not make much sense.
if (res.it->second.HasStashPending()) {
owner_->tiered_storage()->CancelStash(cntx.db_index, key, &res.it->second);
}

// Mark this entry as being looked up. We use key (first) deliberately to preserve the hotness
// attribute of the entry in case of value overtides.
res.it->first.SetTouched(true);

db.top_keys.Touch(key);

std::move(update_stats_on_miss).Cancel();
Expand All @@ -491,6 +503,11 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
if (cluster::IsClusterEnabled()) {
db.slots_stats[cluster::KeySlot(key)].total_reads++;
}
if (res.it->second.IsExternal()) {
events_.ram_misses++;
} else {
events_.ram_hits++;
}
break;
}
return res;
Expand Down Expand Up @@ -976,13 +993,6 @@ void DbSlice::PreUpdate(DbIndex db_ind, Iterator it, std::string_view key) {
DVLOG(2) << "Running callbacks in dbid " << db_ind;
CallChangeCallbacks(db_ind, ChangeReq{it.GetInnerIt()});

// If the value has a pending stash, cancel it before any modification are applied.
// Note: we don't delete offloaded values before updates, because a read-modify operation (like
// append) can be applied instead of a full overwrite. Deleting is reponsibility of the commands
if (it.IsOccupied() && it->second.HasIoPending()) {
owner_->tiered_storage()->CancelStash(db_ind, key, &it->second);
}

it.GetInnerIt().SetVersion(NextVersion());
}

Expand Down Expand Up @@ -1401,7 +1411,7 @@ void DbSlice::ClearOffloadedEntries(absl::Span<const DbIndex> indices, const DbT
cursor = db_ptr->prime.Traverse(cursor, [&](PrimeIterator it) {
if (it->second.IsExternal()) {
tiered_storage->Delete(index, &it->second);
} else if (it->second.HasIoPending()) {
} else if (it->second.HasStashPending()) {
tiered_storage->CancelStash(index, it->first.GetSlice(&scratch), &it->second);
}
});
Expand Down
2 changes: 2 additions & 0 deletions src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ void CopyValueToBuffer(const PrimeValue& pv, char* dest) {

string GetString(const PrimeValue& pv) {
string res;
DCHECK_EQ(pv.ObjType(), OBJ_STRING);

if (pv.ObjType() != OBJ_STRING)
return res;
res.resize(pv.Size());
Expand Down
2 changes: 1 addition & 1 deletion src/server/test_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ using namespace std;
ABSL_DECLARE_FLAG(string, dbfilename);
ABSL_DECLARE_FLAG(uint32_t, num_shards);
ABSL_FLAG(bool, force_epoll, false, "If true, uses epoll api instead iouring to run tests");

ABSL_DECLARE_FLAG(size_t, acllog_max_len);
namespace dfly {

std::ostream& operator<<(std::ostream& os, const DbStats& stats) {
Expand Down
56 changes: 34 additions & 22 deletions src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@
#include "base/logging.h"
#include "server/common.h"
#include "server/db_slice.h"
#include "server/engine_shard_set.h"
#include "server/snapshot.h"
#include "server/table.h"
#include "server/tiering/common.h"
#include "server/tiering/op_manager.h"
#include "server/tiering/small_bins.h"
#include "server/tx_base.h"

ABSL_FLAG(bool, tiered_storage_cache_fetched, true,
"WIP: Load results of offloaded reads to memory");
using namespace facade;

ABSL_FLAG(uint32_t, tiered_storage_memory_margin, 1_MB,
"In bytes. If memory budget on a shard goes blow this limit, tiering stops "
"hot-loading values into ram.");

ABSL_FLAG(unsigned, tiered_storage_write_depth, 50,
"Maximum number of concurrent stash requests issued by background offload");
Expand All @@ -36,8 +38,6 @@ namespace dfly {
using namespace std;
using namespace util;

using namespace tiering::literals;

using KeyRef = tiering::OpManager::KeyRef;

namespace {
Expand Down Expand Up @@ -80,14 +80,14 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {

public:
ShardOpManager(TieredStorage* ts, DbSlice* db_slice, size_t max_size)
: tiering::OpManager{max_size}, ts_{ts}, db_slice_{db_slice} {
cache_fetched_ = absl::GetFlag(FLAGS_tiered_storage_cache_fetched);
: tiering::OpManager{max_size}, ts_{ts}, db_slice_{*db_slice} {
memory_margin_ = absl::GetFlag(FLAGS_tiered_storage_memory_margin);
}

// Clear IO pending flag for entry
void ClearIoPending(OpManager::KeyRef key) {
if (auto pv = Find(key); pv) {
pv->SetIoPending(false);
pv->SetStashPending(false);
stats_.total_cancels++;
}
}
Expand All @@ -99,14 +99,14 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
}

DbTableStats* GetDbTableStats(DbIndex dbid) {
return db_slice_->MutableStats(dbid);
return db_slice_.MutableStats(dbid);
}

private:
PrimeValue* Find(OpManager::KeyRef key) {
// TODO: Get DbContext for transaction for correct dbid and time
// Bypass all update and stat mechanisms
auto it = db_slice_->GetDBTable(key.first)->prime.Find(key.second);
auto it = db_slice_.GetDBTable(key.first)->prime.Find(key.second);
return IsValid(it) ? &it->second : nullptr;
}

Expand Down Expand Up @@ -141,7 +141,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
if (auto pv = Find(key); pv) {
RecordAdded(*pv, segment.length, GetDbTableStats(key.first));

pv->SetIoPending(false);
pv->SetStashPending(false);
pv->SetExternal(segment.offset, segment.length);

stats_.total_stashes++;
Expand All @@ -154,15 +154,19 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
SetExternal({sub_dbid, sub_key}, sub_segment);
}

bool cache_fetched_ = false;
bool HasEnoughMemoryMargin(int64_t value_len) {
return db_slice_.memory_budget() - memory_margin_ - value_len > 0;
}

int64_t memory_margin_ = 0;

struct {
size_t total_stashes = 0, total_cancels = 0, total_fetches = 0;
size_t total_defrags = 0;
} stats_;

TieredStorage* ts_;
DbSlice* db_slice_;
DbSlice& db_slice_;
};

void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, string_view page) {
Expand All @@ -173,7 +177,7 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str
auto predicate = [item_segment](const PrimeKey& key, const PrimeValue& probe) {
return probe.IsExternal() && tiering::DiskSegment{probe.GetExternalSlice()} == item_segment;
};
auto it = db_slice_->GetDBTable(dbid)->prime.FindFirst(hash, predicate);
auto it = db_slice_.GetDBTable(dbid)->prime.FindFirst(hash, predicate);
if (!IsValid(it))
continue;

Expand All @@ -200,7 +204,8 @@ bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, string_view value,
// the snapshotting.
// TODO: to revisit this when we rewrite it with more efficient snapshotting algorithm.

bool should_upload = modified || (cache_fetched_ && !SliceSnapshot::IsSnaphotInProgress());
bool should_upload =
modified || (HasEnoughMemoryMargin(value.size()) && !SliceSnapshot::IsSnaphotInProgress());

if (!should_upload)
return false;
Expand Down Expand Up @@ -332,7 +337,7 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) {
}

StringOrView raw_string = value->GetRawString();
value->SetIoPending(true);
value->SetStashPending(true);

tiering::OpManager::EntryId id;
error_code ec;
Expand Down Expand Up @@ -364,13 +369,13 @@ void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) {
}

void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value) {
DCHECK(value->HasIoPending());
DCHECK(value->HasStashPending());
if (OccupiesWholePages(value->Size())) {
op_manager_->Delete(KeyRef(dbid, key));
} else if (auto bin = bins_->Delete(dbid, key); bin) {
op_manager_->Delete(*bin);
}
value->SetIoPending(false);
value->SetStashPending(false);
}

float TieredStorage::WriteDepthUsage() const {
Expand Down Expand Up @@ -423,11 +428,18 @@ void TieredStorage::RunOffloading(DbIndex dbid) {
disk_stats.max_file_size)
return;

auto cb = [this, dbid, tmp = std::string{}](PrimeIterator it) mutable {
TryStash(dbid, it->first.GetSlice(&tmp), &it->second);
string tmp;
auto cb = [this, dbid, &tmp](PrimeIterator it) mutable {
if (ShouldStash(it->second)) {
if (it->first.WasTouched()) {
it->first.SetTouched(false);
} else {
TryStash(dbid, it->first.GetSlice(&tmp), &it->second);
}
}
romange marked this conversation as resolved.
Show resolved Hide resolved
};

PrimeTable& table = op_manager_->db_slice_->GetDBTable(dbid)->prime;
PrimeTable& table = op_manager_->db_slice_.GetDBTable(dbid)->prime;
PrimeTable::Cursor start_cursor{};

// Loop while we haven't traversed all entries or reached our stash io device limit.
Expand All @@ -442,7 +454,7 @@ void TieredStorage::RunOffloading(DbIndex dbid) {

bool TieredStorage::ShouldStash(const PrimeValue& pv) const {
auto disk_stats = op_manager_->GetStats().disk_stats;
return !pv.IsExternal() && !pv.HasIoPending() && pv.ObjType() == OBJ_STRING &&
return !pv.IsExternal() && !pv.HasStashPending() && pv.ObjType() == OBJ_STRING &&
pv.Size() >= kMinValueSize &&
disk_stats.allocated_bytes + tiering::kPageSize + pv.Size() < disk_stats.max_file_size;
}
Expand Down
4 changes: 0 additions & 4 deletions src/server/tiered_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ using namespace testing;

ABSL_DECLARE_FLAG(bool, force_epoll);
ABSL_DECLARE_FLAG(string, tiered_prefix);
ABSL_DECLARE_FLAG(bool, tiered_storage_cache_fetched);
ABSL_DECLARE_FLAG(bool, backing_file_direct);
ABSL_DECLARE_FLAG(float, tiered_offload_threshold);
ABSL_DECLARE_FLAG(unsigned, tiered_storage_write_depth);

Expand Down Expand Up @@ -53,8 +51,6 @@ class TieredStorageTest : public BaseFamilyTest {
if (GetFlag(FLAGS_tiered_prefix).empty()) {
SetFlag(&FLAGS_tiered_prefix, "/tmp/tiered_storage_test");
}
SetFlag(&FLAGS_tiered_storage_cache_fetched, true);
SetFlag(&FLAGS_backing_file_direct, true);

BaseFamilyTest::SetUp();
}
Expand Down
1 change: 1 addition & 0 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <absl/strings/match.h>

#include "base/flags.h"
#include "base/logging.h"
#include "facade/op_status.h"
#include "redis/redis_aux.h"
Expand Down
Loading