diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index e7beb92613a9..0ea4c02bd7c0 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -2948,11 +2948,16 @@ void XReadGeneric2(CmdArgList args, ConnectionContext* cntx, bool read_group) { auto* tx = cntx->transaction; - // Determine if streams have entries + // Determine if streams have entries or any error occured AggregateValue> err; atomic_bool have_entries = false; - auto cb = [&](auto* tx, auto* es) { + // With a single shard we can call OpRead in a single hop, falling back to + // avoid concluding if no entries are available. + bool try_fastread = tx->GetUniqueShardCnt() == 1; + vector fastread_prefetched; + + auto cb = [&](auto* tx, auto* es) -> Transaction::RunnableResult { auto op_args = tx->GetOpArgs(es); for (string_view skey : tx->GetShardArgs(es->shard_id())) { if (auto res = HasEntries2(op_args, skey, &*opts); holds_alternative(res)) @@ -2960,9 +2965,16 @@ void XReadGeneric2(CmdArgList args, ConnectionContext* cntx, bool read_group) { else if (holds_alternative(res) && get(res)) have_entries.store(true, memory_order_relaxed); } + + if (try_fastread) { + if (have_entries.load(memory_order_relaxed)) + fastread_prefetched = OpRead(tx->GetOpArgs(es), tx->GetShardArgs(es->shard_id()), *opts); + else + return {OpStatus::OK, Transaction::RunnableResult::AVOID_CONCLUDING}; + } return OpStatus::OK; }; - tx->Execute(cb, false); + tx->Execute(cb, try_fastread); if (err) { tx->Conclude(); @@ -2973,13 +2985,17 @@ void XReadGeneric2(CmdArgList args, ConnectionContext* cntx, bool read_group) { return XReadBlock(&*opts, cntx); vector> xread_resp; - xread_resp.resize(shard_set->size()); - auto read_cb = [&](Transaction* t, EngineShard* shard) { - ShardId sid = shard->shard_id(); - xread_resp[sid] = OpRead(t->GetOpArgs(shard), t->GetShardArgs(sid), *opts); - return OpStatus::OK; - }; - tx->Execute(std::move(read_cb), true); + if (try_fastread && have_entries.load(memory_order_relaxed)) { + xread_resp = {std::move(fastread_prefetched)}; + } else { + xread_resp.resize(shard_set->size()); + auto read_cb = [&](Transaction* t, EngineShard* shard) { + ShardId sid = shard->shard_id(); + xread_resp[sid] = OpRead(t->GetOpArgs(shard), t->GetShardArgs(sid), *opts); + return OpStatus::OK; + }; + tx->Execute(std::move(read_cb), true); + } // Count number of streams and merge final results in correct order int resolved_streams = 0; diff --git a/src/server/stream_family_test.cc b/src/server/stream_family_test.cc index fd1cadfe8f02..bedde9b80df0 100644 --- a/src/server/stream_family_test.cc +++ b/src/server/stream_family_test.cc @@ -133,7 +133,7 @@ TEST_F(StreamFamilyTest, XRead) { // Receive all records from a single stream, in a single hop auto resp = Run({"xread", "streams", "foo", "0"}); EXPECT_THAT(resp.GetVec(), ElementsAre("foo", ArrLen(3))); - // EXPECT_EQ(GetMetrics().shard_stats.tx_optimistic_total, 5u); todo temporary disabled + EXPECT_EQ(GetMetrics().shard_stats.tx_optimistic_total, 5u); // Receive all records from both streams. resp = Run({"xread", "streams", "foo", "bar", "0", "0"});