From ff10ea7adb278699a0996198641d796802588ea7 Mon Sep 17 00:00:00 2001 From: kostas Date: Tue, 24 Oct 2023 20:42:03 +0300 Subject: [PATCH] chore(reply_builder): add dcheck that each command invocation has replied --- src/facade/reply_builder.cc | 12 +++++++++++- src/facade/reply_builder.h | 5 +++++ src/facade/reply_capture.cc | 1 + src/server/main_service.cc | 2 ++ 4 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/facade/reply_builder.cc b/src/facade/reply_builder.cc index 9221a0fbed5b..44bb7bf32051 100644 --- a/src/facade/reply_builder.cc +++ b/src/facade/reply_builder.cc @@ -37,7 +37,7 @@ DoubleToStringConverter dfly_conv(kConvFlags, "inf", "nan", 'e', -6, 21, 6, 0); } // namespace SinkReplyBuilder::SinkReplyBuilder(::io::Sink* sink) - : sink_(sink), should_batch_(false), should_aggregate_(false) { + : sink_(sink), should_batch_(false), should_aggregate_(false), has_replied_(false) { } void SinkReplyBuilder::CloseConnection() { @@ -49,6 +49,8 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) { DCHECK(sink_); constexpr size_t kMaxBatchSize = 1024; + has_replied_ = true; + size_t bsize = 0; for (unsigned i = 0; i < len; ++i) { bsize += v[i].iov_len; @@ -92,6 +94,14 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) { } } +void SinkReplyBuilder::ExpectReply() { + has_replied_ = false; +} + +bool SinkReplyBuilder::HasReplied() const { + return has_replied_; +} + void SinkReplyBuilder::SendRaw(std::string_view raw) { iovec v = {IoVec(raw)}; diff --git a/src/facade/reply_builder.h b/src/facade/reply_builder.h index e88db9bae913..81de917e589b 100644 --- a/src/facade/reply_builder.h +++ b/src/facade/reply_builder.h @@ -113,6 +113,9 @@ class SinkReplyBuilder { bool is_nested_ = true; }; + void ExpectReply(); + bool HasReplied() const; + protected: void SendRaw(std::string_view str); // Sends raw without any formatting. void SendRawVec(absl::Span msg_vec); @@ -134,6 +137,8 @@ class SinkReplyBuilder { // Similarly to batch mode but is controlled by at operation level. bool should_aggregate_ : 1; + bool has_replied_ : 1; + bool has_batch_replied_ : 1; }; class MCReplyBuilder : public SinkReplyBuilder { diff --git a/src/facade/reply_capture.cc b/src/facade/reply_capture.cc index b15b242604fd..d9fff6501d87 100644 --- a/src/facade/reply_capture.cc +++ b/src/facade/reply_capture.cc @@ -130,6 +130,7 @@ void CapturingReplyBuilder::SendDirect(Payload&& val) { } void CapturingReplyBuilder::Capture(Payload val) { + has_replied_ = true; if (!stack_.empty()) { stack_.top().first->arr.push_back(std::move(val)); stack_.top().second--; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 4bdcbb600fb2..db1692b97a79 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1066,12 +1066,14 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo DispatchMonitor(cntx, cid, tail_args); } + cntx->reply_builder()->ExpectReply(); try { cid->Invoke(tail_args, cntx); } catch (std::exception& e) { LOG(ERROR) << "Internal error, system probably unstable " << e.what(); return false; } + DCHECK(cntx->reply_builder()->HasReplied()); if (record_stats) { DCHECK(cntx->transaction);