Skip to content

Commit

Permalink
fix(redis_replicaiton_test): fix compare set types (#929)
Browse files Browse the repository at this point in the history
* fix(redis_replicaiton_test): fix compare set types

Signed-off-by: ashotland <[email protected]>
Co-authored-by: ashotland <[email protected]>
  • Loading branch information
adiholden and ashotland authored Mar 13, 2023
1 parent a8c1e5c commit 8e528f1
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 36 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/docker-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,6 @@ jobs:
args: 'DragonflyDB version [${{ env.TAG_NAME }}](https://github.com/dragonflydb/dragonfly/releases/tag/${{ env.TAG_NAME }}) has been released 🎉'

- name: Re-build Docs
if: env.IS_PRERELEASE != 'true'
run: |
curl -s -X POST '${{ secrets.VERCEL_DOCS_WEBHOOK }}'
if: env.IS_PRERELEASE != 'true'
run: |
curl -s -X POST '${{ secrets.VERCEL_DOCS_WEBHOOK }}'
38 changes: 23 additions & 15 deletions tests/dragonfly/redis_replicaiton_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import subprocess
from .utility import *


class RedisServer:
def __init__(self):
self.port = 5555
Expand All @@ -29,36 +30,41 @@ def stop(self):

# Checks that master and redis are synced by writing a random key to master
# and waiting for it to exist in replica. Foreach db in 0..dbcount-1.


async def await_synced(master_port, replica_port, dbcount=1):
rnd_str = "".join(random.choices(string.ascii_letters, k=10))
key = "sync_key/" + rnd_str
for db in range(dbcount):
c_master = aioredis.Redis(port=master_port, db=db)
await c_master.set(key, "dummy")
print (f"set {key} MASTER db = {db}")
print(f"set {key} MASTER db = {db}")
c_replica = aioredis.Redis(port=replica_port, db=db)
timeout = 30
while timeout > 0:
timeout -= 1
v = await c_replica.get(key)
print (f"get {key} from REPLICA db = {db} got {v}")
print(f"get {key} from REPLICA db = {db} got {v}")
if v is not None:
break
await asyncio.sleep(1)
await c_master.close()
await c_replica.close()
assert timeout > 0, "Timeout while waiting for replica to sync"


async def await_synced_all(c_master, c_replicas):
for c_replica in c_replicas:
await await_synced(c_master, c_replica)


async def check_data(seeder, replicas, c_replicas):
capture = await seeder.capture()
for (replica, c_replica) in zip(replicas, c_replicas):
await wait_available_async(c_replica)
assert await seeder.compare(capture, port=replica.port)


@pytest.fixture(scope="function")
def redis_server() -> RedisServer:
s = RedisServer()
Expand All @@ -68,15 +74,13 @@ def redis_server() -> RedisServer:
s.stop()



full_sync_replication_specs = [
([1], dict(keys=10_000, dbcount=1, unsupported_types=[ValueType.JSON])),
# REPRO for dbcount > 1 with ony ValueType.SET:
#([1], dict(keys=200, dbcount=2, unsupported_types=[ValueType.JSON, ValueType.LIST, ValueType.STRING, ValueType.HSET, ValueType.ZSET])),
# PASSES when excluding ValueType.SET:
([1], dict(keys=5000, dbcount=2, unsupported_types=[ValueType.JSON, ValueType.SET])),
([2], dict(keys=5000, dbcount=4, unsupported_types=[ValueType.JSON, ValueType.SET])),
([1], dict(keys=100, dbcount=1, unsupported_types=[ValueType.JSON])),
([1], dict(keys=5000, dbcount=2, unsupported_types=[ValueType.JSON])),
([2], dict(keys=5000, dbcount=4, unsupported_types=[ValueType.JSON])),
]


@pytest.mark.asyncio
@pytest.mark.parametrize("t_replicas, seeder_config", full_sync_replication_specs)
async def test_replication_full_sync(df_local_factory, df_seeder_factory, redis_server, t_replicas, seeder_config):
Expand All @@ -87,7 +91,8 @@ async def test_replication_full_sync(df_local_factory, df_seeder_factory, redis_
seeder = df_seeder_factory.create(port=master.port, **seeder_config)
await seeder.run(target_deviation=0.1)

replica = df_local_factory.create(port=master.port + 1, proactor_threads=t_replicas[0])
replica = df_local_factory.create(
port=master.port + 1, proactor_threads=t_replicas[0])
replica.start()
c_replica = aioredis.Redis(port=replica.port)
assert await c_replica.ping()
Expand All @@ -101,12 +106,13 @@ async def test_replication_full_sync(df_local_factory, df_seeder_factory, redis_

stable_sync_replication_specs = [
([1], dict(keys=100, dbcount=1, unsupported_types=[ValueType.JSON])),
([1], dict(keys=10_000, dbcount=2, unsupported_types=[ValueType.JSON, ValueType.SET])),
([1], dict(keys=10_000, dbcount=2, unsupported_types=[ValueType.JSON])),
([2], dict(keys=10_000, dbcount=1, unsupported_types=[ValueType.JSON])),
([2], dict(keys=10_000, dbcount=2, unsupported_types=[ValueType.JSON, ValueType.SET])),
([8], dict(keys=10_000, dbcount=4, unsupported_types=[ValueType.JSON, ValueType.SET])),
([2], dict(keys=10_000, dbcount=2, unsupported_types=[ValueType.JSON])),
([8], dict(keys=10_000, dbcount=4, unsupported_types=[ValueType.JSON])),
]


@pytest.mark.asyncio
@pytest.mark.parametrize("t_replicas, seeder_config", stable_sync_replication_specs)
@pytest.mark.skip(reason="Skipping until we fix replication from redis")
Expand All @@ -115,7 +121,8 @@ async def test_replication_stable_sync(df_local_factory, df_seeder_factory, redi
c_master = aioredis.Redis(port=master.port)
assert await c_master.ping()

replica = df_local_factory.create(port=master.port + 1, proactor_threads=t_replicas[0])
replica = df_local_factory.create(
port=master.port + 1, proactor_threads=t_replicas[0])
replica.start()
c_replica = aioredis.Redis(port=replica.port)
assert await c_replica.ping()
Expand All @@ -141,6 +148,8 @@ async def test_replication_stable_sync(df_local_factory, df_seeder_factory, redi
([1] * 8, dict(keys=500, dbcount=2, unsupported_types=[ValueType.JSON])),
([1], dict(keys=100, dbcount=2, unsupported_types=[ValueType.JSON])),
]


@pytest.mark.asyncio
@pytest.mark.parametrize("t_replicas, seeder_config", replication_specs)
@pytest.mark.skip(reason="Skipping until we fix replication from redis")
Expand Down Expand Up @@ -172,7 +181,6 @@ async def run_replication(c_replica):
await c_replica.execute_command("REPLICAOF localhost " + str(master.port))
await wait_available_async(c_replica)


await asyncio.gather(*(asyncio.create_task(run_replication(c))
for c in c_replicas))

Expand Down
42 changes: 24 additions & 18 deletions tests/dragonfly/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def gen_shrink_cmd(self):
return f"PEXPIRE k{key} {random.randint(0, 50)}", -1
else:
keys_gen = (self.randomize_key(pop=True)
for _ in range(random.randint(1, self.max_multikey)))
for _ in range(random.randint(1, self.max_multikey)))
keys = [f"k{k}" for k, _ in keys_gen if k is not None]

if len(keys) == 0:
Expand Down Expand Up @@ -385,35 +385,40 @@ def reset(self):
""" Reset internal state. Needs to be called after flush or restart"""
self.gen.reset()

async def capture(self, port=None, target_db=0, keys=None):
"""Create DataCapture for selected db"""
async def capture(self, port=None):
"""Create DataCapture for all dbs"""

if port is None:
port = self.port
keys = sorted(list(self.gen.keys_and_types()))

if keys is None:
keys = sorted(list(self.gen.keys_and_types()))

client = aioredis.Redis(port=port, db=target_db)
capture = DataCapture(await self._capture_entries(client, keys))
await client.connection_pool.disconnect()
return capture
captures = await asyncio.gather(*(
self._capture_db(port=port, target_db=db, keys=keys) for db in range(self.dbcount)
))
return captures

async def compare(self, initial_capture, port=6379):
async def compare(self, initial_captures, port=6379):
"""Compare data capture with all dbs of instance and return True if all dbs are correct"""
print(f"comparing capture to {port}")
keys = sorted(list(self.gen.keys_and_types()))
captures = await asyncio.gather(*(
self.capture(port=port, target_db=db, keys=keys) for db in range(self.dbcount)
))
for db, capture in zip(range(self.dbcount), captures):
if not initial_capture.compare(capture):
target_captures = await self.capture(port=port)

for db, target_capture, initial_capture in zip(range(self.dbcount), target_captures, initial_captures):
print(f"comparing capture to {port}, db: {db}")
if not initial_capture.compare(target_capture):
eprint(f">>> Inconsistent data on port {port}, db {db}")
return False
return True

def target(self, key_cnt):
self.gen.key_cnt_target = key_cnt

async def _capture_db(self, port, target_db, keys):
eprint(f"Capture data on port {port}, db {target_db}")
client = aioredis.Redis(port=port, db=target_db)
capture = DataCapture(await self._capture_entries(client, keys))
await client.connection_pool.disconnect()
return capture

async def _generator_task(self, queues, target_ops=None, target_deviation=None):
cpu_time = 0
submitted = 0
Expand Down Expand Up @@ -452,7 +457,8 @@ def stringify_cmd(cmd):

if file is not None:
pattern = "MULTI\n{}\nEXEC\n" if is_multi_transaction else "{}\n"
file.write(pattern.format('\n'.join(stringify_cmd(cmd) for cmd in blob)))
file.write(pattern.format('\n'.join(stringify_cmd(cmd)
for cmd in blob)))

print('.', end='', flush=True)
await asyncio.sleep(0.0)
Expand Down

0 comments on commit 8e528f1

Please sign in to comment.