Skip to content

Commit

Permalink
Merge branch 'feature/objectMap' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
xeroc committed Jan 22, 2016
2 parents 0389d95 + 0d25d59 commit b5f74f7
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 113 deletions.
46 changes: 34 additions & 12 deletions grapheneapi/grapheneclient.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,31 @@
from .grapheneapi import GrapheneAPI
from .graphenews import GrapheneWebsocket
from collections import OrderedDict

import logging as log

#: max number of objects to chache
max_cache_objects = 50


class LimitedSizeDict(OrderedDict):
""" This class limits the size of the objectMap
"""

def __init__(self, *args, **kwds):
self.size_limit = kwds.pop("size_limit", max_cache_objects)
OrderedDict.__init__(self, *args, **kwds)
self._check_size_limit()

def __setitem__(self, key, value):
OrderedDict.__setitem__(self, key, value)
self._check_size_limit()

def _check_size_limit(self):
if self.size_limit is not None:
while len(self) > self.size_limit:
self.popitem(last=False)


class ExampleConfig() :
""" The behavior of your program (e.g. reactions on messages) can be
Expand Down Expand Up @@ -474,17 +497,16 @@ def getObject(self, oid):
from RPC connection.
"""
if self.ws :
# [_instance, _type, _id] = oid.split(".")
# if (not (oid in self.ws.proto.objectMap) or
# _instance == "1" and _type == "7"): # force refresh orders
# data = self.rpc.get_object(oid)
# self.ws.proto.objectMap[oid] = data
# else:
# data = self.ws.proto.objectMap[oid]
# if len(data) == 1 :
# return data[0]
# else:
# return data
return self.ws.get_objects([oid])[0]
[_instance, _type, _id] = oid.split(".")
if (not (oid in self.ws.objectMap) or
_instance == "1" and _type == "7"): # force refresh orders
data = self.rpc.get_object(oid)
self.ws.objectMap[oid] = data
else:
data = self.ws.objectMap[oid]
if len(data) == 1 :
return data[0]
else:
return data
else :
return self.rpc.get_object(oid)[0]
39 changes: 39 additions & 0 deletions grapheneapi/graphenews.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import time
import asyncio
import ssl
from collections import OrderedDict

try:
from autobahn.asyncio.websocket import WebSocketClientFactory
Expand All @@ -10,6 +11,28 @@
from .graphenewsprotocol import GrapheneWebsocketProtocol
from .graphenewsrpc import GrapheneWebsocketRPC

#: max number of objects to chache
max_cache_objects = 50


class LimitedSizeDict(OrderedDict):
""" This class limits the size of the objectMap
"""

def __init__(self, *args, **kwds):
self.size_limit = kwds.pop("size_limit", max_cache_objects)
OrderedDict.__init__(self, *args, **kwds)
self._check_size_limit()

def __setitem__(self, key, value):
OrderedDict.__setitem__(self, key, value)
self._check_size_limit()

def _check_size_limit(self):
if self.size_limit is not None:
while len(self) > self.size_limit:
self.popitem(last=False)


class GrapheneWebsocket(GrapheneWebsocketRPC):
""" This class serves as a management layer for the websocket
Expand All @@ -36,6 +59,8 @@ def __init__(self, url, username, password,
self.proto = proto
self.proto.username = username
self.proto.password = password
self.objectMap = LimitedSizeDict()
self.proto.objectMap = self.objectMap # this is a reference
self.factory = None

def setObjectCallbacks(self, callbacks) :
Expand Down Expand Up @@ -109,6 +134,14 @@ def get_object(self, oid):
"""
return self.get_objects([oid])[0]

def getObject(self, oid):
if self.objectMap is not None and oid in self.objectMap:
return self.objectMap[oid]
else:
data = self.get_object(oid)
self.objectMap[oid] = data
return data

def connect(self) :
""" Create websocket factory by Autobahn
"""
Expand All @@ -121,6 +154,12 @@ def run_forever(self) :
This method will try to keep the connection alive and try an
autoreconnect if the connection closes.
"""
if not issubclass(self.factory.protocol, GrapheneWebsocketProtocol) :
raise Exception("When using run(), we need websocket " +
"notifications which requires the " +
"configuration/protocol to inherit " +
"'GrapheneWebsocketProtocol'")

loop = asyncio.get_event_loop()
# forward loop into protocol so that we can issue a reset from the
# protocol:
Expand Down
27 changes: 12 additions & 15 deletions grapheneapi/graphenewsprotocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
except ImportError:
raise ImportError("Missing dependency: python-autobahn")

"""
Graphene Websocket Protocol
"""


class GrapheneWebsocketProtocol(WebSocketClientProtocol):
""" Graphene Websocket Protocol is the class that will be used
Expand All @@ -35,7 +31,7 @@ class GrapheneWebsocketProtocol(WebSocketClientProtocol):
markets = []

#: Storage of Objects to reduce latency and load
objectMap = {}
objectMap = None

#: Event Callback registrations and fnts
onEventCallbacks = {}
Expand Down Expand Up @@ -234,20 +230,21 @@ def getObjectcb(self, oid, callback, *args):
:param object-id oid: Object ID to retrieve
:param fnt callback: Callback to call if object has been received
"""
# if oid in self.objectMap and callable(callback):
# callback(self.objectMap[oid])
# else:
handles = [partial(self.setObject, oid)]
if callback and callable(callback):
handles.append(callback)
self.wsexec([self.api_ids["database"],
"get_objects",
[[oid]]], handles)
if self.objectMap is not None and oid in self.objectMap and callable(callback):
callback(self.objectMap[oid])
else:
handles = [partial(self.setObject, oid)]
if callback and callable(callback):
handles.append(callback)
self.wsexec([self.api_ids["database"],
"get_objects",
[[oid]]], handles)

def setObject(self, oid, data):
""" Set Object in the internal Object Storage
"""
self.objectMap[oid] = data
if self.objectMap is not None:
self.objectMap[oid] = data

def onConnect(self, response):
""" Is executed on successful connect. Calls event
Expand Down
65 changes: 32 additions & 33 deletions scripts/monitor-deposits/monitor.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,29 @@
import json
from grapheneapi import GrapheneWebsocket, GrapheneWebsocketProtocol
from graphenebase import Memo, PrivateKey, PublicKey
import config

""" PubKey Prefix
Productive network: BTS
Testnetwork: GPH """
#prefix = "GPH"
#: PubKey Prefix
#: * Productive network: BTS
#: * Testnetwork: GPH """
# prefix = "GPH"
prefix = "BTS"

""" Callback on event
This function will be triggered on a notification of the witness.
If you subsribe (see below) to 2.6.*, the witness node will notify you of
any chances regarding your account_balance """

class GrapheneMonitor(GrapheneWebsocketProtocol) :
last_op = "1.11.0"
account_id = "1"

def __init__(self) :
super().__init__()

def onAccountUpdate(self, data=None) :
if data :
opID = api.getObject(data["most_recent_op"])["operation_id"]
else :
if data :
opID = api.getObject(data["most_recent_op"])["operation_id"]
else :
opID = self.last_op
self.wsexec([self.api_ids["history"], "get_account_history", [self.account_id, self.last_op, 100, "1.11.0"]], self.process_operations)
if data : self.last_op = opID
if data :
self.last_op = opID

def process_operations(self, operations) :
for operation in operations[::-1] :
Expand All @@ -34,51 +32,52 @@ def process_operations(self, operations) :
op = operation["op"][1]

" Consider only Transfer operations "
if operation["op"][0] != 0: continue
if operation["op"][0] != 0:
continue

# Get assets involved in Fee and Transfer
fee_asset = api.get_object(op["fee"]["asset_id"])
amount_asset = api.get_object(op["amount"]["asset_id"])
fee_asset = api.getObject(op["fee"]["asset_id"])
amount_asset = api.getObject(op["amount"]["asset_id"])

# Amounts for fee and transfer
fee_amount = float(op["fee"]["amount"]) / float(10**int(fee_asset["precision"]))
amount_amount= float(op["amount"]["amount"]) / float(10**int(amount_asset["precision"]))
fee_amount = float(op["fee"]["amount"]) / float(10 ** int(fee_asset["precision"]))
amount_amount = float(op["amount"]["amount"]) / float(10 ** int(amount_asset["precision"]))

# Get accounts involved
from_account = api.get_object(op["from"])
to_account = api.get_object(op["to"])
from_account = api.getObject(op["from"])
to_account = api.getObject(op["to"])

# Decode the memo
memomsg = ""
if "memo" in op :
memo = op["memo"]
try : # if possible
try : # if possible
privkey = PrivateKey(config.memo_wif_key)
pubkey = PublicKey(memo["from"], prefix=prefix)
memomsg = Memo.decode_memo(privkey, pubkey, memo["nonce"], memo["message"])
except Exception as e: # if not possible
except Exception as e: # if not possible
memomsg = "--cannot decode-- %s" % str(e)
# Print out
print("last_op: %s | block:%s | from %s -> to: %s | fee: %f %s | amount: %f %s | memo: %s" % (
opID, block,
from_account["name"], to_account["name"],
fee_amount, fee_asset["symbol"],
amount_amount, amount_asset["symbol"],
memomsg))
opID, block,
from_account["name"], to_account["name"],
fee_amount, fee_asset["symbol"],
amount_amount, amount_asset["symbol"],
memomsg))

if __name__ == '__main__':
## Monitor definitions
# Monitor definitions
protocol = GrapheneMonitor
protocol.last_op = config.last_op ## last operation logged
protocol.account_id = "1.2.%s" % config.accountID.split(".")[2] ## account to monitor
protocol.last_op = config.last_op # last operation logged
protocol.account_id = "1.2.%s" % config.accountID.split(".")[2] # account to monitor

## Open Up Graphene Websocket API
# Open Up Graphene Websocket API
api = GrapheneWebsocket(config.url, config.user, config.password, protocol)

## Set Callback for object changes
# Set Callback for object changes
api.setObjectCallbacks({config.accountID : protocol.onAccountUpdate})
api.setEventCallbacks({"registered-history" : protocol.onAccountUpdate})

## Run the Websocket connection continuously
# Run the Websocket connection continuously
api.connect()
api.run_forever()
Loading

0 comments on commit b5f74f7

Please sign in to comment.