Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add websocket module to py-sdk #117

Merged
merged 12 commits into from
Sep 5, 2022
3 changes: 2 additions & 1 deletion nibiru/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@


import nibiru.common # noqa
import nibiru.msg # noqa
from nibiru.client import GrpcClient # noqa
from nibiru.common import Coin, Direction, PoolAsset, Side, TxConfig # noqa
from nibiru.common import Coin, Direction, PoolAsset, Side, TxConfig, TxType # noqa
from nibiru.network import Network # noqa
from nibiru.sdk import Sdk # noqa
from nibiru.transaction import Transaction # noqa
Expand Down
28 changes: 28 additions & 0 deletions nibiru/event_specs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from dataclasses import dataclass
from enum import Enum


class Events(Enum):
"""
The events enum type shows the type of events available to parse from the nibiruwebsocket object.
"""

# Perp events
PositionChangedEvent = "nibiru.perp.v1.PositionChangedEvent"
PositionLiquidatedEvent = "nibiru.perp.v1.PositionLiquidatedEvent"
FundingRateChangedEvent = "nibiru.perp.v1.FundingRateChangedEvent"
PositionSettledEvent = "nibiru.perp.v1.PositionSettledEvent"

# Vpool events
ReserveSnapshotSavedEvent = "nibiru.vpool.v1.ReserveSnapshotSavedEvent"
SwapQuoteForBaseEvent = "nibiru.vpool.v1.SwapQuoteForBaseEvent"
MarkPriceChanged = "nibiru.vpool.v1.MarkPriceChanged"

# Bank
Transfer = "transfer"


@dataclass
class EventCaptured:
event_type: Events
payload: dict
matthiasmatt marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions nibiru/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class Network:
chain_id: str
fee_denom: str
env: str
websocket_endpoint: str = None

def __post_init__(self):
"""
Expand Down Expand Up @@ -55,6 +56,7 @@ def devnet(cls) -> "Network":
return cls(
lcd_endpoint=f'http://{chain_config["HOST"]}:{chain_config["LCD_PORT"]}',
grpc_endpoint=f'{chain_config["HOST"]}:{chain_config["GRPC_PORT"]}',
websocket_endpoint=os.getenv("WEBSOCKET_ENDPOINT"),
chain_id=chain_config["CHAIN_ID"],
fee_denom='unibi',
env="devnet",
Expand Down
75 changes: 74 additions & 1 deletion nibiru/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from datetime import datetime
from typing import Any, Callable, Union
from functools import reduce
from typing import Any, Callable, Dict, Iterable, List, Union

from deepmerge import Merger
from google.protobuf.timestamp_pb2 import Timestamp

# number of decimal places
PRECISION = 18
MERGER = Merger([(list, "override"), (dict, "merge")], ["override"], ["override"])


# reimplementation of cosmos-sdk/types/decimal.go
Expand Down Expand Up @@ -175,3 +178,73 @@ def toPbTimestamp(dt: datetime):
ts = Timestamp()
ts.FromDatetime(dt)
return ts


def merge_list(list_of_dict: Iterable[Dict]) -> Dict:
"""
Merge a list of dictionnary together.

```
>>> d1 = dict(x=0, y=dict(a=0, b=1), z=dict(x=1))
>>> d2 = dict(y=dict(c=2), z=dict(y=1))
>>> merge_list(d1, d2)

{
"x": 0,
"y": {"a": 0, "b": 1, "c": 2},
"z": {
"x": 1,
"y": 1,
},
}
```

Args:
list_of_dict (Iterable[Dict]): The list of dictionaries to merge

Returns:
Dict: Dictionaries merged together.
"""

return reduce(
lambda x, y: MERGER.merge(x, y),
list_of_dict,
)


def transform_java_dict_to_list(events: Dict) -> List[Dict]:
matthiasmatt marked this conversation as resolved.
Show resolved Hide resolved
"""
Transfrom a java type dictionary to a python dictionary.

For example:

{
'nibiru.vpool.v1.SwapQuoteForBaseEvent.pair': ['ubtc:unusd'],
'tx.fee': ['170unibi'],
}

becomes:

[
{'nibiru': {'vpool': {'v1': {'MarkPriceChanged': {'pair': 'ubtc:unusd'}}}}},
{'tx': {'fee': '170unibi'}},
]
matthiasmatt marked this conversation as resolved.
Show resolved Hide resolved

Args:
events (Dict): Events coming from nibiru

Returns:
List[Dict]: List of nested dictionaries
"""
return [
reduce(
lambda x, y: {y: x},
[
[value.strip('"') for value in payload]
if len(payload) > 1
else payload[0].strip('"') # single element list is starred
matthiasmatt marked this conversation as resolved.
Show resolved Hide resolved
]
+ event.split(".")[::-1],
)
for event, payload in events.items()
]
112 changes: 112 additions & 0 deletions nibiru/websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import functools
import json
import operator
import threading
import time
from multiprocessing import Queue
from typing import List

from websocket import WebSocketApp

import nibiru.utils
from nibiru import Network
from nibiru.event_specs import EventCaptured, Events

ERROR_TIMEOUT_SLEEP = 3


class NibiruWebsocket:
queue: Queue = None
matthiasmatt marked this conversation as resolved.
Show resolved Hide resolved
captured_events_type: List[List[str]]

def __init__(
self, network: Network, captured_events_type: List[Events] = [], verbose=False
):
"""
The nibiru listener provides an interface to easily connect and handle subscription to the events of a nibiru
chain.
"""

self.websocket_url = network.websocket_endpoint
self.captured_events_type = [
captured_event.value.split(".") for captured_event in captured_events_type
]
self.queue = Queue()

def start(self):
"""
Start the websocket and fill the queue with events.
"""

self._ws = WebSocketApp(
self.websocket_url,
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
)
threading.Thread(
target=self._ws.run_forever,
daemon=True,
name=f"Nibiru websocket @ {self.websocket_url}",
).start()

def _on_open(self, _: WebSocketApp):
print("WebSocket starting")
matthiasmatt marked this conversation as resolved.
Show resolved Hide resolved
self._subscribe()

def _on_error(self, ws: WebSocketApp, error: Exception):
print(f"Closing websocket, error {error}")
ws.close()
time.sleep(ERROR_TIMEOUT_SLEEP)
ws.run_forever()

def _on_message(self, _: WebSocketApp, message: str):
"""
Parse the message and filter through them using the captured event type.
Put these filtered event in the queue.

Args:
_ (WebSocketApp): No idea what this is
message (str): The message in a utf-8 data received from the server
"""
log = json.loads(message).get("result")
if log is None:
return

events = log.get("events")
if events is None:
return

block_height = log["data"]["value"]["TxResult"]["height"]
matthiasmatt marked this conversation as resolved.
Show resolved Hide resolved

events = nibiru.utils.merge_list(
nibiru.utils.transform_java_dict_to_list(events)
)

for event_type in self.captured_events_type:
try:
event_payload = functools.reduce(operator.getitem, event_type, events)
except KeyError:
pass
else: # if no KeyError
event_payload["block_height"] = block_height
matthiasmatt marked this conversation as resolved.
Show resolved Hide resolved
event_payload["timestamp"] = time.time_ns()
matthiasmatt marked this conversation as resolved.
Show resolved Hide resolved

self.queue.put(
EventCaptured(
event_type=".".join(event_type),
payload=event_payload,
)
)

def _subscribe(self):
self._ws.send(
json.dumps(
{
"jsonrpc": "2.0",
"method": "subscribe",
"id": 1,
"params": {"query": "tm.event='Tx'"},
}
)
)
45 changes: 37 additions & 8 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pytest = "^7.1.2"
pre-commit = "^2.20.0"
nibiru-proto = "^0.14.1"
shutup = "^0.2.0"
websocket-client = "^1.4.0"
deepmerge = "^1.0.1"

[tool.poetry.dev-dependencies]
black = "^22.6.0"
Expand Down
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def pytest_configure(config):
raise ValueError(f"Environment variable {var} is missing!")

pytest.VALIDATOR_MNEMONIC = os.getenv("VALIDATOR_MNEMONIC")
pytest.WEBSOCKET_ENDPOINT = os.getenv("WEBSOCKET_ENDPOINT")
pytest.ORACLE_MNEMONIC = os.getenv("ORACLE_MNEMONIC")
pytest.NETWORK_INSECURE = os.getenv("NETWORK_INSECURE") != "false"

Expand Down
Loading