Skip to content

Commit

Permalink
Updated test scripts and CI/CD pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
fire committed Jun 21, 2024
1 parent 4bded65 commit 433a818
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 189 deletions.
2 changes: 1 addition & 1 deletion cicd_test.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ $BLENDER_DIR = "blender-3.3.19-windows-x64"

# Run the tests and generate XML reports
# & "$BLENDER_DIR/3.3/python/bin/python.exe" -m xmlrunner discover --verbose tests.vrtist -o logs/tests
& "$BLENDER_DIR/3.3/python/bin/python.exe" -m pytest --timeout=3 tests/broadcaster/test_server.py
& "$BLENDER_DIR/3.3/python/bin/python.exe" -m xmlrunner discover --verbose tests.broadcaster -o logs/tests
# & "$BLENDER_DIR/3.3/python/bin/python.exe" -m xmlrunner discover --verbose tests.blender -o logs/tests
349 changes: 161 additions & 188 deletions tests/broadcaster/test_server.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import unittest
import threading
import time
import pytest

import itertools
from mixer.broadcaster.apps.server import Server
from mixer.broadcaster.client import Client
import mixer.broadcaster.common as common
Expand Down Expand Up @@ -39,194 +38,168 @@ def network_consumer(client, delegate):
delegate.update_clients_attributes(clients_attributes)


port_iter = itertools.count(start=common.DEFAULT_PORT)


@pytest.fixture
def server():
_delegate = Delegate()
_server = Server()

while True:
try:
port = next(port_iter)
server_thread = threading.Thread(target=_server.run, args=(port,))
server_thread.start()

time.sleep(0.5)

yield _server, port

_server.shutdown()
@unittest.skip("")
class TestServer(unittest.TestCase):
def setUp(self):
self._delegate = Delegate()
self._server = Server()
server_thread = threading.Thread(None, self._server.run)
server_thread.start()

def tearDown(self):
self._server.shutdown()
self.delay()

def delay(self):
time.sleep(0.2)

def test_connect(self):
delay = self.delay
server = self._server

client1 = Client()
delay()
self.assertTrue(client1.is_connected())
self.assertEqual(server.client_count(), (0, 1))

client1.disconnect()
delay()
self.assertFalse(client1.is_connected())
self.assertEqual(server.client_count(), (0, 0))

#
client2 = Client()
delay()
self.assertTrue(client2.is_connected())
self.assertEqual(server.client_count(), (0, 1))

client3 = Client()
delay()
self.assertTrue(client3.is_connected())
self.assertEqual(server.client_count(), (0, 2))

client2.disconnect()
delay()
self.assertFalse(client2.is_connected())
self.assertTrue(client3.is_connected())
self.assertEqual(server.client_count(), (0, 1))

client2.disconnect()
delay()
self.assertFalse(client2.is_connected())
self.assertTrue(client3.is_connected())
self.assertEqual(server.client_count(), (0, 1))

client3.disconnect()
delay()
self.assertFalse(client3.is_connected())
self.assertEqual(server.client_count(), (0, 0))

def test_join_one_room_one_client(self):
delay = self.delay
server = self._server

c0_name = "c0_name"
c0_room = "c0_room"

d0 = Delegate()
c0 = Client()
delay()
self.assertEqual(server.client_count(), (0, 1))

c0.set_client_attributes({common.ClientAttributes.USERNAME: c0_name})
c0.join_room(c0_room, "ignored", "ignored", True, True)
delay()
network_consumer(c0, self._delegate)
expected = (c0_name, c0_room)
self.assertEqual(server.client_count(), (1, 0))
self.assertEqual(len(d0.name_room), 1)
self.assertIn(expected, d0.name_room)

def test_join_one_room_two_clients(self):
delay = self.delay
server = self._server

c0_name = "c0_name"
c0_room = "c0_room"

c1_name = "c1_name"
c1_room = c0_room

d0 = Delegate()
c0 = Client()
c0.join_room(c0_room, "ignored", "ignored", True, True)
c0.set_client_attributes({common.ClientAttributes.USERNAME: c0_name})

d1 = Delegate()
c1 = Client()
c1.join_room(c1_room, "ignored", "ignored", True, True)
c1.set_client_attributes({common.ClientAttributes.USERNAME: c1_name})

delay()

network_consumer(c0, self._delegate)
network_consumer(c1, self._delegate)
expected = [(c0_name, c0_room), (c1_name, c1_room)]
self.assertEqual(server.client_count(), (2, 0))
self.assertEqual(len(d0.name_room), 2)
self.assertEqual(len(d1.name_room), 2)
self.assertCountEqual(d0.name_room, expected)
self.assertCountEqual(d1.name_room, expected)

def test_join_one_room_two_clients_leave(self):
delay = self.delay
server = self._server

c0_name = "c0_name"
c0_room = "c0_room"

c1_name = "c1_name"
c1_room = c0_room

d0 = Delegate()
c0 = Client()
c0.join_room(c0_room, "ignored", "ignored", True, True)
c0.set_client_attributes({common.ClientAttributes.USERNAME: c0_name})

d1 = Delegate()
c1 = Client()
c1.join_room(c1_room, "ignored", "ignored", True, True)
c1.set_client_attributes({common.ClientAttributes.USERNAME: c1_name})

c1.leave_room(c1_room)

delay()
network_consumer(c0, self._delegate)
network_consumer(c1, self._delegate)
expected = [(c0_name, c0_room)]
self.assertEqual(server.client_count(), (1, 1))
self.assertEqual(len(d0.name_room), 1)
self.assertCountEqual(d0.name_room, expected)
self.assertListEqual(d0.name_room, d1.name_room)


class TestClient(unittest.TestCase):
def setUp(self):
pass

def tearDown(self):
pass

def test_client_is_disconnected_when_server_process_is_killed(self):
server_process = ServerProcess()
server_process.start()

with Client(server_process.host, server_process.port) as client:
self.assertTrue(client.is_connected())
client.fetch_commands()

break
except Exception as e:
print(f"Failed to start server on port {port}: {e}")
continue
server_process.kill()

self.assertRaises(common.ClientDisconnectedException, client.fetch_commands)

@pytest.mark.skip(reason="Temporarily disabled")
def test_connect(server):
def delay():
return time.sleep(0.2)

_server, port = server
self.assertTrue(not client.is_connected())

client1 = Client(port=port)
delay()
assert not client1.is_connected()
assert _server.client_count() == 0

client1.connect()
delay()
assert client1.is_connected()
assert _server.client_count() == 1

client1.disconnect()
delay()
assert not client1.is_connected()
assert _server.client_count() == 0

#
client2 = Client(port=port)
client2.connect()
delay()
assert client2.is_connected()
assert _server.client_count() == 1

client3 = Client(port=port)
client3.connect()
delay()
assert client3.is_connected()
assert _server.client_count() == 2

client2.disconnect()
delay()
assert not client2.is_connected()
assert client3.is_connected()
assert _server.client_count() == 1

client2.disconnect()
delay()
assert not client2.is_connected()
assert client3.is_connected()
assert _server.client_count() == 1

client3.disconnect()
delay()
assert not client3.is_connected()
assert _server.client_count() == 0


def test_join_one_room_one_client(server):
def delay():
return time.sleep(0.2)

_server, port = server
c0_name = "c0_name"
c0_room = "c0_room"

d0 = Delegate()
c0 = Client()
delay()

c0.connect()
c0.set_client_attributes({common.ClientAttributes.USERNAME: c0_name})
c0.join_room(c0_room, "ignored", "ignored", True, True)
delay()
network_consumer(c0, d0)
assert len(_server._rooms) == 1
assert c0_room in _server._rooms
# assert len(d0.name_room) == 1
# assert expected in d0.name_room


@pytest.mark.skip(reason="Temporarily disabled")
def test_join_one_room_two_clients(server):
def delay():
return time.sleep(0.2)

_server, port = server

c0_name = "c0_name"
c0_room = "c0_room"

c1_name = "c1_name"
c1_room = c0_room

d0 = Delegate()
c0 = Client(port=port)
c0.connect()
delay()
c0.join_room(c0_room, "ignored", "ignored", True, True)
c0.set_client_attributes({common.ClientAttributes.USERNAME: c0_name})

d1 = Delegate()
c1 = Client(port=port)
c1.connect()
delay()
c1.join_room(c1_room, "ignored", "ignored", True, True)
c1.set_client_attributes({common.ClientAttributes.USERNAME: c1_name})

delay()

network_consumer(c0, d0)
network_consumer(c1, d1)
expected = [(c0_name, c0_room), (c1_name, c1_room)]
assert _server.client_count() == 2
assert len(d0.name_room) == 2
assert len(d1.name_room) == 2
assert set(d0.name_room) == set(expected)
assert set(d1.name_room) == set(expected)


@pytest.mark.skip(reason="Temporarily disabled")
def test_join_one_room_two_clients_leave(server):
def delay():
return time.sleep(0.2)

c0_name = "c0_name"
c0_room = "c0_room"

c1_name = "c1_name"
c1_room = c0_room

d0 = Delegate()
c0 = Client()
c0.connect()
c0.join_room(c0_room, "ignored", "ignored", True, True)
c0.set_client_attributes({common.ClientAttributes.USERNAME: c0_name})

d1 = Delegate()
c1 = Client()
c1.connect()
c1.join_room(c1_room, "ignored", "ignored", True, True)
c1.set_client_attributes({common.ClientAttributes.USERNAME: c1_name})

c1.leave_room(c1_room)

delay()
network_consumer(c0, d0)
network_consumer(c1, d1)
expected = [(c0_name, c0_room)]
assert server.client_count() == (1, 1)
assert len(d0.name_room) == 1
assert set(d0.name_room) == set(expected)
assert d0.name_room == d1.name_room


@pytest.mark.skip(reason="Temporarily disabled")
def test_client_is_disconnected_when_server_process_is_killed():
server_process = ServerProcess()
server_process.start()

with Client(server_process.host, server_process.port) as client:
assert client.is_connected()
client.fetch_commands()

server_process.kill()

with pytest.raises(common.ClientDisconnectedException):
client.fetch_commands()

assert not client.is_connected()
if __name__ == "__main__":
unittest.main()

0 comments on commit 433a818

Please sign in to comment.