Skip to content

Commit

Permalink
[API] Updates, fixes and Adding of 'stream'
Browse files Browse the repository at this point in the history
  • Loading branch information
xeroc committed Jun 6, 2016
1 parent be78e07 commit ce651dc
Show file tree
Hide file tree
Showing 8 changed files with 343 additions and 180 deletions.
2 changes: 1 addition & 1 deletion docs/howto-exchanges-detailed.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ still support exchanges that require more confirmations for deposits.

We provide a so called *delayed* full node which accepts two additional
parameters for the configuration besides those already available with the
standard daemon (read :doc:`full`).
standard daemon.

* `trusted-node` RPC endpoint of a trusted validating node (required)
* `delay-block-count` Number of blocks to delay before advancing chain state (required)
Expand Down
31 changes: 31 additions & 0 deletions docs/howto-monitor-operations.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
***************************************************
Howto Monitor the blockchain for certain operations
***************************************************

Operations in blocks can be monitored relatively easy by using the
`block_stream` (for entire blocks) for `stream` (for specific
operations) generators.

The following example will only show ``transfer`` operations on the
blockchain:

.. code-block:: python
from grapheneapi.grapheneclient import GrapheneClient
from pprint import pprint
class Config():
witness_url = "ws://testnet.bitshares.eu/ws"
if __name__ == '__main__':
client = GrapheneClient(Config)
for b in client.ws.stream("transfer"):
pprint(b)
Note that you can define a starting block and instead of waiting for
sufficient confirmations (irreversible blocks), you can also consider
the real *head* block with:

.. code-block:: python
for b in client.ws.stream("transfer", start=199924, mode="head"):
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Tutorials
.. toctree::
:maxdepth: 1

howto-monitor-operations
howto-exchanges
howto-exchanges-detailed

Expand Down
3 changes: 1 addition & 2 deletions grapheneapi/grapheneapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ def rpcexec(self, payload):
rtype: json
raises RPCConnection: if no connction can be made
raises UnauthorizedError: if the user is not authorized
raise ValueError: if the API returns a non-JSON formated
answer
raise ValueError: if the API returns a non-JSON formated answer
It is not recommended to use this method directly, unless
you know what you are doing. All calls available to the API
Expand Down
73 changes: 26 additions & 47 deletions grapheneapi/grapheneclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,6 @@
import logging
log = logging.getLogger("grapheneapi.grapheneclient")

#: max number of objects to chache
max_cache_objects = 50


class LimitedSizeDict(OrderedDict):
""" This class limits the size of the objectMap
"""

def __init__(self, *args, **kwds):
self.size_limit = kwds.pop("size_limit", max_cache_objects)
OrderedDict.__init__(self, *args, **kwds)
self._check_size_limit()

def __setitem__(self, key, value):
OrderedDict.__setitem__(self, key, value)
self._check_size_limit()

def _check_size_limit(self):
if self.size_limit is not None:
while len(self) > self.size_limit:
self.popitem(last=False)

def __getitem__(self, key):
""" keep the element longer in the memory by moving it to the end
"""
self.move_to_end(key)
return OrderedDict.__getitem__(self, key)


class ExampleConfig() :
""" The behavior of your program (e.g. reactions on messages) can be
Expand Down Expand Up @@ -495,6 +467,30 @@ def getChainInfo(self):
"core_symbol" : core_asset["symbol"],
"chain_id" : chain_id}

def getObject(self, oid):
""" Get an Object either from Websocket store (if available) or
from RPC connection.
"""
if self.ws :
[_instance, _type, _id] = oid.split(".")
if (not (oid in self.ws.objectMap) or
_instance == "1" and _type == "7"): # force refresh orders
data = self.ws.get_object(oid)
self.ws.objectMap[oid] = data
else:
data = self.ws.objectMap[oid]
if len(data) == 1 :
return data[0]
else:
return data
else :
return self.rpc.get_object(oid)[0]

def get_object(self, oid):
""" Identical to ``getObject``
"""
return self.getObject(oid)

""" Forward these calls to Websocket API
"""
def setEventCallbacks(self, callbacks):
Expand All @@ -518,6 +514,8 @@ def setMarketCallBack(self, markets):
"""
self.ws.setMarketCallBack(markets)

""" Connect to Websocket and run asynchronously
"""
def connect(self):
""" Only *connect* to the websocket server. Does **not** run the
subsystem.
Expand All @@ -534,22 +532,3 @@ def run(self):
""" Connect to Websocket server **and** run the subsystem """
self.connect()
self.run_forever()

def getObject(self, oid):
""" Get an Object either from Websocket store (if available) or
from RPC connection.
"""
if self.ws :
[_instance, _type, _id] = oid.split(".")
if (not (oid in self.ws.objectMap) or
_instance == "1" and _type == "7"): # force refresh orders
data = self.ws.get_object(oid)
self.ws.objectMap[oid] = data
else:
data = self.ws.objectMap[oid]
if len(data) == 1 :
return data[0]
else:
return data
else :
return self.rpc.get_object(oid)[0]
162 changes: 95 additions & 67 deletions grapheneapi/graphenews.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,22 @@
log = logging.getLogger("grapheneapi.graphenews")

#: max number of objects to chache
max_cache_objects = 50


class LimitedSizeDict(OrderedDict):
""" This class limits the size of the objectMap
""" This class limits the size of the objectMap to
``max_cache_objects`` (default_ 50).
All objects received are stored in the objectMap and get_object
calls will lookup most objects from this structure
"""

max_cache_objects = 50

def __init__(self, *args, **kwds):
self.size_limit = kwds.pop("size_limit", max_cache_objects)
if "max_cache_objects" in kwds:
self.max_cache_objects = kwds["max_cache_objects"]
self.size_limit = kwds.pop("size_limit", self.max_cache_objects)
OrderedDict.__init__(self, *args, **kwds)
self._check_size_limit()

Expand All @@ -41,6 +48,12 @@ def _check_size_limit(self):
while len(self) > self.size_limit:
self.popitem(last=False)

def __getitem__(self, key):
""" keep the element longer in the memory by moving it to the end
"""
self.move_to_end(key)
return OrderedDict.__getitem__(self, key)


class GrapheneWebsocket(GrapheneWebsocketRPC):
""" This class serves as a management layer for the websocket
Expand All @@ -54,13 +67,25 @@ class GrapheneWebsocket(GrapheneWebsocketRPC):
`autobahn.asyncio.websocket`.
"""

def __init__(self, url, username, password,
def __init__(self, url, username="", password="",
proto=GrapheneWebsocketProtocol):
""" Open A GrapheneWebsocket connection that can handle
notifications though asynchronous calls.
:param str url: Url to the websocket server
:param str username: Username for login
:param str password: Password for login
:param GrapheneWebsocketProtocol proto: (optional) Protocol that inherits ``GrapheneWebsocketProtocol``
"""
ssl, host, port, resource, path, params = parseWsUrl(url)
GrapheneWebsocketRPC.__init__(self, url, username, password)
self.url = url
self.username = username
self.password = password

# Open another RPC connection to execute calls
GrapheneWebsocketRPC.__init__(self, url, username, password)

# Parameters for another connection for asynchronous notifications
self.ssl = ssl
self.host = host
self.port = port
Expand All @@ -71,68 +96,6 @@ def __init__(self, url, username, password,
self.proto.objectMap = self.objectMap # this is a reference
self.factory = None

def setObjectCallbacks(self, callbacks) :
""" Define Callbacks on Objects for websocket connections
:param json callbacks: A object/callback json structur to
register object updates with a
callback
The object/callback structure looks as follows:
.. code-block: json
{
"2.0.0" : print,
"object-id": fnt-callback
}
"""
self.proto.database_callbacks = callbacks

def setAccountsDispatcher(self, accounts, callback) :
""" Subscribe to Full Account Updates
:param accounts: Accounts to subscribe to
:type accounts: array of account IDs
:param fnt callback: function to be called on notifications
"""
self.proto.accounts = accounts
self.proto.accounts_callback = callback

def setEventCallbacks(self, callbacks) :
""" Set Event Callbacks of the subsystem
:param json callbacks: event/fnt json object
Available events:
* ``connection-init``
* ``connection-opened``
* ``connection-closed``
* ``registered-database``
* ``registered-history``
* ``registered-network-broadcast``
* ``registered-network-node``
"""
for key in callbacks :
self.proto.onEventCallbacks[key] = callbacks[key]

def setMarketCallBack(self, markets) :
""" Define Callbacks on Market Events for websocket connections
:param markets: Array of market pairs to register to
:type markets: array of asset pairs
Example
.. code-block:: python
setMarketCallBack(["USD:EUR", "GOLD:USD"])
"""
self.proto.markets = markets

def get_object(self, oid):
""" Get_Object as a passthrough from get_objects([array])
Attention: This call requires GrapheneAPI because it is a non-blocking
Expand All @@ -143,6 +106,9 @@ def get_object(self, oid):
return self.get_objects([oid])[0]

def getObject(self, oid):
""" Lookup objects from the object storage and if not available,
request object from the API
"""
if self.objectMap is not None and oid in self.objectMap:
return self.objectMap[oid]
else:
Expand Down Expand Up @@ -197,3 +163,65 @@ def run_forever(self) :

log.info("Good bye!")
loop.close()

def setObjectCallbacks(self, callbacks) :
""" Define Callbacks on Objects for websocket connections
:param json callbacks: A object/callback json structur to
register object updates with a
callback
The object/callback structure looks as follows:
.. code-block: json
{
"2.0.0" : print,
"object-id": fnt-callback
}
"""
self.proto.database_callbacks = callbacks

def setAccountsDispatcher(self, accounts, callback) :
""" Subscribe to Full Account Updates
:param accounts: Accounts to subscribe to
:type accounts: array of account IDs
:param fnt callback: function to be called on notifications
"""
self.proto.accounts = accounts
self.proto.accounts_callback = callback

def setEventCallbacks(self, callbacks) :
""" Set Event Callbacks of the subsystem
:param json callbacks: event/fnt json object
Available events:
* ``connection-init``
* ``connection-opened``
* ``connection-closed``
* ``registered-database``
* ``registered-history``
* ``registered-network-broadcast``
* ``registered-network-node``
"""
for key in callbacks :
self.proto.onEventCallbacks[key] = callbacks[key]

def setMarketCallBack(self, markets) :
""" Define Callbacks on Market Events for websocket connections
:param markets: Array of market pairs to register to
:type markets: array of asset pairs
Example
.. code-block:: python
setMarketCallBack(["USD:EUR", "GOLD:USD"])
"""
self.proto.markets = markets
Loading

0 comments on commit ce651dc

Please sign in to comment.