Skip to content

Commit

Permalink
Rename to_delete to to_cancel in TriggerRunner (#20658)
Browse files Browse the repository at this point in the history
The queue's purpose is to track triggers that need to be canceled. The language `to_delete` was a bit confusing because for one it does not actually delete them but cancel them.  The deletion work is actually in `cleanup_finished_triggers`.  It seems that this method will usually not do anything and it's only for cancelling triggers that are currently running but for whatever reason no longer should be.  E.g. when a task is killed and therefore the trigger is no longer needed, or some multi-triggerer scenarios.  So putting cancel in the name also highlights that this is about stopping running triggers, not e.g. purging completed ones.
  • Loading branch information
dstandish authored Jan 5, 2022
1 parent 05b9f3d commit c20ad79
Showing 1 changed file with 11 additions and 11 deletions.
22 changes: 11 additions & 11 deletions airflow/jobs/triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
to_create: Deque[Tuple[int, BaseTrigger]]

# Inbound queue of deleted triggers
to_delete: Deque[int]
to_cancel: Deque[int]

# Outbound queue of events
events: Deque[Tuple[int, TriggerEvent]]
Expand All @@ -221,7 +221,7 @@ def __init__(self):
self.triggers = {}
self.trigger_cache = {}
self.to_create = deque()
self.to_delete = deque()
self.to_cancel = deque()
self.events = deque()
self.failed_triggers = deque()

Expand All @@ -241,7 +241,7 @@ async def arun(self):
while not self.stop:
# Run core logic
await self.create_triggers()
await self.delete_triggers()
await self.cancel_triggers()
await self.cleanup_finished_triggers()
# Sleep for a bit
await asyncio.sleep(1)
Expand Down Expand Up @@ -269,13 +269,13 @@ async def create_triggers(self):
self.log.warning("Trigger %s had insertion attempted twice", trigger_id)
await asyncio.sleep(0)

async def delete_triggers(self):
async def cancel_triggers(self):
"""
Drain the to_delete queue and ensure all triggers that are not in the
Drain the to_cancel queue and ensure all triggers that are not in the
DB are cancelled, so the cleanup job deletes them.
"""
while self.to_delete:
trigger_id = self.to_delete.popleft()
while self.to_cancel:
trigger_id = self.to_cancel.popleft()
if trigger_id in self.triggers:
# We only delete if it did not exit already
self.triggers[trigger_id]["task"].cancel()
Expand Down Expand Up @@ -383,7 +383,7 @@ def update_triggers(self, requested_trigger_ids: Set[int]):
current_trigger_ids = set(self.triggers.keys())
# Work out the two difference sets
new_trigger_ids = requested_trigger_ids.difference(current_trigger_ids)
old_trigger_ids = current_trigger_ids.difference(requested_trigger_ids)
cancel_trigger_ids = current_trigger_ids.difference(requested_trigger_ids)
# Bulk-fetch new trigger records
new_triggers = Trigger.bulk_fetch(new_trigger_ids)
# Add in new triggers
Expand All @@ -400,9 +400,9 @@ def update_triggers(self, requested_trigger_ids: Set[int]):
self.failed_triggers.append(new_id)
continue
self.to_create.append((new_id, trigger_class(**new_triggers[new_id].kwargs)))
# Remove old triggers
for old_id in old_trigger_ids:
self.to_delete.append(old_id)
# Enqueue orphaned triggers for cancellation
for old_id in cancel_trigger_ids:
self.to_cancel.append(old_id)

def get_trigger_by_classpath(self, classpath: str) -> Type[BaseTrigger]:
"""
Expand Down

0 comments on commit c20ad79

Please sign in to comment.