From a7d405251de4435975f70e225de24f3f3a7c516b Mon Sep 17 00:00:00 2001 From: shahar Date: Tue, 19 Nov 2024 17:18:53 +0200 Subject: [PATCH] fix: Huge entries fail to load outside RDB / replication We have an internal utility tool that we use to deserialize values in some use cases: * `RESTORE` * Cluster slot migration * `RENAME`, if the source and target shards are different We [recently](https://github.com/dragonflydb/dragonfly/issues/3760) changed this area of the code, which caused this regression as it only handled RDB / replication streams. Fixes #4143 --- src/server/debugcmd.cc | 2 +- src/server/generic_family.cc | 65 ++++++++++++++++++++++----------- tests/dragonfly/cluster_test.py | 47 ++++++++++++++++++++++++ tests/dragonfly/generic_test.py | 39 ++++++++++++++++++++ 4 files changed, 130 insertions(+), 23 deletions(-) diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index e8c264c74191..809f28100cc1 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -401,7 +401,7 @@ void DebugCmd::Run(CmdArgList args, facade::SinkReplyBuilder* builder) { " to meet value size.", " If RAND is specified then value will be set to random hex string in specified size.", " If SLOTS is specified then create keys only in given slots range.", - " TYPE specifies data type (must be STRING/LIST/SET/HSET/ZSET/JSON), default STRING.", + " TYPE specifies data type (must be STRING/LIST/SET/HASH/ZSET/JSON), default STRING.", " ELEMENTS specifies how many sub elements if relevant (like entries in a list / set).", "OBJHIST", " Prints histogram of object sizes.", diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 0b3aa70bb5ee..a29b556675b3 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -157,25 +157,32 @@ class RdbRestoreValue : protected RdbLoaderBase { const RestoreArgs& args, EngineShard* shard); private: - std::optional Parse(std::string_view payload); + std::optional Parse(io::Source* source); + int rdb_type_ = -1; }; -std::optional RdbRestoreValue::Parse(std::string_view payload) { - InMemSource source(payload); - src_ = &source; - if (io::Result type_id = FetchType(); type_id && rdbIsObjectTypeDF(type_id.value())) { - OpaqueObj obj; - error_code ec = ReadObj(type_id.value(), &obj); // load the type from the input stream - if (ec) { - LOG(ERROR) << "failed to load data for type id " << (unsigned int)type_id.value(); - return std::nullopt; +std::optional RdbRestoreValue::Parse(io::Source* source) { + src_ = source; + if (pending_read_.remaining == 0) { + io::Result type_id = FetchType(); + if (type_id && rdbIsObjectTypeDF(type_id.value())) { + rdb_type_ = *type_id; } + } - return std::optional(std::move(obj)); - } else { + if (rdb_type_ == -1) { LOG(ERROR) << "failed to load type id from the input stream or type id is invalid"; return std::nullopt; } + + OpaqueObj obj; + error_code ec = ReadObj(rdb_type_, &obj); // load the type from the input stream + if (ec) { + LOG(ERROR) << "failed to load data for type id " << rdb_type_; + return std::nullopt; + } + + return std::optional(std::move(obj)); } std::optional RdbRestoreValue::Add(std::string_view data, @@ -183,17 +190,31 @@ std::optional RdbRestoreValue::Add(std::string_view data, const DbContext& cntx, const RestoreArgs& args, EngineShard* shard) { - auto opaque_res = Parse(data); - if (!opaque_res) { - return std::nullopt; - } - + InMemSource data_src(data); PrimeValue pv; - if (auto ec = FromOpaque(*opaque_res, &pv); ec) { - // we failed - report and exit - LOG(WARNING) << "error while trying to save data: " << ec; - return std::nullopt; - } + bool first_parse = true; + do { + auto opaque_res = Parse(&data_src); + if (!opaque_res) { + return std::nullopt; + } + + LoadConfig config; + if (first_parse) { + first_parse = false; + } else { + config.append = true; + } + if (pending_read_.remaining > 0) { + config.streamed = true; + } + + if (auto ec = FromOpaque(*opaque_res, config, &pv); ec) { + // we failed - report and exit + LOG(WARNING) << "error while trying to read data: " << ec; + return std::nullopt; + } + } while (pending_read_.remaining > 0); if (auto res = db_slice.AddNew(cntx, key, std::move(pv), args.ExpirationTime()); res) { res->it->first.SetSticky(args.Sticky()); diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 66f70856e236..d20d0d319f16 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1773,6 +1773,53 @@ async def node1size0(): assert str(i) == await nodes[1].client.get(f"{{key50}}:{i}") +@pytest.mark.parametrize("type", ["list", "hash", "string", "set", "zset"]) +@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"}) +@pytest.mark.asyncio +async def test_cluster_migration_huge_list(df_factory: DflyInstanceFactory, type): + instances = [ + df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2) + ] + df_factory.start_all(instances) + + nodes = [await create_node_info(instance) for instance in instances] + nodes[0].slots = [(0, 16383)] + nodes[1].slots = [] + + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + logging.debug(f"Generating huge {type}") + await nodes[0].client.execute_command( + f"debug populate 1 k 10000 RAND TYPE {type} ELEMENTS 1000" + ) + + async def get_length(client): + if type == "list": + return await client.llen("k:0") + elif type == "hash": + return await client.hlen("k:0") + elif type == "string": + return await client.strlen("k:0") + elif type == "set": + return await client.scard("k:0") + else: + assert type == "zset" + return await client.zcard("k:0") + + size = await get_length(nodes[0].client) + + nodes[0].migrations = [ + MigrationInfo("127.0.0.1", instances[1].admin_port, [(0, 16383)], nodes[1].id) + ] + logging.debug("Migrating slots") + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + logging.debug("Waiting for migration to finish") + await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED") + + assert size == await get_length(nodes[1].client) + + def parse_lag(replication_info: str): lags = re.findall("lag=([0-9]+)\r\n", replication_info) assert len(lags) == 1 diff --git a/tests/dragonfly/generic_test.py b/tests/dragonfly/generic_test.py index 020d364a9c23..21848686800d 100644 --- a/tests/dragonfly/generic_test.py +++ b/tests/dragonfly/generic_test.py @@ -1,4 +1,5 @@ import os +import logging import pytest import redis import asyncio @@ -168,3 +169,41 @@ async def test_denyoom_commands(df_factory): # mget should not be rejected await client.execute_command("mget x") + + +@pytest.mark.parametrize("type", ["list", "hash", "string", "set", "zset"]) +@dfly_args({"proactor_threads": 4}) +@pytest.mark.asyncio +async def test_rename_huge_values(df_factory, type): + df_server = df_factory.create() + df_server.start() + client = df_server.client() + + logging.debug(f"Generating huge {type}") + await client.execute_command(f"debug populate 1 k 10000 RAND TYPE {type} ELEMENTS 1000") + + async def get_length(key): + if type == "list": + return await client.llen(key) + elif type == "hash": + return await client.hlen(key) + elif type == "string": + return await client.strlen(key) + elif type == "set": + return await client.scard(key) + else: + assert type == "zset" + return await client.zcard(key) + + size = await get_length("k:0") + logging.debug(f"size {size}") + keys = await client.execute_command("keys *") + logging.debug(f"keys {keys}") + + # Rename multiple times to make sure the key moves between shards + old_name = "k:0" + for i in range(10): + new_name = f"new:{i}" + await client.execute_command(f"rename {old_name} {new_name}") + assert size == await get_length(new_name) + old_name = new_name