Skip to content

Commit

Permalink
refactor: address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev committed Jan 2, 2024
1 parent 2b9d9e7 commit 72976f7
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 17 deletions.
6 changes: 3 additions & 3 deletions src/server/blocking_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ using namespace std;

struct WatchItem {
Transaction* trans;
KeyReadyChecker keyReadyChecker;
KeyReadyChecker key_ready_checker;

Transaction* get() const {
return trans;
}

WatchItem(Transaction* t, KeyReadyChecker krc) : trans(t), keyReadyChecker(std::move(krc)) {
WatchItem(Transaction* t, KeyReadyChecker krc) : trans(t), key_ready_checker(std::move(krc)) {
}
};

Expand Down Expand Up @@ -287,7 +287,7 @@ void BlockingController::NotifyWatchQueue(std::string_view key, WatchQueueMap* w
auto& wi = queue.front();
Transaction* head = wi.get();
// We check may the transaction be notified otherwise move it to the end of the queue
if (wi.keyReadyChecker(owner_, context, head, key)) {
if (wi.key_ready_checker(owner_, context, head, key)) {
DVLOG(2) << "WQ-Pop " << head->DebugId() << " from key " << key;
if (head->NotifySuspended(owner_->committed_txid(), sid, key)) {
wq->state = WatchQueue::ACTIVE;
Expand Down
6 changes: 3 additions & 3 deletions src/server/container_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,11 @@ OpResult<string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty
auto wcb = [](Transaction* t, EngineShard* shard) { return t->GetShardArgs(shard->shard_id()); };

*block_flag = true;
const auto keyChecker = [req_obj_type](EngineShard* owner, const DbContext& context, Transaction*,
std::string_view key) -> bool {
const auto key_checker = [req_obj_type](EngineShard* owner, const DbContext& context,
Transaction*, std::string_view key) -> bool {
return owner->db_slice().FindReadOnly(context, key, req_obj_type).ok();
};
auto status = trans->WaitOnWatch(limit_tp, std::move(wcb), keyChecker);
auto status = trans->WaitOnWatch(limit_tp, std::move(wcb), key_checker);
*block_flag = false;

if (status != OpStatus::OK)
Expand Down
12 changes: 6 additions & 6 deletions src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -881,12 +881,12 @@ OpResult<string> BPopPusher::RunSingle(Transaction* t, time_point tp) {

auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice{&this->pop_key_, 1}; };

const auto keyChecker = [](EngineShard* owner, const DbContext& context, Transaction*,
std::string_view key) -> bool {
const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*,
std::string_view key) -> bool {
return owner->db_slice().FindReadOnly(context, key, OBJ_LIST).ok();
};
// Block
if (auto status = t->WaitOnWatch(tp, std::move(wcb), keyChecker); status != OpStatus::OK)
if (auto status = t->WaitOnWatch(tp, std::move(wcb), key_checker); status != OpStatus::OK)
return status;

t->Execute(cb_move, true);
Expand All @@ -910,12 +910,12 @@ OpResult<string> BPopPusher::RunPair(Transaction* t, time_point tp) {
// This allows us to run Transaction::Execute on watched transactions in both shards.
auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice{&this->pop_key_, 1}; };

const auto keyChecker = [](EngineShard* owner, const DbContext& context, Transaction*,
std::string_view key) -> bool {
const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*,
std::string_view key) -> bool {
return owner->db_slice().FindReadOnly(context, key, OBJ_LIST).ok();
};

if (auto status = t->WaitOnWatch(tp, std::move(wcb), keyChecker); status != OpStatus::OK)
if (auto status = t->WaitOnWatch(tp, std::move(wcb), key_checker); status != OpStatus::OK)
return status;

return MoveTwoShards(t, pop_key_, push_key_, popdir_, pushdir_, true);
Expand Down
11 changes: 6 additions & 5 deletions src/server/stream_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2817,26 +2817,27 @@ void XReadBlock(ReadOpts opts, ConnectionContext* cntx) {
auto tp = (opts.timeout) ? chrono::steady_clock::now() + chrono::milliseconds(opts.timeout)
: Transaction::time_point::max();

const auto& keyChecker = [&opts](EngineShard* owner, const DbContext& context, Transaction* tx,
const auto key_checker = [&opts](EngineShard* owner, const DbContext& context, Transaction* tx,
std::string_view key) -> bool {
auto res_it = owner->db_slice().FindReadOnly(context, key, OBJ_STREAM);
if (!res_it.ok())
return false;

auto sitem = opts.stream_ids.at(key);
if (sitem.id.val.ms != UINT64_MAX && sitem.id.val.seq != UINT64_MAX)
return true;

const CompactObj& cobj = (*res_it)->second;
stream* s = GetReadOnlyStream(cobj);
streamID last_id = s->last_id;
if (s->length) {
streamLastValidID(s, &last_id);
}

auto sitem = opts.stream_ids.at(key);
if (sitem.id.val.ms != UINT64_MAX && sitem.id.val.seq != UINT64_MAX)
return true;
return streamCompareID(&last_id, &sitem.group->last_id) > 0;
};

if (auto status = cntx->transaction->WaitOnWatch(tp, std::move(wcb), keyChecker);
if (auto status = cntx->transaction->WaitOnWatch(tp, std::move(wcb), key_checker);
status != OpStatus::OK)
return rb->SendNullArray();

Expand Down

0 comments on commit 72976f7

Please sign in to comment.