diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 901b2c9996e8..6d5aed56586b 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -458,9 +458,7 @@ OpResult DbSlice::FindInternal(const Context& cntx, std: if (!change_cb_.empty()) { auto bump_cb = [&](PrimeTable::bucket_iterator bit) { DVLOG(2) << "Running callbacks for key " << key << " in dbid " << cntx.db_index; - for (const auto& ccb : change_cb_) { - ccb.second(cntx.db_index, bit); - } + CallChangeCallbacks(cntx.db_index, bit); }; db.prime.CVCUponBump(change_cb_.back().first, res.it, bump_cb); } @@ -524,9 +522,7 @@ OpResult DbSlice::AddOrFindInternal(const Context& cnt // It's a new entry. DVLOG(2) << "Running callbacks for key " << key << " in dbid " << cntx.db_index; - for (const auto& ccb : change_cb_) { - ccb.second(cntx.db_index, key); - } + CallChangeCallbacks(cntx.db_index, key); // In case we are loading from rdb file or replicating we want to disable conservative memory // checks (inside PrimeEvictionPolicy::CanGrow) and reject insertions only after we pass max @@ -975,9 +971,7 @@ void DbSlice::PreUpdate(DbIndex db_ind, Iterator it, std::string_view key) { FiberAtomicGuard fg; DVLOG(2) << "Running callbacks in dbid " << db_ind; - for (const auto& ccb : change_cb_) { - ccb.second(db_ind, ChangeReq{it.GetInnerIt()}); - } + CallChangeCallbacks(db_ind, ChangeReq{it.GetInnerIt()}); // If the value has a pending stash, cancel it before any modification are applied. // Note: we don't delete offloaded values before updates, because a read-modify operation (like @@ -1089,6 +1083,13 @@ void DbSlice::ExpireAllIfNeeded() { uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) { uint64_t ver = NextVersion(); + + // TODO rewrite this logic to be more clear + // this mutex lock is needed to check that this method is not called simultaneously with + // change_cb_ calls and journal_slice::change_cb_arr_ calls. + // It can be unlocked anytime because DbSlice::RegisterOnChange + // and journal_slice::RegisterOnChange calls without preemption + std::lock_guard lk(cb_mu_); change_cb_.emplace_back(ver, std::move(cb)); return ver; } @@ -1099,6 +1100,7 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_ // change_cb_ is ordered by version. DVLOG(2) << "Running callbacks in dbid " << db_ind << " with bucket_version=" << bucket_version << ", upper_bound=" << upper_bound; + for (const auto& ccb : change_cb_) { uint64_t cb_version = ccb.first; DCHECK_LE(cb_version, upper_bound); @@ -1113,6 +1115,7 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_ //! Unregisters the callback. void DbSlice::UnregisterOnChange(uint64_t id) { + lock_guard lk(cb_mu_); // we need to wait until callback is finished before remove it for (auto it = change_cb_.begin(); it != change_cb_.end(); ++it) { if (it->first == id) { change_cb_.erase(it); @@ -1506,4 +1509,10 @@ void DbSlice::OnCbFinish() { fetched_items_.clear(); } +void DbSlice::CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const { + for (const auto& ccb : change_cb_) { + ccb.second(id, cr); + } +} + } // namespace dfly diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 5da9ce571617..6e5184a67b3c 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -469,6 +469,14 @@ class DbSlice { void PerformDeletion(Iterator del_it, DbTable* table); void PerformDeletion(PrimeIterator del_it, DbTable* table); + void LockChangeCb() const { + return cb_mu_.lock_shared(); + } + + void UnlockChangeCb() const { + return cb_mu_.unlock_shared(); + } + private: void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key); void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size); @@ -523,6 +531,8 @@ class DbSlice { return version_++; } + void CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const; + private: ShardId shard_id_; uint8_t caching_mode_ : 1; @@ -544,6 +554,12 @@ class DbSlice { // Used in temporary computations in Acquire/Release. mutable absl::flat_hash_set uniq_fps_; + // To ensure correct data replication, we must serialize the buckets that each running command + // will modify, followed by serializing the command to the journal. We use a mutex to prevent + // interleaving between bucket and journal registrations, and the command execution with its + // journaling. LockChangeCb is called before the callback, and UnlockChangeCb is called after + // journaling is completed. Register to bucket and journal changes is also does without preemption + mutable util::fb2::SharedMutex cb_mu_; // ordered from the smallest to largest version. std::vector> change_cb_; diff --git a/src/server/detail/save_stages_controller.cc b/src/server/detail/save_stages_controller.cc index fc823074ba46..0ef993c4ce6f 100644 --- a/src/server/detail/save_stages_controller.cc +++ b/src/server/detail/save_stages_controller.cc @@ -254,7 +254,10 @@ void SaveStagesController::SaveDfs() { // Save shard files. auto cb = [this](Transaction* t, EngineShard* shard) { + // a hack to avoid deadlock in Transaction::RunCallback(...) + shard->db_slice().UnlockChangeCb(); SaveDfsSingle(shard); + shard->db_slice().LockChangeCb(); return OpStatus::OK; }; trans_->ScheduleSingleHop(std::move(cb)); @@ -294,7 +297,10 @@ void SaveStagesController::SaveRdb() { } auto cb = [snapshot = snapshot.get()](Transaction* t, EngineShard* shard) { + // a hack to avoid deadlock in Transaction::RunCallback(...) + shard->db_slice().UnlockChangeCb(); snapshot->StartInShard(shard); + shard->db_slice().LockChangeCb(); return OpStatus::OK; }; trans_->ScheduleSingleHop(std::move(cb)); diff --git a/src/server/journal/journal_slice.cc b/src/server/journal/journal_slice.cc index f7c068f4613b..3f00813b71df 100644 --- a/src/server/journal/journal_slice.cc +++ b/src/server/journal/journal_slice.cc @@ -198,13 +198,14 @@ void JournalSlice::AddLogRecord(const Entry& entry, bool await) { } uint32_t JournalSlice::RegisterOnChange(ChangeCallback cb) { - lock_guard lk(cb_mu_); + // mutex lock isn't needed due to iterators are not invalidated uint32_t id = next_cb_id_++; change_cb_arr_.emplace_back(id, std::move(cb)); return id; } void JournalSlice::UnregisterOnChange(uint32_t id) { + // we need to wait until callback is finished before remove it lock_guard lk(cb_mu_); auto it = find_if(change_cb_arr_.begin(), change_cb_arr_.end(), [id](const auto& e) { return e.first == id; }); diff --git a/src/server/journal/journal_slice.h b/src/server/journal/journal_slice.h index 2752eb463c56..8534d78f7aae 100644 --- a/src/server/journal/journal_slice.h +++ b/src/server/journal/journal_slice.h @@ -47,7 +47,6 @@ class JournalSlice { void UnregisterOnChange(uint32_t); bool HasRegisteredCallbacks() const { - std::shared_lock lk(cb_mu_); return !change_cb_arr_.empty(); } @@ -62,8 +61,8 @@ class JournalSlice { std::optional> ring_buffer_; base::IoBuf ring_serialize_buf_; - mutable util::fb2::SharedMutex cb_mu_; - std::vector> change_cb_arr_ ABSL_GUARDED_BY(cb_mu_); + mutable util::fb2::SharedMutex cb_mu_; // to prevent removing callback during call + std::list> change_cb_arr_; LSN lsn_ = 1; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 1a17ca2dbde0..002a20988fec 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -627,6 +627,7 @@ void Transaction::RunCallback(EngineShard* shard) { DCHECK_EQ(shard, EngineShard::tlocal()); RunnableResult result; + shard->db_slice().LockChangeCb(); try { result = (*cb_ptr_)(this, shard); @@ -664,7 +665,10 @@ void Transaction::RunCallback(EngineShard* shard) { // Log to journal only once the command finished running if ((coordinator_state_ & COORD_CONCLUDING) || (multi_ && multi_->concluding)) { LogAutoJournalOnShard(shard, result); + shard->db_slice().UnlockChangeCb(); MaybeInvokeTrackingCb(); + } else { + shard->db_slice().UnlockChangeCb(); } } @@ -1247,9 +1251,11 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) { DCHECK_EQ(unique_shard_cnt_, 1u); auto* shard = EngineShard::tlocal(); + shard->db_slice().LockChangeCb(); auto result = cb(this, shard); shard->db_slice().OnCbFinish(); LogAutoJournalOnShard(shard, result); + shard->db_slice().UnlockChangeCb(); MaybeInvokeTrackingCb(); DCHECK_EQ(result.flags, 0); // if it's sophisticated, we shouldn't squash it