diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index eff1079cdbbd..ba92077668ff 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -300,8 +300,12 @@ void Connection::DispatchOperations::operator()(const PubMsgRecord& msg) { void Connection::DispatchOperations::operator()(Request::PipelineMsg& msg) { ++stats->pipelined_cmd_cnt; - bool empty = self->dispatch_q_.empty(); - builder->SetBatchMode(!empty); + self->pipeline_msg_cnt_--; + + bool do_batch = (self->pipeline_msg_cnt_ > 0); + DVLOG(2) << "Dispatching pipeline: " << ToSV(msg.args.front()) << " " << do_batch; + + builder->SetBatchMode(do_batch); self->cc_->async_dispatch = true; self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get()); self->last_interaction_ = time(nullptr); @@ -650,13 +654,16 @@ auto Connection::ParseRedis() -> ParserStatus { } } RespToArgList(parse_args_, &cmd_vec_); + + DVLOG(2) << "Sync dispatch " << ToSV(cmd_vec_.front()); + CmdArgList cmd_list{cmd_vec_.data(), cmd_vec_.size()}; service_->DispatchCommand(cmd_list, cc_.get()); last_interaction_ = time(nullptr); } else { // Dispatch via queue to speedup input reading. RequestPtr req = FromArgs(std::move(parse_args_), tlh); - + ++pipeline_msg_cnt_; dispatch_q_.push_back(std::move(req)); if (dispatch_q_.size() == 1) { evc_.notify(); diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 2836a6d0c0b2..1621f69747a7 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -147,6 +147,8 @@ class Connection : public util::Connection { RequestPtr FromArgs(RespVec args, mi_heap_t* heap); std::deque dispatch_q_; // coordinated via evc_. + uint32_t pipeline_msg_cnt_ = 0; + static thread_local std::vector free_req_pool_; util::fibers_ext::EventCount evc_; diff --git a/src/facade/reply_builder.cc b/src/facade/reply_builder.cc index 1e1dbdbe779c..efd91ed6125d 100644 --- a/src/facade/reply_builder.cc +++ b/src/facade/reply_builder.cc @@ -47,13 +47,19 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) { DCHECK(sink_); if (should_batch_) { - // TODO: to introduce flushing when too much data is batched. + size_t total_size = batch_.size(); for (unsigned i = 0; i < len; ++i) { - std::string_view src((char*)v[i].iov_base, v[i].iov_len); - DVLOG(2) << "Appending to stream " << sink_ << " " << src; - batch_.append(src.data(), src.size()); + total_size += v[i].iov_len; + } + + if (total_size < 8192) { // Allow batching with up to 8K of data. + for (unsigned i = 0; i < len; ++i) { + std::string_view src((char*)v[i].iov_base, v[i].iov_len); + DVLOG(2) << "Appending to stream " << src; + batch_.append(src.data(), src.size()); + } + return; } - return; } error_code ec; @@ -327,6 +333,8 @@ void RedisReplyBuilder::StartArray(unsigned len) { } void RedisReplyBuilder::SendStringArr(StrPtr str_ptr, uint32_t len) { + DVLOG(2) << "Sending array of " << len << " strings."; + // When vector length is too long, Send returns EMSGSIZE. size_t vec_len = std::min(256u, len); diff --git a/tests/README.md b/tests/README.md index dbb8af28f669..9a5151313abe 100644 --- a/tests/README.md +++ b/tests/README.md @@ -18,6 +18,11 @@ You can override the location of the binary using `DRAGONFLY_PATH` environment v - use `--df arg=val` to pass custom arguments to all dragonfly instances. Can be used multiple times. - use `--log-seeder file` to store all single-db commands from the lastest tests seeder inside file. +for example, + +```sh +pytest dragonfly/connection_test.py -s --df logtostdout --df vmodule=dragonfly_connection=2 -k test_subscribe +``` ### Before you start Please make sure that you have python 3 installed on you local host. If have more both python 2 and python 3 installed on you host, you can run the tests with the following command: diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index ffc014e4a01e..406947edc34f 100644 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -174,8 +174,8 @@ async def reader(channel: aioredis.client.PubSub, messages, max: int): return True, "success" -async def run_pipeline_mode(async_client, messages): - pipe = async_client.pipeline() +async def run_pipeline_mode(async_client: aioredis.Redis, messages): + pipe = async_client.pipeline(transaction=False) for key, val in messages.items(): pipe.set(key, val) result = await pipe.execute() @@ -327,3 +327,9 @@ async def test_big_command(df_server, size=8 * 1024): writer.close() await writer.wait_closed() + +@pytest.mark.asyncio +async def test_subscribe_pipelined(async_client: aioredis.Redis): + pipe = async_client.pipeline(transaction=False) + pipe.execute_command('subscribe channel').execute_command('subscribe channel') + await pipe.echo('bye bye').execute()