Skip to content

Commit

Permalink
fix: broken memcached error reporting (#1741)
Browse files Browse the repository at this point in the history
* fix DispatchCommand error reporting when memcached protocol is used (one example is when we use SET command on the replica -- previously we crashed now we properly report an error)
* SendError(ErrorReply) moved to SinkReplyBuilder from RedisReplyBuilder
* SendError(OpStatus) moved to SinkReplyBuilder from RedisReplyBuilder
* added tests for SendError(ErrorReply) in RedisReplyBuilder
  • Loading branch information
kostasrim authored Aug 28, 2023
1 parent 35a5433 commit 1855c1c
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 58 deletions.
14 changes: 14 additions & 0 deletions src/facade/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class ConnectionContext {
}

// A convenient proxy for redis interface.
// Use with caution -- should only be used only
// in execution paths that are Redis *only*
RedisReplyBuilder* operator->();

SinkReplyBuilder* reply_builder() {
Expand All @@ -50,6 +52,18 @@ class ConnectionContext {
return res;
}

void SendError(std::string_view str, std::string_view type = std::string_view{}) {
rbuilder_->SendError(str, type);
}

void SendError(ErrorReply&& error) {
rbuilder_->SendError(std::move(error));
}

void SendSimpleString(std::string_view str) {
rbuilder_->SendSimpleString(str);
}

// connection state / properties.
bool conn_closing : 1;
bool req_auth : 1;
Expand Down
36 changes: 35 additions & 1 deletion src/facade/op_status.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,37 @@
#include "facade/op_status.h"

namespace facade {} // namespace facade
#include "base/logging.h"
#include "facade/error.h"
#include "facade/resp_expr.h"

namespace facade {

std::string_view StatusToMsg(OpStatus status) {
switch (status) {
case OpStatus::OK:
return "OK";
case OpStatus::KEY_NOTFOUND:
return kKeyNotFoundErr;
case OpStatus::WRONG_TYPE:
return kWrongTypeErr;
case OpStatus::OUT_OF_RANGE:
return kIndexOutOfRange;
case OpStatus::INVALID_FLOAT:
return kInvalidFloatErr;
case OpStatus::INVALID_INT:
return kInvalidIntErr;
case OpStatus::SYNTAX_ERR:
return kSyntaxErr;
case OpStatus::OUT_OF_MEMORY:
return kOutOfMemory;
case OpStatus::BUSY_GROUP:
return "-BUSYGROUP Consumer Group name already exists";
case OpStatus::INVALID_NUMERIC_RESULT:
return kInvalidNumericResult;
default:
LOG(ERROR) << "Unsupported status " << status;
return "Internal error";
}
}

} // namespace facade
2 changes: 2 additions & 0 deletions src/facade/op_status.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ inline bool operator==(OpStatus st, const OpResultBase& ob) {
return ob.operator==(st);
}

std::string_view StatusToMsg(OpStatus status);

} // namespace facade

namespace std {
Expand Down
60 changes: 16 additions & 44 deletions src/facade/reply_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,22 @@ void SinkReplyBuilder::SendRaw(std::string_view raw) {
Send(&v, 1);
}

void SinkReplyBuilder::SendError(ErrorReply error) {
if (error.status)
return SendError(*error.status);

string_view message_sv = visit([](auto&& str) -> string_view { return str; }, error.message);
SendError(message_sv, error.kind);
}

void SinkReplyBuilder::SendError(OpStatus status) {
if (status == OpStatus::OK) {
SendOk();
} else {
SendError(StatusToMsg(status));
}
}

void SinkReplyBuilder::SendRawVec(absl::Span<const std::string_view> msg_vec) {
absl::FixedArray<iovec, 16> arr(msg_vec.size());

Expand Down Expand Up @@ -223,14 +239,6 @@ void RedisReplyBuilder::SendError(string_view str, string_view err_type) {
}
}

void RedisReplyBuilder::SendError(ErrorReply error) {
if (error.status)
return SendError(*error.status);

string_view message_sv = visit([](auto&& str) -> string_view { return str; }, error.message);
SendError(message_sv, error.kind);
}

void RedisReplyBuilder::SendProtocolError(std::string_view str) {
SendError(absl::StrCat("-ERR Protocol error: ", str), "protocol_error");
}
Expand Down Expand Up @@ -277,42 +285,6 @@ void RedisReplyBuilder::SendBulkString(std::string_view str) {
return Send(v, ABSL_ARRAYSIZE(v));
}

std::string_view RedisReplyBuilder::StatusToMsg(OpStatus status) {
switch (status) {
case OpStatus::OK:
return "OK";
case OpStatus::KEY_NOTFOUND:
return kKeyNotFoundErr;
case OpStatus::WRONG_TYPE:
return kWrongTypeErr;
case OpStatus::OUT_OF_RANGE:
return kIndexOutOfRange;
case OpStatus::INVALID_FLOAT:
return kInvalidFloatErr;
case OpStatus::INVALID_INT:
return kInvalidIntErr;
case OpStatus::SYNTAX_ERR:
return kSyntaxErr;
case OpStatus::OUT_OF_MEMORY:
return kOutOfMemory;
case OpStatus::BUSY_GROUP:
return "-BUSYGROUP Consumer Group name already exists";
case OpStatus::INVALID_NUMERIC_RESULT:
return kInvalidNumericResult;
default:
LOG(ERROR) << "Unsupported status " << status;
return "Internal error";
}
}

void RedisReplyBuilder::SendError(OpStatus status) {
if (status == OpStatus::OK) {
SendOk();
} else {
SendError(StatusToMsg(status));
}
}

void RedisReplyBuilder::SendLong(long num) {
string str = absl::StrCat(":", num, kCRLF);
SendRaw(str);
Expand Down
9 changes: 3 additions & 6 deletions src/facade/reply_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class SinkReplyBuilder {
}

virtual void SendError(std::string_view str, std::string_view type = {}) = 0; // MC and Redis
virtual void SendError(ErrorReply error);
virtual void SendError(OpStatus status);

virtual void SendStored() = 0; // Reply for set commands.
virtual void SendSetSkipped() = 0;
Expand Down Expand Up @@ -177,13 +179,12 @@ class RedisReplyBuilder : public SinkReplyBuilder {
void SetResp3(bool is_resp3);

void SendError(std::string_view str, std::string_view type = {}) override;
virtual void SendError(ErrorReply error);
using SinkReplyBuilder::SendError;

void SendMGetResponse(absl::Span<const OptResp>) override;

void SendStored() override;
void SendSetSkipped() override;
virtual void SendError(OpStatus status);
void SendProtocolError(std::string_view str) override;

virtual void SendNullArray(); // Send *-1
Expand All @@ -206,10 +207,6 @@ class RedisReplyBuilder : public SinkReplyBuilder {

static char* FormatDouble(double val, char* dest, unsigned dest_len);

// You normally should not call this - maps the status
// into the string that would be sent
static std::string_view StatusToMsg(OpStatus status);

protected:
struct WrappedStrSpan : public StrSpan {
size_t Size() const;
Expand Down
29 changes: 27 additions & 2 deletions src/facade/reply_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ TEST_F(RedisReplyBuilderTest, ErrorBuiltInMessage) {
OpStatus::OUT_OF_MEMORY, OpStatus::INVALID_FLOAT, OpStatus::INVALID_INT,
OpStatus::SYNTAX_ERR, OpStatus::BUSY_GROUP, OpStatus::INVALID_NUMERIC_RESULT};
for (const auto& err : error_codes) {
const std::string_view error_name = RedisReplyBuilder::StatusToMsg(err);
const std::string_view error_name = StatusToMsg(err);
const std::string_view error_type = GetErrorType(error_name);

sink_.Clear();
Expand All @@ -251,14 +251,39 @@ TEST_F(RedisReplyBuilderTest, ErrorBuiltInMessage) {
}
}

TEST_F(RedisReplyBuilderTest, ErrorReplyBuiltInMessage) {
ErrorReply err{OpStatus::OUT_OF_RANGE};
builder_->SendError(err);
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart));
ASSERT_TRUE(absl::EndsWith(str(), kCRLF));
ASSERT_EQ(builder_->err_count().at(kIndexOutOfRange), 1);
ASSERT_EQ(str(), BuildExpectedErrorString(kIndexOutOfRange));

auto parsing_output = Parse();
ASSERT_TRUE(parsing_output.Verify(SinkSize()));
ASSERT_TRUE(parsing_output.IsError());
sink_.Clear();

err = ErrorReply{"e1", "e2"};
builder_->SendError(err);
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart));
ASSERT_TRUE(absl::EndsWith(str(), kCRLF));
ASSERT_EQ(builder_->err_count().at("e2"), 1);
ASSERT_EQ(str(), BuildExpectedErrorString("e1"));

parsing_output = Parse();
ASSERT_TRUE(parsing_output.Verify(SinkSize()));
ASSERT_TRUE(parsing_output.IsError());
}

TEST_F(RedisReplyBuilderTest, ErrorNoneBuiltInMessage) {
// All these op codes creating the same error message
OpStatus none_unique_codes[] = {OpStatus::ENTRIES_ADDED_SMALL, OpStatus::SKIPPED,
OpStatus::KEY_EXISTS, OpStatus::INVALID_VALUE,
OpStatus::TIMED_OUT, OpStatus::STREAM_ID_SMALL};
uint64_t error_count = 0;
for (const auto& err : none_unique_codes) {
const std::string_view error_name = RedisReplyBuilder::StatusToMsg(err);
const std::string_view error_name = StatusToMsg(err);
const std::string_view error_type = GetErrorType(error_name);

sink_.Clear();
Expand Down
8 changes: 4 additions & 4 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
const auto [cid, args_no_cmd] = FindCmd(args);

if (cid == nullptr) {
return (*cntx)->SendError(ReportUnknownCmd(ArgS(args, 0)));
return cntx->SendError(ReportUnknownCmd(ArgS(args, 0)));
}

ConnectionContext* dfly_cntx = static_cast<ConnectionContext*>(cntx);
Expand All @@ -899,7 +899,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
if (auto& exec_info = dfly_cntx->conn_state.exec_info; exec_info.IsCollecting())
exec_info.state = ConnectionState::ExecInfo::EXEC_ERROR;

(*dfly_cntx)->SendError(std::move(*err));
dfly_cntx->SendError(std::move(*err));
return;
}

Expand All @@ -909,13 +909,13 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
StoredCmd stored_cmd{cid, args_no_cmd};
dfly_cntx->conn_state.exec_info.body.push_back(std::move(stored_cmd));

return (*cntx)->SendSimpleString("QUEUED");
return cntx->SendSimpleString("QUEUED");
}

uint64_t start_ns = absl::GetCurrentTimeNanos();

if (cid->opt_mask() & CO::DENYOOM) {
int64_t used_memory = etl.GetUsedMemory(start_ns);
uint64_t used_memory = etl.GetUsedMemory(start_ns);
double oom_deny_ratio = GetFlag(FLAGS_oom_deny_ratio);
if (used_memory > (max_memory_limit * oom_deny_ratio)) {
return cntx->reply_builder()->SendError(kOutOfMemory);
Expand Down
2 changes: 1 addition & 1 deletion tests/dragonfly/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def admin_port(self) -> int:
def mc_port(self) -> int:
if self.params.existing_mc_port:
return self.params.existing_mc_port
return int(self.args.get("mc_port", "11211"))
return int(self.args.get("memcached_port", "11211"))

@staticmethod
def format_args(args):
Expand Down
35 changes: 35 additions & 0 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from redis import asyncio as aioredis
from .utility import *
from . import DflyInstanceFactory, dfly_args
import pymemcache
import logging

BASE_PORT = 1111
Expand Down Expand Up @@ -1506,3 +1507,37 @@ async def test_replicaof_flag_disconnect(df_local_factory):

role = await c_replica.role()
assert role[0] == b"master"


@pytest.mark.asyncio
async def test_df_crash_on_memcached_error(df_local_factory):
master = df_local_factory.create(
port=BASE_PORT,
memcached_port=11211,
proactor_threads=2,
)

replica = df_local_factory.create(
port=master.port + 1,
memcached_port=master.mc_port + 1,
proactor_threads=2,
)

master.start()
replica.start()

c_master = aioredis.Redis(port=master.port)
await wait_available_async(c_master)

c_replica = aioredis.Redis(port=replica.port)
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_available_async(c_replica)
await wait_for_replica_status(c_replica, status="up")
await c_replica.close()

memcached_client = pymemcache.Client(f"localhost:{replica.mc_port}")

with pytest.raises(pymemcache.exceptions.MemcacheClientError):
memcached_client.set(b"key", b"data", noreply=False)

await c_master.close()

0 comments on commit 1855c1c

Please sign in to comment.