Skip to content

Commit

Permalink
fix: fix iterators issues
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev committed Jun 13, 2024
1 parent 8d301d4 commit 5775f95
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 26 deletions.
38 changes: 19 additions & 19 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -458,12 +458,9 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
if (!change_cb_.empty()) {
auto bump_cb = [&](PrimeTable::bucket_iterator bit) {
DVLOG(2) << "Running callbacks for key " << key << " in dbid " << cntx.db_index;
for (const auto& ccb : change_cb_) {
auto cb = ccb; // we need a copy of shared_ptr to prevent it removing during callback
cb->second(cntx.db_index, bit);
}
CallChangeCallbacks(cntx.db_index, bit);
};
db.prime.CVCUponBump(change_cb_.back()->first, res.it, bump_cb);
db.prime.CVCUponBump(change_cb_.back().first, res.it, bump_cb);
}
auto bump_it = db.prime.BumpUp(res.it, PrimeBumpPolicy{fetched_items_});
if (bump_it != res.it) { // the item was bumped
Expand Down Expand Up @@ -525,10 +522,7 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt

// It's a new entry.
DVLOG(2) << "Running callbacks for key " << key << " in dbid " << cntx.db_index;
for (const auto& ccb : change_cb_) {
auto cb = ccb; // we need a copy of shared_ptr to prevent it removing during callback
cb->second(cntx.db_index, key);
}
CallChangeCallbacks(cntx.db_index, key);

// In case we are loading from rdb file or replicating we want to disable conservative memory
// checks (inside PrimeEvictionPolicy::CanGrow) and reject insertions only after we pass max
Expand Down Expand Up @@ -977,10 +971,7 @@ void DbSlice::PreUpdate(DbIndex db_ind, Iterator it, std::string_view key) {
FiberAtomicGuard fg;

DVLOG(2) << "Running callbacks in dbid " << db_ind;
for (const auto& ccb : change_cb_) {
auto cb = ccb; // we need a copy of shared_ptr to prevent it removing during callback
cb->second(db_ind, ChangeReq{it.GetInnerIt()});
}
CallChangeCallbacks(db_ind, ChangeReq{it.GetInnerIt()});

// If the value has a pending stash, cancel it before any modification are applied.
// Note: we don't delete offloaded values before updates, because a read-modify operation (like
Expand Down Expand Up @@ -1092,8 +1083,8 @@ void DbSlice::ExpireAllIfNeeded() {

uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) {
uint64_t ver = NextVersion();
change_cb_.emplace_back(
std::make_shared<std::pair<uint64_t, ChangeCallback>>(ver, std::move(cb)));
// mutex lock isn't needed due to iterators are not invalidated
change_cb_.emplace_back(ver, std::move(cb));
return ver;
}

Expand All @@ -1103,23 +1094,25 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_
// change_cb_ is ordered by version.
DVLOG(2) << "Running callbacks in dbid " << db_ind << " with bucket_version=" << bucket_version
<< ", upper_bound=" << upper_bound;

lock_guard lk(cb_mu_);
for (const auto& ccb : change_cb_) {
auto cb = ccb; // we need a copy of shared_ptr to prevent it removing during callback
uint64_t cb_version = cb->first;
uint64_t cb_version = ccb.first;
DCHECK_LE(cb_version, upper_bound);
if (cb_version == upper_bound) {
return;
}
if (bucket_version < cb_version) {
cb->second(db_ind, ChangeReq{it.GetInnerIt()});
ccb.second(db_ind, ChangeReq{it.GetInnerIt()});
}
}
}

//! Unregisters the callback.
void DbSlice::UnregisterOnChange(uint64_t id) {
lock_guard lk(cb_mu_); // we need to wait until callback is finished before remove it
for (auto it = change_cb_.begin(); it != change_cb_.end(); ++it) {
if ((*it)->first == id) {
if (it->first == id) {
change_cb_.erase(it);
return;
}
Expand Down Expand Up @@ -1511,4 +1504,11 @@ void DbSlice::OnCbFinish() {
fetched_items_.clear();
}

void DbSlice::CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const {
lock_guard lk(cb_mu_);
for (const auto& ccb : change_cb_) {
ccb.second(id, cr);
}
}

} // namespace dfly
5 changes: 4 additions & 1 deletion src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,8 @@ class DbSlice {
return version_++;
}

void CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const;

private:
ShardId shard_id_;
uint8_t caching_mode_ : 1;
Expand All @@ -544,8 +546,9 @@ class DbSlice {
// Used in temporary computations in Acquire/Release.
mutable absl::flat_hash_set<uint64_t> uniq_fps_;

mutable util::fb2::Mutex cb_mu_; // to prevent removing callback during call
// ordered from the smallest to largest version.
std::vector<std::shared_ptr<std::pair<uint64_t, ChangeCallback>>> change_cb_;
std::list<std::pair<uint64_t, ChangeCallback>> change_cb_;

// Used in temporary computations in Find item and CbFinish
mutable absl::flat_hash_set<CompactObjectView> fetched_items_;
Expand Down
12 changes: 7 additions & 5 deletions src/server/journal/journal_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,26 +187,28 @@ void JournalSlice::AddLogRecord(const Entry& entry, bool await) {

// TODO: Remove the callbacks, replace with notifiers
{
lock_guard lk(cb_mu_);
DVLOG(2) << "AddLogRecord: run callbacks for " << entry.ToString()
<< " num callbacks: " << change_cb_arr_.size();

for (const auto& k_v : change_cb_arr_) {
auto cb = k_v; // we need a copy of shared_ptr to prevent it removing during callback
cb->second(*item, await);
k_v.second(*item, await);
}
}
}

uint32_t JournalSlice::RegisterOnChange(ChangeCallback cb) {
// mutex lock isn't needed due to iterators are not invalidated
uint32_t id = next_cb_id_++;
change_cb_arr_.emplace_back(
std::make_shared<std::pair<uint32_t, ChangeCallback>>(id, std::move(cb)));
change_cb_arr_.emplace_back(id, std::move(cb));
return id;
}

void JournalSlice::UnregisterOnChange(uint32_t id) {
// we need to wait until callback is finished before remove it
lock_guard lk(cb_mu_);
auto it = find_if(change_cb_arr_.begin(), change_cb_arr_.end(),
[id](const auto& e) { return e->first == id; });
[id](const auto& e) { return e.first == id; });
CHECK(it != change_cb_arr_.end());
change_cb_arr_.erase(it);
}
Expand Down
3 changes: 2 additions & 1 deletion src/server/journal/journal_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class JournalSlice {
std::optional<base::RingBuffer<JournalItem>> ring_buffer_;
base::IoBuf ring_serialize_buf_;

std::vector<std::shared_ptr<std::pair<uint32_t, ChangeCallback>>> change_cb_arr_;
mutable util::fb2::Mutex cb_mu_; // to prevent removing callback during call
std::list<std::pair<uint32_t, ChangeCallback>> change_cb_arr_;

LSN lsn_ = 1;

Expand Down

0 comments on commit 5775f95

Please sign in to comment.