Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

_query_lock implementation causes "Future attached to a different loop" error when gather() is used with global Database(force_rollback=True) instance in app tests #157

Closed
rafalp opened this issue Nov 3, 2019 · 1 comment

Comments

@rafalp
Copy link
Member

rafalp commented Nov 3, 2019

This is tangentially related to #125

When writing tests for my project I've found that following Starlette's database usage docs and Databases tests docs in app results in setup that makes it impossible to test application logic that uses gather to initialize multiple queries at single time.

Reproduction

Starlette's database example directs developers to create single Database instance for their application:

database = databases.Database(DATABASE_URL)

app = Starlette()

app.add_event_handler("startup", database.connect)
app.add_event_handler("shutdown", database.disconnect)

However in tests you would like to enforce some sort of isolation between tests. Easy way to achieve this is to run all tests in isolation. Hence force_rollback flag on Database. If developer updates their implementation, they will end with something like this:

if TESTING:
    database = Database(TEST_DATABASE_URL, force_rollback=True)
else:
    database = Database(DATABASE_URL)

app = Starlette()

app.add_event_handler("startup", database.connect)
app.add_event_handler("shutdown", database.disconnect)

This code will run fine until we try to use gather somewhere in the logic and write a test for it:

from asyncio import gather

from .database import database


async def logic_using_gather():
    return await gather(
        database.fetch_one("SELECT pg_sleep(1)"),
        database.fetch_one("SELECT pg_sleep(1)"),
    )


@pytest.fixture(scope="function")
async def db():
    async with database:
        yield


@pytest.mark.asyncio
async def test_my_logic(db):
    await logic_using_gather()

We will get an error:

RuntimeError: Task <Task pending coro=<Database.execute() running at /usr/local/lib/python3.7/site-packages/databases/core.py:122> cb=[gather.<locals>._done_callback() at /usr/local/lib/python3.7/asyncio/tasks.py:691]> got Future <Future pending> attached to a different loop

The isssue

When Database is initialized with force_rollback=True, it internally creates single "global connection" object which in turn internally creates asyncio.Lock instance for itself.

asyncio.Lock can be initialized with loop argument, but if none is provided, it will try to get current running event loop, or create new event loop is none exists. The former is the case when app is running behind one of ASGI servers that provide event loop, while latter is the case in the test setups.

Error is result of test-runner provided event loop running task which itself tries to await asyncio.Lock() that was initialized with its own event loop. This is not the case when queries are ran one after another, because first query will release the lock before next one is sent to event loop for execution:

await database.fetch_one("SELECT pg_sleep(1)")
await database.fetch_one("SELECT pg_sleep(1)")

However when gather is used, both tasks are sent to event loop at same time, and because _query_lock._locked is already set to True, await _query_lock will cause asyncio.Lock to create and send waiting task to event loop. This task will be sent to different event loop than one running rest of the logic, producing the resulting error.

Reduced example

Reaching back to the discussion in #125, its pointed out that there's a test in databases for using gather to await multiple queries, but this test checks unrealistic use of Database together with gather:

@pytest.mark.parametrize("database_url", DATABASE_URLS)
@async_adapter
async def test_concurrent_access_on_single_connection(database_url):
    async with Database(database_url, force_rollback=True) as database:

        async def db_lookup():
            if str(database_url).startswith("postgresql"):
                await database.fetch_one("SELECT pg_sleep(1)")

        await asyncio.gather(db_lookup(), db_lookup())

Whereas more real (and failing) usage would look like this:

@pytest.mark.parametrize("database_url", DATABASE_URLS)
def test_concurrent_access_on_single_connection(database_url):
    database = Database(database_url, force_rollback=True)

    @async_adapter
    async def run_db_lookups():
        async with database:
            async def db_lookup():
                if str(database_url).startswith("postgresql"):
                    await database.fetch_one("SELECT pg_sleep(1)")

            await asyncio.gather(db_lookup(), db_lookup())
    
    run_db_lookups()

Above test will fail with same error because now database and async_adapter will use two different event loops for their logic.

@rafalp
Copy link
Member Author

rafalp commented Nov 3, 2019

Initially I was contemplating if the solution to this would be loop option on database.connect, but now I am thinking that there shouldn't be any global connection/transaction object that gets initialized at the same time as Database does. Instead force_rollback flag should be kept on Database and passed on to any connection initialized by such object. This connection could then use this flag to know that all queries executed from it should be wrapped in transaction that should rollback when connection is closed.

I've actually moved global connection initialization from Database.__init__ to Database.connect and tests for logic using gather started passing again ;)

I also can't help but feel that using _query_lock somewhat defeats the point of using async to speed up IO for single client (eg. in GraphQL APIs). I understand that it keeps API simple while preventing possible race conditions in transactions, but I am thinking that in this case the Transaction itself should support implement execute (and friends), requiring developers to use it explicitly to run DB queries:

async with database.transaction() as transaction:
    await user = transaction.fetch_one(...)
    transaction.execute(...)

...but I think I am getting too fixated on how the library behaves in tests vs how it will behave in real-world when there will be multiple connections running single client's queries in parallel. ;)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants