diff --git a/src/core/interpreter.cc b/src/core/interpreter.cc index 5fc4a7371704..8963e629540e 100644 --- a/src/core/interpreter.cc +++ b/src/core/interpreter.cc @@ -4,11 +4,13 @@ #include "core/interpreter.h" +#include #include #include #include #include #include +#include #include #include @@ -303,6 +305,41 @@ void ToHex(const uint8_t* src, char* dest) { dest[40] = '\0'; } +int DragonflyHashCommand(lua_State* lua) { + int argc = lua_gettop(lua); + if (argc != 2) { + lua_pushstring(lua, "wrong number of arguments"); + return lua_error(lua); + } + + XXH64_hash_t hash = absl::bit_cast(lua_tointeger(lua, 1)); + auto update_hash = [&hash](string_view sv) { hash = XXH64(sv.data(), sv.length(), hash); }; + + auto digest_value = [&hash, &update_hash, lua](int pos) { + if (int type = lua_type(lua, pos); type == LUA_TSTRING) { + const char* str = lua_tostring(lua, pos); + update_hash(string_view{str, strlen(str)}); + } else { + CHECK_EQ(type, LUA_TNUMBER) << "Only strings and integers can be hashed"; + update_hash(to_string(lua_tointeger(lua, pos))); + } + }; + + if (lua_type(lua, 2) == LUA_TTABLE) { + lua_pushnil(lua); + while (lua_next(lua, 2) != 0) { + digest_value(-2); // key, included for correct hashing + digest_value(-1); // value + lua_pop(lua, 1); + } + } else { + digest_value(2); + } + + lua_pushinteger(lua, absl::bit_cast(hash)); + return 1; +} + int RedisSha1Command(lua_State* lua) { int argc = lua_gettop(lua); if (argc != 1) { @@ -387,6 +424,17 @@ Interpreter::Interpreter() { *ptr = this; // SaveOnRegistry(lua_, kInstanceKey, this); + /* Register the dragonfly commands table and fields */ + lua_newtable(lua_); + + lua_pushstring(lua_, "ihash"); + lua_pushcfunction(lua_, DragonflyHashCommand); + lua_settable(lua_, -3); + + /* Finally set the table as 'dragonfly' global var. */ + lua_setglobal(lua_, "dragonfly"); + CHECK(lua_checkstack(lua_, 64)); + /* Register the redis commands table and fields */ lua_newtable(lua_); diff --git a/tests/dragonfly/instance.py b/tests/dragonfly/instance.py index 66c0d36cc622..a8f80ab14bff 100644 --- a/tests/dragonfly/instance.py +++ b/tests/dragonfly/instance.py @@ -319,7 +319,7 @@ def create(self, existing_port=None, **kwargs) -> DflyInstance: args = {**self.args, **kwargs} args.setdefault("dbfilename", "") vmod = "dragonfly_connection=1,accept_server=1,listener_interface=1,main_service=1,rdb_save=1,replica=1" - args.setdefault("vmodule", vmod) + # args.setdefault("vmodule", vmod) for k, v in args.items(): args[k] = v.format(**self.params.env) if isinstance(v, str) else v diff --git a/tests/dragonfly/requirements.txt b/tests/dragonfly/requirements.txt index 6d15d8dd92ce..185537b9b5c9 100644 --- a/tests/dragonfly/requirements.txt +++ b/tests/dragonfly/requirements.txt @@ -23,3 +23,4 @@ redis-om==0.2.1 pytest-emoji==0.2.0 pytest-icdiff==0.8 pytest-timeout==2.2.0 +asyncio==3.4.3 diff --git a/tests/dragonfly/seeder/README.md b/tests/dragonfly/seeder/README.md new file mode 100644 index 000000000000..56c80385654f --- /dev/null +++ b/tests/dragonfly/seeder/README.md @@ -0,0 +1,65 @@ +## Seeder library + +Please use the testing frameworks factories to obtain proper seeder instances! + +### 1. Filling data + +The seeder tries to maintain a specific number of keys, quickly filling or emptying the instance to reach the target. Once reached, it will issue also modification commands, trying to maintain an equilibrium with mixed load + +```python +# Configure how many keys we want +s = Seeder(key_target=10_000) + +# Fill instance with keys until it's 10k +- 1% +# Will create many new keys with data and reach equilibrium +await s.run(client, target_deviation=0.01) +assert abs(client.dbsize() - 10_000) <= 100 + +# Run 5k operations, balanced mix of create/delete/modify +await s.run(client, target_ops=5000) + +# Now we want only 500 keys, issue many deletes +s.change_key_target(500) +await s.run(client, target_deviation=0.01) +``` + +### 2. Checking consistency + +Use `Seeder.capture()` to calculate a "state hashes" based on all the data inside an instance. Equal data produces equal hashes (equal hashes don't guarantee equal data but what are the odds...). + +```python +# Fill master with 10k (+- 1%) keys +s = Seeder(key_target=10_000) +await seeder.run(master, target_deviation=0.01) + +# "Replicate" or other operations +replicate(master, replica) + +# Ensure master and replica have same state hashes +master_hashes, replica_hashes = asyncio.gather( + Seeder.capture(master), # note it's a static method + Seeder.capture(replica) +) +assert master_hashes == replica_hashes +``` + +### 3. Working with load + +A seeders `run(client)` can be called without any target. It can only be stopped with + +```python +# Fill instance with keys +s = Seeder() +await seeder.run(client, target_deviation=0.01) + +# Start seeder without target +# Because the instance reached its key target, the seeder +# will issue a balanced mix of modifications/additions/deletions +seeding_task = asyncio.create_task(s.run(client)) + +# Do operations under fuzzy load +save(client) + +await s.stop(client) # request stop, no immediate effect +await seeding_task # wait for actual stop and cleanup +``` diff --git a/tests/dragonfly/seeder/__init__.py b/tests/dragonfly/seeder/__init__.py new file mode 100644 index 000000000000..6bbc5dce89b4 --- /dev/null +++ b/tests/dragonfly/seeder/__init__.py @@ -0,0 +1,117 @@ +import asyncio +import random +import re +import typing +import redis.asyncio as aioredis +from dataclasses import dataclass + +try: + from importlib import resources as impresources +except ImportError: + # CI runs on python < 3.8 + import importlib_resources as impresources + + +class SeederBase: + UID_COUNTER = 1 # multiple generators should not conflict on keys + CACHED_SCRIPTS = {} + TYPES = ["string"] + + @classmethod + async def capture(clz, client: aioredis.Redis) -> typing.List[int]: + """Generate hash capture for all data stored in instance pointed by client""" + + sha = await client.script_load(clz._load_script("hash")) + return await asyncio.gather(*(client.evalsha(sha, 0, data_type) for data_type in clz.TYPES)) + + @classmethod + def _next_id(clz): + clz.UID_COUNTER += 1 + return clz.UID_COUNTER + + @staticmethod + def _read_file(fname): + try: + script_file = impresources.files(__package__) / fname + with script_file.open("rt") as f: + return f.read() + except AttributeError: + return impresources.read_text(__package__, fname) + + @classmethod + def _load_script(clz, fname): + if fname in clz.CACHED_SCRIPTS: + return clz.CACHED_SCRIPTS[fname] + + script = clz._read_file(f"script-{fname}.lua") + requested = re.findall(r"-- import:(.*?) --", script) + for request in requested: + lib = clz._read_file(f"script-{request}.lua") + script = script.replace(f"-- import:{request} --", lib) + + clz.CACHED_SCRIPTS[fname] = script + return script + + +class Seeder(SeederBase): + @dataclass + class Unit: + prefix: str + type: str + counter: int + stop_key: str + + units: typing.List[Unit] + + def __init__(self, units=10, key_target=10_000, data_size=10): + self.uid = Seeder._next_id() + self.key_target = key_target + self.data_size = data_size + self.units = [ + Seeder.Unit( + prefix=f"k-s{self.uid}u{i}-", + type=random.choice(Seeder.TYPES), + counter=0, + stop_key=f"_s{self.uid}u{i}-stop", + ) + for i in range(units) + ] + + async def run(self, client: aioredis.Redis, target_ops=None, target_deviation=None): + """Run seeder until one of the targets or until stopped if none are set""" + + using_stopkey = target_ops is None and target_deviation is None + args = [ + self.key_target / len(self.units), + target_ops if target_ops is not None else 0, + target_deviation if target_deviation is not None else -1, + self.data_size, + ] + + sha = await client.script_load(Seeder._load_script("generate")) + await asyncio.gather( + *(self._run_unit(client, sha, unit, using_stopkey, args) for unit in self.units) + ) + + async def stop(self, client: aioredis.Redis): + """Reqeust seeder seeder if it's running without a target, future returned from start() must still be awaited""" + + await asyncio.gather(*(client.set(unit.stop_key, "X") for unit in self.units)) + + def change_key_target(self, target: int): + """Change key target, applied only on succeeding runs""" + + self.key_target = max(target, 100) # math breaks with low values + + @staticmethod + async def _run_unit(client: aioredis.Redis, sha: str, unit: Unit, using_stopkey, args): + await client.delete(unit.stop_key) + + args = [ + unit.prefix, + unit.type, + unit.counter, + unit.stop_key if using_stopkey else "", + ] + args + + unit.counter = await client.evalsha(sha, 0, *args) diff --git a/tests/dragonfly/seeder/script-generate.lua b/tests/dragonfly/seeder/script-generate.lua new file mode 100644 index 000000000000..1f378cfe5b0c --- /dev/null +++ b/tests/dragonfly/seeder/script-generate.lua @@ -0,0 +1,126 @@ +#!lua flags=disable-atomicity + +--[[ +Script for quickly generating various data +]] -- +-- import:genlib -- +-- import:utillib -- + +-- inputs: unit identifiers +local prefix = ARGV[1] +local type = ARGV[2] +local key_counter = tonumber(ARGV[3]) +local stop_key = ARGV[4] + +-- inputs: task specific +local key_target = tonumber(ARGV[5]) +local total_ops = tonumber(ARGV[6]) +local min_dev = tonumber(ARGV[7]) +local data_size = tonumber(ARGV[8]) + +-- collect all keys belonging to this script +-- assumes exclusive ownership +local keys = LU_collect_keys(prefix, type) + +local addfunc = LG_funcs['add_' .. type] +local modfunc = LG_funcs['mod_' .. type] + +local function action_add() + local key = prefix .. tostring(key_counter) + key_counter = key_counter + 1 + table.insert(keys, key) + addfunc(key, data_size) +end + +local function action_mod() + local key = keys[math.random(#keys)] + modfunc(key, data_size) +end + +local function action_del() + local key_idx = math.random(#keys) + keys[key_idx], keys[#keys] = keys[#keys], keys[key_idx] + + local key = table.remove(keys) + redis.acall('DEL', key) +end + +-- set equilibrium point as key target, see intensity calculations below +local real_target = key_target +key_target = key_target / 0.956 + +-- accumulative probabilities: [add, add + delete, modify = 1-( add + delete) ] +local p_add = 0 +local p_del = 0 + +local counter = 0 +while true do + counter = counter + 1 + + -- break if we reached target ops + if total_ops > 0 and counter > total_ops then + break + end + + if key_target < 100 and min_dev > 0 then + print(real_target, key_target, math.abs(#keys - real_target) / real_target) + print() + end + + -- break if we reached our target deviation + if min_dev > 0 and math.abs(#keys - real_target) / real_target < min_dev then + break + end + + -- break if stop key was set (every 100 ops to not slow down) + if stop_key ~= '' and counter % 100 == 0 and redis.call('EXISTS', stop_key) then + break + end + + -- fast path, if we have less than half of the target, always grow + if #keys * 2 < key_target then + action_add() + goto continue + end + + -- update probability only every 10 iterations + if counter % 10 == 0 then + -- calculate intensity (not normalized probabilities) + -- please see attached plots in PR to undertand convergence + + -- the add intensity is monotonically decreasing with keycount growing, + -- the delete intensity is monotonically increasing with keycount growing, + -- the point where the intensities are equal is the equilibrium point, + -- based on the formulas it's ~0.82 * key_target + local i_add = math.max(0, 1 - (#keys / key_target) ^ 16) + local i_del = (#keys / key_target) ^ 16 + + -- we are only interested in large amounts of modification commands when we are in an + -- equilibrium, where there are no low intensities + local i_mod = math.max(0, 7 * math.min(i_add, i_del) ^ 3) + + -- transform intensities to [0, 1] probability ranges + local sum = i_add + i_del + i_mod + p_add = i_add / sum + p_del = p_add + i_del / sum + end + + -- generate random action + local p = math.random() + if p < p_add then + action_add() + elseif p < p_del then + action_del() + else + action_mod() + end + + ::continue:: +end + +-- clear stop key +if stop_key ~= '' then + redis.call('DEL', stop_key) +end + +return key_counter diff --git a/tests/dragonfly/seeder/script-genlib.lua b/tests/dragonfly/seeder/script-genlib.lua new file mode 100644 index 000000000000..e07f0f0de989 --- /dev/null +++ b/tests/dragonfly/seeder/script-genlib.lua @@ -0,0 +1,20 @@ +local LG_funcs = {} + +-- strings + +function LG_funcs.add_string(key, dsize) + local char = string.char(math.random(65, 90)) + redis.apcall('SET', key, string.rep(char, dsize)) +end + +function LG_funcs.mod_string(key, dsize) + -- APPEND and SETRANGE are the only modifying operations for strings, + -- issue APPEND rarely to not grow data too much + if math.random() < 0.05 then + redis.apcall('APPEND', key, '+') + else + local char = string.char(math.random(65, 90)) + local replacement = string.rep(char, math.random(0, dsize / 2)) + redis.apcall('SETRANGE', key, math.random(0, dsize / 2), replacement) + end +end diff --git a/tests/dragonfly/seeder/script-hash.lua b/tests/dragonfly/seeder/script-hash.lua new file mode 100644 index 000000000000..643fc83e5542 --- /dev/null +++ b/tests/dragonfly/seeder/script-hash.lua @@ -0,0 +1,34 @@ +#!lua flags=disable-atomicity +--[[ +Script for quickly computing single 64bit hash for keys of types specified in ARGV[]. +Keys of every type are sorted lexicographically to ensure consistent order. +]]-- + +-- import:hashlib -- +-- import:utillib -- + +-- inputs +local requested_types = ARGV + +local OUT_HASH = 0 + +local function process(type) + local keys = LU_collect_keys('', type) + local hfunc = LH_funcs[type] + + -- sort to provide consistent order + table.sort(keys) + for _, key in ipairs(keys) do + -- add key to hash + OUT_HASH = dragonfly.ihash(OUT_HASH, key) + -- hand hash over to callback + OUT_HASH = hfunc(key, OUT_HASH) + end +end + +for _, type in ipairs(requested_types) do + process(type) +end + + +return OUT_HASH diff --git a/tests/dragonfly/seeder/script-hashlib.lua b/tests/dragonfly/seeder/script-hashlib.lua new file mode 100644 index 000000000000..d2812e5dd890 --- /dev/null +++ b/tests/dragonfly/seeder/script-hashlib.lua @@ -0,0 +1,39 @@ +local LH_funcs = {} + +function LH_funcs.string(key, hash) + -- add value to hash + return dragonfly.ihash(hash, redis.call('GET', key)) +end + +function LH_funcs.list(key, hash) + -- add values to hash + return dragonfly.ihash(hash, redis.call('LRANGE', key, 0, -1)) +end + +function LH_funcs.set(key, hash) + -- add values to hash, sort before to avoid ambiguity + local items = redis.call('SMEMBERS', key) + table.sort(items) + return dragonfly.ihash(hash, items) +end + +function LH_funcs.zset(key, hash) + -- add values to hash, ZRANGE returns always sorted values + return dragonfly.ihash(hash, redis.call('ZRANGE', key, 0, -1, 'WITHSCORES')) +end + +function LH_funcs.hash(key, hash) + -- add values to hash, first convert to key-value pairs and sort + local items = redis.call('HGETALL', key) + local paired_items = {} + for i = 1, #items, 2 do + table.insert(paired_items, items[i] .. '->' .. items[i+1]) + end + table.sort(paired_items) + return dragonfly.ihash(hash, paired_items) +end + +function LH_funcs.json(key, hash) + -- add values to hash, note JSON.GET returns just a string + return dragonfly.ihash(hash, redis.call('JSON.GET', key)) +end diff --git a/tests/dragonfly/seeder/script-utillib.lua b/tests/dragonfly/seeder/script-utillib.lua new file mode 100644 index 000000000000..982d735de111 --- /dev/null +++ b/tests/dragonfly/seeder/script-utillib.lua @@ -0,0 +1,15 @@ +-- collect all keys into table specific type on specific prefix. Uses SCAN-- +local function LU_collect_keys(prefix, type) + local pattern = prefix .. "*" + local cursor = "0" + local keys = {} + repeat + local result = redis.call("SCAN", cursor, "COUNT", 500, "TYPE", type, "MATCH", pattern) + cursor = result[1] + local scan_keys = result[2] + for i, key in ipairs(scan_keys) do + table.insert(keys, key) + end + until cursor == "0" + return keys +end diff --git a/tests/dragonfly/seeder_test.py b/tests/dragonfly/seeder_test.py new file mode 100644 index 000000000000..70a6f196acf9 --- /dev/null +++ b/tests/dragonfly/seeder_test.py @@ -0,0 +1,69 @@ +import asyncio +import async_timeout +import string +import random +from redis import asyncio as aioredis +from . import dfly_args +from .seeder import Seeder + + +@dfly_args({"proactor_threads": 4}) +async def test_seeder_key_target(async_client: aioredis.Redis): + """Ensure seeder reaches its key targets""" + s = Seeder(units=random.randint(4, 12), key_target=5000) + + # Ensure tests are not reasonably slow + async with async_timeout.timeout(1 + 4): + # Fill with 5k keys, 1% derivation = 50 + await s.run(async_client, target_deviation=0.01) + assert abs(await async_client.dbsize() - 5000) <= 50 + + # Run 1k ops, ensure key balance stays the "more or less" the same + await s.run(async_client, target_ops=1000) + assert abs(await async_client.dbsize() - 5000) <= 100 + + # Run one second until stopped + task = asyncio.create_task(s.run(async_client)) + await asyncio.sleep(1.0) + await s.stop(async_client) + await task + + # Change key target, 100 is actual minimum because "math breaks" + s.change_key_target(0) + await s.run(async_client, target_deviation=0.5) # don't set low precision with low values + assert await async_client.dbsize() < 200 + + +@dfly_args({"proactor_threads": 4}) +async def test_seeder_capture(async_client: aioredis.Redis): + """Ensure same data produces same state hashes""" + + async def set_data(): + p = async_client.pipeline() + p.mset(mapping={f"string{i}": f"{i}" for i in range(100)}) + # uncomment when seeder supports more than strings + # p.lpush("list1", *list(string.ascii_letters)) + # p.sadd("set1", *list(string.ascii_letters)) + # p.hset("hash1", mapping={f"{i}": l for i, l in enumerate(string.ascii_letters)}) + # p.zadd("zset1", mapping={l: i for i, l in enumerate(string.ascii_letters)}) + # p.json().set("json1", ".", {"a": [1, 2, 3], "b": {"c": 1, "d": 2, "e": [5, 6]}}) + await p.execute() + + # Capture with filled data + await set_data() + c1 = await Seeder.capture(async_client) + + # Check hashes are 0 without data + await async_client.flushall() + assert all(h == 0 for h in (await Seeder.capture(async_client))) + + # Check setting the same data results in same hashes + await set_data() + c2 = await Seeder.capture(async_client) + assert c1 == c2 + + # Check chaning the data gives different hahses + # await async_client.lpush("list1", "NEW") + await async_client.append("string1", "MORE-DATA") + c3 = await Seeder.capture(async_client) + assert c1 != c3