Skip to content

Commit

Permalink
fix: correctly set batching mode during pubsub
Browse files Browse the repository at this point in the history
Previously we set batch mode when dispatch queue is not empty
but dispatch queue could contain other async messages related to pubsub or monitor.
Now we enable batching only if there are more pipeline commands in the queue.
Fixes #935.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Mar 13, 2023
1 parent 97e38ae commit 1b9b286
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 10 deletions.
13 changes: 10 additions & 3 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,12 @@ void Connection::DispatchOperations::operator()(const PubMsgRecord& msg) {

void Connection::DispatchOperations::operator()(Request::PipelineMsg& msg) {
++stats->pipelined_cmd_cnt;
bool empty = self->dispatch_q_.empty();
builder->SetBatchMode(!empty);
self->pipeline_msg_cnt_--;

bool do_batch = (self->pipeline_msg_cnt_ > 0);
DVLOG(2) << "Dispatching pipeline: " << ToSV(msg.args.front()) << " " << do_batch;

builder->SetBatchMode(do_batch);
self->cc_->async_dispatch = true;
self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get());
self->last_interaction_ = time(nullptr);
Expand Down Expand Up @@ -650,13 +654,16 @@ auto Connection::ParseRedis() -> ParserStatus {
}
}
RespToArgList(parse_args_, &cmd_vec_);

DVLOG(2) << "Sync dispatch " << ToSV(cmd_vec_.front());

CmdArgList cmd_list{cmd_vec_.data(), cmd_vec_.size()};
service_->DispatchCommand(cmd_list, cc_.get());
last_interaction_ = time(nullptr);
} else {
// Dispatch via queue to speedup input reading.
RequestPtr req = FromArgs(std::move(parse_args_), tlh);

++pipeline_msg_cnt_;
dispatch_q_.push_back(std::move(req));
if (dispatch_q_.size() == 1) {
evc_.notify();
Expand Down
2 changes: 2 additions & 0 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ class Connection : public util::Connection {
RequestPtr FromArgs(RespVec args, mi_heap_t* heap);

std::deque<RequestPtr> dispatch_q_; // coordinated via evc_.
uint32_t pipeline_msg_cnt_ = 0;

static thread_local std::vector<RequestPtr> free_req_pool_;
util::fibers_ext::EventCount evc_;

Expand Down
19 changes: 14 additions & 5 deletions src/facade/reply_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,20 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
DCHECK(sink_);

if (should_batch_) {
// TODO: to introduce flushing when too much data is batched.
size_t total_size = batch_.size();
for (unsigned i = 0; i < len; ++i) {
std::string_view src((char*)v[i].iov_base, v[i].iov_len);
DVLOG(2) << "Appending to stream " << sink_ << " " << src;
batch_.append(src.data(), src.size());
total_size += v[i].iov_len;
}

if (total_size < 8192) {
// TODO: to introduce flushing when too much data is batched.
for (unsigned i = 0; i < len; ++i) {
std::string_view src((char*)v[i].iov_base, v[i].iov_len);
DVLOG(2) << "Appending to stream " << src;
batch_.append(src.data(), src.size());
}
return;
}
return;
}

error_code ec;
Expand Down Expand Up @@ -327,6 +334,8 @@ void RedisReplyBuilder::StartArray(unsigned len) {
}

void RedisReplyBuilder::SendStringArr(StrPtr str_ptr, uint32_t len) {
DVLOG(2) << "Sending array of " << len << " strings.";

// When vector length is too long, Send returns EMSGSIZE.
size_t vec_len = std::min<size_t>(256u, len);

Expand Down
5 changes: 5 additions & 0 deletions tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ You can override the location of the binary using `DRAGONFLY_PATH` environment v
- use `--df arg=val` to pass custom arguments to all dragonfly instances. Can be used multiple times.
- use `--log-seeder file` to store all single-db commands from the lastest tests seeder inside file.

for example,

```sh
pytest dragonfly/connection_test.py -s --df logtostdout --df vmodule=dragonfly_connection=2 -k test_subscribe
```
### Before you start
Please make sure that you have python 3 installed on you local host.
If have more both python 2 and python 3 installed on you host, you can run the tests with the following command:
Expand Down
10 changes: 8 additions & 2 deletions tests/dragonfly/connection_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ async def reader(channel: aioredis.client.PubSub, messages, max: int):
return True, "success"


async def run_pipeline_mode(async_client, messages):
pipe = async_client.pipeline()
async def run_pipeline_mode(async_client: aioredis.Redis, messages):
pipe = async_client.pipeline(transaction=False)
for key, val in messages.items():
pipe.set(key, val)
result = await pipe.execute()
Expand Down Expand Up @@ -327,3 +327,9 @@ async def test_big_command(df_server, size=8 * 1024):

writer.close()
await writer.wait_closed()

@pytest.mark.asyncio
async def test_subscribe_pipelined(async_client: aioredis.Redis):
pipe = async_client.pipeline(transaction=False)
pipe.execute_command('subscribe channel').execute_command('subscribe channel')
await pipe.echo('bye bye').execute()

0 comments on commit 1b9b286

Please sign in to comment.