Skip to content

Commit

Permalink
RPC updates to ensure lock is released
Browse files Browse the repository at this point in the history
  • Loading branch information
xeroc committed Oct 15, 2018
1 parent 265e2ab commit a433aed
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 17 deletions.
30 changes: 20 additions & 10 deletions grapheneapi/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,22 @@ def chain_params(self):
def get_network(self):
return self.get_chain_properties()

def updated_connection(self):
if self.url[:2] == "ws":
return Websocket(self.url, **self._kwargs)
elif self.url[:4] == "http":
return Http(self.url, **self._kwargs)
else:
raise ValueError("Only support http(s) and ws(s) connections!")

@property
def connection(self):
if self._active_url != self.url:
if self.url[:2] == "ws":
self._active_connection = Websocket(self.url, **self._kwargs)
elif self.url[:4] == "http":
self._active_connection = Http(self.url, **self._kwargs)
else:
raise ValueError("Only support http(s) and ws(s) connections!")
log.debug("Updating connection from {} to {}".format(
self._active_url, self.url
))
self._active_connection = self.updated_connection()
self._active_url = self.url
return self._active_connection

def connect(self):
Expand All @@ -71,7 +78,6 @@ def connect(self):
log.warning(str(e))
self.error_url()
self.next()
self._active_url = self.url
self.register_apis()

def find_next(self):
Expand Down Expand Up @@ -144,7 +150,8 @@ def api_id(self):
self.api_id["database"] = self.database(api_id=1)
self.api_id["history"] = self.history(api_id=1)
self.api_id["network_broadcast"] = self.network_broadcast(api_id=1)
self.api_id["network_broadcast"] = self.network_broadcast(
api_id=1)
"""
return self.connection.api_id
Expand All @@ -160,8 +167,8 @@ def register_apis(self): # pragma: no cover
def __getattr__(self, name):
def func(*args, **kwargs):
while True:
func = self.connection.__getattr__(name)
try:
func = self.connection.__getattr__(name)
r = func(*args, **kwargs)
self.reset_counter()
break
Expand All @@ -175,7 +182,10 @@ def func(*args, **kwargs):
# break
break # pragma: no cover
except IOError as e: # pragma: no cover
log.critical("Connection was closed remotely. Retrying")
import traceback
log.debug(traceback.format_exc())
log.warning("Connection was closed remotely.")
log.warning("Reconnecting ...")
self.error_url()
self.next()
except Exception as e: # pragma: no cover
Expand Down
17 changes: 10 additions & 7 deletions grapheneapi/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def __init__(self, *args, **kwargs):

def connect(self):
log.debug("Trying to connect to node %s" % self.url)
self._request_id = 0
if self.url[:3] == "wss":
ssl_defaults = ssl.get_default_verify_paths()
sslopt_ca_certs = {'ca_certs': ssl_defaults.cafile}
Expand Down Expand Up @@ -57,13 +58,15 @@ def rpcexec(self, payload):
self.__lock.acquire()

# Send over websocket
self.ws.send(
json.dumps(payload, ensure_ascii=False).encode('utf8')
)
# Receive from websocket
ret = self.ws.recv()
try:
self.ws.send(
json.dumps(payload, ensure_ascii=False).encode('utf8')
)
# Receive from websocket
ret = self.ws.recv()

# Release lock
self.__lock.release()
finally:
# Release lock
self.__lock.release()

return ret

0 comments on commit a433aed

Please sign in to comment.