diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py index 2890544cd4484..58adf96118fdb 100644 --- a/tests/executors/test_celery_executor.py +++ b/tests/executors/test_celery_executor.py @@ -75,6 +75,17 @@ def _prepare_app(broker_url=None, execute=None): patch_app = mock.patch('airflow.executors.celery_executor.app', test_app) patch_execute = mock.patch('airflow.executors.celery_executor.execute_command', test_execute) + backend = test_app.backend + + if hasattr(backend, 'ResultSession'): + # Pre-create the database tables now, otherwise SQLA vis Celery has a + # race condition where it one of the subprocesses can die with "Table + # already exists" error, because SQLA checks for which tables exist, + # then issues a CREATE TABLE, rather than doing CREATE TABLE IF NOT + # EXISTS + session = backend.ResultSession() + session.close() + with patch_app, patch_execute: try: yield test_app @@ -165,6 +176,7 @@ def fake_execute_command(): self.assertEqual(1, len(executor.queued_tasks)) self.assertEqual(executor.queued_tasks[key], value_tuple) + @pytest.mark.backend("mysql", "postgres") def test_exception_propagation(self): with _prepare_app(), self.assertLogs(celery_executor.log) as cm: