Skip to content

Commit

Permalink
fix: fix memcache bugs (#1745)
Browse files Browse the repository at this point in the history
1. If the first request sent to the connection is large (2kb or more)
   Dragonfly was closing the connection.
2. Changed server side error reporting according to memcache protocol:
   https://github.com/memcached/memcached/blob/master/doc/protocol.txt#L172
3. Fixed the wrong casting in DispatchCommand.
4. Remove practically unused code that translated opstatus to strings.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Aug 27, 2023
1 parent 75fdc16 commit 6dd51de
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 71 deletions.
22 changes: 18 additions & 4 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ bool MatchHttp11Line(string_view line) {
}

constexpr size_t kMinReadSize = 256;
constexpr size_t kMaxReadSize = 32_KB;
constexpr size_t kMaxReadSize = 64_KB;

constexpr size_t kMaxDispatchQMemory = 5_MB;

Expand Down Expand Up @@ -467,6 +467,9 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {

// Main loop.
if (parse_status != ERROR && !ec) {
if (io_buf_.AppendLen() < 64) {
io_buf_.EnsureCapacity(io_buf_.Capacity() * 2);
}
auto res = IoLoop(peer, orig_builder);

if (holds_alternative<error_code>(res)) {
Expand Down Expand Up @@ -646,8 +649,8 @@ auto Connection::ParseMemcache() -> ParserStatus {
return NEED_MORE;
}

if (result == MemcacheParser::PARSE_ERROR) {
builder->SendError(""); // ERROR.
if (result == MemcacheParser::PARSE_ERROR || result == MemcacheParser::UNKNOWN_CMD) {
builder->SendSimpleString("ERROR");
} else if (result == MemcacheParser::BAD_DELTA) {
builder->SendClientError("invalid numeric delta argument");
} else if (result != MemcacheParser::OK) {
Expand Down Expand Up @@ -679,6 +682,8 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil
FetchBuilderStats(stats_, orig_builder);

io::MutableBytes append_buf = io_buf_.AppendBuffer();
DCHECK(!append_buf.empty());

phase_ = READ_SOCKET;

::io::Result<size_t> recv_sz = peer->Recv(append_buf);
Expand Down Expand Up @@ -713,7 +718,9 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil

if (parser_hint > capacity) {
io_buf_.Reserve(std::min(kMaxReadSize, parser_hint));
} else if (append_buf.size() == *recv_sz && append_buf.size() > capacity / 2) {
}

if (io_buf_.AppendLen() < 64u) {
// Last io used most of the io_buf to the end.
io_buf_.Reserve(capacity * 2); // Valid growth range.
}
Expand All @@ -722,6 +729,13 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil
VLOG(1) << "Growing io_buf to " << io_buf_.Capacity();
stats_->read_buf_capacity += (io_buf_.Capacity() - capacity);
}
DCHECK_GT(io_buf_.AppendLen(), 0U);
} else if (io_buf_.AppendLen() == 0) {
// We have a full buffer and we can not progress with parsing.
// This means that we have request too large.
LOG(ERROR) << "Request is too large, closing connection";
parse_status = ERROR;
break;
}
} else if (parse_status != OK) {
break;
Expand Down
47 changes: 1 addition & 46 deletions src/facade/op_status.cc
Original file line number Diff line number Diff line change
@@ -1,48 +1,3 @@
#include "facade/op_status.h"

namespace facade {

const char* DebugString(OpStatus op) {
switch (op) {
case OpStatus::OK:
return "OK";
case OpStatus::KEY_EXISTS:
return "KEY EXISTS";
case OpStatus::KEY_NOTFOUND:
return "KEY NOTFOUND";
case OpStatus::SKIPPED:
return "SKIPPED";
case OpStatus::INVALID_VALUE:
return "INVALID VALUE";
case OpStatus::OUT_OF_RANGE:
return "OUT OF RANGE";
case OpStatus::WRONG_TYPE:
return "WRONG TYPE";
case OpStatus::TIMED_OUT:
return "TIMED OUT";
case OpStatus::OUT_OF_MEMORY:
return "OUT OF MEMORY";
case OpStatus::INVALID_FLOAT:
return "INVALID FLOAT";
case OpStatus::INVALID_INT:
return "INVALID INT";
case OpStatus::SYNTAX_ERR:
return "INVALID SYNTAX";
case OpStatus::BUSY_GROUP:
return "BUSY GROUP";
case OpStatus::STREAM_ID_SMALL:
return "STREAM ID TO SMALL";
case OpStatus::ENTRIES_ADDED_SMALL:
return "ENTRIES ADDED IS TO SMALL";
case OpStatus::INVALID_NUMERIC_RESULT:
return "INVALID NUMERIC RESULT";
case OpStatus::CANCELLED:
return "CANCELLED";
}
return "Unknown Error Code"; // we should not be here, but this is how enums works in c++
}
const char* OpResultBase::DebugFormat() const {
return DebugString(st_);
}

} // namespace facade
namespace facade {} // namespace facade
2 changes: 0 additions & 2 deletions src/facade/op_status.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ enum class OpStatus : uint16_t {
CANCELLED,
};

const char* DebugString(OpStatus op);

class OpResultBase {
public:
OpResultBase(OpStatus st = OpStatus::OK) : st_(st) {
Expand Down
2 changes: 1 addition & 1 deletion src/facade/reply_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ void MCReplyBuilder::SendMGetResponse(absl::Span<const OptResp> arr) {
}

void MCReplyBuilder::SendError(string_view str, std::string_view type) {
SendSimpleString("ERROR");
SendSimpleString(absl::StrCat("SERVER_ERROR ", str));
}

void MCReplyBuilder::SendProtocolError(std::string_view str) {
Expand Down
24 changes: 9 additions & 15 deletions src/facade/reply_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,25 +232,22 @@ TEST_F(RedisReplyBuilderTest, ErrorBuiltInMessage) {
OpStatus::OUT_OF_MEMORY, OpStatus::INVALID_FLOAT, OpStatus::INVALID_INT,
OpStatus::SYNTAX_ERR, OpStatus::BUSY_GROUP, OpStatus::INVALID_NUMERIC_RESULT};
for (const auto& err : error_codes) {
const std::string_view error_code_name = DebugString(err);
const std::string_view error_name = RedisReplyBuilder::StatusToMsg(err);
const std::string_view error_type = GetErrorType(error_name);

sink_.Clear();
builder_->SendError(err);
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart))
<< " invalid start char for " << error_code_name;
ASSERT_TRUE(absl::EndsWith(str(), kCRLF))
<< " failed to find correct termination at " << error_code_name;
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart)) << " invalid start char for " << err;
ASSERT_TRUE(absl::EndsWith(str(), kCRLF)) << " failed to find correct termination at " << err;
ASSERT_EQ(builder_->err_count().at(error_type), 1)
<< " number of error count is invalid for " << error_code_name;
<< " number of error count is invalid for " << err;
ASSERT_EQ(str(), BuildExpectedErrorString(error_name))
<< " error different from expected - '" << str() << "'";

auto parsing_output = Parse();
ASSERT_TRUE(parsing_output.Verify(SinkSize()))
<< " verify for the result is invalid for " << error_code_name;
ASSERT_TRUE(parsing_output.IsError()) << " expecting error for " << error_code_name;
<< " verify for the result is invalid for " << err;
ASSERT_TRUE(parsing_output.IsError()) << " expecting error for " << err;
}
}

Expand All @@ -261,24 +258,21 @@ TEST_F(RedisReplyBuilderTest, ErrorNoneBuiltInMessage) {
OpStatus::TIMED_OUT, OpStatus::STREAM_ID_SMALL};
uint64_t error_count = 0;
for (const auto& err : none_unique_codes) {
const std::string_view error_code_name = DebugString(err);
const std::string_view error_name = RedisReplyBuilder::StatusToMsg(err);
const std::string_view error_type = GetErrorType(error_name);

sink_.Clear();
builder_->SendError(err);
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart))
<< " invalid start char for " << error_code_name;
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart)) << " invalid start char for " << err;
ASSERT_TRUE(absl::EndsWith(str(), kCRLF));
auto current_error_count = builder_->err_count().at(error_type);
error_count++;
ASSERT_EQ(current_error_count, error_count)
<< " number of error count is invalid for " << error_code_name;
ASSERT_EQ(current_error_count, error_count) << " number of error count is invalid for " << err;
auto parsing_output = Parse();
ASSERT_TRUE(parsing_output.Verify(SinkSize()))
<< " verify for the result is invalid for " << error_code_name;
<< " verify for the result is invalid for " << err;

ASSERT_TRUE(parsing_output.IsError()) << " expecting error for " << error_code_name;
ASSERT_TRUE(parsing_output.IsError()) << " expecting error for " << err;
}
}

Expand Down
1 change: 0 additions & 1 deletion src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1182,7 +1182,6 @@ void GenericFamily::Dump(CmdArgList args, ConnectionContext* cntx) {
<< result.value().size();
(*cntx)->SendBulkString(*result);
} else {
DVLOG(1) << "Dump failed: " << result.DebugFormat() << key << " nil";
(*cntx)->SendNull();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
int64_t used_memory = etl.GetUsedMemory(start_ns);
double oom_deny_ratio = GetFlag(FLAGS_oom_deny_ratio);
if (used_memory > (max_memory_limit * oom_deny_ratio)) {
return (*cntx)->SendError(kOutOfMemory);
return cntx->reply_builder()->SendError(kOutOfMemory);
}
}

Expand Down
17 changes: 16 additions & 1 deletion tests/dragonfly/connection_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from redis import asyncio as aioredis
from redis.exceptions import ConnectionError as redis_conn_error
import async_timeout

import pymemcache
from dataclasses import dataclass

from . import DflyInstance, dfly_args
Expand Down Expand Up @@ -516,3 +516,18 @@ async def test_squashed_pipeline(async_client: aioredis.Redis):
async def test_squashed_pipeline_seeder(df_server, df_seeder_factory):
seeder = df_seeder_factory.create(port=df_server.port, keys=10_000)
await seeder.run(target_deviation=0.1)


@pytest.mark.asyncio
async def test_memcached_large_request(df_local_factory):
server = df_local_factory.create(
port=BASE_PORT,
memcached_port=11211,
proactor_threads=2,
)

server.start()

memcached_client = pymemcache.Client(("localhost", server.mc_port), default_noreply=False)

assert memcached_client.set(b"key", b"d" * 4096, noreply=False)

0 comments on commit 6dd51de

Please sign in to comment.