Skip to content

Commit

Permalink
fix: fix RegisterOnChange methods for journal and db_slice (dragonfly…
Browse files Browse the repository at this point in the history
…db#3171)

* fix: fix RegisterOnChange methods for journal and db_slice. Call db_slice and journal callbacks atomically. Made a hack to avoid deadlock during SAVE
  • Loading branch information
BorysTheDev authored and dranikpg committed Jun 23, 2024
1 parent a42f9f6 commit 81db231
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 13 deletions.
27 changes: 18 additions & 9 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -458,9 +458,7 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
if (!change_cb_.empty()) {
auto bump_cb = [&](PrimeTable::bucket_iterator bit) {
DVLOG(2) << "Running callbacks for key " << key << " in dbid " << cntx.db_index;
for (const auto& ccb : change_cb_) {
ccb.second(cntx.db_index, bit);
}
CallChangeCallbacks(cntx.db_index, bit);
};
db.prime.CVCUponBump(change_cb_.back().first, res.it, bump_cb);
}
Expand Down Expand Up @@ -524,9 +522,7 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt

// It's a new entry.
DVLOG(2) << "Running callbacks for key " << key << " in dbid " << cntx.db_index;
for (const auto& ccb : change_cb_) {
ccb.second(cntx.db_index, key);
}
CallChangeCallbacks(cntx.db_index, key);

// In case we are loading from rdb file or replicating we want to disable conservative memory
// checks (inside PrimeEvictionPolicy::CanGrow) and reject insertions only after we pass max
Expand Down Expand Up @@ -975,9 +971,7 @@ void DbSlice::PreUpdate(DbIndex db_ind, Iterator it, std::string_view key) {
FiberAtomicGuard fg;

DVLOG(2) << "Running callbacks in dbid " << db_ind;
for (const auto& ccb : change_cb_) {
ccb.second(db_ind, ChangeReq{it.GetInnerIt()});
}
CallChangeCallbacks(db_ind, ChangeReq{it.GetInnerIt()});

// If the value has a pending stash, cancel it before any modification are applied.
// Note: we don't delete offloaded values before updates, because a read-modify operation (like
Expand Down Expand Up @@ -1089,6 +1083,13 @@ void DbSlice::ExpireAllIfNeeded() {

uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) {
uint64_t ver = NextVersion();

// TODO rewrite this logic to be more clear
// this mutex lock is needed to check that this method is not called simultaneously with
// change_cb_ calls and journal_slice::change_cb_arr_ calls.
// It can be unlocked anytime because DbSlice::RegisterOnChange
// and journal_slice::RegisterOnChange calls without preemption
std::lock_guard lk(cb_mu_);
change_cb_.emplace_back(ver, std::move(cb));
return ver;
}
Expand All @@ -1099,6 +1100,7 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_
// change_cb_ is ordered by version.
DVLOG(2) << "Running callbacks in dbid " << db_ind << " with bucket_version=" << bucket_version
<< ", upper_bound=" << upper_bound;

for (const auto& ccb : change_cb_) {
uint64_t cb_version = ccb.first;
DCHECK_LE(cb_version, upper_bound);
Expand All @@ -1113,6 +1115,7 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_

//! Unregisters the callback.
void DbSlice::UnregisterOnChange(uint64_t id) {
lock_guard lk(cb_mu_); // we need to wait until callback is finished before remove it
for (auto it = change_cb_.begin(); it != change_cb_.end(); ++it) {
if (it->first == id) {
change_cb_.erase(it);
Expand Down Expand Up @@ -1506,4 +1509,10 @@ void DbSlice::OnCbFinish() {
fetched_items_.clear();
}

void DbSlice::CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const {
for (const auto& ccb : change_cb_) {
ccb.second(id, cr);
}
}

} // namespace dfly
16 changes: 16 additions & 0 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,14 @@ class DbSlice {
void PerformDeletion(Iterator del_it, DbTable* table);
void PerformDeletion(PrimeIterator del_it, DbTable* table);

void LockChangeCb() const {
return cb_mu_.lock_shared();
}

void UnlockChangeCb() const {
return cb_mu_.unlock_shared();
}

private:
void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key);
void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size);
Expand Down Expand Up @@ -523,6 +531,8 @@ class DbSlice {
return version_++;
}

void CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const;

private:
ShardId shard_id_;
uint8_t caching_mode_ : 1;
Expand All @@ -544,6 +554,12 @@ class DbSlice {
// Used in temporary computations in Acquire/Release.
mutable absl::flat_hash_set<uint64_t> uniq_fps_;

// To ensure correct data replication, we must serialize the buckets that each running command
// will modify, followed by serializing the command to the journal. We use a mutex to prevent
// interleaving between bucket and journal registrations, and the command execution with its
// journaling. LockChangeCb is called before the callback, and UnlockChangeCb is called after
// journaling is completed. Register to bucket and journal changes is also does without preemption
mutable util::fb2::SharedMutex cb_mu_;
// ordered from the smallest to largest version.
std::vector<std::pair<uint64_t, ChangeCallback>> change_cb_;

Expand Down
6 changes: 6 additions & 0 deletions src/server/detail/save_stages_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,10 @@ void SaveStagesController::SaveDfs() {

// Save shard files.
auto cb = [this](Transaction* t, EngineShard* shard) {
// a hack to avoid deadlock in Transaction::RunCallback(...)
shard->db_slice().UnlockChangeCb();
SaveDfsSingle(shard);
shard->db_slice().LockChangeCb();
return OpStatus::OK;
};
trans_->ScheduleSingleHop(std::move(cb));
Expand Down Expand Up @@ -294,7 +297,10 @@ void SaveStagesController::SaveRdb() {
}

auto cb = [snapshot = snapshot.get()](Transaction* t, EngineShard* shard) {
// a hack to avoid deadlock in Transaction::RunCallback(...)
shard->db_slice().UnlockChangeCb();
snapshot->StartInShard(shard);
shard->db_slice().LockChangeCb();
return OpStatus::OK;
};
trans_->ScheduleSingleHop(std::move(cb));
Expand Down
3 changes: 2 additions & 1 deletion src/server/journal/journal_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,14 @@ void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
}

uint32_t JournalSlice::RegisterOnChange(ChangeCallback cb) {
lock_guard lk(cb_mu_);
// mutex lock isn't needed due to iterators are not invalidated
uint32_t id = next_cb_id_++;
change_cb_arr_.emplace_back(id, std::move(cb));
return id;
}

void JournalSlice::UnregisterOnChange(uint32_t id) {
// we need to wait until callback is finished before remove it
lock_guard lk(cb_mu_);
auto it = find_if(change_cb_arr_.begin(), change_cb_arr_.end(),
[id](const auto& e) { return e.first == id; });
Expand Down
5 changes: 2 additions & 3 deletions src/server/journal/journal_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class JournalSlice {
void UnregisterOnChange(uint32_t);

bool HasRegisteredCallbacks() const {
std::shared_lock lk(cb_mu_);
return !change_cb_arr_.empty();
}

Expand All @@ -62,8 +61,8 @@ class JournalSlice {
std::optional<base::RingBuffer<JournalItem>> ring_buffer_;
base::IoBuf ring_serialize_buf_;

mutable util::fb2::SharedMutex cb_mu_;
std::vector<std::pair<uint32_t, ChangeCallback>> change_cb_arr_ ABSL_GUARDED_BY(cb_mu_);
mutable util::fb2::SharedMutex cb_mu_; // to prevent removing callback during call
std::list<std::pair<uint32_t, ChangeCallback>> change_cb_arr_;

LSN lsn_ = 1;

Expand Down
6 changes: 6 additions & 0 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ void Transaction::RunCallback(EngineShard* shard) {
DCHECK_EQ(shard, EngineShard::tlocal());

RunnableResult result;
shard->db_slice().LockChangeCb();
try {
result = (*cb_ptr_)(this, shard);

Expand Down Expand Up @@ -664,7 +665,10 @@ void Transaction::RunCallback(EngineShard* shard) {
// Log to journal only once the command finished running
if ((coordinator_state_ & COORD_CONCLUDING) || (multi_ && multi_->concluding)) {
LogAutoJournalOnShard(shard, result);
shard->db_slice().UnlockChangeCb();
MaybeInvokeTrackingCb();
} else {
shard->db_slice().UnlockChangeCb();
}
}

Expand Down Expand Up @@ -1247,9 +1251,11 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) {
DCHECK_EQ(unique_shard_cnt_, 1u);

auto* shard = EngineShard::tlocal();
shard->db_slice().LockChangeCb();
auto result = cb(this, shard);
shard->db_slice().OnCbFinish();
LogAutoJournalOnShard(shard, result);
shard->db_slice().UnlockChangeCb();
MaybeInvokeTrackingCb();

DCHECK_EQ(result.flags, 0); // if it's sophisticated, we shouldn't squash it
Expand Down

0 comments on commit 81db231

Please sign in to comment.