Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Improve client connection checking in pytests #967

Merged
merged 1 commit into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,13 @@ string Connection::GetClientInfo() const {
absl::StrAppend(&res, " laddr=", le.address().to_string(), ":", le.port());
absl::StrAppend(&res, " fd=", lsb->native_handle(), " name=", name_);
absl::StrAppend(&res, " age=", now - creation_time_, " idle=", now - last_interaction_);
absl::StrAppend(&res, " phase=", phase_, " ");
absl::StrAppend(&res, " phase=", phase_);

if (cc_) {
absl::StrAppend(&res, service_->GetContextInfo(cc_.get()));
string cc_info = service_->GetContextInfo(cc_.get());
if (!cc_info.empty()) {
absl::StrAppend(&res, " ", cc_info);
}
}

return res;
Expand Down
24 changes: 9 additions & 15 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@

@dfly_args({})
class TestNotEmulated:
def test_cluster_commands_fails_when_not_emulate(self, client: redis.Redis):
with pytest.raises(redis.ResponseError) as respErr:
client.execute_command("CLUSTER HELP")
async def test_cluster_commands_fails_when_not_emulate(self, async_client: aioredis.Redis):
with pytest.raises(aioredis.ResponseError) as respErr:
await async_client.execute_command("CLUSTER HELP")
assert "cluster_mode" in str(respErr.value)

with pytest.raises(redis.ResponseError) as respErr:
client.execute_command("CLUSTER SLOTS")
with pytest.raises(aioredis.ResponseError) as respErr:
await async_client.execute_command("CLUSTER SLOTS")
assert "emulated" in str(respErr.value)


Expand Down Expand Up @@ -76,7 +76,6 @@ def is_local_host(ip: str) -> bool:


@dfly_args({"proactor_threads": 4, "cluster_mode": "emulated"})
@pytest.mark.asyncio
async def test_cluster_slots_in_replicas(df_local_factory):
master = df_local_factory.create(port=BASE_PORT)
replica = df_local_factory.create(port=BASE_PORT+1, logtostdout=True)
Expand Down Expand Up @@ -107,11 +106,8 @@ async def test_cluster_slots_in_replicas(df_local_factory):


@dfly_args({"cluster_mode": "emulated", "cluster_announce_ip": "127.0.0.2"})
@pytest.mark.asyncio
async def test_cluster_info(async_pool):
conn = aioredis.Redis(connection_pool=async_pool)

res = await conn.execute_command("CLUSTER INFO")
async def test_cluster_info(async_client):
res = await async_client.execute_command("CLUSTER INFO")
assert len(res) == 16
assert res == {'cluster_current_epoch': '1',
'cluster_known_nodes': '1',
Expand All @@ -134,10 +130,8 @@ async def test_cluster_info(async_pool):

@dfly_args({"cluster_mode": "emulated", "cluster_announce_ip": "127.0.0.2"})
@pytest.mark.asyncio
async def test_cluster_nodes(async_pool):
conn = aioredis.Redis(connection_pool=async_pool)

res = await conn.execute_command("CLUSTER NODES")
async def test_cluster_nodes(async_client):
res = await async_client.execute_command("CLUSTER NODES")
assert len(res) == 1
info = res['127.0.0.2:6379@6379']
assert res is not None
Expand Down
37 changes: 20 additions & 17 deletions tests/dragonfly/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import os
import sys
from time import sleep
import pytest
import pytest_asyncio
import redis
Expand Down Expand Up @@ -88,38 +89,39 @@ def df_server(df_factory: DflyInstanceFactory) -> DflyInstance:
instance.start()

yield instance

clients_left = None
try:
client = redis.Redis(port=instance.port)
clients_left = client.execute_command("INFO")['connected_clients']
client.client_setname("mgr")
sleep(0.1)
clients_left = [x for x in client.client_list() if x["name"] != "mgr"]
except Exception as e:
print(e, file=sys.stderr)

instance.stop()
assert clients_left == 1
assert clients_left == []


@pytest.fixture(scope="class")
def connection(df_server: DflyInstance):
return redis.Connection(port=df_server.port)


@pytest.fixture(scope="class")
def sync_pool(df_server: DflyInstance):
pool = redis.ConnectionPool(decode_responses=True, port=df_server.port)
yield pool
pool.disconnect()
# @pytest.fixture(scope="class")
# def sync_pool(df_server: DflyInstance):
# pool = redis.ConnectionPool(decode_responses=True, port=df_server.port)
# yield pool
# pool.disconnect()


@pytest.fixture(scope="class")
def client(sync_pool):
"""
Return a client to the default instance with all entries flushed.
"""
client = redis.Redis(connection_pool=sync_pool)
client.flushall()
return client
# @pytest.fixture(scope="class")
# def client(sync_pool):
# """
# Return a client to the default instance with all entries flushed.
# """
# client = redis.Redis(connection_pool=sync_pool)
# client.flushall()
# return client


@pytest.fixture(scope="function")
Expand Down Expand Up @@ -149,8 +151,9 @@ async def async_client(async_pool):
Return an async client to the default instance with all entries flushed.
"""
client = aioredis.Redis(connection_pool=async_pool)
await client.client_setname("test")
adiholden marked this conversation as resolved.
Show resolved Hide resolved
await client.flushall()
return client
yield client
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why yield?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does not matter here but in general, yielding is better because then you have a place to release resources if needed (after yield).



def pytest_addoption(parser):
Expand Down
5 changes: 2 additions & 3 deletions tests/dragonfly/connection_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ def generate(max):


@pytest.mark.asyncio
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why you did not remove the asyncio mark here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have yet to do a proper clean-up for all the marks - just found a way to skip them

async def test_subsribers_with_active_publisher(df_server: DflyInstance, max_connections=100):
async def test_subscribers_with_active_publisher(df_server: DflyInstance, max_connections=100):
# TODO: I am not how to customize the max connections for the pool.
async_pool = aioredis.ConnectionPool(host="localhost", port=df_server.port,
db=0, decode_responses=True, max_connections=max_connections)
Expand Down Expand Up @@ -316,7 +316,6 @@ async def subscribe_worker():
await async_pool.disconnect()


@pytest.mark.asyncio
async def test_big_command(df_server, size=8 * 1024):
reader, writer = await asyncio.open_connection('127.0.0.1', df_server.port)

Expand All @@ -328,7 +327,7 @@ async def test_big_command(df_server, size=8 * 1024):
writer.close()
await writer.wait_closed()

@pytest.mark.asyncio

async def test_subscribe_pipelined(async_client: aioredis.Redis):
pipe = async_client.pipeline(transaction=False)
pipe.execute_command('subscribe channel').execute_command('subscribe channel')
Expand Down
2 changes: 0 additions & 2 deletions tests/dragonfly/eval_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def DJANGO_CACHEOPS_SCHEMA(vs): return {

@dfly_multi_test_args({'default_lua_config': 'allow-undeclared-keys', 'proactor_threads': 4},
{'default_lua_config': 'allow-undeclared-keys disable-atomicity', 'proactor_threads': 4})
@pytest.mark.asyncio
async def test_django_cacheops_script(async_client, num_keys=500):
script = async_client.register_script(DJANGO_CACHEOPS_SCRIPT)

Expand Down Expand Up @@ -161,7 +160,6 @@ async def test_django_cacheops_script(async_client, num_keys=500):

@dfly_multi_test_args({'default_lua_config': 'allow-undeclared-keys', 'proactor_threads': 4},
{'default_lua_config': 'allow-undeclared-keys disable-atomicity', 'proactor_threads': 4})
@pytest.mark.asyncio
async def test_golang_asynq_script(async_pool, num_queues=10, num_tasks=100):
async def enqueue_worker(queue):
client = aioredis.Redis(connection_pool=async_pool)
Expand Down
11 changes: 5 additions & 6 deletions tests/dragonfly/generic_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

@dfly_multi_test_args({'keys_output_limit': 512}, {'keys_output_limit': 1024})
class TestKeys:
def test_max_keys(self, client, df_server):
async def test_max_keys(self, async_client: aioredis.Redis, df_server):
max_keys = df_server['keys_output_limit']

batch_fill_data(client, gen_test_data(max_keys * 3))

keys = client.keys()
pipe = async_client.pipeline()
batch_fill_data(pipe, gen_test_data(max_keys * 3))
await pipe.execute()
keys = await async_client.keys()
assert len(keys) in range(max_keys, max_keys+512)

@pytest.fixture(scope="function")
Expand All @@ -22,7 +22,6 @@ def export_dfly_password() -> str:
yield pwd
del os.environ['DFLY_PASSWORD']

@pytest.mark.asyncio
async def test_password(df_local_factory, export_dfly_password):
dfly = df_local_factory.create()
dfly.start()
Expand Down
4 changes: 0 additions & 4 deletions tests/dragonfly/redis_replicaiton_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ def redis_server() -> RedisServer:
]


@pytest.mark.asyncio
@pytest.mark.parametrize("t_replicas, seeder_config", full_sync_replication_specs)
async def test_replication_full_sync(df_local_factory, df_seeder_factory, redis_server, t_replicas, seeder_config):
master = redis_server
Expand Down Expand Up @@ -117,7 +116,6 @@ async def test_replication_full_sync(df_local_factory, df_seeder_factory, redis_
]


@pytest.mark.asyncio
@pytest.mark.parametrize("t_replicas, seeder_config", stable_sync_replication_specs)
async def test_replication_stable_sync(df_local_factory, df_seeder_factory, redis_server, t_replicas, seeder_config):
master = redis_server
Expand Down Expand Up @@ -153,7 +151,6 @@ async def test_replication_stable_sync(df_local_factory, df_seeder_factory, redi
]


@pytest.mark.asyncio
@pytest.mark.parametrize("t_replicas, seeder_config", replication_specs)
async def test_redis_replication_all(df_local_factory, df_seeder_factory, redis_server, t_replicas, seeder_config):
master = redis_server
Expand Down Expand Up @@ -210,7 +207,6 @@ async def run_replication(c_replica):
]


@pytest.mark.asyncio
@pytest.mark.parametrize("t_replicas, t_disconnect, seeder_config", master_disconnect_cases)
async def test_disconnect_master(df_local_factory, df_seeder_factory, redis_server, t_replicas, t_disconnect, seeder_config):

Expand Down
7 changes: 4 additions & 3 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import re

from .utility import *
from . import dfly_args
from . import DflyInstanceFactory, dfly_args


BASE_PORT = 1111
Expand Down Expand Up @@ -147,7 +147,7 @@ async def check_data(seeder, replicas, c_replicas):

@pytest.mark.asyncio
@pytest.mark.parametrize("t_master, t_crash_fs, t_crash_ss, t_disonnect, n_keys", disconnect_cases)
async def test_disconnect_replica(df_local_factory, df_seeder_factory, t_master, t_crash_fs, t_crash_ss, t_disonnect, n_keys):
async def test_disconnect_replica(df_local_factory: DflyInstanceFactory, df_seeder_factory, t_master, t_crash_fs, t_crash_ss, t_disonnect, n_keys):
master = df_local_factory.create(port=BASE_PORT, proactor_threads=t_master)
replicas = [
(df_local_factory.create(
Expand Down Expand Up @@ -252,7 +252,7 @@ async def disconnect(replica, c_replica, crash_type):

# Check master survived all disconnects
assert await c_master.ping()

await c_master.close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the replication tests have this problem not only this one.. and I believe there is bug in the aioredis lib version we use that close does not realy close and we need to use connection_pool.disconnect
I will open PR with this change


"""
Test stopping master during different phases.
Expand Down Expand Up @@ -381,6 +381,7 @@ async def test_rotating_masters(df_local_factory, df_seeder_factory, t_replica,
fill_seeder.stop()
fill_task.cancel()


"""
Test flushall command. Set data to master send flashall and set more data.
Check replica keys at the end.
Expand Down
33 changes: 14 additions & 19 deletions tests/dragonfly/server_family_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ def test_quit(connection):


def test_quit_after_sub(connection):
connection = redis.Connection()
connection.send_command("SUBSCRIBE", "foo")
connection.read_response()

Expand All @@ -23,11 +22,11 @@ def test_quit_after_sub(connection):
connection.read_response()


def test_multi_exec(client):
pipeline = client.pipeline()
async def test_multi_exec(async_client: aioredis.Redis):
pipeline = async_client.pipeline()
pipeline.set("foo", "bar")
pipeline.get("foo")
val = pipeline.execute()
val = await pipeline.execute()
assert val == [True, "bar"]


Expand All @@ -42,43 +41,39 @@ def test_multi_exec(client):
'''


def test_multi_eval(client):
async def test_multi_eval(async_client: aioredis.Redis):
try:
pipeline = client.pipeline()
pipeline = async_client.pipeline()
pipeline.set("foo", "bar")
pipeline.get("foo")
pipeline.eval("return 43", 0)
assert True, "This part should not executed due to issue #457"

val = pipeline.execute()
val = await pipeline.execute()
assert val == "foo"
except Exception as e:
msg = str(e)
assert "Dragonfly does not allow execution of" in msg


def test_connection_name(client):
name = client.execute_command("CLIENT GETNAME")
assert not name
client.execute_command("CLIENT SETNAME test_conn_name")
name = client.execute_command("CLIENT GETNAME")
async def test_connection_name(async_client: aioredis.Redis):
name = await async_client.execute_command("CLIENT GETNAME")
assert name == "test"
await async_client.execute_command("CLIENT SETNAME test_conn_name")
name = await async_client.execute_command("CLIENT GETNAME")
assert name == "test_conn_name"


'''
make sure that the scan command is working with python
'''


def test_scan(client):
async def test_scan(async_client: aioredis.Redis):
def gen_test_data():
for i in range(10):
yield f"key-{i}", f"value-{i}"

for key, val in gen_test_data():
res = client.set(key, val)
res = await async_client.set(key, val)
assert res is not None
cur, keys = client.scan(cursor=0, match=key, count=2)
cur, keys = await async_client.scan(cursor=0, match=key, count=2)
assert cur == 0
assert len(keys) == 1
assert keys[0] == key
1 change: 1 addition & 0 deletions tests/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ log_cli_format = [%(asctime)s.%(msecs)03d %(levelname)s] %(message)s
log_date_format = %Y-%m-%d %H:%M:%S
log_file_level=DEBUG
log_cli = true
asyncio_mode=auto