Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Feature/neo retry logic #398

Closed
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 131 additions & 56 deletions neomodel/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@
import sys
import time
import warnings
import random
from threading import local

from neo4j.v1 import GraphDatabase, basic_auth, CypherError, SessionError, Node

from . import config
from .exceptions import UniqueProperty, ConstraintValidationFailed, ModelDefinitionMismatch
from .exceptions import (
UniqueProperty,
ConstraintValidationFailed,
ModelDefinitionMismatch,
)

if sys.version_info >= (3, 0):
from urllib.parse import urlparse
Expand All @@ -22,7 +27,7 @@
def ensure_connection(func):
def wrapper(self, *args, **kwargs):
# Sort out where to find url
if hasattr(self, 'db'):
if hasattr(self, "db"):
_db = self.db
else:
_db = self
Expand All @@ -35,7 +40,7 @@ def wrapper(self, *args, **kwargs):


def change_neo4j_password(db, new_password):
db.cypher_query("CALL dbms.changePassword({password})", {'password': new_password})
db.cypher_query("CALL dbms.changePassword({password})", {"password": new_password})


def clear_neo4j_database(db):
Expand All @@ -46,7 +51,7 @@ class Database(local):
"""
A singleton object via which all operations from neomodel to the Neo4j backend are handled with.
"""

def __init__(self):
"""
"""
Expand All @@ -67,17 +72,23 @@ def set_connection(self, url):
"""
u = urlparse(url)

if u.netloc.find('@') > -1 and (u.scheme == 'bolt' or u.scheme == 'bolt+routing'):
credentials, hostname = u.netloc.rsplit('@', 1)
username, password, = credentials.split(':')
if u.netloc.find("@") > -1 and (
u.scheme == "bolt" or u.scheme == "bolt+routing"
):
credentials, hostname = u.netloc.rsplit("@", 1)
username, password, = credentials.split(":")
else:
raise ValueError("Expecting url format: bolt://user:password@localhost:7687"
" got {}".format(url))

self.driver = GraphDatabase.driver(u.scheme + '://' + hostname,
auth=basic_auth(username, password),
encrypted=config.ENCRYPTED_CONNECTION,
max_pool_size=config.MAX_POOL_SIZE)
raise ValueError(
"Expecting url format: bolt://user:password@localhost:7687"
" got {}".format(url)
)

self.driver = GraphDatabase.driver(
u.scheme + "://" + hostname,
auth=basic_auth(username, password),
encrypted=config.ENCRYPTED_CONNECTION,
max_pool_size=config.MAX_POOL_SIZE,
)
self.url = url
self._pid = os.getpid()
self._active_transaction = None
Expand All @@ -92,10 +103,10 @@ def transaction(self):
@property
def write_transaction(self):
return TransactionProxy(self, access_mode="WRITE")

@property
def read_transaction(self):
return TransactionProxy(self, access_mode="READ")
return TransactionProxy(self, access_mode="READ")

@ensure_connection
def begin(self, access_mode=None):
Expand All @@ -104,7 +115,9 @@ def begin(self, access_mode=None):
"""
if self._active_transaction:
raise SystemError("Transaction in progress")
self._active_transaction = self.driver.session(access_mode=access_mode).begin_transaction()
self._active_transaction = self.driver.session(
access_mode=access_mode
).begin_transaction()

@ensure_connection
def commit(self):
Expand Down Expand Up @@ -137,36 +150,49 @@ def _object_resolution(self, result_list):

:return: A list of instantiated objects.
"""

# Object resolution occurs in-place
for a_result_item in enumerate(result_list):
for a_result_attribute in enumerate(a_result_item[1]):
for a_result_attribute in enumerate(a_result_item[1]):
try:
# Primitive types should remain primitive types,
# Primitive types should remain primitive types,
# Nodes to be resolved to native objects
resolved_object = a_result_attribute[1]

if type(a_result_attribute[1]) is Node:
resolved_object = self._NODE_CLASS_REGISTRY[frozenset(a_result_attribute[1].labels)].inflate(
a_result_attribute[1])

resolved_object = self._NODE_CLASS_REGISTRY[
frozenset(a_result_attribute[1].labels)
].inflate(a_result_attribute[1])

if type(a_result_attribute[1]) is list:
resolved_object = self._object_resolution([a_result_attribute[1]])

result_list[a_result_item[0]][a_result_attribute[0]] = resolved_object

resolved_object = self._object_resolution(
[a_result_attribute[1]]
)

result_list[a_result_item[0]][
a_result_attribute[0]
] = resolved_object

except KeyError:
# Not being able to match the label set of a node with a known object results
# in a KeyError in the internal dictionary used for resolution. If it is impossible
# Not being able to match the label set of a node with a known object results
# in a KeyError in the internal dictionary used for resolution. If it is impossible
# to match, then raise an exception with more details about the error.
raise ModelDefinitionMismatch(a_result_attribute[1], self._NODE_CLASS_REGISTRY)

raise ModelDefinitionMismatch(
a_result_attribute[1], self._NODE_CLASS_REGISTRY
)

return result_list



@ensure_connection
def cypher_query(self, query, params=None, handle_unique=True, retry_on_session_expire=False, resolve_objects=False):
def cypher_query(
self,
query,
params=None,
handle_unique=True,
retry_on_session_expire=False,
resolve_objects=False,
max_retry_seconds=60,
):
"""
Runs a query on the database and returns a list of results and their headers.

Expand All @@ -181,7 +207,7 @@ def cypher_query(self, query, params=None, handle_unique=True, retry_on_session_
:param resolve_objects: Whether to attempt to resolve the returned nodes to data model objects automatically
:type: bool
"""

if self._pid != os.getpid():
self.set_connection(self.url)

Expand All @@ -193,17 +219,58 @@ def cypher_query(self, query, params=None, handle_unique=True, retry_on_session_
try:
# Retrieve the data
start = time.clock()
response = session.run(query, params)
results, meta = [list(r.values()) for r in response], response.keys()

delay = 1
while True:
last_exception = None
cts = time.clock()
mvanderkroon marked this conversation as resolved.
Show resolved Hide resolved

if cts - start > max_retry_seconds:
# keep in mind that if many
# different exceptions are
# thrown, we only show the
# one that occured last
raise last_exception

try:
response = session.run(query, params)
results, meta = (
[list(r.values()) for r in response],
response.keys(),
)
break
except Exception as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So - this is the fun bit :-) There are a variety of things to cope with here. Various exceptions can come back depending on where in its lifecycle the cluster currently is:

ServiceUnavailable("Unable to retrieve routing information") 
SessionExpired("Failed to obtain connection towards '%s' server." % access_mode) 
self.Error("Failed to read from defunct connection {!r}".format(self.server.address)) 
AttributeError: 'NoneType' object has no attribute 'commit' 
AttributeError: 'NoneType' object has no attribute 'rollback' 
AttributeError: 'NoneType' object has no attribute 'protocol_version' 
self.Error("Failed to read from closed connection {!r}".format(self.server.address)) 
self.Error("Failed to write to closed connection {!r}".format(self.server.address)) 
ConnectionRefusedError: [Errno 111] Connection refused 
requests.exceptions.ConnectionError: HTTPConnectionPool(host='neo4j-0', port=7474): Max retries exceeded with url: /db/data/transaction/commit

(I think the AttributeErrors are coming from a TransactionProxy getting a SessionExpired exception, retrying successfully but setting db.transaction to None.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ís tricky, none (with perhaps the only exception being the first) are very clearly 'transient' and could just as well indicate issues that will not resolved by simply waiting and retrying.

I think there are generally two distinct cases:

(1) There is an open/healthy connection to the neo4j cluster. However, the cluster is not in a valid state, e.g. it is currently electing a new leader. Any queries issued against the cluster whilst it being in this invalid state will cause 'transient' errors that can be mitigated by the retry logic as implemented above.

It is, however, not clear which concrete Exception the driver raises but the neobolt codebase has a few good candidates of which Neo.ClientError.Cluster.NotALeader seems very likely to indicate the above mentioned scenario, next to a few others that are clearly marked as transient.

(2) There are (potentially transient) network issues that prevent the driver to issue queries to the cluster over a previously healthy and open connection. In my experience, the above exceptions tend to occur most in this scenario and I think the solution here is different: not only should we retry issuing the query, but we should also attempt to reinitiate the connection.

@robertlagrant Does this description match your experience?

Copy link
Contributor

@robertlagrant robertlagrant Jan 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right!

I think the neobolt NotALeader exception is a good pick for when the we're now talking to a follower. As a bonus, given how it's named I assume it also will be thrown when the node is in CANDIDATE status, before the cluster forms.

A good example of (2) is that the app has been directed (say) to node 1, which is the leader, and node 1 dies for some reason. Nodes 2 and 3 re-establish themselves as follower and leader, but while node 1 is physically coming back up it may be at a point where it's not even serving network requests, let alone have cluster info. So if the app tries to connect in that time, it almost certainly will get a connection timeout. When it tries to connect to the old node before it's got the cluster info, that's probably the service unavailable exception. I imagine that neobolt's ConnectionExpired and ServiceUnavailable exceptions will cover those cases.

In the case of ConnectionExpired, it could probably stop trying to connect to that server and connect afresh to the globally configured connection string instead, as that'll probably be a load balancer that will route the traffic to an alive node, re-get routing info and all will be well. What do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I hadn't thought about the CANDIDATE stage, but I think you're right!

Your example of scenario 2 seems entirely on point, it's consistent with what we see happen every once in a while.

I agree: connect afresh to the globally configure connection string and it should resolve. I see one possible risk: there might be a small window where the load balancer has not picked up the unavailability of node 1 and so there is a (very) small chance that connecting afresh might end up at node 1 - I'm not sure of the likelihood of this happening (it also depends on health check interval settings and cluster size), but it might be prudent to ensure some retry logic for the reconnect with a configurable MAX_RETRY_SECONDS (which should then be set by the user to a value greater than the health check interval, with a reasonable default - 10 seconds?).

Copy link
Contributor

@robertlagrant robertlagrant Jan 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think that's a good move. I'd probably suggest that if they choose to set it to 0 that the retry will be disabled, in case some people want to implement such things in infrastructure instead. I can't think of an obvious example of this, but I imagine if people have a cluster-aware load balancer then they'd want to disable it to eliminate unnecessary moving parts.

But yeah, nice catch. Hadn't thought of that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robertlagrant is there any chance you can try this to see if it resolves the issues?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# TODO: Exception is too broad, we
# should limit to only high-level,
# transient exceptions

# add a random jitter between
# 0 and 1 second to mitigate
# risk of all clients reconnecting
# at the same time in case of
# (transient) connectivity issues
jitter = random.uniform(0.0, 1.0)

# wait before retrying in hopes
# that the transient error has passed
# by now
time.sleep(delay + jitter)

# wait exponentially longer after
# each failure
delay = delay * 2

last_exception = e

end = time.clock()

if resolve_objects:
# Do any automatic resolution required
results = self._object_resolution(results)

except CypherError as ce:
if ce.code == u'Neo.ClientError.Schema.ConstraintValidationFailed':
if 'already exists with label' in ce.message and handle_unique:
if ce.code == u"Neo.ClientError.Schema.ConstraintValidationFailed":
if "already exists with label" in ce.message and handle_unique:
raise UniqueProperty(ce.message)

raise ConstraintValidationFailed(ce.message)
Expand All @@ -216,18 +283,26 @@ def cypher_query(self, query, params=None, handle_unique=True, retry_on_session_
except SessionError:
if retry_on_session_expire:
self.set_connection(self.url)
return self.cypher_query(query=query,
params=params,
handle_unique=handle_unique,
retry_on_session_expire=False)
return self.cypher_query(
query=query,
params=params,
handle_unique=handle_unique,
retry_on_session_expire=False,
)
raise

if os.environ.get('NEOMODEL_CYPHER_DEBUG', False):
logger.debug("query: " + query + "\nparams: " + repr(params) + "\ntook: %.2gs\n" % (end - start))
if os.environ.get("NEOMODEL_CYPHER_DEBUG", False):
logger.debug(
"query: "
+ query
+ "\nparams: "
+ repr(params)
+ "\ntook: %.2gs\n" % (end - start)
)

return results, meta


class TransactionProxy(object):
def __init__(self, db, access_mode=None):
self.db = db
Expand All @@ -243,12 +318,12 @@ def __exit__(self, exc_type, exc_value, traceback):
self.db.rollback()

if exc_type is CypherError:
if exc_value.code == u'Neo.ClientError.Schema.ConstraintValidationFailed':
if exc_value.code == u"Neo.ClientError.Schema.ConstraintValidationFailed":
raise UniqueProperty(exc_value.message)

if not exc_value:
self.db.commit()
self.db.commit()

def __call__(self, func):
def wrapper(*args, **kwargs):
with self:
Expand Down Expand Up @@ -285,7 +360,7 @@ def __get__(self, obj, type=None):
# Just used for error messages
class _UnsavedNode(object):
def __repr__(self):
return '<unsaved node>'
return "<unsaved node>"

def __str__(self):
return self.__repr__()
Expand All @@ -294,7 +369,7 @@ def __str__(self):
def _get_node_properties(node):
"""Get the properties from a neo4j.v1.types.graph.Node object."""
# 1.6.x and newer have it as `_properties`
if hasattr(node, '_properties'):
if hasattr(node, "_properties"):
return node._properties
# 1.5.x and older have it as `properties`
else:
Expand Down