Skip to content

Commit

Permalink
Fix race in Celery tests by pre-creating result tables (#8909)
Browse files Browse the repository at this point in the history
We noticed our Celery tests failing sometimes with

> (psycopg2.errors.UniqueViolation) duplicate key value violates unique
> constraint "pg_type_typname_nsp_index"
> DETAIL:  Key (typname, typnamespace)=(celery_tasksetmeta, 2200) already exists

It appears this is a race condition in SQLAlchemy's "create_all()"
function, where it first checks which tables exist, builds up a list of
`CREATE TABLE` statements, then issues them. Thus if two celery worker
processes start at the same time, they will find the the table doesn't
yet exist, and both try to create it.

This is _probably_ a bug in SQLA, but this should be an easy enough fix
here, to just ensure that the table exists before launching any Celery tasks.

(cherry picked from commit bae5cc2)
  • Loading branch information
ashb authored and kaxil committed Jul 1, 2020
1 parent b2a4032 commit 64db6e6
Showing 1 changed file with 12 additions and 0 deletions.
12 changes: 12 additions & 0 deletions tests/executors/test_celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ def _prepare_app(self, broker_url=None, execute=None):
patch_app = mock.patch('airflow.executors.celery_executor.app', test_app)
patch_execute = mock.patch('airflow.executors.celery_executor.execute_command', test_execute)

backend = test_app.backend

if hasattr(backend, 'ResultSession'):
# Pre-create the database tables now, otherwise SQLA vis Celery has a
# race condition where it one of the subprocesses can die with "Table
# already exists" error, because SQLA checks for which tables exist,
# then issues a CREATE TABLE, rather than doing CREATE TABLE IF NOT
# EXISTS
session = backend.ResultSession()
session.close()

with patch_app, patch_execute:
try:
yield test_app
Expand Down Expand Up @@ -140,6 +151,7 @@ def fake_execute_command():
self.assertEquals(1, len(executor.queued_tasks))
self.assertEquals(executor.queued_tasks['key'], value_tuple)

@pytest.mark.backend("mysql", "postgres")
def test_exception_propagation(self):
with self._prepare_app() as app:
@app.task
Expand Down

0 comments on commit 64db6e6

Please sign in to comment.