Skip to content

Commit

Permalink
chore: singlehop XREAD
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg committed Sep 28, 2024
1 parent 55699a9 commit 971718e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 11 deletions.
36 changes: 26 additions & 10 deletions src/server/stream_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2948,21 +2948,33 @@ void XReadGeneric2(CmdArgList args, ConnectionContext* cntx, bool read_group) {

auto* tx = cntx->transaction;

// Determine if streams have entries
// Determine if streams have entries or any error occured
AggregateValue<optional<facade::ErrorReply>> err;
atomic_bool have_entries = false;

auto cb = [&](auto* tx, auto* es) {
// With a single shard we can call OpRead in a single hop, falling back to
// avoid concluding if no entries are available.
bool try_fastread = tx->GetUniqueShardCnt() == 1;
vector<RecordVec> fastread_prefetched;

auto cb = [&](auto* tx, auto* es) -> Transaction::RunnableResult {
auto op_args = tx->GetOpArgs(es);
for (string_view skey : tx->GetShardArgs(es->shard_id())) {
if (auto res = HasEntries2(op_args, skey, &*opts); holds_alternative<facade::ErrorReply>(res))
err = get<facade::ErrorReply>(res);
else if (holds_alternative<bool>(res) && get<bool>(res))
have_entries.store(true, memory_order_relaxed);
}

if (try_fastread) {
if (have_entries.load(memory_order_relaxed))
fastread_prefetched = OpRead(tx->GetOpArgs(es), tx->GetShardArgs(es->shard_id()), *opts);
else
return {OpStatus::OK, Transaction::RunnableResult::AVOID_CONCLUDING};
}
return OpStatus::OK;
};
tx->Execute(cb, false);
tx->Execute(cb, try_fastread);

if (err) {
tx->Conclude();
Expand All @@ -2973,13 +2985,17 @@ void XReadGeneric2(CmdArgList args, ConnectionContext* cntx, bool read_group) {
return XReadBlock(&*opts, cntx);

vector<vector<RecordVec>> xread_resp;
xread_resp.resize(shard_set->size());
auto read_cb = [&](Transaction* t, EngineShard* shard) {
ShardId sid = shard->shard_id();
xread_resp[sid] = OpRead(t->GetOpArgs(shard), t->GetShardArgs(sid), *opts);
return OpStatus::OK;
};
tx->Execute(std::move(read_cb), true);
if (try_fastread && have_entries.load(memory_order_relaxed)) {
xread_resp = {std::move(fastread_prefetched)};
} else {
xread_resp.resize(shard_set->size());
auto read_cb = [&](Transaction* t, EngineShard* shard) {
ShardId sid = shard->shard_id();
xread_resp[sid] = OpRead(t->GetOpArgs(shard), t->GetShardArgs(sid), *opts);
return OpStatus::OK;
};
tx->Execute(std::move(read_cb), true);
}

// Count number of streams and merge final results in correct order
int resolved_streams = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/server/stream_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ TEST_F(StreamFamilyTest, XRead) {
// Receive all records from a single stream, in a single hop
auto resp = Run({"xread", "streams", "foo", "0"});
EXPECT_THAT(resp.GetVec(), ElementsAre("foo", ArrLen(3)));
// EXPECT_EQ(GetMetrics().shard_stats.tx_optimistic_total, 5u); todo temporary disabled
EXPECT_EQ(GetMetrics().shard_stats.tx_optimistic_total, 5u);

// Receive all records from both streams.
resp = Run({"xread", "streams", "foo", "bar", "0", "0"});
Expand Down

0 comments on commit 971718e

Please sign in to comment.