Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove blpop FindFirst hop after wakeup #1168

Merged
merged 4 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/server/blocking_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ void BlockingController::NotifyWatchQueue(std::string_view key, WatchQueueMap* w
Transaction* head = wi.get();
DVLOG(2) << "WQ-Pop " << head->DebugId() << " from key " << key;

if (head->NotifySuspended(owner_->committed_txid(), sid)) {
if (head->NotifySuspended(owner_->committed_txid(), sid, key)) {
// We deliberately keep the notified transaction in the queue to know which queue
// must handled when this transaction finished.
wq->notify_txid = owner_->committed_txid();
Expand Down
105 changes: 45 additions & 60 deletions src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ struct ShardFFResult {
};

// Used by bpopper.
OpResult<ShardFFResult> FindFirst(bool awaked_only, Transaction* trans) {
OpResult<ShardFFResult> FindFirst(Transaction* trans) {
VLOG(2) << "FindFirst::Find " << trans->DebugId();

// Holds Find results: (iterator to a found key, and its index in the passed arguments).
Expand All @@ -145,31 +145,17 @@ OpResult<ShardFFResult> FindFirst(bool awaked_only, Transaction* trans) {
std::vector<OpResult<FFResult>> find_res(shard_set->size());
fill(find_res.begin(), find_res.end(), OpStatus::KEY_NOTFOUND);

// We must capture notify_txid before we spawn callbacks.
// Otherwise, consider the following scenario:
// 0. The key is added in shard 0, with notify_txid = 100
// 1. The cb runs first on shard1 and does not find anything.
// 2. A tx 99 runs on shard 1, adds a key, updates notify_txid to 99.
// 3. the cb on shard 0 runs and ignores the key due to lower notify_txid.
uint64_t notify_txid = trans->GetNotifyTxid();

auto cb = [&](Transaction* t, EngineShard* shard) {
auto args = t->GetShardArgs(shard->shard_id());
// if requested to consider awaked shards only, we check the AWAKED_Q flag.
if (awaked_only && (t->GetLocalMask(shard->shard_id()) & Transaction::AWAKED_Q) == 0) {
return OpStatus::OK;
}

if (shard->committed_txid() <= notify_txid) {
OpResult<pair<PrimeIterator, unsigned>> ff_res =
shard->db_slice().FindFirst(t->GetDbContext(), args);
OpResult<pair<PrimeIterator, unsigned>> ff_res =
shard->db_slice().FindFirst(t->GetDbContext(), args);

if (ff_res) {
FFResult ff_result(ff_res->first->first.AsRef(), ff_res->second);
find_res[shard->shard_id()] = move(ff_result);
} else {
find_res[shard->shard_id()] = ff_res.status();
}
if (ff_res) {
FFResult ff_result(ff_res->first->first.AsRef(), ff_res->second);
find_res[shard->shard_id()] = move(ff_result);
} else {
find_res[shard->shard_id()] = ff_res.status();
}

return OpStatus::OK;
Expand Down Expand Up @@ -229,6 +215,7 @@ class BPopper {

private:
void Pop(Transaction* t, EngineShard* shard);
void OpPop(Transaction* t, EngineShard* shard);

ListDir dir_;

Expand Down Expand Up @@ -258,25 +245,25 @@ BPopper::BPopper(ListDir dir) : dir_(dir) {
}

OpStatus BPopper::Run(Transaction* trans, unsigned msec) {
time_point tp =
msec ? chrono::steady_clock::now() + chrono::milliseconds(msec) : time_point::max();
auto tp = msec ? chrono::steady_clock::now() + chrono::milliseconds(msec) : time_point::max();
bool is_multi = trans->IsMulti();

trans->Schedule();

auto* stats = ServerState::tl_connection_stats();

OpResult<ShardFFResult> result = FindFirst(false, trans);
OpResult<ShardFFResult> result = FindFirst(trans);

if (result.status() == OpStatus::KEY_NOTFOUND) {
if (result.ok()) {
ff_result_ = move(result.value());
} else if (result.status() == OpStatus::KEY_NOTFOUND) {
// Close transaction and return.
if (is_multi) {
// close transaction and return.
auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
trans->Execute(std::move(cb), true);

return OpStatus::TIMED_OUT;
}

// Block
auto wcb = [](Transaction* t, EngineShard* shard) {
return t->GetShardArgs(shard->shard_id());
};
Expand All @@ -288,25 +275,16 @@ OpStatus BPopper::Run(Transaction* trans, unsigned msec) {

if (!wait_succeeded)
return OpStatus::TIMED_OUT;

// Now we have something for sure.
result = FindFirst(true, trans); // retry - must find something.
}

if (!result) {
} else {
// Could be the wrong-type error.
// cleanups, locks removal etc.
auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
trans->Execute(std::move(cb), true);

DCHECK_NE(result.status(), OpStatus::KEY_NOTFOUND);

return result.status();
}

VLOG(1) << "Popping an element " << trans->DebugId();
ff_result_ = move(result.value());

auto cb = [this](Transaction* t, EngineShard* shard) {
Pop(t, shard);
return OpStatus::OK;
Expand All @@ -317,27 +295,35 @@ OpStatus BPopper::Run(Transaction* trans, unsigned msec) {
}

void BPopper::Pop(Transaction* t, EngineShard* shard) {
if (shard->shard_id() == ff_result_.sid) {
if (auto wake_key = t->GetWakeKey(shard->shard_id()); wake_key) {
key_ = *wake_key;
OpPop(t, shard);
} else if (shard->shard_id() == ff_result_.sid) {
ff_result_.key.GetString(&key_);
auto& db_slice = shard->db_slice();
auto it_res = db_slice.Find(t->GetDbContext(), key_, OBJ_LIST);
CHECK(it_res) << t->DebugId() << " " << key_; // must exist and must be ok.
PrimeIterator it = *it_res;
quicklist* ql = GetQL(it->second);

DVLOG(2) << "popping from " << key_ << " " << t->DebugId();
db_slice.PreUpdate(t->GetDbIndex(), it);
value_ = ListPop(dir_, ql);
db_slice.PostUpdate(t->GetDbIndex(), it, key_);
if (quicklistCount(ql) == 0) {
DVLOG(1) << "deleting key " << key_ << " " << t->DebugId();
CHECK(shard->db_slice().Del(t->GetDbIndex(), it));
}
OpArgs op_args = t->GetOpArgs(shard);
if (op_args.shard->journal()) {
string command = dir_ == ListDir::LEFT ? "LPOP" : "RPOP";
RecordJournal(op_args, command, ArgSlice{key_}, 1);
}
OpPop(t, shard);
}
}

void BPopper::OpPop(Transaction* t, EngineShard* shard) {
auto& db_slice = shard->db_slice();
auto it_res = db_slice.Find(t->GetDbContext(), key_, OBJ_LIST);
CHECK(it_res) << t->DebugId() << " " << key_; // must exist and must be ok.
PrimeIterator it = *it_res;

quicklist* ql = GetQL(it->second);

DVLOG(2) << "popping from " << key_ << " " << t->DebugId();
db_slice.PreUpdate(t->GetDbIndex(), it);
value_ = ListPop(dir_, ql);
db_slice.PostUpdate(t->GetDbIndex(), it, key_);
if (quicklistCount(ql) == 0) {
DVLOG(1) << "deleting key " << key_ << " " << t->DebugId();
CHECK(shard->db_slice().Del(t->GetDbIndex(), it));
}
OpArgs op_args = t->GetOpArgs(shard);
if (op_args.shard->journal()) {
string command = dir_ == ListDir::LEFT ? "LPOP" : "RPOP";
RecordJournal(op_args, command, ArgSlice{key_}, 1);
}
}

Expand Down Expand Up @@ -1007,7 +993,6 @@ OpResult<string> BPopPusher::RunSingle(Transaction* t, time_point tp) {

// Block
++stats->num_blocked_clients;

bool wait_succeeded = t->WaitOnWatch(tp, std::move(wcb));
--stats->num_blocked_clients;

Expand Down
75 changes: 3 additions & 72 deletions src/server/list_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,82 +233,13 @@ TEST_F(ListFamilyTest, BLPopMultiPush) {

pop_fb.Join();

// We can't determine what key was popped, so only check result presence.
// It might not be first kKey3 "C" because of squashing and re-ordering.
ASSERT_THAT(blpop_resp, ArrLen(2));
auto resp_arr = blpop_resp.GetVec();
EXPECT_THAT(resp_arr, ElementsAre(kKey1, "A"));
ASSERT_THAT(Run({"exists", kKey1, kKey2, kKey3}), IntArg(2));
ASSERT_EQ(0, NumWatched());
}

TEST_F(ListFamilyTest, BLPopSerialize) {
RespExpr blpop_resp;

auto pop_fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"});
});

WaitUntilLocked(0, kKey1);

LOG(INFO) << "Starting multi";

TxClock cl1, cl2;

auto p1_fb = pp_->at(1)->LaunchFiber([&] {
// auto resp = Run({"multi"}); // We use multi to assign ts to lpush.
// ASSERT_EQ(resp, "OK");
Run({"lpush", kKey1, "A"});

/*for (unsigned i = 0; i < 10; ++i) {
// dummy command to prolong this transaction and make convergence more complicated.
Run({"exists", kKey1, kKey2, kKey3});
}

resp = Run({"exec"});

// Either this lpush has run first or the one below.
// In any case it must be that between 2 invocations of lpush (wrapped in multi)
// blpop will be triggered and it will empty the list again. Hence, in any case
// lpush kKey1 here and below should return 1.
ASSERT_THAT(resp, ArrLen(11));*/
cl1 = GetDebugInfo("IO1").clock;
LOG(INFO) << "push1 ts: " << cl1;
});

auto p2_fb = pp_->at(2)->LaunchFiber([&] {
auto resp = Run({"multi"}); // We use multi to assign ts to lpush.
ASSERT_EQ(resp, "OK");
for (unsigned i = 0; i < 10; ++i) {
// dummy command to prolong this transaction and make convergence more complicated.
Run({"exists", kKey1, kKey2, kKey3});
}
Run({"lpush", kKey1, "B"});
Run({"lpush", kKey2, "C"});

resp = Run({"exec"});

ASSERT_THAT(resp, ArrLen(12));
/*auto sub_arr = resp.GetVec();
EXPECT_THAT(sub_arr[0], IntArg(1));
EXPECT_THAT(sub_arr[1], IntArg(1));*/

cl2 = GetDebugInfo("IO2").clock;
LOG(INFO) << "push2 ts: " << cl2;
});

p1_fb.Join();
p2_fb.Join();

pop_fb.Join();
ASSERT_THAT(blpop_resp, ArrLen(2));
auto resp_arr = blpop_resp.GetVec();
EXPECT_THAT(resp_arr, ElementsAre(kKey1, ArgType(RespExpr::STRING)));

if (cl2 < cl1) {
EXPECT_EQ(resp_arr[1], "B");
} else {
EXPECT_EQ(resp_arr[1], "A");
}
}

TEST_F(ListFamilyTest, WrongTypeDoesNotWake) {
RespExpr blpop_resp;

Expand Down
39 changes: 25 additions & 14 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,7 @@ bool Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provi

auto wake_cb = [this] {
return (coordinator_state_ & COORD_CANCELLED) ||
notify_txid_.load(memory_order_relaxed) != kuint64max;
wakeup_requested_.load(memory_order_relaxed) > 0;
};

cv_status status = cv_status::no_timeout;
Expand Down Expand Up @@ -1257,7 +1257,7 @@ bool Transaction::IsGlobal() const {
// Runs only in the shard thread.
// Returns true if the transacton has changed its state from suspended to awakened,
// false, otherwise.
bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) {
bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid, string_view key) {
unsigned idx = SidToId(sid);
auto& sd = shard_data_[idx];
unsigned local_mask = sd.local_mask;
Expand All @@ -1266,6 +1266,12 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) {
return false;
}

// Wake a transaction only once on the first notify.
// We don't care about preserving the strict order with multiple operations running on blocking
// keys in parallel, because the internal order is not observable from outside either way.
if (wakeup_requested_.fetch_add(1, memory_order_relaxed) > 0)
return false;

DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask << " by commited_id "
<< committed_txid;

Expand All @@ -1277,20 +1283,25 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) {
sd.local_mask &= ~SUSPENDED_Q;
sd.local_mask |= AWAKED_Q;

TxId notify_id = notify_txid_.load(memory_order_relaxed);

while (committed_txid < notify_id) {
if (notify_txid_.compare_exchange_weak(notify_id, committed_txid, memory_order_relaxed)) {
// if we improved notify_txid_ - break.
blocking_ec_.notify(); // release barrier.
break;
}
}
return true;
// Find index of awakened key.
auto args = GetShardArgs(sid);
auto it =
find_if(args.begin(), args.end(), [key](auto arg) { return facade::ToSV(arg) == key; });
DCHECK(it != args.end());
sd.wake_key_pos = it - args.begin();
}

CHECK(sd.local_mask & AWAKED_Q);
return false;
blocking_ec_.notify();
return true;
}

optional<string_view> Transaction::GetWakeKey(ShardId sid) const {
auto& sd = shard_data_[SidToId(sid)];
if ((sd.local_mask & AWAKED_Q) == 0)
return nullopt;

CHECK_NE(sd.wake_key_pos, UINT16_MAX);
return GetShardArgs(sid).at(sd.wake_key_pos);
}

void Transaction::LogAutoJournalOnShard(EngineShard* shard) {
Expand Down
Loading