diff --git a/helio b/helio index 445ebf76b96b..8a6e9279cdf7 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 445ebf76b96b7360982f2b3c4558ec347ba3adb5 +Subproject commit 8a6e9279cdf7182934afdf0af7c3735ed2b7a75c diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 1435804db9ee..f06f3efb0508 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -46,9 +46,9 @@ void SendProtocolError(RedisParser::Result pres, FiberSocketBase* peer) { res.append("invalid multibulk length\r\n"); } - auto size_res = peer->Send(::io::Buffer(res)); - if (!size_res) { - LOG(WARNING) << "Error " << size_res.error(); + error_code ec = peer->Write(::io::Buffer(res)); + if (ec) { + LOG(WARNING) << "Error " << ec; } } @@ -344,10 +344,10 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { } } - error_code ec; + error_code ec = cc_->reply_builder()->GetError(); // Main loop. - if (parse_status != ERROR) { + if (parse_status != ERROR && !ec) { auto res = IoLoop(peer); if (holds_alternative(res)) { @@ -380,15 +380,15 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { if (redis_parser_) { SendProtocolError(RedisParser::Result(parser_error_), peer); - peer->Shutdown(SHUT_RDWR); } else { string_view sv{"CLIENT_ERROR bad command line format\r\n"}; - auto size_res = peer->Send(::io::Buffer(sv)); - if (!size_res) { - LOG(WARNING) << "Error " << size_res.error(); - ec = size_res.error(); + error_code ec2 = peer->Write(::io::Buffer(sv)); + if (ec2) { + LOG(WARNING) << "Error " << ec2; + ec = ec; } } + peer->Shutdown(SHUT_RDWR); } if (ec && !FiberSocketBase::IsConnClosed(ec)) { @@ -532,9 +532,9 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variantio_read_cnt; SetPhase("process"); - if (redis_parser_) + if (redis_parser_) { parse_status = ParseRedis(); - else { + } else { DCHECK(memcache_parser_); parse_status = ParseMemcache(); } diff --git a/src/facade/reply_builder.cc b/src/facade/reply_builder.cc index 53725451dca6..3014691519f2 100644 --- a/src/facade/reply_builder.cc +++ b/src/facade/reply_builder.cc @@ -3,6 +3,7 @@ // #include "facade/reply_builder.h" +#include #include #include #include @@ -89,13 +90,13 @@ void SinkReplyBuilder::SendRaw(std::string_view raw) { } void SinkReplyBuilder::SendRawVec(absl::Span msg_vec) { - iovec v[msg_vec.size()]; + absl::FixedArray arr(msg_vec.size()); for (unsigned i = 0; i < msg_vec.size(); ++i) { - v[i].iov_base = const_cast(msg_vec[i].data()); - v[i].iov_len = msg_vec[i].size(); + arr[i].iov_base = const_cast(msg_vec[i].data()); + arr[i].iov_len = msg_vec[i].size(); } - Send(v, msg_vec.size()); + Send(arr.data(), msg_vec.size()); } MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink) { @@ -305,14 +306,65 @@ void RedisReplyBuilder::SendStringArr(absl::Span arr) { SendRaw(res); } +// This implementation a bit complicated because it uses vectorized +// send to send an array. The problem with that is the OS limits vector length +// to low numbers (around 1024). Therefore, to make it robust we send the array in batches. +// We limit the vector length to 256 and when it fills up we flush it to the socket and continue +// iterating. void RedisReplyBuilder::SendStringArr(absl::Span arr) { - string res = absl::StrCat("*", arr.size(), kCRLF); + // When vector length is too long, Send returns EMSGSIZE. + size_t vec_len = std::min(256u, arr.size()); - for (size_t i = 0; i < arr.size(); ++i) { - StrAppend(&res, "$", arr[i].size(), kCRLF); - res.append(arr[i]).append(kCRLF); + absl::FixedArray vec(vec_len * 2 + 2); + absl::FixedArray meta((vec_len + 1) * 16); + char* next = meta.data(); + + *next++ = '*'; + next = absl::numbers_internal::FastIntToBuffer(arr.size(), next); + *next++ = '\r'; + *next++ = '\n'; + vec[0].iov_base = meta.data(); + vec[0].iov_len = next - meta.data(); + char* start = next; + + unsigned vec_indx = 1; + for (unsigned i = 0; i < arr.size(); ++i) { + const auto& src = arr[i]; + *next++ = '$'; + next = absl::numbers_internal::FastIntToBuffer(src.size(), next); + *next++ = '\r'; + *next++ = '\n'; + vec[vec_indx].iov_base = start; + vec[vec_indx].iov_len = next - start; + DCHECK_GT(next - start, 0); + + start = next; + ++vec_indx; + + vec[vec_indx].iov_base = const_cast(src.data()); + vec[vec_indx].iov_len = src.size(); + + *next++ = '\r'; + *next++ = '\n'; + ++vec_indx; + + if (vec_indx + 1 >= vec.size()) { + if (i < arr.size() - 1 || vec_indx == vec.size()) { + Send(vec.data(), vec_indx); + if (ec_) + return; + + vec_indx = 0; + start = meta.data(); + next = start + 2; + start[0] = '\r'; + start[1] = '\n'; + } + } } - SendRaw(res); + vec[vec_indx].iov_base = start; + vec[vec_indx].iov_len = 2; + Send(vec.data(), vec_indx + 1); } void RedisReplyBuilder::StartArray(unsigned len) { diff --git a/src/facade/reply_builder.h b/src/facade/reply_builder.h index edd83cf01459..99a58a9870a2 100644 --- a/src/facade/reply_builder.h +++ b/src/facade/reply_builder.h @@ -7,7 +7,7 @@ #include #include "facade/op_status.h" -#include "io/sync_stream_interface.h" +#include "io/io.h" namespace facade { diff --git a/src/redis/CMakeLists.txt b/src/redis/CMakeLists.txt index 715c688eb547..5f817146f8ba 100644 --- a/src/redis/CMakeLists.txt +++ b/src/redis/CMakeLists.txt @@ -15,6 +15,8 @@ add_library(redis_lib crc64.c crcspeed.c debug.c dict.c intset.c cxx_link(redis_lib ${ZMALLOC_DEPS}) +target_compile_options(redis_lib PRIVATE -Wno-maybe-uninitialized) + if (REDIS_ZMALLOC_MI) target_compile_definitions(redis_lib PUBLIC USE_ZMALLOC_MI) endif() diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index 8a74dbd4b331..f40efaff0289 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -290,10 +290,7 @@ void HSetFamily::HGetGeneric(CmdArgList args, ConnectionContext* cntx, uint8_t g OpResult> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); if (result) { - (*cntx)->StartArray(result->size()); - for (const auto& s : *result) { - (*cntx)->SendBulkString(s); - } + (*cntx)->SendStringArr(absl::Span{*result}); } else { (*cntx)->SendError(result.status()); }