Skip to content

Commit

Permalink
Working asyncio websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
bitphage committed May 19, 2019
1 parent c060ab8 commit 757e7c2
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 15 deletions.
43 changes: 28 additions & 15 deletions grapheneasync/rpc.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,22 @@
# -*- coding: utf-8 -*-
import json
import asyncio
import websockets
import logging

from grapheneapi.rpc import Rpc as OriginalRpc

class Rpc:
log = logging.getLogger(__name__)


class Rpc(OriginalRpc):
def __init__(self, url, *args, **kwargs):
self.url = url
self.api_id = {}
self._request_id = 0
self.url = url

def get_request_id(self):
self._request_id += 1
return self._request_id

async def connect(self):
self.websocket = await websockets.connect(self.url)

async def disconnect(self):
pass

async def rpcexec(self, payload):
await self.websocket.send(json.dumps(payload))
return await self.websocket.recv()

def __getattr__(self, name):
""" Map all methods to RPC calls and pass through the arguments
Expand All @@ -31,14 +26,32 @@ def __getattr__(self, name):
"""

async def method(*args, **kwargs):
api_id = kwargs.get("api_id", kwargs.get("api", 0))

# Sepcify the api to talk to
if "api_id" not in kwargs: # pragma: no cover
if "api" in kwargs:
if kwargs["api"] in self.api_id and self.api_id[kwargs["api"]]:
api_id = self.api_id[kwargs["api"]]
else:
api_id = kwargs["api"]
else:
api_id = 0
else: # pragma: no cover
api_id = kwargs["api_id"]

# let's be able to define the num_retries per query
self.num_retries = kwargs.get("num_retries", self.num_retries)

query = {
"method": "call",
"params": [api_id, name, list(args)],
"jsonrpc": "2.0",
"id": self.get_request_id(),
}
# Need to await here!
return await self.rpcexec(query)
r = await self.rpcexec(query)
message = self.parse_response(r)

return message

return method
31 changes: 31 additions & 0 deletions grapheneasync/websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import asyncio
import websockets
import logging
import json

from .rpc import Rpc

log = logging.getLogger(__name__)


class Websocket(Rpc):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.ws = None
self.loop = kwargs.get('loop')

async def connect(self):
self.ws = await websockets.connect(self.url, ssl=True, loop=self.loop)

async def disconnect(self):
await self.ws.close()

async def rpcexec(self, payload):
if not self.ws:
await self.connect()

log.debug(json.dumps(payload))

await self.ws.send(json.dumps(payload))

return await self.ws.recv()
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ dateutils==0.6.6
# Unit testing
pytest==4.3.1
pytest-mock==1.10.2
pytest-asyncio==0.10.0
coverage==4.5.3
mock==2.0.0

Expand Down
23 changes: 23 additions & 0 deletions tests/grapheneasync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import pytest
import asyncio
import logging

from grapheneasync.websocket import Websocket

logger = logging.getLogger('websockets')
logger.setLevel(logging.DEBUG)


@pytest.mark.asyncio
async def test_loop(event_loop):
await asyncio.sleep(1)


@pytest.mark.asyncio
async def test_rpc(event_loop):
ws = Websocket('wss://eu.nodes.bitshares.ws')
props = await ws.get_dynamic_global_properties()
await ws.disconnect()
logger.info(props)
assert isinstance(props, dict)
assert props['head_block_number'] > 0

0 comments on commit 757e7c2

Please sign in to comment.