From 64db6e681428d0638fa1bd788846f38abc94a9df Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Tue, 19 May 2020 13:21:44 +0100 Subject: [PATCH] Fix race in Celery tests by pre-creating result tables (#8909) We noticed our Celery tests failing sometimes with > (psycopg2.errors.UniqueViolation) duplicate key value violates unique > constraint "pg_type_typname_nsp_index" > DETAIL: Key (typname, typnamespace)=(celery_tasksetmeta, 2200) already exists It appears this is a race condition in SQLAlchemy's "create_all()" function, where it first checks which tables exist, builds up a list of `CREATE TABLE` statements, then issues them. Thus if two celery worker processes start at the same time, they will find the the table doesn't yet exist, and both try to create it. This is _probably_ a bug in SQLA, but this should be an easy enough fix here, to just ensure that the table exists before launching any Celery tasks. (cherry picked from commit bae5cc2f5ca32e0f61c3b92008fbd484184448ef) --- tests/executors/test_celery_executor.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py index dcf9910e55999..d9a15c751d6f4 100644 --- a/tests/executors/test_celery_executor.py +++ b/tests/executors/test_celery_executor.py @@ -61,6 +61,17 @@ def _prepare_app(self, broker_url=None, execute=None): patch_app = mock.patch('airflow.executors.celery_executor.app', test_app) patch_execute = mock.patch('airflow.executors.celery_executor.execute_command', test_execute) + backend = test_app.backend + + if hasattr(backend, 'ResultSession'): + # Pre-create the database tables now, otherwise SQLA vis Celery has a + # race condition where it one of the subprocesses can die with "Table + # already exists" error, because SQLA checks for which tables exist, + # then issues a CREATE TABLE, rather than doing CREATE TABLE IF NOT + # EXISTS + session = backend.ResultSession() + session.close() + with patch_app, patch_execute: try: yield test_app @@ -140,6 +151,7 @@ def fake_execute_command(): self.assertEquals(1, len(executor.queued_tasks)) self.assertEquals(executor.queued_tasks['key'], value_tuple) + @pytest.mark.backend("mysql", "postgres") def test_exception_propagation(self): with self._prepare_app() as app: @app.task