diff --git a/airflow/jobs/triggerer_job.py b/airflow/jobs/triggerer_job.py index 1c8642986cb81..b04faf37b275b 100644 --- a/airflow/jobs/triggerer_job.py +++ b/airflow/jobs/triggerer_job.py @@ -205,7 +205,7 @@ class TriggerRunner(threading.Thread, LoggingMixin): to_create: Deque[Tuple[int, BaseTrigger]] # Inbound queue of deleted triggers - to_delete: Deque[int] + to_cancel: Deque[int] # Outbound queue of events events: Deque[Tuple[int, TriggerEvent]] @@ -221,7 +221,7 @@ def __init__(self): self.triggers = {} self.trigger_cache = {} self.to_create = deque() - self.to_delete = deque() + self.to_cancel = deque() self.events = deque() self.failed_triggers = deque() @@ -241,7 +241,7 @@ async def arun(self): while not self.stop: # Run core logic await self.create_triggers() - await self.delete_triggers() + await self.cancel_triggers() await self.cleanup_finished_triggers() # Sleep for a bit await asyncio.sleep(1) @@ -269,13 +269,13 @@ async def create_triggers(self): self.log.warning("Trigger %s had insertion attempted twice", trigger_id) await asyncio.sleep(0) - async def delete_triggers(self): + async def cancel_triggers(self): """ - Drain the to_delete queue and ensure all triggers that are not in the + Drain the to_cancel queue and ensure all triggers that are not in the DB are cancelled, so the cleanup job deletes them. """ - while self.to_delete: - trigger_id = self.to_delete.popleft() + while self.to_cancel: + trigger_id = self.to_cancel.popleft() if trigger_id in self.triggers: # We only delete if it did not exit already self.triggers[trigger_id]["task"].cancel() @@ -383,7 +383,7 @@ def update_triggers(self, requested_trigger_ids: Set[int]): current_trigger_ids = set(self.triggers.keys()) # Work out the two difference sets new_trigger_ids = requested_trigger_ids.difference(current_trigger_ids) - old_trigger_ids = current_trigger_ids.difference(requested_trigger_ids) + cancel_trigger_ids = current_trigger_ids.difference(requested_trigger_ids) # Bulk-fetch new trigger records new_triggers = Trigger.bulk_fetch(new_trigger_ids) # Add in new triggers @@ -400,9 +400,9 @@ def update_triggers(self, requested_trigger_ids: Set[int]): self.failed_triggers.append(new_id) continue self.to_create.append((new_id, trigger_class(**new_triggers[new_id].kwargs))) - # Remove old triggers - for old_id in old_trigger_ids: - self.to_delete.append(old_id) + # Enqueue orphaned triggers for cancellation + for old_id in cancel_trigger_ids: + self.to_cancel.append(old_id) def get_trigger_by_classpath(self, classpath: str) -> Type[BaseTrigger]: """