Skip to content

Commit

Permalink
[AIRFLOW-3060] DAG context manager fails to exit properly in certain …
Browse files Browse the repository at this point in the history
…circumstances
  • Loading branch information
newtonle authored and Alice Berard committed Jan 3, 2019
1 parent d721bd9 commit 5a8bf85
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 2 deletions.
9 changes: 7 additions & 2 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3292,6 +3292,8 @@ def __init__(
self.on_success_callback = on_success_callback
self.on_failure_callback = on_failure_callback

self._context_manager_set = False

self._comps = {
'dag_id',
'task_ids',
Expand Down Expand Up @@ -3339,13 +3341,16 @@ def __hash__(self):

def __enter__(self):
global _CONTEXT_MANAGER_DAG
self._old_context_manager_dag = _CONTEXT_MANAGER_DAG
_CONTEXT_MANAGER_DAG = self
if not self._context_manager_set:
self._old_context_manager_dag = _CONTEXT_MANAGER_DAG
_CONTEXT_MANAGER_DAG = self
self._context_manager_set = True
return self

def __exit__(self, _type, _value, _tb):
global _CONTEXT_MANAGER_DAG
_CONTEXT_MANAGER_DAG = self._old_context_manager_dag
self._context_manager_set = False

# /Context Manager ----------------------------------------------

Expand Down
9 changes: 9 additions & 0 deletions tests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ def test_dag_as_context_manager(self):
self.assertEqual(dag.dag_id, 'creating_dag_in_cm')
self.assertEqual(dag.tasks[0].task_id, 'op6')

with dag:
with dag:
op7 = DummyOperator(task_id='op7')
op8 = DummyOperator(task_id='op8')
op8.dag = dag2

self.assertEqual(op7.dag, dag)
self.assertEqual(op8.dag, dag2)

def test_dag_topological_sort(self):
dag = DAG(
'dag',
Expand Down

0 comments on commit 5a8bf85

Please sign in to comment.