Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add websocket module to py-sdk #117

Merged
merged 12 commits into from
Sep 5, 2022
1 change: 1 addition & 0 deletions .github/workflows/pytests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ jobs:
GRPC_PORT: ${{ secrets.GRPC_PORT }}
LCD_PORT: ${{ secrets.LCD_PORT }}
NETWORK_INSECURE: ${{ secrets.NETWORK_INSECURE }}
WEBSOCKET_ENDPOINT: ${{ secrets.WEBSOCKET_ENDPOINT }}
steps:
# ----------------------------------------------
# check-out repo and set-up python
Expand Down
3 changes: 2 additions & 1 deletion nibiru/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@


import nibiru.common # noqa
import nibiru.msg # noqa
from nibiru.client import GrpcClient # noqa
from nibiru.common import Coin, Direction, PoolAsset, Side, TxConfig # noqa
from nibiru.common import Coin, Direction, PoolAsset, Side, TxConfig, TxType # noqa
from nibiru.network import Network # noqa
from nibiru.sdk import Sdk # noqa
from nibiru.transaction import Transaction # noqa
Expand Down
28 changes: 28 additions & 0 deletions nibiru/event_specs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from dataclasses import dataclass
from enum import Enum


class EventType(Enum):
"""
The events enum type shows the type of events available to parse from the nibiruwebsocket object.
"""

# Perp events
PositionChangedEvent = "nibiru.perp.v1.PositionChangedEvent"
PositionLiquidatedEvent = "nibiru.perp.v1.PositionLiquidatedEvent"
FundingRateChangedEvent = "nibiru.perp.v1.FundingRateChangedEvent"
PositionSettledEvent = "nibiru.perp.v1.PositionSettledEvent"

# Vpool events
ReserveSnapshotSavedEvent = "nibiru.vpool.v1.ReserveSnapshotSavedEvent"
SwapQuoteForBaseEvent = "nibiru.vpool.v1.SwapQuoteForBaseEvent"
MarkPriceChanged = "nibiru.vpool.v1.MarkPriceChanged"

# Bank
Transfer = "transfer"


@dataclass
class EventCaptured:
event_type: EventType
payload: dict
2 changes: 2 additions & 0 deletions nibiru/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class Network:
chain_id: str
fee_denom: str
env: str
websocket_endpoint: str = None

def __post_init__(self):
"""
Expand Down Expand Up @@ -55,6 +56,7 @@ def devnet(cls) -> "Network":
return cls(
lcd_endpoint=f'http://{chain_config["HOST"]}:{chain_config["LCD_PORT"]}',
grpc_endpoint=f'{chain_config["HOST"]}:{chain_config["GRPC_PORT"]}',
websocket_endpoint=os.getenv("WEBSOCKET_ENDPOINT"),
chain_id=chain_config["CHAIN_ID"],
fee_denom='unibi',
env="devnet",
Expand Down
67 changes: 67 additions & 0 deletions nibiru/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import logging
import sys
from datetime import datetime
from typing import Any, Callable, Union

Expand Down Expand Up @@ -175,3 +177,68 @@ def toPbTimestamp(dt: datetime):
ts = Timestamp()
ts.FromDatetime(dt)
return ts


class ColoredFormatter(logging.Formatter):

fmt = "%(asctime)s|%(levelname)s|%(funcName)s| %(message)s"

white = "\x1b[97;20m"
grey = "\x1b[38;20m"
green = "\x1b[32;20m"
cyan = "\x1b[36;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"

FORMATS = {
logging.DEBUG: fmt.format(green, reset),
logging.INFO: fmt.format(cyan, reset),
logging.WARNING: fmt.format(yellow, reset),
logging.ERROR: fmt.format(red, reset),
logging.CRITICAL: fmt.format(bold_red, reset),
}

def format(self, record: logging.LogRecord):
"""Formats a record for the logging handler.

Args:
record (logging.LogRecord): Represents an instance of an event being
logged.
"""
log_format = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_format, datefmt="%H:%M:%S")
return formatter.format(record=record)


def init_logger(name: str) -> logging.Logger:
"""
Simple logger to use throughout the test suite.

Examples:
```python
from nibiru.utils import init_logger
LOGGER = init_logger("test-logger")
LOGGER.info("successfully executed tx staking command")
LOGGER.debug("debugging error message")
```

Log levels include: [debug, info, warning, error, critical]

Args:
name (str): Name of the logger

Returns:
logging.Logger: The logger object
"""
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)

# Logs to stdout so we can at least see logs in GHA.
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)

handler.setFormatter(fmt=ColoredFormatter())
logger.addHandler(handler)
return logger
115 changes: 115 additions & 0 deletions nibiru/websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import json
import threading
import time
from multiprocessing import Queue
from typing import List

from websocket import WebSocketApp

from nibiru import Network
from nibiru.event_specs import EventCaptured, EventType
from nibiru.utils import init_logger

ERROR_TIMEOUT_SLEEP = 3


class NibiruWebsocket:
queue: Queue = None
matthiasmatt marked this conversation as resolved.
Show resolved Hide resolved
captured_events_type: List[List[str]]

def __init__(
self,
network: Network,
captured_events_type: List[EventType] = [],
):
"""
The nibiru listener provides an interface to easily connect and handle subscription to the events of a nibiru
chain.
"""

self.websocket_url = network.websocket_endpoint
if self.websocket_url is None:
raise ValueError(
"No websocket endpoint provided. Construct the network object setting up the "
"`websocket_endpoint` endpoint"
)

self.captured_events_type: dict = {
captured_event.value: captured_event
for captured_event in captured_events_type
}
self.queue = Queue()
self.logger = init_logger("ws-logger")

def start(self):
"""
Start the websocket and fill the queue with events.
"""

self._ws = WebSocketApp(
self.websocket_url,
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
)
threading.Thread(
target=self._ws.run_forever,
daemon=True,
name=f"Nibiru websocket @ {self.websocket_url}",
).start()

def _on_open(self, _: WebSocketApp):
self.logger.info("WebSocket starting")
self._subscribe()

def _on_error(self, ws: WebSocketApp, error: Exception):
self.logger.error(f"Closing websocket, error {error}")
self.logger.exception(error)
ws.close()
time.sleep(ERROR_TIMEOUT_SLEEP)
ws.run_forever()

def _on_message(self, _: WebSocketApp, message: str):
"""
Parse the message and filter through them using the captured event type.
Put these filtered event in the queue.

Args:
_ (WebSocketApp): No idea what this is
message (str): The message in a utf-8 data received from the server
"""
log = json.loads(message).get("result")
if log is None:
return

events = log.get("events")
if events is None:
return

block_height = int(log["data"]["value"]["TxResult"]["height"])
tx_hash = events["tx.hash"][0]

events = json.loads(log["data"]["value"]["TxResult"]["result"]["log"])[0]

for event in events["events"]:
if event["type"] in self.captured_events_type:
event_payload = {
attribute["key"]: attribute["value"].strip('"')
for attribute in event["attributes"]
}
event_payload["block_height"] = block_height
matthiasmatt marked this conversation as resolved.
Show resolved Hide resolved
event_payload["tx_hash"] = tx_hash

self.queue.put(EventCaptured(event["type"], event_payload))

def _subscribe(self):
self._ws.send(
json.dumps(
{
"jsonrpc": "2.0",
"method": "subscribe",
"id": 1,
"params": {"query": "tm.event='Tx'"},
}
)
)
81 changes: 49 additions & 32 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading