Skip to content

Commit

Permalink
resolve issue 364 (#452)
Browse files Browse the repository at this point in the history
* attempt to fix issue 364

* enforce one cursor per connection

* PEP

* add new test

* bump version

* revert
  • Loading branch information
thehesiod authored and vir-mir committed Oct 16, 2018
1 parent 52876e3 commit 8cea23e
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ htmlcov
docs/_build
.tox
.cache
.pytest_cache
.pytest_cache
28 changes: 27 additions & 1 deletion aiopg/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,11 @@ def __init__(self, dsn, loop, timeout, waiter, echo, **kwargs):
self._cancelling = False
self._cancellation_waiter = None
self._echo = echo
self._conn_cursor = None
self._notifies = asyncio.Queue(loop=loop)
self._weakref = weakref.ref(self)
self._loop.add_reader(self._fileno, self._ready, self._weakref)

if loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))

Expand Down Expand Up @@ -264,13 +266,29 @@ def cursor(self, name=None, cursor_factory=None,
*name*, *scrollable* and *withhold* parameters are not supported by
psycopg in asynchronous mode.
NOTE: as of [TODO] any previously created created cursor from this
connection will be closed
"""
self.close_cursor()

self._last_usage = self._loop.time()
coro = self._cursor(name=name, cursor_factory=cursor_factory,
scrollable=scrollable, withhold=withhold,
timeout=timeout)
return _ContextManager(coro)

def cursor_created(self, cursor):
if self._conn_cursor and not self._conn_cursor.closed:
raise Exception("You can only have one cursor per connection")

self._conn_cursor = cursor

def cursor_closed(self, cursor):
if cursor != self._conn_cursor:
raise Exception("You can only have one cursor per connection")

self._conn_cursor = None

@asyncio.coroutine
def _cursor(self, name=None, cursor_factory=None,
scrollable=None, withhold=False, timeout=None):
Expand All @@ -281,7 +299,8 @@ def _cursor(self, name=None, cursor_factory=None,
cursor_factory=cursor_factory,
scrollable=scrollable,
withhold=withhold)
return Cursor(self, impl, timeout, self._echo)
cursor = Cursor(self, impl, timeout, self._echo)
return cursor

@asyncio.coroutine
def _cursor_impl(self, name=None, cursor_factory=None,
Expand All @@ -303,7 +322,10 @@ def _close(self):
if self._writing:
self._writing = False
self._loop.remove_writer(self._fileno)

self.close_cursor()
self._conn.close()

if self._waiter is not None and not self._waiter.done():
self._waiter.set_exception(
psycopg2.OperationalError("Connection closed"))
Expand All @@ -314,6 +336,10 @@ def close(self):
ret.set_result(None)
return ret

def close_cursor(self):
if self._conn_cursor:
self._conn_cursor.close()

@property
def closed(self):
"""Connection status.
Expand Down
6 changes: 5 additions & 1 deletion aiopg/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ def __init__(self, conn, impl, timeout, echo):
self._echo = echo
self._transaction = Transaction(self, IsolationLevel.repeatable_read)

conn.cursor_created(self)

@property
def echo(self):
"""Return echo mode status."""
Expand Down Expand Up @@ -48,7 +50,9 @@ def description(self):

def close(self):
"""Close the cursor now."""
self._impl.close()
if not self.closed:
self._impl.close()
self._conn.cursor_closed(self)

@property
def closed(self):
Expand Down
1 change: 1 addition & 0 deletions aiopg/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ def release(self, conn):
if self._closing:
conn.close()
else:
conn.close_cursor() # there may be weak-refs to these cursors
self._free.append(conn)
fut = ensure_future(self._wakeup(), loop=self._loop)
return fut
Expand Down
4 changes: 1 addition & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@


install_requires = ['psycopg2>=2.7.0']
extras_require = {'sa': ['sqlalchemy>=1.1']}

PY_VER = sys.version_info

Expand All @@ -16,9 +17,6 @@ def read(f):
return open(os.path.join(os.path.dirname(__file__), f)).read().strip()


extras_require = {'sa': ['sqlalchemy>=1.1'], }


def read_version():
regexp = re.compile(r"^__version__\W*=\W*'([\d.abrc]+)'")
init_py = os.path.join(os.path.dirname(__file__), 'aiopg', '__init__.py')
Expand Down
9 changes: 9 additions & 0 deletions tests/pep492/test_async_await.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ async def test_cursor_create_with_context_manager(make_connection):
assert cursor.closed


@asyncio.coroutine
async def test_two_cursor_create_with_context_manager(make_connection):
conn = await make_connection()

async with conn.cursor() as cursor1, conn.cursor() as cursor2:
assert cursor1.closed
assert not cursor2.closed


@asyncio.coroutine
async def test_pool_context_manager_timeout(pg_params, loop):
async with aiopg.create_pool(loop=loop, **pg_params, minsize=1,
Expand Down
22 changes: 14 additions & 8 deletions tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from aiopg.connection import Connection, TIMEOUT
from aiopg.cursor import Cursor
from aiopg.utils import ensure_future, create_future
from aiopg.utils import ensure_future
from unittest import mock


Expand Down Expand Up @@ -332,6 +332,7 @@ def inner(future, cursor):
fut = asyncio.Future(loop=loop)
conn = yield from connect()
cur = yield from conn.cursor()

task = ensure_future(inner(fut, cur), loop=loop)
yield from fut
yield from asyncio.sleep(0.1, loop=loop)
Expand All @@ -345,7 +346,7 @@ def inner(future, cursor):
else:
assert False, "Connection did not start cancelling"

cur = yield from conn.cursor()
# cur = yield from conn.cursor()
with pytest.raises(RuntimeError) as e:
yield from cur.execute('SELECT 1')
assert str(e.value) == ('cursor.execute() called while connection '
Expand Down Expand Up @@ -480,15 +481,17 @@ def go():
def test_execute_twice(connect):
conn = yield from connect()
cur1 = yield from conn.cursor()
cur2 = yield from conn.cursor()
# cur2 = yield from conn.cursor()
coro1 = cur1.execute('SELECT 1')
fut1 = next(coro1)
assert isinstance(fut1, asyncio.Future)
coro2 = cur2.execute('SELECT 2')
coro2 = cur1.execute('SELECT 2')

with pytest.raises(RuntimeError):
next(coro2)

yield from conn.cancel()


@asyncio.coroutine
def test_connect_to_unsupported_port(unused_port, loop, pg_params):
Expand All @@ -512,20 +515,23 @@ def test_binary_protocol_error(connect):

@asyncio.coroutine
def test_closing_in_separate_task(connect, loop):
event = create_future(loop)
closed_event = asyncio.Event(loop=loop)
exec_created = asyncio.Event(loop=loop)

@asyncio.coroutine
def waiter(conn):
cur = yield from conn.cursor()
fut = cur.execute("SELECT pg_sleep(1000)")
event.set_result(None)
with pytest.raises(psycopg2.OperationalError):
exec_created.set()
yield from closed_event.wait()
with pytest.raises(psycopg2.InterfaceError):
yield from fut

@asyncio.coroutine
def closer(conn):
yield from event
yield from exec_created.wait()
yield from conn.close()
closed_event.set()

conn = yield from connect()
yield from asyncio.gather(waiter(conn), closer(conn),
Expand Down
1 change: 1 addition & 0 deletions tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ def sleep(conn):
yield from sleep(conn)

assert 1 == pool.freesize

with (yield from pool) as conn:
cur = yield from conn.cursor()
yield from cur.execute('SELECT 1;')
Expand Down

0 comments on commit 8cea23e

Please sign in to comment.