From 2b199518dba679f253af3a80fc952fa28b126f5d Mon Sep 17 00:00:00 2001 From: shahar Date: Tue, 15 Oct 2024 10:56:25 +0300 Subject: [PATCH] fix: Support replicating Valkey and Redis 7.2 Until now, we only tested Dragonfly against Redis 6.2. It appears that something has changed in the way Redis sends stable sync commands, and now they also forward `MULTI` and `EXEC` as part of their replication. Since we do not allow all commands to run under `MULTI`/`EXEC`, specifically `SELECT`, a Dragonfly replica of such servers failed these commands and became inconsistent with the data on the master. The proposed fix is to simply ignore (i.e. not execute) `MULTI`/`EXEC` coming from a Redis/Valkey master, and run the commands within those transactions individually, like we do for other transactions. To test this we randomly choose a redis/valkey server based on 3 available installed binaries and test against them. --- src/server/replica.cc | 26 ++++++++++++++++---------- tests/dragonfly/instance.py | 4 +++- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/src/server/replica.cc b/src/server/replica.cc index f618164dda68..df22b1e67321 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -635,18 +635,24 @@ error_code Replica::ConsumeRedisStream() { } if (!LastResponseArgs().empty()) { - VLOG(2) << "Got command " << absl::CHexEscape(ToSV(LastResponseArgs()[0].GetBuf())) - << "\n consumed: " << response->total_read; - - if (LastResponseArgs()[0].GetBuf()[0] == '\r') { - for (const auto& arg : LastResponseArgs()) { - LOG(INFO) << absl::CHexEscape(ToSV(arg.GetBuf())); + string_view cmd = absl::CHexEscape(ToSV(LastResponseArgs()[0].GetBuf())); + + // Valkey and Redis may send MULTI and EXEC as part of their replication commands. + // Dragonfly disallows some commands, such as SELECT, inside of MULTI/EXEC, so here we simply + // ignore MULTI/EXEC and execute their inner commands individually. + if (!absl::EqualsIgnoreCase(cmd, "MULTI") && !absl::EqualsIgnoreCase(cmd, "EXEC")) { + VLOG(2) << "Got command " << cmd << "\n consumed: " << response->total_read; + + if (LastResponseArgs()[0].GetBuf()[0] == '\r') { + for (const auto& arg : LastResponseArgs()) { + LOG(INFO) << absl::CHexEscape(ToSV(arg.GetBuf())); + } } - } - facade::RespExpr::VecToArgList(LastResponseArgs(), &args_vector); - CmdArgList arg_list{args_vector.data(), args_vector.size()}; - service_.DispatchCommand(arg_list, &conn_context); + facade::RespExpr::VecToArgList(LastResponseArgs(), &args_vector); + CmdArgList arg_list{args_vector.data(), args_vector.size()}; + service_.DispatchCommand(arg_list, &conn_context); + } } io_buf.ConsumeInput(response->left_in_buffer); diff --git a/tests/dragonfly/instance.py b/tests/dragonfly/instance.py index 019d27558db2..d7dd7405ee26 100644 --- a/tests/dragonfly/instance.py +++ b/tests/dragonfly/instance.py @@ -3,6 +3,7 @@ import threading import time import subprocess +import random import aiohttp import logging from dataclasses import dataclass @@ -455,8 +456,9 @@ def __init__(self, port): self.proc = None def start(self, **kwargs): + servers = ["redis-server-6.2.11", "redis-server-7.2.2", "valkey-server-8.0.1"] command = [ - "redis-server-6.2.11", + random.choice(servers), f"--port {self.port}", "--save ''", "--appendonly no",