Skip to content

Commit

Permalink
fix: fix memcache bugs
Browse files Browse the repository at this point in the history
1. If the first request sent to the connection is large (2kb or more)
   Dragonfly was closing the connection.
2. Changed server side error reporting according to memcache protocol:
   https://github.com/memcached/memcached/blob/master/doc/protocol.txt#L172
3. Fixed the wrong casting in DispatchCommand.
4. Remove practically unused code that translated opstatus to strings.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Aug 27, 2023
1 parent f1ac4b0 commit f5ef274
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 70 deletions.
22 changes: 18 additions & 4 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ bool MatchHttp11Line(string_view line) {
}

constexpr size_t kMinReadSize = 256;
constexpr size_t kMaxReadSize = 32_KB;
constexpr size_t kMaxReadSize = 64_KB;

constexpr size_t kMaxDispatchQMemory = 5_MB;

Expand Down Expand Up @@ -467,6 +467,9 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {

// Main loop.
if (parse_status != ERROR && !ec) {
if (io_buf_.AppendLen() < 64) {
io_buf_.EnsureCapacity(io_buf_.Capacity() * 2);
}
auto res = IoLoop(peer, orig_builder);

if (holds_alternative<error_code>(res)) {
Expand Down Expand Up @@ -646,8 +649,8 @@ auto Connection::ParseMemcache() -> ParserStatus {
return NEED_MORE;
}

if (result == MemcacheParser::PARSE_ERROR) {
builder->SendError(""); // ERROR.
if (result == MemcacheParser::PARSE_ERROR || result == MemcacheParser::UNKNOWN_CMD) {
builder->SendSimpleString("ERROR");
} else if (result == MemcacheParser::BAD_DELTA) {
builder->SendClientError("invalid numeric delta argument");
} else if (result != MemcacheParser::OK) {
Expand Down Expand Up @@ -679,6 +682,8 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil
FetchBuilderStats(stats_, orig_builder);

io::MutableBytes append_buf = io_buf_.AppendBuffer();
DCHECK(!append_buf.empty());

phase_ = READ_SOCKET;

::io::Result<size_t> recv_sz = peer->Recv(append_buf);
Expand Down Expand Up @@ -713,7 +718,9 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil

if (parser_hint > capacity) {
io_buf_.Reserve(std::min(kMaxReadSize, parser_hint));
} else if (append_buf.size() == *recv_sz && append_buf.size() > capacity / 2) {
}

if (io_buf_.AppendLen() < 64u) {
// Last io used most of the io_buf to the end.
io_buf_.Reserve(capacity * 2); // Valid growth range.
}
Expand All @@ -722,6 +729,13 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil
VLOG(1) << "Growing io_buf to " << io_buf_.Capacity();
stats_->read_buf_capacity += (io_buf_.Capacity() - capacity);
}
DCHECK_GT(io_buf_.AppendLen(), 0U);
} else if (io_buf_.AppendLen() == 0) {
// We have a full buffer and we can not progress with parsing.
// This means that we have request too large.
LOG(ERROR) << "Request is too large, closing connection";
parse_status = ERROR;
break;
}
} else if (parse_status != OK) {
break;
Expand Down
47 changes: 1 addition & 46 deletions src/facade/op_status.cc
Original file line number Diff line number Diff line change
@@ -1,48 +1,3 @@
#include "facade/op_status.h"

namespace facade {

const char* DebugString(OpStatus op) {
switch (op) {
case OpStatus::OK:
return "OK";
case OpStatus::KEY_EXISTS:
return "KEY EXISTS";
case OpStatus::KEY_NOTFOUND:
return "KEY NOTFOUND";
case OpStatus::SKIPPED:
return "SKIPPED";
case OpStatus::INVALID_VALUE:
return "INVALID VALUE";
case OpStatus::OUT_OF_RANGE:
return "OUT OF RANGE";
case OpStatus::WRONG_TYPE:
return "WRONG TYPE";
case OpStatus::TIMED_OUT:
return "TIMED OUT";
case OpStatus::OUT_OF_MEMORY:
return "OUT OF MEMORY";
case OpStatus::INVALID_FLOAT:
return "INVALID FLOAT";
case OpStatus::INVALID_INT:
return "INVALID INT";
case OpStatus::SYNTAX_ERR:
return "INVALID SYNTAX";
case OpStatus::BUSY_GROUP:
return "BUSY GROUP";
case OpStatus::STREAM_ID_SMALL:
return "STREAM ID TO SMALL";
case OpStatus::ENTRIES_ADDED_SMALL:
return "ENTRIES ADDED IS TO SMALL";
case OpStatus::INVALID_NUMERIC_RESULT:
return "INVALID NUMERIC RESULT";
case OpStatus::CANCELLED:
return "CANCELLED";
}
return "Unknown Error Code"; // we should not be here, but this is how enums works in c++
}
const char* OpResultBase::DebugFormat() const {
return DebugString(st_);
}

} // namespace facade
namespace facade {} // namespace facade
2 changes: 0 additions & 2 deletions src/facade/op_status.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ enum class OpStatus : uint16_t {
CANCELLED,
};

const char* DebugString(OpStatus op);

class OpResultBase {
public:
OpResultBase(OpStatus st = OpStatus::OK) : st_(st) {
Expand Down
2 changes: 1 addition & 1 deletion src/facade/reply_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ void MCReplyBuilder::SendMGetResponse(absl::Span<const OptResp> arr) {
}

void MCReplyBuilder::SendError(string_view str, std::string_view type) {
SendSimpleString("ERROR");
SendSimpleString(absl::StrCat("SERVER_ERROR ", str));
}

void MCReplyBuilder::SendProtocolError(std::string_view str) {
Expand Down
24 changes: 9 additions & 15 deletions src/facade/reply_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,25 +232,22 @@ TEST_F(RedisReplyBuilderTest, ErrorBuiltInMessage) {
OpStatus::OUT_OF_MEMORY, OpStatus::INVALID_FLOAT, OpStatus::INVALID_INT,
OpStatus::SYNTAX_ERR, OpStatus::BUSY_GROUP, OpStatus::INVALID_NUMERIC_RESULT};
for (const auto& err : error_codes) {
const std::string_view error_code_name = DebugString(err);
const std::string_view error_name = RedisReplyBuilder::StatusToMsg(err);
const std::string_view error_type = GetErrorType(error_name);

sink_.Clear();
builder_->SendError(err);
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart))
<< " invalid start char for " << error_code_name;
ASSERT_TRUE(absl::EndsWith(str(), kCRLF))
<< " failed to find correct termination at " << error_code_name;
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart)) << " invalid start char for " << err;
ASSERT_TRUE(absl::EndsWith(str(), kCRLF)) << " failed to find correct termination at " << err;
ASSERT_EQ(builder_->err_count().at(error_type), 1)
<< " number of error count is invalid for " << error_code_name;
<< " number of error count is invalid for " << err;
ASSERT_EQ(str(), BuildExpectedErrorString(error_name))
<< " error different from expected - '" << str() << "'";

auto parsing_output = Parse();
ASSERT_TRUE(parsing_output.Verify(SinkSize()))
<< " verify for the result is invalid for " << error_code_name;
ASSERT_TRUE(parsing_output.IsError()) << " expecting error for " << error_code_name;
<< " verify for the result is invalid for " << err;
ASSERT_TRUE(parsing_output.IsError()) << " expecting error for " << err;
}
}

Expand All @@ -261,24 +258,21 @@ TEST_F(RedisReplyBuilderTest, ErrorNoneBuiltInMessage) {
OpStatus::TIMED_OUT, OpStatus::STREAM_ID_SMALL};
uint64_t error_count = 0;
for (const auto& err : none_unique_codes) {
const std::string_view error_code_name = DebugString(err);
const std::string_view error_name = RedisReplyBuilder::StatusToMsg(err);
const std::string_view error_type = GetErrorType(error_name);

sink_.Clear();
builder_->SendError(err);
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart))
<< " invalid start char for " << error_code_name;
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart)) << " invalid start char for " << err;
ASSERT_TRUE(absl::EndsWith(str(), kCRLF));
auto current_error_count = builder_->err_count().at(error_type);
error_count++;
ASSERT_EQ(current_error_count, error_count)
<< " number of error count is invalid for " << error_code_name;
ASSERT_EQ(current_error_count, error_count) << " number of error count is invalid for " << err;
auto parsing_output = Parse();
ASSERT_TRUE(parsing_output.Verify(SinkSize()))
<< " verify for the result is invalid for " << error_code_name;
<< " verify for the result is invalid for " << err;

ASSERT_TRUE(parsing_output.IsError()) << " expecting error for " << error_code_name;
ASSERT_TRUE(parsing_output.IsError()) << " expecting error for " << err;
}
}

Expand Down
1 change: 0 additions & 1 deletion src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1182,7 +1182,6 @@ void GenericFamily::Dump(CmdArgList args, ConnectionContext* cntx) {
<< result.value().size();
(*cntx)->SendBulkString(*result);
} else {
DVLOG(1) << "Dump failed: " << result.DebugFormat() << key << " nil";
(*cntx)->SendNull();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
int64_t used_memory = etl.GetUsedMemory(start_ns);
double oom_deny_ratio = GetFlag(FLAGS_oom_deny_ratio);
if (used_memory > (max_memory_limit * oom_deny_ratio)) {
return (*cntx)->SendError(kOutOfMemory);
return cntx->reply_builder()->SendError(kOutOfMemory);
}
}

Expand Down

0 comments on commit f5ef274

Please sign in to comment.