diff --git a/grapheneasync/rpc.py b/grapheneasync/rpc.py index 3adb6d04..2a8fc1bc 100644 --- a/grapheneasync/rpc.py +++ b/grapheneasync/rpc.py @@ -1,27 +1,22 @@ # -*- coding: utf-8 -*- import json -import asyncio -import websockets +import logging +from grapheneapi.rpc import Rpc as OriginalRpc -class Rpc: +log = logging.getLogger(__name__) + + +class Rpc(OriginalRpc): def __init__(self, url, *args, **kwargs): - self.url = url + self.api_id = {} self._request_id = 0 + self.url = url def get_request_id(self): self._request_id += 1 return self._request_id - async def connect(self): - self.websocket = await websockets.connect(self.url) - - async def disconnect(self): - pass - - async def rpcexec(self, payload): - await self.websocket.send(json.dumps(payload)) - return await self.websocket.recv() def __getattr__(self, name): """ Map all methods to RPC calls and pass through the arguments @@ -31,7 +26,22 @@ def __getattr__(self, name): """ async def method(*args, **kwargs): - api_id = kwargs.get("api_id", kwargs.get("api", 0)) + + # Sepcify the api to talk to + if "api_id" not in kwargs: # pragma: no cover + if "api" in kwargs: + if kwargs["api"] in self.api_id and self.api_id[kwargs["api"]]: + api_id = self.api_id[kwargs["api"]] + else: + api_id = kwargs["api"] + else: + api_id = 0 + else: # pragma: no cover + api_id = kwargs["api_id"] + + # let's be able to define the num_retries per query + self.num_retries = kwargs.get("num_retries", self.num_retries) + query = { "method": "call", "params": [api_id, name, list(args)], @@ -39,6 +49,9 @@ async def method(*args, **kwargs): "id": self.get_request_id(), } # Need to await here! - return await self.rpcexec(query) + r = await self.rpcexec(query) + message = self.parse_response(r) + + return message return method diff --git a/grapheneasync/websocket.py b/grapheneasync/websocket.py new file mode 100644 index 00000000..d546b399 --- /dev/null +++ b/grapheneasync/websocket.py @@ -0,0 +1,31 @@ +import asyncio +import websockets +import logging +import json + +from .rpc import Rpc + +log = logging.getLogger(__name__) + + +class Websocket(Rpc): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.ws = None + self.loop = kwargs.get('loop') + + async def connect(self): + self.ws = await websockets.connect(self.url, ssl=True, loop=self.loop) + + async def disconnect(self): + await self.ws.close() + + async def rpcexec(self, payload): + if not self.ws: + await self.connect() + + log.debug(json.dumps(payload)) + + await self.ws.send(json.dumps(payload)) + + return await self.ws.recv() diff --git a/requirements-test.txt b/requirements-test.txt index 71a4af08..1d75a490 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -4,6 +4,7 @@ dateutils==0.6.6 # Unit testing pytest==4.3.1 pytest-mock==1.10.2 +pytest-asyncio==0.10.0 coverage==4.5.3 mock==2.0.0 diff --git a/tests/grapheneasync.py b/tests/grapheneasync.py new file mode 100644 index 00000000..a2cb1ef2 --- /dev/null +++ b/tests/grapheneasync.py @@ -0,0 +1,23 @@ +import pytest +import asyncio +import logging + +from grapheneasync.websocket import Websocket + +logger = logging.getLogger('websockets') +logger.setLevel(logging.DEBUG) + + +@pytest.mark.asyncio +async def test_loop(event_loop): + await asyncio.sleep(1) + + +@pytest.mark.asyncio +async def test_rpc(event_loop): + ws = Websocket('wss://eu.nodes.bitshares.ws') + props = await ws.get_dynamic_global_properties() + await ws.disconnect() + logger.info(props) + assert isinstance(props, dict) + assert props['head_block_number'] > 0