Skip to content

Commit

Permalink
chore(reply_builder): add dcheck that each command invocation has rep…
Browse files Browse the repository at this point in the history
…lied
  • Loading branch information
kostasrim committed Oct 24, 2023
1 parent 1a813ce commit ff10ea7
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 1 deletion.
12 changes: 11 additions & 1 deletion src/facade/reply_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ DoubleToStringConverter dfly_conv(kConvFlags, "inf", "nan", 'e', -6, 21, 6, 0);
} // namespace

SinkReplyBuilder::SinkReplyBuilder(::io::Sink* sink)
: sink_(sink), should_batch_(false), should_aggregate_(false) {
: sink_(sink), should_batch_(false), should_aggregate_(false), has_replied_(false) {
}

void SinkReplyBuilder::CloseConnection() {
Expand All @@ -49,6 +49,8 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
DCHECK(sink_);
constexpr size_t kMaxBatchSize = 1024;

has_replied_ = true;

size_t bsize = 0;
for (unsigned i = 0; i < len; ++i) {
bsize += v[i].iov_len;
Expand Down Expand Up @@ -92,6 +94,14 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
}
}

void SinkReplyBuilder::ExpectReply() {
has_replied_ = false;
}

bool SinkReplyBuilder::HasReplied() const {
return has_replied_;
}

void SinkReplyBuilder::SendRaw(std::string_view raw) {
iovec v = {IoVec(raw)};

Expand Down
5 changes: 5 additions & 0 deletions src/facade/reply_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ class SinkReplyBuilder {
bool is_nested_ = true;
};

void ExpectReply();
bool HasReplied() const;

protected:
void SendRaw(std::string_view str); // Sends raw without any formatting.
void SendRawVec(absl::Span<const std::string_view> msg_vec);
Expand All @@ -134,6 +137,8 @@ class SinkReplyBuilder {

// Similarly to batch mode but is controlled by at operation level.
bool should_aggregate_ : 1;
bool has_replied_ : 1;
bool has_batch_replied_ : 1;
};

class MCReplyBuilder : public SinkReplyBuilder {
Expand Down
1 change: 1 addition & 0 deletions src/facade/reply_capture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ void CapturingReplyBuilder::SendDirect(Payload&& val) {
}

void CapturingReplyBuilder::Capture(Payload val) {
has_replied_ = true;
if (!stack_.empty()) {
stack_.top().first->arr.push_back(std::move(val));
stack_.top().second--;
Expand Down
2 changes: 2 additions & 0 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1066,12 +1066,14 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo
DispatchMonitor(cntx, cid, tail_args);
}

cntx->reply_builder()->ExpectReply();
try {
cid->Invoke(tail_args, cntx);
} catch (std::exception& e) {
LOG(ERROR) << "Internal error, system probably unstable " << e.what();
return false;
}
DCHECK(cntx->reply_builder()->HasReplied());

if (record_stats) {
DCHECK(cntx->transaction);
Expand Down

0 comments on commit ff10ea7

Please sign in to comment.