Skip to content

Commit

Permalink
Merge branch 'main' into kpr7
Browse files Browse the repository at this point in the history
  • Loading branch information
kostasrim committed Aug 13, 2024
2 parents a890049 + db7bd06 commit 9b82757
Show file tree
Hide file tree
Showing 15 changed files with 178 additions and 118 deletions.
5 changes: 0 additions & 5 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -361,11 +361,6 @@ class UniquePicksGenerator : public PicksGenerator {
absl::BitGen bitgen_{};
};

struct ConditionFlag {
util::fb2::CondVarAny cond_var;
bool flag = false;
};

// Helper class used to guarantee atomicity between serialization of buckets
class ThreadLocalMutex {
public:
Expand Down
30 changes: 24 additions & 6 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT
if (auto journal = db_slice_->shard_owner()->journal(); journal) {
RecordExpiry(cntx_.db_index, key);
}

// Safe we already acquired std::unique_lock lk(db_slice_->GetSerializationMutex());
// on the flows that call this function
db_slice_->PerformDeletion(DbSlice::Iterator(last_slot_it, StringOrView::FromView(key)), table);

++evicted_;
Expand Down Expand Up @@ -479,6 +480,8 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:

if (caching_mode_ && IsValid(res.it)) {
if (!change_cb_.empty()) {
FetchedItemsRestorer fetched_restorer(&fetched_items_);
std::unique_lock lk(local_mu_);
auto bump_cb = [&](PrimeTable::bucket_iterator bit) {
CallChangeCallbacks(cntx.db_index, key, bit);
};
Expand Down Expand Up @@ -570,6 +573,9 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
auto status = res.status();
CHECK(status == OpStatus::KEY_NOTFOUND || status == OpStatus::OUT_OF_MEMORY) << status;

FetchedItemsRestorer fetched_restorer(&fetched_items_);
std::unique_lock lk(local_mu_);

// It's a new entry.
CallChangeCallbacks(cntx.db_index, key, {key});

Expand Down Expand Up @@ -682,6 +688,8 @@ void DbSlice::ActivateDb(DbIndex db_ind) {
}

bool DbSlice::Del(Context cntx, Iterator it) {
std::unique_lock lk(local_mu_);

if (!IsValid(it)) {
return false;
}
Expand Down Expand Up @@ -801,6 +809,10 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
}

void DbSlice::FlushDb(DbIndex db_ind) {
// We should not flush if serialization of a big value is in progress because this
// could lead to UB or assertion failures (while DashTable::Traverse is iterating over
// a logical bucket).
std::unique_lock lk(local_mu_);
// clear client tracking map.
client_tracking_map_.clear();

Expand All @@ -822,6 +834,7 @@ void DbSlice::FlushDb(DbIndex db_ind) {
}

void DbSlice::AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) {
std::unique_lock lk(local_mu_);
uint64_t delta = at - expire_base_[0]; // TODO: employ multigen expire updates.
auto& db = *db_arr_[db_ind];
size_t table_before = db.expire.mem_usage();
Expand All @@ -831,6 +844,7 @@ void DbSlice::AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) {
}

bool DbSlice::RemoveExpire(DbIndex db_ind, Iterator main_it) {
std::unique_lock lk(local_mu_);
if (main_it->second.HasExpire()) {
auto& db = *db_arr_[db_ind];
size_t table_before = db.expire.mem_usage();
Expand Down Expand Up @@ -1052,6 +1066,8 @@ bool DbSlice::CheckLock(IntentLock::Mode mode, DbIndex dbid, uint64_t fp) const
}

void DbSlice::PreUpdate(DbIndex db_ind, Iterator it, std::string_view key) {
FetchedItemsRestorer fetched_restorer(&fetched_items_);
std::unique_lock lk(local_mu_);
CallChangeCallbacks(db_ind, key, ChangeReq{it.GetInnerIt()});
it.GetInnerIt().SetVersion(NextVersion());
}
Expand Down Expand Up @@ -1219,13 +1235,13 @@ auto DbSlice::DeleteExpiredStep(const Context& cntx, unsigned count) -> DeleteEx

unsigned i = 0;
for (; i < count / 3; ++i) {
db.expire_cursor = Traverse(&db.expire, db.expire_cursor, cb);
db.expire_cursor = db.expire.Traverse(db.expire_cursor, cb);
}

// continue traversing only if we had strong deletion rate based on the first sample.
if (result.deleted * 4 > result.traversed) {
for (; i < count; ++i) {
db.expire_cursor = Traverse(&db.expire, db.expire_cursor, cb);
db.expire_cursor = db.expire.Traverse(db.expire_cursor, cb);
}
}

Expand Down Expand Up @@ -1344,10 +1360,14 @@ void DbSlice::CreateDb(DbIndex db_ind) {

void DbSlice::RegisterWatchedKey(DbIndex db_indx, std::string_view key,
ConnectionState::ExecInfo* exec_info) {
// Because we might insert while another fiber is preempted
std::unique_lock lk(local_mu_);
db_arr_[db_indx]->watched_keys[key].push_back(exec_info);
}

void DbSlice::UnregisterConnectionWatches(const ConnectionState::ExecInfo* exec_info) {
// Because we might remove while another fiber is preempted and miss a notification
std::unique_lock lk(local_mu_);
for (const auto& [db_indx, key] : exec_info->watched_keys) {
auto& watched_keys = db_arr_[db_indx]->watched_keys;
if (auto it = watched_keys.find(key); it != watched_keys.end()) {
Expand Down Expand Up @@ -1391,7 +1411,7 @@ void DbSlice::ClearOffloadedEntries(absl::Span<const DbIndex> indices, const DbT
// Delete all tiered entries
PrimeTable::Cursor cursor;
do {
cursor = Traverse(&db_ptr->prime, cursor, [&](PrimeIterator it) {
cursor = db_ptr->prime.Traverse(cursor, [&](PrimeIterator it) {
if (it->second.IsExternal()) {
tiered_storage->Delete(index, &it->second);
} else if (it->second.HasStashPending()) {
Expand Down Expand Up @@ -1523,8 +1543,6 @@ void DbSlice::CallChangeCallbacks(DbIndex id, std::string_view key, const Change
return;

DVLOG(2) << "Running callbacks for key " << key << " in dbid " << id;
FetchedItemsRestorer fetched_restorer(&fetched_items_);
std::unique_lock lk(local_mu_);

const size_t limit = change_cb_.size();
auto ccb = change_cb_.begin();
Expand Down
40 changes: 21 additions & 19 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,46 +305,50 @@ class DbSlice {
AddOrFindResult& operator=(ItAndUpdater&& o);
};

OpResult<AddOrFindResult> AddOrFind(const Context& cntx, std::string_view key);
OpResult<AddOrFindResult> AddOrFind(const Context& cntx, std::string_view key)
ABSL_LOCKS_EXCLUDED(local_mu_);

// Same as AddOrSkip, but overwrites in case entry exists.
OpResult<AddOrFindResult> AddOrUpdate(const Context& cntx, std::string_view key, PrimeValue obj,
uint64_t expire_at_ms);
uint64_t expire_at_ms) ABSL_LOCKS_EXCLUDED(local_mu_);

// Adds a new entry. Requires: key does not exist in this slice.
// Returns the iterator to the newly added entry.
// Returns OpStatus::OUT_OF_MEMORY if bad_alloc is thrown
OpResult<ItAndUpdater> AddNew(const Context& cntx, std::string_view key, PrimeValue obj,
uint64_t expire_at_ms);
uint64_t expire_at_ms) ABSL_LOCKS_EXCLUDED(local_mu_);

// Update entry expiration. Return epxiration timepoint in abs milliseconds, or -1 if the entry
// already expired and was deleted;
facade::OpResult<int64_t> UpdateExpire(const Context& cntx, Iterator prime_it, ExpIterator exp_it,
const ExpireParams& params);
const ExpireParams& params) ABSL_LOCKS_EXCLUDED(local_mu_);

// Adds expiry information.
void AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at);
void AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) ABSL_LOCKS_EXCLUDED(local_mu_);

// Removes the corresponing expiry information if exists.
// Returns true if expiry existed (and removed).
bool RemoveExpire(DbIndex db_ind, Iterator main_it);
bool RemoveExpire(DbIndex db_ind, Iterator main_it) ABSL_LOCKS_EXCLUDED(local_mu_);

// Either adds or removes (if at == 0) expiry. Returns true if a change was made.
// Does not change expiry if at != 0 and expiry already exists.
bool UpdateExpire(DbIndex db_ind, Iterator main_it, uint64_t at);
bool UpdateExpire(DbIndex db_ind, Iterator main_it, uint64_t at) ABSL_LOCKS_EXCLUDED(local_mu_);

void SetMCFlag(DbIndex db_ind, PrimeKey key, uint32_t flag);
uint32_t GetMCFlag(DbIndex db_ind, const PrimeKey& key) const;

// Creates a database with index `db_ind`. If such database exists does nothing.
void ActivateDb(DbIndex db_ind);

bool Del(Context cntx, Iterator it);
// Delete a key referred by its iterator.
void PerformDeletion(Iterator del_it, DbTable* table);

bool Del(Context cntx, Iterator it) ABSL_LOCKS_EXCLUDED(local_mu_);

constexpr static DbIndex kDbAll = 0xFFFF;

// Flushes db_ind or all databases if kDbAll is passed
void FlushDb(DbIndex db_ind);
void FlushDb(DbIndex db_ind) ABSL_LOCKS_EXCLUDED(local_mu_);

// Flushes the data of given slot ranges.
void FlushSlots(cluster::SlotRanges slot_ranges);
Expand Down Expand Up @@ -435,7 +439,7 @@ class DbSlice {
void FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound);

//! Unregisters the callback.
void UnregisterOnChange(uint64_t id);
void UnregisterOnChange(uint64_t id) ABSL_LOCKS_EXCLUDED(local_mu_);

struct DeleteExpiredStats {
uint32_t deleted = 0; // number of deleted items due to expiry (less than traversed).
Expand All @@ -451,7 +455,6 @@ class DbSlice {
// Returnes number of (elements,bytes) freed due to evictions.
std::pair<uint64_t, size_t> FreeMemWithEvictionStep(DbIndex db_indx, size_t starting_segment_id,
size_t increase_goal_bytes);
void ScheduleForOffloadStep(DbIndex db_indx, size_t increase_goal_bytes);

int32_t GetNextSegmentForEviction(int32_t segment_id, DbIndex db_ind) const;

Expand Down Expand Up @@ -493,20 +496,17 @@ class DbSlice {
client_tracking_map_[key].insert(conn_ref);
}

// Delete a key referred by its iterator.
void PerformDeletion(Iterator del_it, DbTable* table);
void PerformDeletion(PrimeIterator del_it, DbTable* table);

// Provides access to the internal lock of db_slice for flows that serialize
// entries with preemption and need to synchronize with Traverse below which
// acquires the same lock.
ThreadLocalMutex* GetSerializationMutex() {
return &local_mu_;
ThreadLocalMutex& GetSerializationMutex() {
return local_mu_;
}

// Wrapper around DashTable::Traverse that allows preemptions
template <typename Cb, typename DashTable>
PrimeTable::Cursor Traverse(DashTable* pt, PrimeTable::Cursor cursor, Cb&& cb) {
PrimeTable::Cursor Traverse(DashTable* pt, PrimeTable::Cursor cursor, Cb&& cb)
ABSL_LOCKS_EXCLUDED(local_mu_) {
std::unique_lock lk(local_mu_);
return pt->Traverse(cursor, std::forward<Cb>(cb));
}
Expand All @@ -532,6 +532,7 @@ class DbSlice {
void ClearOffloadedEntries(absl::Span<const DbIndex> indices, const DbTableArray& db_arr);

void PerformDeletion(Iterator del_it, ExpIterator exp_it, DbTable* table);
void PerformDeletion(PrimeIterator del_it, DbTable* table);

// Send invalidation message to the clients that are tracking the change to a key.
void SendInvalidationTrackingMessage(std::string_view key);
Expand Down Expand Up @@ -562,7 +563,8 @@ class DbSlice {
return version_++;
}

void CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const;
void CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const
ABSL_EXCLUSIVE_LOCKS_REQUIRED(local_mu_);

// Used to provide exclusive access while Traversing segments
mutable ThreadLocalMutex local_mu_;
Expand Down
17 changes: 15 additions & 2 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ class DflyEngineTest : public BaseFamilyTest {
}
};

class DflyEngineTestWithRegistry : public BaseFamilyTest {
protected:
DflyEngineTestWithRegistry() : BaseFamilyTest() {
num_threads_ = kPoolThreadCount;
ResetService();
}
};

class SingleThreadDflyEngineTest : public BaseFamilyTest {
protected:
SingleThreadDflyEngineTest() : BaseFamilyTest() {
Expand Down Expand Up @@ -289,7 +297,7 @@ TEST_F(DflyEngineTest, ScriptFlush) {
EXPECT_THAT(1, resp.GetInt());
}

TEST_F(DflyEngineTest, Hello) {
TEST_F(DflyEngineTestWithRegistry, Hello) {
auto resp = Run({"hello"});
ASSERT_THAT(resp, ArrLen(14));
resp = Run({"hello", "2"});
Expand All @@ -316,9 +324,14 @@ TEST_F(DflyEngineTest, Hello) {
ErrArg("WRONGPASS invalid username-password pair or user is disabled."));

resp = Run({"hello", "3", "AUTH", "default", ""});
ASSERT_THAT(resp, ErrArg("WRONGPASS invalid username-password pair or user is disabled."));

TestInitAclFam();

resp = Run({"hello", "3", "AUTH", "default", "tmp"});
ASSERT_THAT(resp, ArrLen(14));

resp = Run({"hello", "3", "AUTH", "default", "", "SETNAME", "myname"});
resp = Run({"hello", "3", "AUTH", "default", "tmp", "SETNAME", "myname"});
ASSERT_THAT(resp, ArrLen(14));
}

Expand Down
12 changes: 9 additions & 3 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,9 @@ void EngineShard::Heartbeat() {
RetireExpiredAndEvict();
}

// TODO: iterate over all namespaces
DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id());

// Offset CoolMemoryUsage when consider background offloading.
// TODO: Another approach could be is to align the approach similarly to how we do with
// FreeMemWithEvictionStep, i.e. if memory_budget is below the limit.
Expand All @@ -621,7 +624,6 @@ void EngineShard::Heartbeat() {
<< " tiering_threshold: " << tiering_offload_threshold
<< ", cool memory: " << tiered_storage_->CoolMemoryUsage();

DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id());
for (unsigned i = 0; i < db_slice.db_array_size(); ++i) {
if (!db_slice.IsDbValid(i))
continue;
Expand All @@ -631,6 +633,10 @@ void EngineShard::Heartbeat() {
}

void EngineShard::RetireExpiredAndEvict() {
// TODO: iterate over all namespaces
DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id());
// Some of the functions below might acquire the lock again so we need to unlock it
std::unique_lock lk(db_slice.GetSerializationMutex());
constexpr double kTtlDeleteLimit = 200;
constexpr double kRedLimitFactor = 0.1;

Expand All @@ -651,8 +657,6 @@ void EngineShard::RetireExpiredAndEvict() {
DbContext db_cntx;
db_cntx.time_now_ms = GetCurrentTimeMs();

// TODO: iterate over all namespaces
DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id());
for (unsigned i = 0; i < db_slice.db_array_size(); ++i) {
if (!db_slice.IsDbValid(i))
continue;
Expand All @@ -674,6 +678,8 @@ void EngineShard::RetireExpiredAndEvict() {
}
}

// Because TriggerOnJournalWriteToSink will lock the same lock leading to a deadlock.
lk.unlock();
// Journal entries for expired entries are not writen to socket in the loop above.
// Trigger write to socket when loop finishes.
if (auto journal = EngineShard::tlocal()->journal(); journal) {
Expand Down
Loading

0 comments on commit 9b82757

Please sign in to comment.