Skip to content

Commit

Permalink
[EWT-499] Airflow Upgrade to 1.10.12 [CP] from 1.10.4+twtr : 6162402
Browse files Browse the repository at this point in the history
[TWTTR] Don't enqueue tasks again if already queued for K8sExecutor(twitter-forks#60)

Basically reverting commit 87fcc1c  and making changes specifically into the Celery Executor class only.
  • Loading branch information
msumit authored and Ayush Sethi committed Nov 9, 2020
1 parent 335f2c7 commit 57481cf
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 2 deletions.
4 changes: 2 additions & 2 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ def queue_command(self, simple_task_instance, command, priority=1, queue=None):
key = simple_task_instance.key
if key not in self.queued_tasks and key not in self.running:
self.log.info("Adding to queue: %s", command)
self.queued_tasks[key] = (command, priority, queue, simple_task_instance)
else:
self.log.info("Adding to queue even though already queued or running {}".format(command, key))
self.queued_tasks[key] = (command, priority, queue, simple_task_instance)
self.log.error("could not queue task %s", key)

def queue_task_instance(
self,
Expand Down
8 changes: 8 additions & 0 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,14 @@ def start(self):
self._sync_parallelism
)

def queue_command(self, simple_task_instance, command, priority=1, queue=None):
key = simple_task_instance.key
if key not in self.queued_tasks and key not in self.running:
self.log.info("Adding to queue: %s", command)
else:
self.log.info("Adding to queue even though already queued or running {}".format(command, key))
self.queued_tasks[key] = (command, priority, queue, simple_task_instance)

def _num_tasks_per_send_process(self, to_send_count):
"""
How many Celery tasks should each worker process send.
Expand Down
1 change: 1 addition & 0 deletions airflow/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@

version = '1.10.12'


0 comments on commit 57481cf

Please sign in to comment.