Skip to content

Commit

Permalink
[API] Websocket Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
xeroc committed Dec 31, 2015
1 parent 40c7a27 commit 823857e
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 26 deletions.
5 changes: 4 additions & 1 deletion grapheneapi/graphenews.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class GrapheneWebsocket(GrapheneAPI):
def __init__(self, url, username, password,
proto=GrapheneWebsocketProtocol):
ssl, host, port, resource, path, params = parseWsUrl(url)
super().__init__(host, port, username, password)
GrapheneAPI.__init__(self, host, port, username, password)
self.url = url
self.username = username
self.password = password
Expand All @@ -27,6 +27,7 @@ def __init__(self, url, username, password,
self.proto = proto
self.proto.username = self.username
self.proto.password = self.password
self.factory = None

""" Get an object_id by name
"""
Expand Down Expand Up @@ -87,6 +88,8 @@ def setMarketCallBack(self, markets) :
self.proto.markets = markets

""" Get_Object as a passthrough from get_objects([array])
Attention: This call requires GrapheneAPI because it is a non-blocking
JSON query
"""
def get_object(self, oid):
return self.get_objects([oid])[0]
Expand Down
85 changes: 60 additions & 25 deletions grapheneapi/graphenewsprotocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ class GrapheneWebsocketProtocol(WebSocketClientProtocol):
def __init__(self):
pass

"""
API Calls
"""
def wsexec(self, params, callback=None):
request = {"request" : {}, "callback" : None}
self.request_id += 1
Expand All @@ -39,11 +42,20 @@ def wsexec(self, params, callback=None):
# print(request["request"])
self.sendMessage(json.dumps(request["request"]).encode('utf8'))

"""
Callback management
"""
def eventcallback(self, name):
if (name in self.onEventCallbacks and
callable(self.onEventCallbacks[name])):
self.onEventCallbacks[name](self)

"""
Registration to Graphene-APIs
"""
def register_api(self, name):
self.wsexec([1, name, []], [partial(self._set_api_id, name)])

def _set_api_id(self, name, data):
self.api_ids.update({name : data})
if name == "database":
Expand All @@ -58,20 +70,9 @@ def _set_api_id(self, name, data):
def _login(self):
self.wsexec([1, "login", [self.username, self.password]])

def getObjectcb(self, oid, callback, *args):
if oid in self.objectMap and callable(callback):
callback(self.objectMap[oid])
else:
handles = [partial(self.setObject, oid)]
if callback and callable(callback):
handles.append(callback)
self.wsexec([self.api_ids["database"],
"get_objects",
[[oid]]], handles)

def setObject(self, oid, data):
self.objectMap[oid] = data

"""
Subscriptions
"""
def subscribe_to_accounts(self, account_ids, *args):
self.wsexec([0, "get_full_accounts", [account_ids, True]])

Expand All @@ -97,20 +98,39 @@ def subscribe_to_objects(self, *args):
"set_subscribe_callback",
[self.request_id, False]], handles)

""" Get History
"""
def getAccountHistory(self, account_id, callback,
start="1.11.0", stop="1.11.0", limit=100):
if account_id[0:4] == "1.2." :
self.wsexec([self.api_ids["history"],
"get_account_history",
[account_id, start, 100, stop]],
callback)
else :
raise Exception("getAccountHistory expects an account" +
"id of the form '1.2.x'!")

"""
Main Message Dispatcher
"""
def dispatchNotice(self, notice):
if "id" not in notice:
return
oid = notice["id"]
[inst, _type, _id] = oid.split(".")
account_ids = []
for a in self.accounts :
account_ids.append("2.6.%s" % a.split(".")[2])
account_ids.append("1.2.%s" % a.split(".")[2])
try:
if (oid in self.database_callbacks_ids and
callable(self.database_callbacks_ids[oid])):
self.database_callbacks_ids[oid](self, notice)

" Account Notifications "
if (callable(self.accounts_callback) and
(("2.6.%s" % _id in self.accounts) or
("1.2.%s" % _id in self.accounts))):
oid in account_ids):
self.accounts_callback(notice)

" Market notifications "
Expand All @@ -125,13 +145,29 @@ def dispatchNotice(self, notice):

except Exception as e:
print('Error dispatching notice: %s' % str(e))
import traceback
traceback.print_exc()

def register_api(self, name):
self.wsexec([1, name, []], [partial(self._set_api_id, name)])
"""
Object Management
"""
def getObjectcb(self, oid, callback, *args):
if oid in self.objectMap and callable(callback):
callback(self.objectMap[oid])
else:
handles = [partial(self.setObject, oid)]
if callback and callable(callback):
handles.append(callback)
self.wsexec([self.api_ids["database"],
"get_objects",
[[oid]]], handles)

def setObject(self, oid, data):
self.objectMap[oid] = data

################
# Websocket API
################
"""
Websocket API via Autobahn
"""
def onConnect(self, response):
self.request_id = 1
print("Server connected: {0}".format(response.peer))
Expand All @@ -151,13 +187,13 @@ def onOpen(self):
# self.register_api("network_node")
self.register_api("network_broadcast")

" main websocket message dispatcher "
def onMessage(self, payload, isBinary):
res = json.loads(payload.decode('utf8'))
# print("\n\nServer: " + json.dumps(res,indent=1))
# print("\n\nServer: " + str(res))
if "error" not in res:
""" Resolve answers from RPC calls
"""
" Resolve answers from RPC calls "
if "id" in res:
if res["id"] not in self.requests:
print("Received answer to an unknown request?!")
Expand All @@ -169,8 +205,7 @@ def onMessage(self, payload, isBinary):
for callback in callbacks:
callback(res["result"])
elif "method" in res:
""" Run registered call backs for individual object notices
"""
" Run registered call backs for individual object notices "
if res["method"] == "notice":
[self.setObject(notice["id"], notice)
for notice in res["params"][1][0] if "id" in notice]
Expand Down

0 comments on commit 823857e

Please sign in to comment.