Skip to content

Commit

Permalink
chore: Cancel pending Stashes upon lookup
Browse files Browse the repository at this point in the history
Also implement hot/cold 1-bit replacement strategy.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Jul 17, 2024
1 parent 878c690 commit 01032a6
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 31 deletions.
11 changes: 6 additions & 5 deletions src/core/compact_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,21 +230,21 @@ class CompactObj {
return mask_ & TOUCHED;
}

void SetTouched(bool e) {
void SetTouched(bool e) const {
if (e) {
mask_ |= TOUCHED;
} else {
mask_ &= ~TOUCHED;
}
}

bool HasIoPending() const {
bool DefragIfNeeded(float ratio);

bool HasStashPending() const {
return mask_ & IO_PENDING;
}

bool DefragIfNeeded(float ratio);

void SetIoPending(bool b) {
void SetStashPending(bool b) const {
if (b) {
mask_ |= IO_PENDING;
} else {
Expand Down Expand Up @@ -451,6 +451,7 @@ class CompactObj {
//
static_assert(sizeof(u_) == 16, "");

// We allow io pending, touched bits to be modified on constant objects.
mutable uint8_t mask_ = 0;

// We currently reserve 5 bits for tags and 3 bits for extending the mask. currently reserved.
Expand Down
1 change: 1 addition & 0 deletions src/server/acl/acl_log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "base/flags.h"
#include "base/logging.h"
#include "facade/dragonfly_connection.h"
#include "server/conn_context.h"

ABSL_FLAG(size_t, acllog_max_len, 32,
"Specify the number of log entries. Logs are kept locally for each thread "
Expand Down
10 changes: 5 additions & 5 deletions src/server/acl/acl_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@
#include <deque>
#include <string>

#include "base/flags.h"
#include "server/conn_context.h"
namespace dfly {

ABSL_DECLARE_FLAG(size_t, acllog_max_len);
class ConnectionContext;

namespace dfly::acl {
namespace acl {

class AclLog {
public:
Expand Down Expand Up @@ -49,4 +48,5 @@ class AclLog {
size_t total_entries_allowed_;
};

} // namespace dfly::acl
} // namespace acl
} // namespace dfly
9 changes: 5 additions & 4 deletions src/server/acl/validator.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@

#include <utility>

#include "facade/facade_types.h"
#include "server/acl/acl_log.h"
#include "server/command_registry.h"
#include "server/conn_context.h"

namespace dfly::acl {

class AclKeys;

std::pair<bool, AclLog::Reason> IsUserAllowedToInvokeCommandGeneric(
const std::vector<uint64_t>& acl_commands, const AclKeys& keys, CmdArgList tail_args,
const std::vector<uint64_t>& acl_commands, const AclKeys& keys, facade::CmdArgList tail_args,
const CommandId& id);

bool IsUserAllowedToInvokeCommand(const ConnectionContext& cntx, const CommandId& id,
CmdArgList tail_args);

facade::CmdArgList tail_args);
} // namespace dfly::acl
6 changes: 4 additions & 2 deletions src/server/cluster/incoming_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

#include "server/cluster/incoming_slot_migration.h"

#include "absl/cleanup/cleanup.h"
#include "absl/strings/str_cat.h"
#include <absl/cleanup/cleanup.h>
#include <absl/strings/str_cat.h>

#include "base/flags.h"
#include "base/logging.h"
#include "cluster_utility.h"
#include "server/error.h"
Expand Down
7 changes: 5 additions & 2 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,9 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
fetched_items_.insert(res.it->first.AsRef());
}

// Mark this entry as being looked up.
res.it->first.SetTouched(true);

db.top_keys.Touch(key);

std::move(update_stats_on_miss).Cancel();
Expand Down Expand Up @@ -979,7 +982,7 @@ void DbSlice::PreUpdate(DbIndex db_ind, Iterator it, std::string_view key) {
// If the value has a pending stash, cancel it before any modification are applied.
// Note: we don't delete offloaded values before updates, because a read-modify operation (like
// append) can be applied instead of a full overwrite. Deleting is reponsibility of the commands
if (it.IsOccupied() && it->second.HasIoPending()) {
if (it.IsOccupied() && it->second.HasStashPending()) {
owner_->tiered_storage()->CancelStash(db_ind, key, &it->second);
}

Expand Down Expand Up @@ -1401,7 +1404,7 @@ void DbSlice::ClearOffloadedEntries(absl::Span<const DbIndex> indices, const DbT
cursor = db_ptr->prime.Traverse(cursor, [&](PrimeIterator it) {
if (it->second.IsExternal()) {
tiered_storage->Delete(index, &it->second);
} else if (it->second.HasIoPending()) {
} else if (it->second.HasStashPending()) {
tiered_storage->CancelStash(index, it->first.GetSlice(&scratch), &it->second);
}
});
Expand Down
13 changes: 12 additions & 1 deletion src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ void CopyValueToBuffer(const PrimeValue& pv, char* dest) {

string GetString(const PrimeValue& pv) {
string res;
DCHECK_EQ(pv.ObjType(), OBJ_STRING);

if (pv.ObjType() != OBJ_STRING)
return res;
res.resize(pv.Size());
Expand Down Expand Up @@ -453,6 +455,10 @@ SinkReplyBuilder::MGetResponse OpMGet(util::fb2::BlockingCounter wait_bc, bool f
} else {
db_slice.IncRamHits();
CopyValueToBuffer(it->second, next);
it->first.SetTouched(true);
if (it->second.HasStashPending()) {
shard->tiered_storage()->CancelStash(index, it.key(), &it->second);
}
}

size_t size = it->second.Size();
Expand Down Expand Up @@ -535,11 +541,16 @@ struct GetReplies {

StringValue StringValue::Read(DbIndex dbid, string_view key, const PrimeValue& pv,
DbSlice* db_slice) {
auto* ts = db_slice->shard_owner()->tiered_storage();
if (pv.IsExternal()) {
db_slice->IncRamMisses();
return StringValue{db_slice->shard_owner()->tiered_storage()->Read(dbid, key, pv)};
return StringValue{ts->Read(dbid, key, pv)};
}

db_slice->IncRamHits();
if (pv.HasStashPending()) {
ts->CancelStash(dbid, key, &pv);
}
return StringValue(GetString(pv));
}

Expand Down
2 changes: 1 addition & 1 deletion src/server/test_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ using namespace std;
ABSL_DECLARE_FLAG(string, dbfilename);
ABSL_DECLARE_FLAG(uint32_t, num_shards);
ABSL_FLAG(bool, force_epoll, false, "If true, uses epoll api instead iouring to run tests");

ABSL_DECLARE_FLAG(size_t, acllog_max_len);
namespace dfly {

std::ostream& operator<<(std::ostream& os, const DbStats& stats) {
Expand Down
25 changes: 16 additions & 9 deletions src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
// Clear IO pending flag for entry
void ClearIoPending(OpManager::KeyRef key) {
if (auto pv = Find(key); pv) {
pv->SetIoPending(false);
pv->SetStashPending(false);
stats_.total_cancels++;
}
}
Expand Down Expand Up @@ -143,7 +143,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
if (auto pv = Find(key); pv) {
RecordAdded(*pv, segment.length, GetDbTableStats(key.first));

pv->SetIoPending(false);
pv->SetStashPending(false);
pv->SetExternal(segment.offset, segment.length);

stats_.total_stashes++;
Expand Down Expand Up @@ -339,7 +339,7 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) {
}

StringOrView raw_string = value->GetRawString();
value->SetIoPending(true);
value->SetStashPending(true);

tiering::OpManager::EntryId id;
error_code ec;
Expand Down Expand Up @@ -370,14 +370,14 @@ void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) {
RecordDeleted(*value, segment.length, op_manager_->GetDbTableStats(dbid));
}

void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value) {
DCHECK(value->HasIoPending());
void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, const PrimeValue* value) {
DCHECK(value->HasStashPending());
if (OccupiesWholePages(value->Size())) {
op_manager_->Delete(KeyRef(dbid, key));
} else if (auto bin = bins_->Delete(dbid, key); bin) {
op_manager_->Delete(*bin);
}
value->SetIoPending(false);
value->SetStashPending(false);
}

float TieredStorage::WriteDepthUsage() const {
Expand Down Expand Up @@ -430,8 +430,15 @@ void TieredStorage::RunOffloading(DbIndex dbid) {
disk_stats.max_file_size)
return;

auto cb = [this, dbid, tmp = std::string{}](PrimeIterator it) mutable {
TryStash(dbid, it->first.GetSlice(&tmp), &it->second);
string tmp;
auto cb = [this, dbid, &tmp](PrimeIterator it) mutable {
if (ShouldStash(it->second)) {
if (it->first.WasTouched()) {
it->first.SetTouched(false);
} else {
TryStash(dbid, it->first.GetSlice(&tmp), &it->second);
}
}
};

PrimeTable& table = op_manager_->db_slice_.GetDBTable(dbid)->prime;
Expand All @@ -449,7 +456,7 @@ void TieredStorage::RunOffloading(DbIndex dbid) {

bool TieredStorage::ShouldStash(const PrimeValue& pv) const {
auto disk_stats = op_manager_->GetStats().disk_stats;
return !pv.IsExternal() && !pv.HasIoPending() && pv.ObjType() == OBJ_STRING &&
return !pv.IsExternal() && !pv.HasStashPending() && pv.ObjType() == OBJ_STRING &&
pv.Size() >= kMinValueSize &&
disk_stats.allocated_bytes + tiering::kPageSize + pv.Size() < disk_stats.max_file_size;
}
Expand Down
4 changes: 2 additions & 2 deletions src/server/tiered_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class TieredStorage {
void Delete(DbIndex dbid, PrimeValue* value);

// Cancel pending stash for value, must have IO_PENDING flag set
void CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value);
void CancelStash(DbIndex dbid, std::string_view key, const PrimeValue* value);

// Percentage (0-1) of currently used storage_write_depth for ongoing stashes
float WriteDepthUsage() const;
Expand Down Expand Up @@ -139,7 +139,7 @@ class TieredStorage {
void Delete(PrimeValue* value) {
}

void CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value) {
void CancelStash(DbIndex dbid, std::string_view key, const PrimeValue* value) {
}

bool ShouldStash(const PrimeValue& pv) const {
Expand Down
1 change: 1 addition & 0 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <absl/strings/match.h>

#include "base/flags.h"
#include "base/logging.h"
#include "facade/op_status.h"
#include "redis/redis_aux.h"
Expand Down

0 comments on commit 01032a6

Please sign in to comment.