-
-
Notifications
You must be signed in to change notification settings - Fork 595
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
5122519
commit 9c4b5db
Showing
10 changed files
with
300 additions
and
17 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,198 @@ | ||
from functools import wraps | ||
import unittest | ||
import pytest | ||
import socketio | ||
from socketio.exceptions import ConnectionError | ||
from tests.web_server import SocketIOWebServer | ||
|
||
|
||
def with_instrumented_server(auth=False, **ikwargs): | ||
"""This decorator can be applied to test functions or methods so that they | ||
run with a Socket.IO server that has been instrumented for the official | ||
Admin UI project. The arguments passed to the decorator are passed directly | ||
to the ``instrument()`` method of the server. | ||
""" | ||
def decorator(f): | ||
@wraps(f) | ||
def wrapped(*args, **kwargs): | ||
sio = socketio.Server(async_mode='threading') | ||
instrumented_server = sio.instrument(auth=auth, **ikwargs) | ||
|
||
@sio.event | ||
def enter_room(sid, data): | ||
sio.enter_room(sid, data) | ||
|
||
@sio.event(namespace='/foo') | ||
def connect(sid, environ, auth): | ||
pass | ||
|
||
server = SocketIOWebServer(sio) | ||
server.start() | ||
|
||
# import logging | ||
# logging.getLogger('engineio.client').setLevel(logging.DEBUG) | ||
# logging.getLogger('socketio.client').setLevel(logging.DEBUG) | ||
# logging.getLogger('engineio.server').setLevel(logging.DEBUG) | ||
# logging.getLogger('socketio.server').setLevel(logging.DEBUG) | ||
|
||
try: | ||
ret = f(*args, **kwargs) | ||
finally: | ||
server.stop() | ||
instrumented_server.shutdown() | ||
instrumented_server.uninstrument() | ||
|
||
# import logging | ||
# logging.getLogger('engineio.client').setLevel(logging.NOTSET) | ||
# logging.getLogger('socketio.client').setLevel(logging.NOTSET) | ||
# logging.getLogger('engineio.server').setLevel(logging.NOTSET) | ||
# logging.getLogger('socketio.server').setLevel(logging.NOTSET) | ||
|
||
return ret | ||
return wrapped | ||
return decorator | ||
|
||
|
||
def custom_auth(auth): | ||
return auth == {'foo': 'bar'} | ||
|
||
|
||
class TestAdmin(unittest.TestCase): | ||
def test_missing_auth(self): | ||
sio = socketio.Server(async_mode='threading') | ||
with pytest.raises(ValueError): | ||
sio.instrument() | ||
|
||
@with_instrumented_server(auth=False) | ||
def test_admin_connect_with_no_auth(self): | ||
with socketio.SimpleClient() as client: | ||
client.connect('http://localhost:8900', namespace='/admin') | ||
with socketio.SimpleClient() as client: | ||
client.connect('http://localhost:8900', namespace='/admin', | ||
auth={'foo': 'bar'}) | ||
|
||
@with_instrumented_server(auth={'foo': 'bar'}) | ||
def test_admin_connect_with_dict_auth(self): | ||
with socketio.SimpleClient() as client: | ||
client.connect('http://localhost:8900', namespace='/admin', | ||
auth={'foo': 'bar'}) | ||
with socketio.SimpleClient() as client: | ||
with pytest.raises(ConnectionError): | ||
client.connect('http://localhost:8900', namespace='/admin', | ||
auth={'foo': 'baz'}) | ||
with socketio.SimpleClient() as client: | ||
with pytest.raises(ConnectionError): | ||
client.connect('http://localhost:8900', namespace='/admin') | ||
|
||
@with_instrumented_server(auth=[{'foo': 'bar'}, | ||
{'u': 'admin', 'p': 'secret'}]) | ||
def test_admin_connect_with_list_auth(self): | ||
with socketio.SimpleClient() as client: | ||
client.connect('http://localhost:8900', namespace='/admin', | ||
auth={'foo': 'bar'}) | ||
with socketio.SimpleClient() as client: | ||
client.connect('http://localhost:8900', namespace='/admin', | ||
auth={'u': 'admin', 'p': 'secret'}) | ||
with socketio.SimpleClient() as client: | ||
with pytest.raises(ConnectionError): | ||
client.connect('http://localhost:8900', namespace='/admin', | ||
auth={'foo': 'baz'}) | ||
with socketio.SimpleClient() as client: | ||
with pytest.raises(ConnectionError): | ||
client.connect('http://localhost:8900', namespace='/admin') | ||
|
||
@with_instrumented_server(auth=custom_auth) | ||
def test_admin_connect_with_function_auth(self): | ||
with socketio.SimpleClient() as client: | ||
client.connect('http://localhost:8900', namespace='/admin', | ||
auth={'foo': 'bar'}) | ||
with socketio.SimpleClient() as client: | ||
with pytest.raises(ConnectionError): | ||
client.connect('http://localhost:8900', namespace='/admin', | ||
auth={'foo': 'baz'}) | ||
with socketio.SimpleClient() as client: | ||
with pytest.raises(ConnectionError): | ||
client.connect('http://localhost:8900', namespace='/admin') | ||
|
||
@with_instrumented_server() | ||
def test_admin_connect_only_admin(self): | ||
with socketio.SimpleClient() as client: | ||
client.connect('http://localhost:8900', namespace='/admin') | ||
sid = client.sid | ||
expected = ['config', 'all_sockets', 'server_stats'] | ||
events = {} | ||
while expected: | ||
data = client.receive(timeout=5) | ||
if data[0] in expected: | ||
events[data[0]] = data[1] | ||
expected.remove(data[0]) | ||
assert 'supportedFeatures' in events['config'] | ||
assert len(events['all_sockets']) == 1 | ||
assert events['all_sockets'][0]['id'] == sid | ||
assert events['all_sockets'][0]['rooms'] == [sid] | ||
assert events['server_stats']['clientsCount'] == 1 | ||
assert events['server_stats']['pollingClientsCount'] == 0 | ||
assert len(events['server_stats']['namespaces']) == 3 | ||
assert {'name': '/', 'socketsCount': 0} in \ | ||
events['server_stats']['namespaces'] | ||
assert {'name': '/foo', 'socketsCount': 0} in \ | ||
events['server_stats']['namespaces'] | ||
assert {'name': '/admin', 'socketsCount': 1} in \ | ||
events['server_stats']['namespaces'] | ||
|
||
@with_instrumented_server() | ||
def test_admin_connect_with_others(self): | ||
client1 = socketio.SimpleClient() | ||
client1.connect('http://localhost:8900') | ||
client1.emit('enter_room', 'room') | ||
sid1 = client1.sid | ||
|
||
client2 = socketio.SimpleClient() | ||
client2.connect('http://localhost:8900', namespace='/foo') | ||
sid2 = client2.sid | ||
|
||
client3 = socketio.SimpleClient() | ||
client3.connect('http://localhost:8900', namespace='/admin') | ||
sid3 = client3.sid | ||
|
||
with socketio.SimpleClient() as client: | ||
client.connect('http://localhost:8900', namespace='/admin') | ||
sid = client.sid | ||
expected = ['config', 'all_sockets', 'server_stats'] | ||
events = {} | ||
while expected: | ||
data = client.receive(timeout=5) | ||
if data[0] in expected: | ||
events[data[0]] = data[1] | ||
expected.remove(data[0]) | ||
|
||
client3.disconnect() | ||
client2.disconnect() | ||
client1.disconnect() | ||
|
||
assert 'supportedFeatures' in events['config'] | ||
assert len(events['all_sockets']) == 4 | ||
assert events['server_stats']['clientsCount'] == 4 | ||
assert events['server_stats']['pollingClientsCount'] == 0 | ||
assert len(events['server_stats']['namespaces']) == 3 | ||
assert {'name': '/', 'socketsCount': 1} in \ | ||
events['server_stats']['namespaces'] | ||
assert {'name': '/foo', 'socketsCount': 1} in \ | ||
events['server_stats']['namespaces'] | ||
assert {'name': '/admin', 'socketsCount': 2} in \ | ||
events['server_stats']['namespaces'] | ||
|
||
for socket in events['all_sockets']: | ||
if socket['id'] == sid: | ||
assert socket['rooms'] == [sid] | ||
elif socket['id'] == sid1: | ||
assert socket['rooms'] == [sid1, 'room'] | ||
elif socket['id'] == sid2: | ||
assert socket['rooms'] == [sid2] | ||
elif socket['id'] == sid3: | ||
assert socket['rooms'] == [sid3] | ||
|
||
|
||
# instrument in production mode | ||
# instrument production and read-only | ||
# admin emit, join, leave, disconnect |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.