-
Notifications
You must be signed in to change notification settings - Fork 999
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(pytest): Gen2 seeder, part 1 #2556
Merged
Merged
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,3 +23,4 @@ redis-om==0.2.1 | |
pytest-emoji==0.2.0 | ||
pytest-icdiff==0.8 | ||
pytest-timeout==2.2.0 | ||
asyncio==3.4.3 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
## Seeder library | ||
|
||
Please use the testing frameworks factories to obtain proper seeder instances! | ||
|
||
### 1. Filling data | ||
|
||
The seeder tries to maintain a specific number of keys, quickly filling or emptying the instance to reach the target. Once reached, it will issue also modification commands, trying to maintain an equilibrium with mixed load | ||
|
||
```python | ||
# Configure how many keys we want | ||
s = Seeder(key_target=10_000) | ||
|
||
# Fill instance with keys until it's 10k +- 1% | ||
# Will create many new keys with data and reach equilibrium | ||
await s.run(client, target_deviation=0.01) | ||
assert abs(client.dbsize() - 10_000) <= 100 | ||
|
||
# Run 5k operations, balanced mix of create/delete/modify | ||
await s.run(client, target_ops=5000) | ||
|
||
# Now we want only 500 keys, issue many deletes | ||
s.change_key_target(500) | ||
await s.run(client, target_deviation=0.01) | ||
``` | ||
|
||
### 2. Checking consistency | ||
|
||
Use `Seeder.capture()` to calculate a "state hashes" based on all the data inside an instance. Equal data produces equal hashes (equal hashes don't guarantee equal data but what are the odds...). | ||
|
||
```python | ||
# Fill master with 10k (+- 1%) keys | ||
s = Seeder(key_target=10_000) | ||
await seeder.run(master, target_deviation=0.01) | ||
|
||
# "Replicate" or other operations | ||
replicate(master, replica) | ||
|
||
# Ensure master and replica have same state hashes | ||
master_hashes, replica_hashes = asyncio.gather( | ||
Seeder.capture(master), # note it's a static method | ||
Seeder.capture(replica) | ||
) | ||
assert master_hashes == replica_hashes | ||
``` | ||
|
||
### 3. Working with load | ||
|
||
A seeders `run(client)` can be called without any target. It can only be stopped with | ||
|
||
```python | ||
# Fill instance with keys | ||
s = Seeder() | ||
await seeder.run(client, target_deviation=0.01) | ||
|
||
# Start seeder without target | ||
# Because the instance reached its key target, the seeder | ||
# will issue a balanced mix of modifications/additions/deletions | ||
seeding_task = asyncio.create_task(s.run(client)) | ||
|
||
# Do operations under fuzzy load | ||
save(client) | ||
|
||
await s.stop(client) # request stop, no immediate effect | ||
await seeding_task # wait for actual stop and cleanup | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
import asyncio | ||
import random | ||
import re | ||
import typing | ||
import redis.asyncio as aioredis | ||
from dataclasses import dataclass | ||
|
||
try: | ||
from importlib import resources as impresources | ||
except ImportError: | ||
# CI runs on python < 3.8 | ||
import importlib_resources as impresources | ||
|
||
|
||
class SeederBase: | ||
UID_COUNTER = 1 # multiple generators should not conflict on keys | ||
CACHED_SCRIPTS = {} | ||
TYPES = ["string"] | ||
|
||
@classmethod | ||
async def capture(clz, client: aioredis.Redis) -> typing.List[int]: | ||
"""Generate hash capture for all data stored in instance pointed by client""" | ||
|
||
sha = await client.script_load(clz._load_script("hash")) | ||
return await asyncio.gather(*(client.evalsha(sha, 0, data_type) for data_type in clz.TYPES)) | ||
|
||
@classmethod | ||
def _next_id(clz): | ||
clz.UID_COUNTER += 1 | ||
return clz.UID_COUNTER | ||
|
||
@staticmethod | ||
def _read_file(fname): | ||
try: | ||
script_file = impresources.files(__package__) / fname | ||
with script_file.open("rt") as f: | ||
return f.read() | ||
except AttributeError: | ||
return impresources.read_text(__package__, fname) | ||
|
||
@classmethod | ||
def _load_script(clz, fname): | ||
if fname in clz.CACHED_SCRIPTS: | ||
return clz.CACHED_SCRIPTS[fname] | ||
|
||
script = clz._read_file(f"script-{fname}.lua") | ||
requested = re.findall(r"-- import:(.*?) --", script) | ||
for request in requested: | ||
lib = clz._read_file(f"script-{request}.lua") | ||
script = script.replace(f"-- import:{request} --", lib) | ||
|
||
clz.CACHED_SCRIPTS[fname] = script | ||
return script | ||
|
||
|
||
class Seeder(SeederBase): | ||
@dataclass | ||
class Unit: | ||
prefix: str | ||
type: str | ||
counter: int | ||
stop_key: str | ||
|
||
units: typing.List[Unit] | ||
|
||
def __init__(self, units=10, key_target=10_000, data_size=10): | ||
self.uid = Seeder._next_id() | ||
self.key_target = key_target | ||
self.data_size = data_size | ||
self.units = [ | ||
Seeder.Unit( | ||
prefix=f"k-s{self.uid}u{i}-", | ||
type=random.choice(Seeder.TYPES), | ||
counter=0, | ||
stop_key=f"_s{self.uid}u{i}-stop", | ||
) | ||
for i in range(units) | ||
] | ||
|
||
async def run(self, client: aioredis.Redis, target_ops=None, target_deviation=None): | ||
"""Run seeder until one of the targets or until stopped if none are set""" | ||
|
||
using_stopkey = target_ops is None and target_deviation is None | ||
args = [ | ||
self.key_target / len(self.units), | ||
target_ops if target_ops is not None else 0, | ||
target_deviation if target_deviation is not None else -1, | ||
self.data_size, | ||
] | ||
|
||
sha = await client.script_load(Seeder._load_script("generate")) | ||
await asyncio.gather( | ||
*(self._run_unit(client, sha, unit, using_stopkey, args) for unit in self.units) | ||
) | ||
|
||
async def stop(self, client: aioredis.Redis): | ||
"""Reqeust seeder seeder if it's running without a target, future returned from start() must still be awaited""" | ||
|
||
await asyncio.gather(*(client.set(unit.stop_key, "X") for unit in self.units)) | ||
|
||
def change_key_target(self, target: int): | ||
"""Change key target, applied only on succeeding runs""" | ||
|
||
self.key_target = max(target, 100) # math breaks with low values | ||
|
||
@staticmethod | ||
async def _run_unit(client: aioredis.Redis, sha: str, unit: Unit, using_stopkey, args): | ||
await client.delete(unit.stop_key) | ||
|
||
args = [ | ||
unit.prefix, | ||
unit.type, | ||
unit.counter, | ||
unit.stop_key if using_stopkey else "", | ||
] + args | ||
|
||
unit.counter = await client.evalsha(sha, 0, *args) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
#!lua flags=disable-atomicity | ||
|
||
--[[ | ||
Script for quickly generating various data | ||
]] -- | ||
-- import:genlib -- | ||
-- import:utillib -- | ||
|
||
-- inputs: unit identifiers | ||
local prefix = ARGV[1] | ||
local type = ARGV[2] | ||
local key_counter = tonumber(ARGV[3]) | ||
local stop_key = ARGV[4] | ||
|
||
-- inputs: task specific | ||
local key_target = tonumber(ARGV[5]) | ||
local total_ops = tonumber(ARGV[6]) | ||
local min_dev = tonumber(ARGV[7]) | ||
local data_size = tonumber(ARGV[8]) | ||
|
||
-- collect all keys belonging to this script | ||
-- assumes exclusive ownership | ||
local keys = LU_collect_keys(prefix, type) | ||
|
||
local addfunc = LG_funcs['add_' .. type] | ||
local modfunc = LG_funcs['mod_' .. type] | ||
|
||
local function action_add() | ||
local key = prefix .. tostring(key_counter) | ||
key_counter = key_counter + 1 | ||
table.insert(keys, key) | ||
addfunc(key, data_size) | ||
end | ||
|
||
local function action_mod() | ||
local key = keys[math.random(#keys)] | ||
modfunc(key, data_size) | ||
end | ||
|
||
local function action_del() | ||
local key_idx = math.random(#keys) | ||
keys[key_idx], keys[#keys] = keys[#keys], keys[key_idx] | ||
|
||
local key = table.remove(keys) | ||
redis.acall('DEL', key) | ||
end | ||
|
||
-- set equilibrium point as key target, see intensity calculations below | ||
local real_target = key_target | ||
key_target = key_target / 0.956 | ||
|
||
-- accumulative probabilities: [add, add + delete, modify = 1-( add + delete) ] | ||
local p_add = 0 | ||
local p_del = 0 | ||
|
||
local counter = 0 | ||
while true do | ||
counter = counter + 1 | ||
|
||
-- break if we reached target ops | ||
if total_ops > 0 and counter > total_ops then | ||
break | ||
end | ||
|
||
if key_target < 100 and min_dev > 0 then | ||
print(real_target, key_target, math.abs(#keys - real_target) / real_target) | ||
print() | ||
end | ||
|
||
-- break if we reached our target deviation | ||
if min_dev > 0 and math.abs(#keys - real_target) / real_target < min_dev then | ||
break | ||
end | ||
|
||
-- break if stop key was set (every 100 ops to not slow down) | ||
if stop_key ~= '' and counter % 100 == 0 and redis.call('EXISTS', stop_key) then | ||
break | ||
end | ||
|
||
-- fast path, if we have less than half of the target, always grow | ||
if #keys * 2 < key_target then | ||
action_add() | ||
goto continue | ||
end | ||
|
||
-- update probability only every 10 iterations | ||
if counter % 10 == 0 then | ||
-- calculate intensity (not normalized probabilities) | ||
-- please see attached plots in PR to undertand convergence | ||
|
||
-- the add intensity is monotonically decreasing with keycount growing, | ||
-- the delete intensity is monotonically increasing with keycount growing, | ||
-- the point where the intensities are equal is the equilibrium point, | ||
-- based on the formulas it's ~0.82 * key_target | ||
local i_add = math.max(0, 1 - (#keys / key_target) ^ 16) | ||
local i_del = (#keys / key_target) ^ 16 | ||
|
||
-- we are only interested in large amounts of modification commands when we are in an | ||
-- equilibrium, where there are no low intensities | ||
local i_mod = math.max(0, 7 * math.min(i_add, i_del) ^ 3) | ||
|
||
-- transform intensities to [0, 1] probability ranges | ||
local sum = i_add + i_del + i_mod | ||
p_add = i_add / sum | ||
p_del = p_add + i_del / sum | ||
end | ||
|
||
-- generate random action | ||
local p = math.random() | ||
if p < p_add then | ||
action_add() | ||
elseif p < p_del then | ||
action_del() | ||
else | ||
action_mod() | ||
end | ||
|
||
::continue:: | ||
end | ||
|
||
-- clear stop key | ||
if stop_key ~= '' then | ||
redis.call('DEL', stop_key) | ||
end | ||
|
||
return key_counter |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't we missing
await
in this statement?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh yes, we are, will add in next PR