Skip to content

Commit

Permalink
chore: Expose a corruption when connection writes interleaving messages
Browse files Browse the repository at this point in the history
The problem happens when a publisher sends a message and a new subscriber registers.
In that case it sends "subscribe" response and the publish messages and those
interleave sometimes.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Mar 11, 2023
1 parent 5c57e4e commit 36e561a
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 5 deletions.
4 changes: 4 additions & 0 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,10 @@ void Connection::DispatchOperations::operator()(const PubMsgRecord& msg) {
++stats->async_writes_cnt;
const PubMessage& pub_msg = msg.pub_msg;
string_view arr[4];
DCHECK(!rbuilder->is_sending);
rbuilder->is_sending = true;
if (pub_msg.pattern.empty()) {
DVLOG(1) << "Sending message, from channel: " << pub_msg.channel << " " << *pub_msg.message;
arr[0] = "message";
arr[1] = pub_msg.channel;
arr[2] = *pub_msg.message;
Expand All @@ -287,6 +290,7 @@ void Connection::DispatchOperations::operator()(const PubMsgRecord& msg) {
arr[3] = *pub_msg.message;
rbuilder->SendStringArr(absl::Span<string_view>{arr, 4});
}
rbuilder->is_sending = false;
}

void Connection::DispatchOperations::operator()(Request::PipelineMsg& msg) {
Expand Down
2 changes: 2 additions & 0 deletions src/facade/reply_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class SinkReplyBuilder {
virtual void SendStored() = 0;
virtual void SendSetSkipped() = 0;

bool is_sending = false;

protected:
void Send(const iovec* v, uint32_t len);

Expand Down
13 changes: 8 additions & 5 deletions src/server/conn_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,19 @@ void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgLis

if (to_reply) {
const char* action[2] = {"unsubscribe", "subscribe"};

facade::RedisReplyBuilder* rbuilder = this->operator->();
DCHECK(!rbuilder->is_sending);
rbuilder->is_sending = true;
for (size_t i = 0; i < result.size(); ++i) {
(*this)->StartArray(3);
(*this)->SendBulkString(action[to_add]);
(*this)->SendBulkString(ArgS(args, i)); // channel
rbuilder->StartArray(3);
rbuilder->SendBulkString(action[to_add]);
rbuilder->SendBulkString(ArgS(args, i)); // channel

// number of subscribed channels for this connection *right after*
// we subscribe.
(*this)->SendLong(result[i]);
rbuilder->SendLong(result[i]);
}
rbuilder->is_sending = false;
}
}

Expand Down
4 changes: 4 additions & 0 deletions tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ pip install -r dragonfly/requirements.txt
to run pytest, run:
`pytest -xv dragonfly`

to run selectively, use:
`pytest -xv dragonfly -k <substring>`
For more pytest flags [check here](https://fig.io/manual/pytest).

## Writing tests
The [Getting Started](https://docs.pytest.org/en/7.1.x/getting-started.html) guide is a great resource to become familiar with writing pytest test cases.

Expand Down
38 changes: 38 additions & 0 deletions tests/dragonfly/connection_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import aioredis
import async_timeout

from . import DflyInstance


async def run_monitor_eval(monitor, expected):
async with monitor as mon:
Expand Down Expand Up @@ -278,6 +280,42 @@ def generate(max):
assert state, message


@pytest.mark.asyncio
async def test_subsribers_with_active_publisher(df_server: DflyInstance, max_connections=100):
# TODO: I am not how to customize the max connections for the pool.
async_pool = aioredis.ConnectionPool(host="localhost", port=df_server.port,
db=0, decode_responses=True, max_connections=max_connections)

async def publish_worker():
client = aioredis.Redis(connection_pool=async_pool)
for i in range(0, 2000):
await client.publish("channel", f"message-{i}")
await client.close()

async def channel_reader(channel: aioredis.client.PubSub):
for i in range(0, 150):
try:
async with async_timeout.timeout(1):
message = await channel.get_message(ignore_subscribe_messages=True)
except asyncio.TimeoutError:
break

async def subscribe_worker():
client = aioredis.Redis(connection_pool=async_pool)
pubsub = client.pubsub()
async with pubsub as p:
await pubsub.subscribe("channel")
await channel_reader(pubsub)
await pubsub.unsubscribe("channel")

# Create a publisher that sends constantly messages to the channel
# Then create subscribers that will subscribe to already active channel
pub_task = asyncio.create_task(publish_worker())
await asyncio.gather(*(subscribe_worker() for _ in range(max_connections - 10)))
await pub_task
await async_pool.disconnect()


@pytest.mark.asyncio
async def test_big_command(df_server, size=8 * 1024):
reader, writer = await asyncio.open_connection('127.0.0.1', df_server.port)
Expand Down

0 comments on commit 36e561a

Please sign in to comment.