Skip to content

Commit

Permalink
chore(server): Improve the implementation of SendStringArr. (#178)
Browse files Browse the repository at this point in the history
1. Make it use vectorized send instead of concatenating everything into a single string.
2. vectorized SendStringArr could fail sending large arrays for lengths higher than 512 (returned EMSGSIZE).
   We improved the implementation so it would send those arrays in chunks of 256 items.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Jun 26, 2022
1 parent 1f42926 commit 605b1fd
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 27 deletions.
24 changes: 12 additions & 12 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ void SendProtocolError(RedisParser::Result pres, FiberSocketBase* peer) {
res.append("invalid multibulk length\r\n");
}

auto size_res = peer->Send(::io::Buffer(res));
if (!size_res) {
LOG(WARNING) << "Error " << size_res.error();
error_code ec = peer->Write(::io::Buffer(res));
if (ec) {
LOG(WARNING) << "Error " << ec;
}
}

Expand Down Expand Up @@ -344,10 +344,10 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
}
}

error_code ec;
error_code ec = cc_->reply_builder()->GetError();

// Main loop.
if (parse_status != ERROR) {
if (parse_status != ERROR && !ec) {
auto res = IoLoop(peer);

if (holds_alternative<error_code>(res)) {
Expand Down Expand Up @@ -380,15 +380,15 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {

if (redis_parser_) {
SendProtocolError(RedisParser::Result(parser_error_), peer);
peer->Shutdown(SHUT_RDWR);
} else {
string_view sv{"CLIENT_ERROR bad command line format\r\n"};
auto size_res = peer->Send(::io::Buffer(sv));
if (!size_res) {
LOG(WARNING) << "Error " << size_res.error();
ec = size_res.error();
error_code ec2 = peer->Write(::io::Buffer(sv));
if (ec2) {
LOG(WARNING) << "Error " << ec2;
ec = ec;
}
}
peer->Shutdown(SHUT_RDWR);
}

if (ec && !FiberSocketBase::IsConnClosed(ec)) {
Expand Down Expand Up @@ -532,9 +532,9 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant<error_code, Pars
++stats->io_read_cnt;
SetPhase("process");

if (redis_parser_)
if (redis_parser_) {
parse_status = ParseRedis();
else {
} else {
DCHECK(memcache_parser_);
parse_status = ParseMemcache();
}
Expand Down
70 changes: 61 additions & 9 deletions src/facade/reply_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
#include "facade/reply_builder.h"

#include <absl/container/fixed_array.h>
#include <absl/strings/numbers.h>
#include <absl/strings/str_cat.h>
#include <double-conversion/double-to-string.h>
Expand Down Expand Up @@ -89,13 +90,13 @@ void SinkReplyBuilder::SendRaw(std::string_view raw) {
}

void SinkReplyBuilder::SendRawVec(absl::Span<const std::string_view> msg_vec) {
iovec v[msg_vec.size()];
absl::FixedArray<iovec, 16> arr(msg_vec.size());

for (unsigned i = 0; i < msg_vec.size(); ++i) {
v[i].iov_base = const_cast<char*>(msg_vec[i].data());
v[i].iov_len = msg_vec[i].size();
arr[i].iov_base = const_cast<char*>(msg_vec[i].data());
arr[i].iov_len = msg_vec[i].size();
}
Send(v, msg_vec.size());
Send(arr.data(), msg_vec.size());
}

MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink) {
Expand Down Expand Up @@ -305,14 +306,65 @@ void RedisReplyBuilder::SendStringArr(absl::Span<const std::string_view> arr) {
SendRaw(res);
}

// This implementation a bit complicated because it uses vectorized
// send to send an array. The problem with that is the OS limits vector length
// to low numbers (around 1024). Therefore, to make it robust we send the array in batches.
// We limit the vector length to 256 and when it fills up we flush it to the socket and continue
// iterating.
void RedisReplyBuilder::SendStringArr(absl::Span<const string> arr) {
string res = absl::StrCat("*", arr.size(), kCRLF);
// When vector length is too long, Send returns EMSGSIZE.
size_t vec_len = std::min<size_t>(256u, arr.size());

for (size_t i = 0; i < arr.size(); ++i) {
StrAppend(&res, "$", arr[i].size(), kCRLF);
res.append(arr[i]).append(kCRLF);
absl::FixedArray<iovec, 16> vec(vec_len * 2 + 2);
absl::FixedArray<char, 64> meta((vec_len + 1) * 16);
char* next = meta.data();

*next++ = '*';
next = absl::numbers_internal::FastIntToBuffer(arr.size(), next);
*next++ = '\r';
*next++ = '\n';
vec[0].iov_base = meta.data();
vec[0].iov_len = next - meta.data();
char* start = next;

unsigned vec_indx = 1;
for (unsigned i = 0; i < arr.size(); ++i) {
const auto& src = arr[i];
*next++ = '$';
next = absl::numbers_internal::FastIntToBuffer(src.size(), next);
*next++ = '\r';
*next++ = '\n';
vec[vec_indx].iov_base = start;
vec[vec_indx].iov_len = next - start;
DCHECK_GT(next - start, 0);

start = next;
++vec_indx;

vec[vec_indx].iov_base = const_cast<char*>(src.data());
vec[vec_indx].iov_len = src.size();

*next++ = '\r';
*next++ = '\n';
++vec_indx;

if (vec_indx + 1 >= vec.size()) {
if (i < arr.size() - 1 || vec_indx == vec.size()) {
Send(vec.data(), vec_indx);
if (ec_)
return;

vec_indx = 0;
start = meta.data();
next = start + 2;
start[0] = '\r';
start[1] = '\n';
}
}
}
SendRaw(res);
vec[vec_indx].iov_base = start;
vec[vec_indx].iov_len = 2;
Send(vec.data(), vec_indx + 1);
}

void RedisReplyBuilder::StartArray(unsigned len) {
Expand Down
2 changes: 1 addition & 1 deletion src/facade/reply_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <string_view>

#include "facade/op_status.h"
#include "io/sync_stream_interface.h"
#include "io/io.h"

namespace facade {

Expand Down
2 changes: 2 additions & 0 deletions src/redis/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ add_library(redis_lib crc64.c crcspeed.c debug.c dict.c intset.c

cxx_link(redis_lib ${ZMALLOC_DEPS})

target_compile_options(redis_lib PRIVATE -Wno-maybe-uninitialized)

if (REDIS_ZMALLOC_MI)
target_compile_definitions(redis_lib PUBLIC USE_ZMALLOC_MI)
endif()
5 changes: 1 addition & 4 deletions src/server/hset_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,7 @@ void HSetFamily::HGetGeneric(CmdArgList args, ConnectionContext* cntx, uint8_t g
OpResult<vector<string>> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));

if (result) {
(*cntx)->StartArray(result->size());
for (const auto& s : *result) {
(*cntx)->SendBulkString(s);
}
(*cntx)->SendStringArr(absl::Span<const string>{*result});
} else {
(*cntx)->SendError(result.status());
}
Expand Down

0 comments on commit 605b1fd

Please sign in to comment.