From c33882ec4a1b72ad4045e625ed221de868dc584b Mon Sep 17 00:00:00 2001 From: adviti Date: Mon, 4 Dec 2023 22:17:03 -0500 Subject: [PATCH 1/7] Attempted to fix race condition through using locks. Added 2 test cases to stimulate race condition in worker.py --- .../core/tasks/test/taskrunner/test_worker.py | 55 +++++++++++++++++++ kolibri/core/tasks/worker.py | 46 +++++++++++----- 2 files changed, 88 insertions(+), 13 deletions(-) diff --git a/kolibri/core/tasks/test/taskrunner/test_worker.py b/kolibri/core/tasks/test/taskrunner/test_worker.py index a6e8e7170d9..201757a51f6 100644 --- a/kolibri/core/tasks/test/taskrunner/test_worker.py +++ b/kolibri/core/tasks/test/taskrunner/test_worker.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +import threading import time import pytest @@ -49,6 +50,60 @@ def worker(): b.storage.clear(force=True) b.shutdown() +def test_keyerror_prevention(worker): + # Create a job with the same ID as the one in worker.enqueue_job_runs_job + job = Job(id, args=(9,)) + worker.storage.enqueue_job(job, QUEUE) + + # Simulate a race condition by having another thread try to delete the future + # while the job is running + def delete_future(): + time.sleep(0.5) # Wait for the job to start + del worker.future_job_mapping[job.job_id] + + # Start the delete_future thread + delete_thread = threading.Thread(target=delete_future) + delete_thread.start() + + while job.state != "COMPLETED": + job = worker.storage.get_job(job.job_id) + time.sleep(0.1) + + assert job.state == "COMPLETED" + +def test_keyerror_prevention_multiple_jobs(worker): + # Create multiple jobs with the same ID to trigger the race condition + job1 = Job(id, args=(9,)) + job2 = Job(id, args=(9,)) + + # Enqueue the first job + worker.storage.enqueue_job(job1, QUEUE) + + # Simulate a race condition by having another thread try to delete the future + # while the first job is running + def delete_future(): + time.sleep(0.5) # Wait for the first job to start + del worker.future_job_mapping[job1.job_id] + + # Start the delete_future thread + delete_thread = threading.Thread(target=delete_future) + delete_thread.start() + + # Enqueue the second job + worker.storage.enqueue_job(job2, QUEUE) + + while job1.state != "COMPLETED": + job1 = worker.storage.get_job(job1.job_id) + time.sleep(0.1) + + assert job1.state == "COMPLETED" + + # Wait for the second job to complete + while job2.state != "COMPLETED": + job2 = worker.storage.get_job(job2.job_id) + time.sleep(0.1) + + assert job2.state == "COMPLETED" @pytest.mark.django_db class TestWorker: diff --git a/kolibri/core/tasks/worker.py b/kolibri/core/tasks/worker.py index 3d52d5fef8a..be88318f501 100644 --- a/kolibri/core/tasks/worker.py +++ b/kolibri/core/tasks/worker.py @@ -1,3 +1,4 @@ +import threading import logging from concurrent.futures import CancelledError @@ -45,6 +46,10 @@ def __init__(self, connection, regular_workers=2, high_workers=1): # Key: job_id, Value: future object self.future_job_mapping = {} + # Locks to synchronize access to dictionaries + self.job_future_mapping_lock = threading.Lock() + self.future_job_mapping_lock = threading.Lock() + self.storage = Storage(connection) self.requeue_stalled_jobs() @@ -81,16 +86,23 @@ def start_workers(self): def handle_finished_future(self, future): # get back the job assigned to the future - job = self.job_future_mapping[future] - - # Clean up tracking of this job and its future - del self.job_future_mapping[future] - del self.future_job_mapping[job.job_id] - - try: - future.result() - except CancelledError: - self.storage.mark_job_as_canceled(job.job_id) + job = None + + # Acquire locks before accessing dictionaries + with self.job_future_mapping_lock: + with self.future_job_mapping_lock: + if future in self.job_future_mapping: + # get back the job assigned to the future + job = self.job_future_mapping[future] + # Clean up tracking of this job and its future + del self.job_future_mapping[future] + del self.future_job_mapping[job.job_id] + + if job: + try: + future.result() + except CancelledError: + self.storage.mark_job_as_canceled(job.job_id) def shutdown(self, wait=True): logger.info("Asking job schedulers to shut down.") @@ -170,9 +182,17 @@ def start_next_job(self, job): job_id=job.job_id, ) - # assign the futures to a dict, mapping them to a job - self.job_future_mapping[future] = job - self.future_job_mapping[job.job_id] = future + # Acquire locks before modifying dictionaries + with self.job_future_mapping_lock: + with self.future_job_mapping_lock: + # Check if the job ID already exists in the future_job_mapping dictionary + if job.job_id in self.future_job_mapping: + logger.error(f"Job ID {job.job_id} is already in future_job_mapping.") + raise ValueError(f"Duplicate job ID: {job.job_id}") + + # assign the futures to a dict, mapping them to a job + self.job_future_mapping[future] = job + self.future_job_mapping[job.job_id] = future # callback for when the future is now! future.add_done_callback(self.handle_finished_future) From 1c8514b3307a14a2e86990462069038ceb7eba32 Mon Sep 17 00:00:00 2001 From: Adviti Mishra Date: Mon, 4 Dec 2023 22:22:29 -0500 Subject: [PATCH 2/7] Create test_pr_1150_keyerror.yml Workflow for task_runner tests in multiprocessing mode in the macOS environment to stimulate conditions for the KeyError issue --- .github/workflows/test_pr_1150_keyerror.yml | 52 +++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 .github/workflows/test_pr_1150_keyerror.yml diff --git a/.github/workflows/test_pr_1150_keyerror.yml b/.github/workflows/test_pr_1150_keyerror.yml new file mode 100644 index 00000000000..49791e951fa --- /dev/null +++ b/.github/workflows/test_pr_1150_keyerror.yml @@ -0,0 +1,52 @@ +name: KeyError Issue + +on: + push: + branches: + - develop + - 'release-v**' + pull_request: + branches: + - develop + - 'release-v**' + +jobs: + replicate_issue: + runs-on: macos-latest + + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: '3.9' + + - name: Install Node.js and Yarn + run: | + curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.1/install.sh | bash + export NVM_DIR="$HOME/.nvm" + [ -s "$NVM_DIR/nvm.sh" ] && \. "$NVM_DIR/nvm.sh" + nvm install 16.20.0 + npm install -g yarn + + - name: Create and activate virtual environment + run: | + python -m venv venv + source venv/bin/activate + + - name: Install Python dependencies + run: | + pip install -r requirements.txt --upgrade + pip install -r requirements/dev.txt --upgrade + pip install -e . + pip install -r requirements/docs.txt + pip install -r requirements/build.txt + pip install pytest + pip install mock + + - name: Run tests in multiprocessing mode + run: | + export KOLIBRI_USE_WORKER_MULTIPROCESSING=True + pytest kolibri/core/tasks/test/taskrunner/ From a8c41f82a9aab952af3fa095073e3ed751b11ffa Mon Sep 17 00:00:00 2001 From: Adviti Mishra Date: Mon, 4 Dec 2023 22:30:55 -0500 Subject: [PATCH 3/7] Update test_pr_1150_keyerror.yml running specific test cases made to test the fix --- .github/workflows/test_pr_1150_keyerror.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test_pr_1150_keyerror.yml b/.github/workflows/test_pr_1150_keyerror.yml index 49791e951fa..f2e9a89c8cd 100644 --- a/.github/workflows/test_pr_1150_keyerror.yml +++ b/.github/workflows/test_pr_1150_keyerror.yml @@ -49,4 +49,4 @@ jobs: - name: Run tests in multiprocessing mode run: | export KOLIBRI_USE_WORKER_MULTIPROCESSING=True - pytest kolibri/core/tasks/test/taskrunner/ + pytest kolibri/core/tasks/test/taskrunner/test_worker.py From 799e3f65d0a5f2fb521f5b9e5442c46c7f0df1dc Mon Sep 17 00:00:00 2001 From: adviti Date: Tue, 5 Dec 2023 17:16:07 -0500 Subject: [PATCH 4/7] Deleted our GitHub workflow that tests our fix because existing workflows already run all tests --- .github/workflows/test_pr_1150_keyerror.yml | 52 --------------------- 1 file changed, 52 deletions(-) delete mode 100644 .github/workflows/test_pr_1150_keyerror.yml diff --git a/.github/workflows/test_pr_1150_keyerror.yml b/.github/workflows/test_pr_1150_keyerror.yml deleted file mode 100644 index f2e9a89c8cd..00000000000 --- a/.github/workflows/test_pr_1150_keyerror.yml +++ /dev/null @@ -1,52 +0,0 @@ -name: KeyError Issue - -on: - push: - branches: - - develop - - 'release-v**' - pull_request: - branches: - - develop - - 'release-v**' - -jobs: - replicate_issue: - runs-on: macos-latest - - steps: - - name: Checkout code - uses: actions/checkout@v2 - - - name: Set up Python - uses: actions/setup-python@v2 - with: - python-version: '3.9' - - - name: Install Node.js and Yarn - run: | - curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.1/install.sh | bash - export NVM_DIR="$HOME/.nvm" - [ -s "$NVM_DIR/nvm.sh" ] && \. "$NVM_DIR/nvm.sh" - nvm install 16.20.0 - npm install -g yarn - - - name: Create and activate virtual environment - run: | - python -m venv venv - source venv/bin/activate - - - name: Install Python dependencies - run: | - pip install -r requirements.txt --upgrade - pip install -r requirements/dev.txt --upgrade - pip install -e . - pip install -r requirements/docs.txt - pip install -r requirements/build.txt - pip install pytest - pip install mock - - - name: Run tests in multiprocessing mode - run: | - export KOLIBRI_USE_WORKER_MULTIPROCESSING=True - pytest kolibri/core/tasks/test/taskrunner/test_worker.py From 6d34bd5b152bc31ae20dbec1117e81de94e4562a Mon Sep 17 00:00:00 2001 From: adviti Date: Tue, 5 Dec 2023 21:40:06 -0500 Subject: [PATCH 5/7] Made formatted string compatible with Python 2.7 --- kolibri/core/tasks/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kolibri/core/tasks/worker.py b/kolibri/core/tasks/worker.py index be88318f501..b2aa32ab00c 100644 --- a/kolibri/core/tasks/worker.py +++ b/kolibri/core/tasks/worker.py @@ -187,8 +187,8 @@ def start_next_job(self, job): with self.future_job_mapping_lock: # Check if the job ID already exists in the future_job_mapping dictionary if job.job_id in self.future_job_mapping: - logger.error(f"Job ID {job.job_id} is already in future_job_mapping.") - raise ValueError(f"Duplicate job ID: {job.job_id}") + logger.error("Job ID {} is already in future_job_mapping.".format(job.job_id)) + raise ValueError("Duplicate job ID: {}".format(job.job_id)) # assign the futures to a dict, mapping them to a job self.job_future_mapping[future] = job From 13db9c601140c569176ecb5fc94c093d7a328e25 Mon Sep 17 00:00:00 2001 From: Richard Tibbles Date: Tue, 16 Jan 2024 17:33:09 -0800 Subject: [PATCH 6/7] Simplify code to minimize diff. --- kolibri/core/tasks/worker.py | 48 ++++++++++++++---------------------- 1 file changed, 18 insertions(+), 30 deletions(-) diff --git a/kolibri/core/tasks/worker.py b/kolibri/core/tasks/worker.py index b2aa32ab00c..336b814fbfd 100644 --- a/kolibri/core/tasks/worker.py +++ b/kolibri/core/tasks/worker.py @@ -1,4 +1,3 @@ -import threading import logging from concurrent.futures import CancelledError @@ -46,10 +45,6 @@ def __init__(self, connection, regular_workers=2, high_workers=1): # Key: job_id, Value: future object self.future_job_mapping = {} - # Locks to synchronize access to dictionaries - self.job_future_mapping_lock = threading.Lock() - self.future_job_mapping_lock = threading.Lock() - self.storage = Storage(connection) self.requeue_stalled_jobs() @@ -85,24 +80,19 @@ def start_workers(self): return pool def handle_finished_future(self, future): - # get back the job assigned to the future - job = None - - # Acquire locks before accessing dictionaries - with self.job_future_mapping_lock: - with self.future_job_mapping_lock: - if future in self.job_future_mapping: - # get back the job assigned to the future - job = self.job_future_mapping[future] - # Clean up tracking of this job and its future - del self.job_future_mapping[future] - del self.future_job_mapping[job.job_id] - - if job: + try: + # get back the job assigned to the future + job = self.job_future_mapping[future] + # Clean up tracking of this job and its future + del self.job_future_mapping[future] + del self.future_job_mapping[job.job_id] + try: future.result() except CancelledError: self.storage.mark_job_as_canceled(job.job_id) + except KeyError: + pass def shutdown(self, wait=True): logger.info("Asking job schedulers to shut down.") @@ -182,17 +172,15 @@ def start_next_job(self, job): job_id=job.job_id, ) - # Acquire locks before modifying dictionaries - with self.job_future_mapping_lock: - with self.future_job_mapping_lock: - # Check if the job ID already exists in the future_job_mapping dictionary - if job.job_id in self.future_job_mapping: - logger.error("Job ID {} is already in future_job_mapping.".format(job.job_id)) - raise ValueError("Duplicate job ID: {}".format(job.job_id)) - - # assign the futures to a dict, mapping them to a job - self.job_future_mapping[future] = job - self.future_job_mapping[job.job_id] = future + # Check if the job ID already exists in the future_job_mapping dictionary + if job.job_id in self.future_job_mapping: + logger.warn( + "Job id {} is already in future_job_mapping.".format(job.job_id) + ) + + # assign the futures to a dict, mapping them to a job + self.job_future_mapping[future] = job + self.future_job_mapping[job.job_id] = future # callback for when the future is now! future.add_done_callback(self.handle_finished_future) From 6053e075f172519e0811dd4881d78ccac2512fd4 Mon Sep 17 00:00:00 2001 From: Richard Tibbles Date: Tue, 16 Jan 2024 17:40:55 -0800 Subject: [PATCH 7/7] Fix linting. --- kolibri/core/tasks/test/taskrunner/test_worker.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/kolibri/core/tasks/test/taskrunner/test_worker.py b/kolibri/core/tasks/test/taskrunner/test_worker.py index 201757a51f6..9a361d39088 100644 --- a/kolibri/core/tasks/test/taskrunner/test_worker.py +++ b/kolibri/core/tasks/test/taskrunner/test_worker.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -import threading +import threading import time import pytest @@ -50,6 +50,7 @@ def worker(): b.storage.clear(force=True) b.shutdown() + def test_keyerror_prevention(worker): # Create a job with the same ID as the one in worker.enqueue_job_runs_job job = Job(id, args=(9,)) @@ -71,6 +72,7 @@ def delete_future(): assert job.state == "COMPLETED" + def test_keyerror_prevention_multiple_jobs(worker): # Create multiple jobs with the same ID to trigger the race condition job1 = Job(id, args=(9,)) @@ -105,6 +107,7 @@ def delete_future(): assert job2.state == "COMPLETED" + @pytest.mark.django_db class TestWorker: def test_enqueue_job_runs_job(self, worker):