Skip to content

Commit

Permalink
Merge pull request #11591 from adviti-mishra/release-v0.16.x
Browse files Browse the repository at this point in the history
Fixed intermittent KeyError when handling a finished future in the task worker #11350
  • Loading branch information
rtibbles authored Jan 17, 2024
2 parents f24fdf3 + 6053e07 commit 9d9166e
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 10 deletions.
58 changes: 58 additions & 0 deletions kolibri/core/tasks/test/taskrunner/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
import threading
import time

import pytest
Expand Down Expand Up @@ -50,6 +51,63 @@ def worker():
b.shutdown()


def test_keyerror_prevention(worker):
# Create a job with the same ID as the one in worker.enqueue_job_runs_job
job = Job(id, args=(9,))
worker.storage.enqueue_job(job, QUEUE)

# Simulate a race condition by having another thread try to delete the future
# while the job is running
def delete_future():
time.sleep(0.5) # Wait for the job to start
del worker.future_job_mapping[job.job_id]

# Start the delete_future thread
delete_thread = threading.Thread(target=delete_future)
delete_thread.start()

while job.state != "COMPLETED":
job = worker.storage.get_job(job.job_id)
time.sleep(0.1)

assert job.state == "COMPLETED"


def test_keyerror_prevention_multiple_jobs(worker):
# Create multiple jobs with the same ID to trigger the race condition
job1 = Job(id, args=(9,))
job2 = Job(id, args=(9,))

# Enqueue the first job
worker.storage.enqueue_job(job1, QUEUE)

# Simulate a race condition by having another thread try to delete the future
# while the first job is running
def delete_future():
time.sleep(0.5) # Wait for the first job to start
del worker.future_job_mapping[job1.job_id]

# Start the delete_future thread
delete_thread = threading.Thread(target=delete_future)
delete_thread.start()

# Enqueue the second job
worker.storage.enqueue_job(job2, QUEUE)

while job1.state != "COMPLETED":
job1 = worker.storage.get_job(job1.job_id)
time.sleep(0.1)

assert job1.state == "COMPLETED"

# Wait for the second job to complete
while job2.state != "COMPLETED":
job2 = worker.storage.get_job(job2.job_id)
time.sleep(0.1)

assert job2.state == "COMPLETED"


@pytest.mark.django_db
class TestWorker:
def test_enqueue_job_runs_job(self, worker):
Expand Down
28 changes: 18 additions & 10 deletions kolibri/core/tasks/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,19 @@ def start_workers(self):
return pool

def handle_finished_future(self, future):
# get back the job assigned to the future
job = self.job_future_mapping[future]

# Clean up tracking of this job and its future
del self.job_future_mapping[future]
del self.future_job_mapping[job.job_id]

try:
future.result()
except CancelledError:
self.storage.mark_job_as_canceled(job.job_id)
# get back the job assigned to the future
job = self.job_future_mapping[future]
# Clean up tracking of this job and its future
del self.job_future_mapping[future]
del self.future_job_mapping[job.job_id]

try:
future.result()
except CancelledError:
self.storage.mark_job_as_canceled(job.job_id)
except KeyError:
pass

def shutdown(self, wait=True):
logger.info("Asking job schedulers to shut down.")
Expand Down Expand Up @@ -198,6 +200,12 @@ def start_next_job(self, job):
job_id=job.job_id,
)

# Check if the job ID already exists in the future_job_mapping dictionary
if job.job_id in self.future_job_mapping:
logger.warn(
"Job id {} is already in future_job_mapping.".format(job.job_id)
)

# assign the futures to a dict, mapping them to a job
self.job_future_mapping[future] = job
self.future_job_mapping[job.job_id] = future
Expand Down

0 comments on commit 9d9166e

Please sign in to comment.