Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix memcache bugs #1745

Merged
merged 1 commit into from
Aug 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ bool MatchHttp11Line(string_view line) {
}

constexpr size_t kMinReadSize = 256;
constexpr size_t kMaxReadSize = 32_KB;
constexpr size_t kMaxReadSize = 64_KB;

constexpr size_t kMaxDispatchQMemory = 5_MB;

Expand Down Expand Up @@ -467,6 +467,9 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {

// Main loop.
if (parse_status != ERROR && !ec) {
if (io_buf_.AppendLen() < 64) {
io_buf_.EnsureCapacity(io_buf_.Capacity() * 2);
}
auto res = IoLoop(peer, orig_builder);

if (holds_alternative<error_code>(res)) {
Expand Down Expand Up @@ -646,8 +649,8 @@ auto Connection::ParseMemcache() -> ParserStatus {
return NEED_MORE;
}

if (result == MemcacheParser::PARSE_ERROR) {
builder->SendError(""); // ERROR.
if (result == MemcacheParser::PARSE_ERROR || result == MemcacheParser::UNKNOWN_CMD) {
builder->SendSimpleString("ERROR");
} else if (result == MemcacheParser::BAD_DELTA) {
builder->SendClientError("invalid numeric delta argument");
} else if (result != MemcacheParser::OK) {
Expand Down Expand Up @@ -679,6 +682,8 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil
FetchBuilderStats(stats_, orig_builder);

io::MutableBytes append_buf = io_buf_.AppendBuffer();
DCHECK(!append_buf.empty());

phase_ = READ_SOCKET;

::io::Result<size_t> recv_sz = peer->Recv(append_buf);
Expand Down Expand Up @@ -713,7 +718,9 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil

if (parser_hint > capacity) {
io_buf_.Reserve(std::min(kMaxReadSize, parser_hint));
} else if (append_buf.size() == *recv_sz && append_buf.size() > capacity / 2) {
}

if (io_buf_.AppendLen() < 64u) {
// Last io used most of the io_buf to the end.
io_buf_.Reserve(capacity * 2); // Valid growth range.
}
Expand All @@ -722,6 +729,13 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil
VLOG(1) << "Growing io_buf to " << io_buf_.Capacity();
stats_->read_buf_capacity += (io_buf_.Capacity() - capacity);
}
DCHECK_GT(io_buf_.AppendLen(), 0U);
} else if (io_buf_.AppendLen() == 0) {
// We have a full buffer and we can not progress with parsing.
// This means that we have request too large.
LOG(ERROR) << "Request is too large, closing connection";
parse_status = ERROR;
break;
}
} else if (parse_status != OK) {
break;
Expand Down
47 changes: 1 addition & 46 deletions src/facade/op_status.cc
Original file line number Diff line number Diff line change
@@ -1,48 +1,3 @@
#include "facade/op_status.h"

namespace facade {

const char* DebugString(OpStatus op) {
switch (op) {
case OpStatus::OK:
return "OK";
case OpStatus::KEY_EXISTS:
return "KEY EXISTS";
case OpStatus::KEY_NOTFOUND:
return "KEY NOTFOUND";
case OpStatus::SKIPPED:
return "SKIPPED";
case OpStatus::INVALID_VALUE:
return "INVALID VALUE";
case OpStatus::OUT_OF_RANGE:
return "OUT OF RANGE";
case OpStatus::WRONG_TYPE:
return "WRONG TYPE";
case OpStatus::TIMED_OUT:
return "TIMED OUT";
case OpStatus::OUT_OF_MEMORY:
return "OUT OF MEMORY";
case OpStatus::INVALID_FLOAT:
return "INVALID FLOAT";
case OpStatus::INVALID_INT:
return "INVALID INT";
case OpStatus::SYNTAX_ERR:
return "INVALID SYNTAX";
case OpStatus::BUSY_GROUP:
return "BUSY GROUP";
case OpStatus::STREAM_ID_SMALL:
return "STREAM ID TO SMALL";
case OpStatus::ENTRIES_ADDED_SMALL:
return "ENTRIES ADDED IS TO SMALL";
case OpStatus::INVALID_NUMERIC_RESULT:
return "INVALID NUMERIC RESULT";
case OpStatus::CANCELLED:
return "CANCELLED";
}
return "Unknown Error Code"; // we should not be here, but this is how enums works in c++
}
const char* OpResultBase::DebugFormat() const {
return DebugString(st_);
}

} // namespace facade
namespace facade {} // namespace facade
2 changes: 0 additions & 2 deletions src/facade/op_status.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ enum class OpStatus : uint16_t {
CANCELLED,
};

const char* DebugString(OpStatus op);

class OpResultBase {
public:
OpResultBase(OpStatus st = OpStatus::OK) : st_(st) {
Expand Down
2 changes: 1 addition & 1 deletion src/facade/reply_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ void MCReplyBuilder::SendMGetResponse(absl::Span<const OptResp> arr) {
}

void MCReplyBuilder::SendError(string_view str, std::string_view type) {
SendSimpleString("ERROR");
SendSimpleString(absl::StrCat("SERVER_ERROR ", str));
}

void MCReplyBuilder::SendProtocolError(std::string_view str) {
Expand Down
24 changes: 9 additions & 15 deletions src/facade/reply_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,25 +232,22 @@ TEST_F(RedisReplyBuilderTest, ErrorBuiltInMessage) {
OpStatus::OUT_OF_MEMORY, OpStatus::INVALID_FLOAT, OpStatus::INVALID_INT,
OpStatus::SYNTAX_ERR, OpStatus::BUSY_GROUP, OpStatus::INVALID_NUMERIC_RESULT};
for (const auto& err : error_codes) {
const std::string_view error_code_name = DebugString(err);
const std::string_view error_name = RedisReplyBuilder::StatusToMsg(err);
const std::string_view error_type = GetErrorType(error_name);

sink_.Clear();
builder_->SendError(err);
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart))
<< " invalid start char for " << error_code_name;
ASSERT_TRUE(absl::EndsWith(str(), kCRLF))
<< " failed to find correct termination at " << error_code_name;
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart)) << " invalid start char for " << err;
ASSERT_TRUE(absl::EndsWith(str(), kCRLF)) << " failed to find correct termination at " << err;
ASSERT_EQ(builder_->err_count().at(error_type), 1)
<< " number of error count is invalid for " << error_code_name;
<< " number of error count is invalid for " << err;
ASSERT_EQ(str(), BuildExpectedErrorString(error_name))
<< " error different from expected - '" << str() << "'";

auto parsing_output = Parse();
ASSERT_TRUE(parsing_output.Verify(SinkSize()))
<< " verify for the result is invalid for " << error_code_name;
ASSERT_TRUE(parsing_output.IsError()) << " expecting error for " << error_code_name;
<< " verify for the result is invalid for " << err;
ASSERT_TRUE(parsing_output.IsError()) << " expecting error for " << err;
}
}

Expand All @@ -261,24 +258,21 @@ TEST_F(RedisReplyBuilderTest, ErrorNoneBuiltInMessage) {
OpStatus::TIMED_OUT, OpStatus::STREAM_ID_SMALL};
uint64_t error_count = 0;
for (const auto& err : none_unique_codes) {
const std::string_view error_code_name = DebugString(err);
const std::string_view error_name = RedisReplyBuilder::StatusToMsg(err);
const std::string_view error_type = GetErrorType(error_name);

sink_.Clear();
builder_->SendError(err);
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart))
<< " invalid start char for " << error_code_name;
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart)) << " invalid start char for " << err;
ASSERT_TRUE(absl::EndsWith(str(), kCRLF));
auto current_error_count = builder_->err_count().at(error_type);
error_count++;
ASSERT_EQ(current_error_count, error_count)
<< " number of error count is invalid for " << error_code_name;
ASSERT_EQ(current_error_count, error_count) << " number of error count is invalid for " << err;
auto parsing_output = Parse();
ASSERT_TRUE(parsing_output.Verify(SinkSize()))
<< " verify for the result is invalid for " << error_code_name;
<< " verify for the result is invalid for " << err;

ASSERT_TRUE(parsing_output.IsError()) << " expecting error for " << error_code_name;
ASSERT_TRUE(parsing_output.IsError()) << " expecting error for " << err;
}
}

Expand Down
1 change: 0 additions & 1 deletion src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1182,7 +1182,6 @@ void GenericFamily::Dump(CmdArgList args, ConnectionContext* cntx) {
<< result.value().size();
(*cntx)->SendBulkString(*result);
} else {
DVLOG(1) << "Dump failed: " << result.DebugFormat() << key << " nil";
(*cntx)->SendNull();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
int64_t used_memory = etl.GetUsedMemory(start_ns);
double oom_deny_ratio = GetFlag(FLAGS_oom_deny_ratio);
if (used_memory > (max_memory_limit * oom_deny_ratio)) {
return (*cntx)->SendError(kOutOfMemory);
return cntx->reply_builder()->SendError(kOutOfMemory);
}
}

Expand Down
17 changes: 16 additions & 1 deletion tests/dragonfly/connection_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from redis import asyncio as aioredis
from redis.exceptions import ConnectionError as redis_conn_error
import async_timeout

import pymemcache
from dataclasses import dataclass

from . import DflyInstance, dfly_args
Expand Down Expand Up @@ -516,3 +516,18 @@ async def test_squashed_pipeline(async_client: aioredis.Redis):
async def test_squashed_pipeline_seeder(df_server, df_seeder_factory):
seeder = df_seeder_factory.create(port=df_server.port, keys=10_000)
await seeder.run(target_deviation=0.1)


@pytest.mark.asyncio
async def test_memcached_large_request(df_local_factory):
server = df_local_factory.create(
port=BASE_PORT,
memcached_port=11211,
proactor_threads=2,
)

server.start()

memcached_client = pymemcache.Client(("localhost", server.mc_port), default_noreply=False)

assert memcached_client.set(b"key", b"d" * 4096, noreply=False)