Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pytest): Gen2 seeder, part 1 #2556

Merged
merged 3 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions src/core/interpreter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

#include "core/interpreter.h"

#include <absl/base/casts.h>
#include <absl/container/fixed_array.h>
#include <absl/strings/str_cat.h>
#include <absl/time/clock.h>
#include <mimalloc.h>
#include <openssl/evp.h>
#include <xxhash.h>

#include <cstring>
#include <optional>
Expand Down Expand Up @@ -303,6 +305,41 @@ void ToHex(const uint8_t* src, char* dest) {
dest[40] = '\0';
}

int DragonflyHashCommand(lua_State* lua) {
int argc = lua_gettop(lua);
if (argc != 2) {
lua_pushstring(lua, "wrong number of arguments");
return lua_error(lua);
}

XXH64_hash_t hash = absl::bit_cast<XXH64_hash_t>(lua_tointeger(lua, 1));
auto update_hash = [&hash](string_view sv) { hash = XXH64(sv.data(), sv.length(), hash); };

auto digest_value = [&hash, &update_hash, lua](int pos) {
if (int type = lua_type(lua, pos); type == LUA_TSTRING) {
const char* str = lua_tostring(lua, pos);
update_hash(string_view{str, strlen(str)});
} else {
CHECK_EQ(type, LUA_TNUMBER) << "Only strings and integers can be hashed";
update_hash(to_string(lua_tointeger(lua, pos)));
}
};

if (lua_type(lua, 2) == LUA_TTABLE) {
lua_pushnil(lua);
while (lua_next(lua, 2) != 0) {
digest_value(-2); // key, included for correct hashing
digest_value(-1); // value
lua_pop(lua, 1);
}
} else {
digest_value(2);
}

lua_pushinteger(lua, absl::bit_cast<lua_Integer>(hash));
return 1;
}

int RedisSha1Command(lua_State* lua) {
int argc = lua_gettop(lua);
if (argc != 1) {
Expand Down Expand Up @@ -387,6 +424,17 @@ Interpreter::Interpreter() {
*ptr = this;
// SaveOnRegistry(lua_, kInstanceKey, this);

/* Register the dragonfly commands table and fields */
lua_newtable(lua_);

lua_pushstring(lua_, "ihash");
lua_pushcfunction(lua_, DragonflyHashCommand);
lua_settable(lua_, -3);

/* Finally set the table as 'dragonfly' global var. */
lua_setglobal(lua_, "dragonfly");
CHECK(lua_checkstack(lua_, 64));

/* Register the redis commands table and fields */
lua_newtable(lua_);

Expand Down
2 changes: 1 addition & 1 deletion tests/dragonfly/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def create(self, existing_port=None, **kwargs) -> DflyInstance:
args = {**self.args, **kwargs}
args.setdefault("dbfilename", "")
vmod = "dragonfly_connection=1,accept_server=1,listener_interface=1,main_service=1,rdb_save=1,replica=1"
args.setdefault("vmodule", vmod)
# args.setdefault("vmodule", vmod)

for k, v in args.items():
args[k] = v.format(**self.params.env) if isinstance(v, str) else v
Expand Down
1 change: 1 addition & 0 deletions tests/dragonfly/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ redis-om==0.2.1
pytest-emoji==0.2.0
pytest-icdiff==0.8
pytest-timeout==2.2.0
asyncio==3.4.3
65 changes: 65 additions & 0 deletions tests/dragonfly/seeder/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
## Seeder library

Please use the testing frameworks factories to obtain proper seeder instances!

### 1. Filling data

The seeder tries to maintain a specific number of keys, quickly filling or emptying the instance to reach the target. Once reached, it will issue also modification commands, trying to maintain an equilibrium with mixed load

```python
# Configure how many keys we want
s = Seeder(key_target=10_000)

# Fill instance with keys until it's 10k +- 1%
# Will create many new keys with data and reach equilibrium
await s.run(client, target_deviation=0.01)
assert abs(client.dbsize() - 10_000) <= 100

# Run 5k operations, balanced mix of create/delete/modify
await s.run(client, target_ops=5000)

# Now we want only 500 keys, issue many deletes
s.change_key_target(500)
await s.run(client, target_deviation=0.01)
```

### 2. Checking consistency

Use `Seeder.capture()` to calculate a "state hashes" based on all the data inside an instance. Equal data produces equal hashes (equal hashes don't guarantee equal data but what are the odds...).

```python
# Fill master with 10k (+- 1%) keys
s = Seeder(key_target=10_000)
await seeder.run(master, target_deviation=0.01)

# "Replicate" or other operations
replicate(master, replica)

# Ensure master and replica have same state hashes
master_hashes, replica_hashes = asyncio.gather(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't we missing await in this statement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yes, we are, will add in next PR

Seeder.capture(master), # note it's a static method
Seeder.capture(replica)
)
assert master_hashes == replica_hashes
```

### 3. Working with load

A seeders `run(client)` can be called without any target. It can only be stopped with

```python
# Fill instance with keys
s = Seeder()
await seeder.run(client, target_deviation=0.01)

# Start seeder without target
# Because the instance reached its key target, the seeder
# will issue a balanced mix of modifications/additions/deletions
seeding_task = asyncio.create_task(s.run(client))

# Do operations under fuzzy load
save(client)

await s.stop(client) # request stop, no immediate effect
await seeding_task # wait for actual stop and cleanup
```
117 changes: 117 additions & 0 deletions tests/dragonfly/seeder/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import asyncio
import random
import re
import typing
import redis.asyncio as aioredis
from dataclasses import dataclass

try:
from importlib import resources as impresources
except ImportError:
# CI runs on python < 3.8
import importlib_resources as impresources


class SeederBase:
UID_COUNTER = 1 # multiple generators should not conflict on keys
CACHED_SCRIPTS = {}
TYPES = ["string"]

@classmethod
async def capture(clz, client: aioredis.Redis) -> typing.List[int]:
"""Generate hash capture for all data stored in instance pointed by client"""

sha = await client.script_load(clz._load_script("hash"))
return await asyncio.gather(*(client.evalsha(sha, 0, data_type) for data_type in clz.TYPES))

@classmethod
def _next_id(clz):
clz.UID_COUNTER += 1
return clz.UID_COUNTER

@staticmethod
def _read_file(fname):
try:
script_file = impresources.files(__package__) / fname
with script_file.open("rt") as f:
return f.read()
except AttributeError:
return impresources.read_text(__package__, fname)

@classmethod
def _load_script(clz, fname):
if fname in clz.CACHED_SCRIPTS:
return clz.CACHED_SCRIPTS[fname]

script = clz._read_file(f"script-{fname}.lua")
requested = re.findall(r"-- import:(.*?) --", script)
for request in requested:
lib = clz._read_file(f"script-{request}.lua")
script = script.replace(f"-- import:{request} --", lib)

clz.CACHED_SCRIPTS[fname] = script
return script


class Seeder(SeederBase):
@dataclass
class Unit:
prefix: str
type: str
counter: int
stop_key: str

units: typing.List[Unit]

def __init__(self, units=10, key_target=10_000, data_size=10):
self.uid = Seeder._next_id()
self.key_target = key_target
self.data_size = data_size
self.units = [
Seeder.Unit(
prefix=f"k-s{self.uid}u{i}-",
type=random.choice(Seeder.TYPES),
counter=0,
stop_key=f"_s{self.uid}u{i}-stop",
)
for i in range(units)
]

async def run(self, client: aioredis.Redis, target_ops=None, target_deviation=None):
"""Run seeder until one of the targets or until stopped if none are set"""

using_stopkey = target_ops is None and target_deviation is None
args = [
self.key_target / len(self.units),
target_ops if target_ops is not None else 0,
target_deviation if target_deviation is not None else -1,
self.data_size,
]

sha = await client.script_load(Seeder._load_script("generate"))
await asyncio.gather(
*(self._run_unit(client, sha, unit, using_stopkey, args) for unit in self.units)
)

async def stop(self, client: aioredis.Redis):
"""Reqeust seeder seeder if it's running without a target, future returned from start() must still be awaited"""

await asyncio.gather(*(client.set(unit.stop_key, "X") for unit in self.units))

def change_key_target(self, target: int):
"""Change key target, applied only on succeeding runs"""

self.key_target = max(target, 100) # math breaks with low values

@staticmethod
async def _run_unit(client: aioredis.Redis, sha: str, unit: Unit, using_stopkey, args):
await client.delete(unit.stop_key)

args = [
unit.prefix,
unit.type,
unit.counter,
unit.stop_key if using_stopkey else "",
] + args

unit.counter = await client.evalsha(sha, 0, *args)
126 changes: 126 additions & 0 deletions tests/dragonfly/seeder/script-generate.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#!lua flags=disable-atomicity

--[[
Script for quickly generating various data
]] --
-- import:genlib --
-- import:utillib --

-- inputs: unit identifiers
local prefix = ARGV[1]
local type = ARGV[2]
local key_counter = tonumber(ARGV[3])
local stop_key = ARGV[4]

-- inputs: task specific
local key_target = tonumber(ARGV[5])
local total_ops = tonumber(ARGV[6])
local min_dev = tonumber(ARGV[7])
local data_size = tonumber(ARGV[8])

-- collect all keys belonging to this script
-- assumes exclusive ownership
local keys = LU_collect_keys(prefix, type)

local addfunc = LG_funcs['add_' .. type]
local modfunc = LG_funcs['mod_' .. type]

local function action_add()
local key = prefix .. tostring(key_counter)
key_counter = key_counter + 1
table.insert(keys, key)
addfunc(key, data_size)
end

local function action_mod()
local key = keys[math.random(#keys)]
modfunc(key, data_size)
end

local function action_del()
local key_idx = math.random(#keys)
keys[key_idx], keys[#keys] = keys[#keys], keys[key_idx]

local key = table.remove(keys)
redis.acall('DEL', key)
end

-- set equilibrium point as key target, see intensity calculations below
local real_target = key_target
key_target = key_target / 0.956

-- accumulative probabilities: [add, add + delete, modify = 1-( add + delete) ]
local p_add = 0
local p_del = 0

local counter = 0
while true do
counter = counter + 1

-- break if we reached target ops
if total_ops > 0 and counter > total_ops then
break
end

if key_target < 100 and min_dev > 0 then
print(real_target, key_target, math.abs(#keys - real_target) / real_target)
print()
end

-- break if we reached our target deviation
if min_dev > 0 and math.abs(#keys - real_target) / real_target < min_dev then
break
end

-- break if stop key was set (every 100 ops to not slow down)
if stop_key ~= '' and counter % 100 == 0 and redis.call('EXISTS', stop_key) then
break
end

-- fast path, if we have less than half of the target, always grow
if #keys * 2 < key_target then
action_add()
goto continue
end

-- update probability only every 10 iterations
if counter % 10 == 0 then
-- calculate intensity (not normalized probabilities)
-- please see attached plots in PR to undertand convergence

-- the add intensity is monotonically decreasing with keycount growing,
-- the delete intensity is monotonically increasing with keycount growing,
-- the point where the intensities are equal is the equilibrium point,
-- based on the formulas it's ~0.82 * key_target
local i_add = math.max(0, 1 - (#keys / key_target) ^ 16)
local i_del = (#keys / key_target) ^ 16

-- we are only interested in large amounts of modification commands when we are in an
-- equilibrium, where there are no low intensities
local i_mod = math.max(0, 7 * math.min(i_add, i_del) ^ 3)

-- transform intensities to [0, 1] probability ranges
local sum = i_add + i_del + i_mod
p_add = i_add / sum
p_del = p_add + i_del / sum
end

-- generate random action
local p = math.random()
if p < p_add then
action_add()
elseif p < p_del then
action_del()
else
action_mod()
end

::continue::
end

-- clear stop key
if stop_key ~= '' then
redis.call('DEL', stop_key)
end

return key_counter
Loading
Loading