Skip to content

Commit

Permalink
fix: call db_slice and journal callbacks atomically
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev committed Jun 18, 2024
1 parent 7cc46a3 commit 9301ebf
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 10 deletions.
10 changes: 7 additions & 3 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,13 @@ void DbSlice::ExpireAllIfNeeded() {

uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) {
uint64_t ver = NextVersion();
// mutex lock isn't needed due to iterators are not invalidated

// TODO rewrite this logic to be more clear
// this mutex lock is needed to check that this method is not called simultaneously with
// change_cb_ calls and journal_slice::change_cb_arr_ calls.
// It can be unlocked anytime because DbSlice::RegisterOnChange
// and journal_slice::RegisterOnChange calls without preemption
std::lock_guard lk(cb_mu_);
change_cb_.emplace_back(ver, std::move(cb));
return ver;
}
Expand All @@ -1095,7 +1101,6 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_
DVLOG(2) << "Running callbacks in dbid " << db_ind << " with bucket_version=" << bucket_version
<< ", upper_bound=" << upper_bound;

std::shared_lock lk(cb_mu_);
for (const auto& ccb : change_cb_) {
uint64_t cb_version = ccb.first;
DCHECK_LE(cb_version, upper_bound);
Expand Down Expand Up @@ -1505,7 +1510,6 @@ void DbSlice::OnCbFinish() {
}

void DbSlice::CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const {
std::shared_lock lk(cb_mu_);
for (const auto& ccb : change_cb_) {
ccb.second(id, cr);
}
Expand Down
9 changes: 7 additions & 2 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,11 @@ class DbSlice {
void PerformDeletion(Iterator del_it, DbTable* table);
void PerformDeletion(PrimeIterator del_it, DbTable* table);

// this is workaround to execute callbacks for db_slice and journal atomically
[[nodiscard]] auto GetChangeCbLock() const {
return std::shared_lock(cb_mu_);
}

private:
void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key);
void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size);
Expand Down Expand Up @@ -546,9 +551,9 @@ class DbSlice {
// Used in temporary computations in Acquire/Release.
mutable absl::flat_hash_set<uint64_t> uniq_fps_;

mutable util::fb2::SharedMutex cb_mu_; // to prevent removing callback during call
mutable util::fb2::SharedMutex cb_mu_;
// ordered from the smallest to largest version.
std::list<std::pair<uint64_t, ChangeCallback>> change_cb_;
std::vector<std::pair<uint64_t, ChangeCallback>> change_cb_;

// Used in temporary computations in Find item and CbFinish
mutable absl::flat_hash_set<CompactObjectView> fetched_items_;
Expand Down
5 changes: 0 additions & 5 deletions src/server/journal/journal_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,8 @@ void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
DVLOG(2) << "AddLogRecord: run callbacks for " << entry.ToString()
<< " num callbacks: " << change_cb_arr_.size();

auto callbacks_num = change_cb_arr_.size();
for (const auto& k_v : change_cb_arr_) {
k_v.second(*item, await);
--callbacks_num;
// durin calbacks call we can add one more callback so we need to prevent call it
if (callbacks_num == 0)
break;
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ void Transaction::RunCallback(EngineShard* shard) {
DCHECK_EQ(shard, EngineShard::tlocal());

RunnableResult result;
auto change_callback_lock = shard->db_slice().GetChangeCbLock();
try {
result = (*cb_ptr_)(this, shard);

Expand Down Expand Up @@ -664,6 +665,7 @@ void Transaction::RunCallback(EngineShard* shard) {
// Log to journal only once the command finished running
if ((coordinator_state_ & COORD_CONCLUDING) || (multi_ && multi_->concluding)) {
LogAutoJournalOnShard(shard, result);
change_callback_lock.unlock();
MaybeInvokeTrackingCb();
}
}
Expand Down Expand Up @@ -1247,9 +1249,11 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) {
DCHECK_EQ(unique_shard_cnt_, 1u);

auto* shard = EngineShard::tlocal();
auto change_callback_lock = shard->db_slice().GetChangeCbLock();
auto result = cb(this, shard);
shard->db_slice().OnCbFinish();
LogAutoJournalOnShard(shard, result);
change_callback_lock.unlock();
MaybeInvokeTrackingCb();

DCHECK_EQ(result.flags, 0); // if it's sophisticated, we shouldn't squash it
Expand Down

0 comments on commit 9301ebf

Please sign in to comment.