Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix RegisterOnChange methods for journal and db_slice #3171

Merged
merged 7 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -458,9 +458,7 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
if (!change_cb_.empty()) {
auto bump_cb = [&](PrimeTable::bucket_iterator bit) {
DVLOG(2) << "Running callbacks for key " << key << " in dbid " << cntx.db_index;
for (const auto& ccb : change_cb_) {
ccb.second(cntx.db_index, bit);
}
CallChangeCallbacks(cntx.db_index, bit);
};
db.prime.CVCUponBump(change_cb_.back().first, res.it, bump_cb);
}
Expand Down Expand Up @@ -524,9 +522,7 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt

// It's a new entry.
DVLOG(2) << "Running callbacks for key " << key << " in dbid " << cntx.db_index;
for (const auto& ccb : change_cb_) {
ccb.second(cntx.db_index, key);
}
CallChangeCallbacks(cntx.db_index, key);

// In case we are loading from rdb file or replicating we want to disable conservative memory
// checks (inside PrimeEvictionPolicy::CanGrow) and reject insertions only after we pass max
Expand Down Expand Up @@ -975,9 +971,7 @@ void DbSlice::PreUpdate(DbIndex db_ind, Iterator it, std::string_view key) {
FiberAtomicGuard fg;

DVLOG(2) << "Running callbacks in dbid " << db_ind;
for (const auto& ccb : change_cb_) {
ccb.second(db_ind, ChangeReq{it.GetInnerIt()});
}
CallChangeCallbacks(db_ind, ChangeReq{it.GetInnerIt()});

// If the value has a pending stash, cancel it before any modification are applied.
// Note: we don't delete offloaded values before updates, because a read-modify operation (like
Expand Down Expand Up @@ -1089,6 +1083,13 @@ void DbSlice::ExpireAllIfNeeded() {

uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) {
uint64_t ver = NextVersion();

// TODO rewrite this logic to be more clear
// this mutex lock is needed to check that this method is not called simultaneously with
// change_cb_ calls and journal_slice::change_cb_arr_ calls.
// It can be unlocked anytime because DbSlice::RegisterOnChange
// and journal_slice::RegisterOnChange calls without preemption
std::lock_guard lk(cb_mu_);
change_cb_.emplace_back(ver, std::move(cb));
return ver;
}
Expand All @@ -1099,6 +1100,7 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_
// change_cb_ is ordered by version.
DVLOG(2) << "Running callbacks in dbid " << db_ind << " with bucket_version=" << bucket_version
<< ", upper_bound=" << upper_bound;

for (const auto& ccb : change_cb_) {
uint64_t cb_version = ccb.first;
DCHECK_LE(cb_version, upper_bound);
Expand All @@ -1113,6 +1115,7 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_

//! Unregisters the callback.
void DbSlice::UnregisterOnChange(uint64_t id) {
lock_guard lk(cb_mu_); // we need to wait until callback is finished before remove it
for (auto it = change_cb_.begin(); it != change_cb_.end(); ++it) {
if (it->first == id) {
change_cb_.erase(it);
Expand Down Expand Up @@ -1506,4 +1509,10 @@ void DbSlice::OnCbFinish() {
fetched_items_.clear();
}

void DbSlice::CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const {
for (const auto& ccb : change_cb_) {
ccb.second(id, cr);
}
}

} // namespace dfly
16 changes: 16 additions & 0 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,14 @@ class DbSlice {
void PerformDeletion(Iterator del_it, DbTable* table);
void PerformDeletion(PrimeIterator del_it, DbTable* table);

void LockChangeCb() const {
return cb_mu_.lock_shared();
}

void UnlockChangeCb() const {
return cb_mu_.unlock_shared();
}

private:
void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key);
void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size);
Expand Down Expand Up @@ -523,6 +531,8 @@ class DbSlice {
return version_++;
}

void CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const;

private:
ShardId shard_id_;
uint8_t caching_mode_ : 1;
Expand All @@ -544,6 +554,12 @@ class DbSlice {
// Used in temporary computations in Acquire/Release.
mutable absl::flat_hash_set<uint64_t> uniq_fps_;

// To ensure correct data replication, we must serialize the buckets that each running command
// will modify, followed by serializing the command to the journal. We use a mutex to prevent
// interleaving between bucket and journal registrations, and the command execution with its
// journaling. LockChangeCb is called before the callback, and UnlockChangeCb is called after
// journaling is completed. Register to bucket and journal changes is also does without preemption
mutable util::fb2::SharedMutex cb_mu_;
BorysTheDev marked this conversation as resolved.
Show resolved Hide resolved
// ordered from the smallest to largest version.
std::vector<std::pair<uint64_t, ChangeCallback>> change_cb_;

Expand Down
6 changes: 6 additions & 0 deletions src/server/detail/save_stages_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,10 @@ void SaveStagesController::SaveDfs() {

// Save shard files.
auto cb = [this](Transaction* t, EngineShard* shard) {
// a hack to avoid deadlock in Transaction::RunCallback(...)
shard->db_slice().UnlockChangeCb();
SaveDfsSingle(shard);
shard->db_slice().LockChangeCb();
return OpStatus::OK;
};
trans_->ScheduleSingleHop(std::move(cb));
Expand Down Expand Up @@ -294,7 +297,10 @@ void SaveStagesController::SaveRdb() {
}

auto cb = [snapshot = snapshot.get()](Transaction* t, EngineShard* shard) {
// a hack to avoid deadlock in Transaction::RunCallback(...)
shard->db_slice().UnlockChangeCb();
snapshot->StartInShard(shard);
shard->db_slice().LockChangeCb();
return OpStatus::OK;
};
trans_->ScheduleSingleHop(std::move(cb));
Expand Down
3 changes: 2 additions & 1 deletion src/server/journal/journal_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,14 @@ void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
}

uint32_t JournalSlice::RegisterOnChange(ChangeCallback cb) {
lock_guard lk(cb_mu_);
// mutex lock isn't needed due to iterators are not invalidated
uint32_t id = next_cb_id_++;
change_cb_arr_.emplace_back(id, std::move(cb));
return id;
}

void JournalSlice::UnregisterOnChange(uint32_t id) {
// we need to wait until callback is finished before remove it
lock_guard lk(cb_mu_);
auto it = find_if(change_cb_arr_.begin(), change_cb_arr_.end(),
[id](const auto& e) { return e.first == id; });
Expand Down
5 changes: 2 additions & 3 deletions src/server/journal/journal_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class JournalSlice {
void UnregisterOnChange(uint32_t);

bool HasRegisteredCallbacks() const {
std::shared_lock lk(cb_mu_);
return !change_cb_arr_.empty();
}

Expand All @@ -62,8 +61,8 @@ class JournalSlice {
std::optional<base::RingBuffer<JournalItem>> ring_buffer_;
base::IoBuf ring_serialize_buf_;

mutable util::fb2::SharedMutex cb_mu_;
std::vector<std::pair<uint32_t, ChangeCallback>> change_cb_arr_ ABSL_GUARDED_BY(cb_mu_);
mutable util::fb2::SharedMutex cb_mu_; // to prevent removing callback during call
std::list<std::pair<uint32_t, ChangeCallback>> change_cb_arr_;

LSN lsn_ = 1;

Expand Down
6 changes: 6 additions & 0 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ void Transaction::RunCallback(EngineShard* shard) {
DCHECK_EQ(shard, EngineShard::tlocal());

RunnableResult result;
shard->db_slice().LockChangeCb();
try {
result = (*cb_ptr_)(this, shard);

Expand Down Expand Up @@ -664,7 +665,10 @@ void Transaction::RunCallback(EngineShard* shard) {
// Log to journal only once the command finished running
if ((coordinator_state_ & COORD_CONCLUDING) || (multi_ && multi_->concluding)) {
LogAutoJournalOnShard(shard, result);
shard->db_slice().UnlockChangeCb();
MaybeInvokeTrackingCb();
} else {
shard->db_slice().UnlockChangeCb();
}
}

Expand Down Expand Up @@ -1247,9 +1251,11 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) {
DCHECK_EQ(unique_shard_cnt_, 1u);

auto* shard = EngineShard::tlocal();
shard->db_slice().LockChangeCb();
auto result = cb(this, shard);
shard->db_slice().OnCbFinish();
LogAutoJournalOnShard(shard, result);
shard->db_slice().UnlockChangeCb();
MaybeInvokeTrackingCb();

DCHECK_EQ(result.flags, 0); // if it's sophisticated, we shouldn't squash it
Expand Down
Loading