From bbc348ccb55610975fd13ef17a229808fe87ef37 Mon Sep 17 00:00:00 2001 From: Kelvin Jayanoris Date: Wed, 9 Aug 2017 09:44:06 +0000 Subject: [PATCH 1/5] Deal with old exports Fixes: #1068 Create periodic tasks which: - Find old pending exports and mark them as failed. - Find old failed exports and delete them The 'oldness' is determined by a configurable setting. --- onadata/apps/viewer/tasks.py | 38 +++++++++++++++++++++++++ onadata/apps/viewer/tests/test_tasks.py | 10 +++++++ onadata/settings/common.py | 1 + 3 files changed, 49 insertions(+) diff --git a/onadata/apps/viewer/tasks.py b/onadata/apps/viewer/tasks.py index 355283ba8e..25735a1263 100644 --- a/onadata/apps/viewer/tasks.py +++ b/onadata/apps/viewer/tasks.py @@ -1,6 +1,8 @@ import sys +from datetime import datetime, timedelta from celery import task +from celery.task.schedules import crontab from django.conf import settings from django.shortcuts import get_object_or_404 from requests import ConnectionError @@ -349,3 +351,39 @@ def delete_export(export_id): export.delete() return True return False + + +@task.periodic_task( + run_every=(crontab(hour='*/7')), + ignore_result=True +) +def check_pending_exports(): + """ + Exports that have not completed within a set time should be marked as + failed + """ + h = settings.EXPORT_TASK_LIFESPAN + time_threshold = datetime.now() - timedelta(hours=h) + exports = Export.objects.filter(internal_status=Export.PENDING, + created_on__lt=time_threshold) + for export in exports: + export.internal_status = Export.FAILED + export.save() + return True + + +@task.periodic_task( + run_every=(crontab(hour='*/7')), + ignore_result=True +) +def delete_old_failed_exports(): + """ + Delete old failed exports + """ + h = settings.EXPORT_TASK_LIFESPAN + time_threshold = datetime.now() - timedelta(hours=h) + exports = Export.objects.filter(internal_status=Export.FAILED, + created_on__lt=time_threshold) + for export in exports: + delete_export.delay(export.id) + return True diff --git a/onadata/apps/viewer/tests/test_tasks.py b/onadata/apps/viewer/tests/test_tasks.py index 1143c1f522..50c6a55917 100644 --- a/onadata/apps/viewer/tests/test_tasks.py +++ b/onadata/apps/viewer/tests/test_tasks.py @@ -4,6 +4,8 @@ from onadata.apps.main.tests.test_base import TestBase from onadata.apps.viewer.models.export import Export from onadata.apps.viewer.tasks import create_async_export +from onadata.apps.viewer.tasks import check_pending_exports +from onadata.apps.viewer.tasks import delete_old_failed_exports class TestExportTasks(TestBase): @@ -38,3 +40,11 @@ def test_create_async(self): self.assertTrue(export.id) self.assertIn("username", options) self.assertEquals(options.get("id_string"), self.xform.id_string) + + def test_check_pending_exports(self): + result = check_pending_exports.delay() + self.assertTrue(result) + + def test_delete_old_failed_exports(self): + result = delete_old_failed_exports.delay() + self.assertTrue(result) diff --git a/onadata/settings/common.py b/onadata/settings/common.py index 38fa6b0f6f..acd50ca83a 100644 --- a/onadata/settings/common.py +++ b/onadata/settings/common.py @@ -435,6 +435,7 @@ def configure_logging(logger, **kwargs): # number of records on export or CSV import before a progress update EXPORT_TASK_PROGRESS_UPDATE_BATCH = 1000 +EXPORT_TASK_LIFESPAN = 6 # six hours # default content length for submission requests DEFAULT_CONTENT_LENGTH = 10000000 From 9cf49cc77180a5f58f58693f7201d6b581e59be7 Mon Sep 17 00:00:00 2001 From: Kelvin Jayanoris Date: Thu, 10 Aug 2017 05:59:09 +0000 Subject: [PATCH 2/5] Write more robust tests Improve tests to not only test that the celery tasks below are working: - onadata.apps.viewer.tasks.check_pending_exports - onadata.apps.viewer.tasks.delete_old_failed_exports But also test that they have the desired effect on pending and failed exports, respectively. --- onadata/apps/viewer/tasks.py | 14 ++++--- onadata/apps/viewer/tests/test_tasks.py | 50 ++++++++++++++++++++++--- 2 files changed, 53 insertions(+), 11 deletions(-) diff --git a/onadata/apps/viewer/tasks.py b/onadata/apps/viewer/tasks.py index 25735a1263..beb0c1a4d8 100644 --- a/onadata/apps/viewer/tasks.py +++ b/onadata/apps/viewer/tasks.py @@ -1,11 +1,12 @@ import sys -from datetime import datetime, timedelta +from datetime import timedelta +from requests import ConnectionError from celery import task from celery.task.schedules import crontab from django.conf import settings from django.shortcuts import get_object_or_404 -from requests import ConnectionError +from django.utils import timezone from onadata.apps.viewer.models.export import Export from onadata.libs.exceptions import NoRecordsFoundError @@ -362,8 +363,8 @@ def check_pending_exports(): Exports that have not completed within a set time should be marked as failed """ - h = settings.EXPORT_TASK_LIFESPAN - time_threshold = datetime.now() - timedelta(hours=h) + task_lifespan = settings.EXPORT_TASK_LIFESPAN + time_threshold = timezone.now() - timedelta(hours=task_lifespan) exports = Export.objects.filter(internal_status=Export.PENDING, created_on__lt=time_threshold) for export in exports: @@ -380,10 +381,11 @@ def delete_old_failed_exports(): """ Delete old failed exports """ - h = settings.EXPORT_TASK_LIFESPAN - time_threshold = datetime.now() - timedelta(hours=h) + task_lifespan = settings.EXPORT_TASK_LIFESPAN + time_threshold = timezone.now() - timedelta(hours=task_lifespan) exports = Export.objects.filter(internal_status=Export.FAILED, created_on__lt=time_threshold) + # import pdb; pdb.set_trace() for export in exports: delete_export.delay(export.id) return True diff --git a/onadata/apps/viewer/tests/test_tasks.py b/onadata/apps/viewer/tests/test_tasks.py index 50c6a55917..9e33e797c4 100644 --- a/onadata/apps/viewer/tests/test_tasks.py +++ b/onadata/apps/viewer/tests/test_tasks.py @@ -1,5 +1,9 @@ +from datetime import timedelta + from celery import current_app from django.conf import settings +from django.utils import timezone +from django.core.files.storage import get_storage_class from onadata.apps.main.tests.test_base import TestBase from onadata.apps.viewer.models.export import Export @@ -15,6 +19,11 @@ def setUp(self): settings.CELERY_ALWAYS_EAGER = True current_app.conf.CELERY_ALWAYS_EAGER = True + def delete_export_file(self, filepath): + storage = get_storage_class()() + if filepath and storage.exists(filepath): + storage.delete(filepath) + def test_create_async(self): self._publish_transportation_form_and_submit_instance() @@ -35,16 +44,47 @@ def test_create_async(self): for export_type, extra_options in export_types: result = create_async_export( self.xform, export_type, None, False, options) - export = result[0] self.assertTrue(export.id) self.assertIn("username", options) self.assertEquals(options.get("id_string"), self.xform.id_string) def test_check_pending_exports(self): - result = check_pending_exports.delay() - self.assertTrue(result) + self._publish_transportation_form_and_submit_instance() + options = {"group_delimiter": "/", + "remove_group_name": False, + "split_select_multiples": True} + result = create_async_export( + self.xform, Export.CSV_EXPORT, None, False, options) + export = result[0] + filepath = export.filepath + export.filename = "" + over_threshold = settings.EXPORT_TASK_LIFESPAN + 2 + export.internal_status = Export.PENDING + export.created_on = timezone.now() - timedelta(hours=over_threshold) + export.save() + final_result = check_pending_exports.delay() + self.delete_export_file(filepath) + self.assertTrue(final_result) + export = Export.objects.filter(pk=export.pk).first() + self.assertEquals(export.internal_status, Export.FAILED) def test_delete_old_failed_exports(self): - result = delete_old_failed_exports.delay() - self.assertTrue(result) + self._publish_transportation_form_and_submit_instance() + options = {"group_delimiter": "/", + "remove_group_name": False, + "split_select_multiples": True} + result = create_async_export( + self.xform, Export.CSV_EXPORT, None, False, options) + export = result[0] + pk = export.pk + filepath = export.filepath + export.filename = "" + over_threshold = settings.EXPORT_TASK_LIFESPAN + 2 + export.internal_status = Export.FAILED + export.created_on = timezone.now() - timedelta(hours=over_threshold) + export.save() + final_result = delete_old_failed_exports.delay() + self.delete_export_file(filepath) + self.assertTrue(final_result) + self.assertEquals(Export.objects.filter(pk=pk).first(), None) From 57c29a237652cebd80ea7c58d600e0f13b7dde53 Mon Sep 17 00:00:00 2001 From: Kelvin Jayanoris Date: Thu, 10 Aug 2017 06:04:55 +0000 Subject: [PATCH 3/5] Add celerybeat.pid to .gitignore --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 7d1092e50a..c5ab0954d5 100644 --- a/.gitignore +++ b/.gitignore @@ -39,6 +39,7 @@ build *~ *orig celerybeat-schedule +celerybeat.pid *.swp .venv *.ropeproject @@ -72,4 +73,4 @@ bob # visual studio code .vscode/ -tags \ No newline at end of file +tags From c86a47e0f15a61b751028a1d45663ded98f35f5a Mon Sep 17 00:00:00 2001 From: Kelvin Jayanoris Date: Thu, 10 Aug 2017 11:38:08 +0000 Subject: [PATCH 4/5] Code cleanup --- onadata/apps/viewer/tasks.py | 14 +++----- onadata/apps/viewer/tests/test_tasks.py | 48 ++++++++----------------- 2 files changed, 20 insertions(+), 42 deletions(-) diff --git a/onadata/apps/viewer/tasks.py b/onadata/apps/viewer/tasks.py index beb0c1a4d8..fd3a66ce3b 100644 --- a/onadata/apps/viewer/tasks.py +++ b/onadata/apps/viewer/tasks.py @@ -1,12 +1,12 @@ import sys from datetime import timedelta -from requests import ConnectionError from celery import task from celery.task.schedules import crontab from django.conf import settings from django.shortcuts import get_object_or_404 from django.utils import timezone +from requests import ConnectionError from onadata.apps.viewer.models.export import Export from onadata.libs.exceptions import NoRecordsFoundError @@ -358,19 +358,17 @@ def delete_export(export_id): run_every=(crontab(hour='*/7')), ignore_result=True ) -def check_pending_exports(): +def mark_expired_pending_exports_as_failed(): """ Exports that have not completed within a set time should be marked as failed """ + # import pdb; pdb.set_trace() task_lifespan = settings.EXPORT_TASK_LIFESPAN time_threshold = timezone.now() - timedelta(hours=task_lifespan) exports = Export.objects.filter(internal_status=Export.PENDING, created_on__lt=time_threshold) - for export in exports: - export.internal_status = Export.FAILED - export.save() - return True + exports.update(internal_status=Export.FAILED) @task.periodic_task( @@ -385,7 +383,5 @@ def delete_old_failed_exports(): time_threshold = timezone.now() - timedelta(hours=task_lifespan) exports = Export.objects.filter(internal_status=Export.FAILED, created_on__lt=time_threshold) - # import pdb; pdb.set_trace() for export in exports: - delete_export.delay(export.id) - return True + export.delete() diff --git a/onadata/apps/viewer/tests/test_tasks.py b/onadata/apps/viewer/tests/test_tasks.py index 9e33e797c4..32ccb18efe 100644 --- a/onadata/apps/viewer/tests/test_tasks.py +++ b/onadata/apps/viewer/tests/test_tasks.py @@ -3,12 +3,11 @@ from celery import current_app from django.conf import settings from django.utils import timezone -from django.core.files.storage import get_storage_class from onadata.apps.main.tests.test_base import TestBase from onadata.apps.viewer.models.export import Export from onadata.apps.viewer.tasks import create_async_export -from onadata.apps.viewer.tasks import check_pending_exports +from onadata.apps.viewer.tasks import mark_expired_pending_exports_as_failed from onadata.apps.viewer.tasks import delete_old_failed_exports @@ -19,11 +18,6 @@ def setUp(self): settings.CELERY_ALWAYS_EAGER = True current_app.conf.CELERY_ALWAYS_EAGER = True - def delete_export_file(self, filepath): - storage = get_storage_class()() - if filepath and storage.exists(filepath): - storage.delete(filepath) - def test_create_async(self): self._publish_transportation_form_and_submit_instance() @@ -49,42 +43,30 @@ def test_create_async(self): self.assertIn("username", options) self.assertEquals(options.get("id_string"), self.xform.id_string) - def test_check_pending_exports(self): + def test_mark_expired_pending_exports_as_failed(self): self._publish_transportation_form_and_submit_instance() - options = {"group_delimiter": "/", - "remove_group_name": False, - "split_select_multiples": True} - result = create_async_export( - self.xform, Export.CSV_EXPORT, None, False, options) - export = result[0] - filepath = export.filepath - export.filename = "" over_threshold = settings.EXPORT_TASK_LIFESPAN + 2 - export.internal_status = Export.PENDING + export = Export.objects.create(xform=self.xform, + export_type=Export.CSV_EXPORT, + internal_status=Export.PENDING, + filename="") + # we set created_on here because Export.objects.create() overrides it export.created_on = timezone.now() - timedelta(hours=over_threshold) export.save() - final_result = check_pending_exports.delay() - self.delete_export_file(filepath) - self.assertTrue(final_result) + mark_expired_pending_exports_as_failed() export = Export.objects.filter(pk=export.pk).first() self.assertEquals(export.internal_status, Export.FAILED) def test_delete_old_failed_exports(self): self._publish_transportation_form_and_submit_instance() - options = {"group_delimiter": "/", - "remove_group_name": False, - "split_select_multiples": True} - result = create_async_export( - self.xform, Export.CSV_EXPORT, None, False, options) - export = result[0] - pk = export.pk - filepath = export.filepath - export.filename = "" over_threshold = settings.EXPORT_TASK_LIFESPAN + 2 - export.internal_status = Export.FAILED + export = Export.objects.create(xform=self.xform, + export_type=Export.CSV_EXPORT, + internal_status=Export.FAILED, + filename="") + # we set created_on here because Export.objects.create() overrides it export.created_on = timezone.now() - timedelta(hours=over_threshold) export.save() - final_result = delete_old_failed_exports.delay() - self.delete_export_file(filepath) - self.assertTrue(final_result) + pk = export.pk + delete_old_failed_exports() self.assertEquals(Export.objects.filter(pk=pk).first(), None) From a4ded1ee5bf5d11bab0c1c38de79f88e49658395 Mon Sep 17 00:00:00 2001 From: Kelvin Jayanoris Date: Fri, 11 Aug 2017 06:48:30 +0000 Subject: [PATCH 5/5] Code cleanup --- onadata/apps/viewer/tasks.py | 5 ++--- onadata/apps/viewer/tests/test_tasks.py | 6 +++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/onadata/apps/viewer/tasks.py b/onadata/apps/viewer/tasks.py index fd3a66ce3b..dc48e9aaa1 100644 --- a/onadata/apps/viewer/tasks.py +++ b/onadata/apps/viewer/tasks.py @@ -375,7 +375,7 @@ def mark_expired_pending_exports_as_failed(): run_every=(crontab(hour='*/7')), ignore_result=True ) -def delete_old_failed_exports(): +def delete_expired_failed_exports(): """ Delete old failed exports """ @@ -383,5 +383,4 @@ def delete_old_failed_exports(): time_threshold = timezone.now() - timedelta(hours=task_lifespan) exports = Export.objects.filter(internal_status=Export.FAILED, created_on__lt=time_threshold) - for export in exports: - export.delete() + exports.delete() diff --git a/onadata/apps/viewer/tests/test_tasks.py b/onadata/apps/viewer/tests/test_tasks.py index 32ccb18efe..ddccb33dd6 100644 --- a/onadata/apps/viewer/tests/test_tasks.py +++ b/onadata/apps/viewer/tests/test_tasks.py @@ -8,7 +8,7 @@ from onadata.apps.viewer.models.export import Export from onadata.apps.viewer.tasks import create_async_export from onadata.apps.viewer.tasks import mark_expired_pending_exports_as_failed -from onadata.apps.viewer.tasks import delete_old_failed_exports +from onadata.apps.viewer.tasks import delete_expired_failed_exports class TestExportTasks(TestBase): @@ -57,7 +57,7 @@ def test_mark_expired_pending_exports_as_failed(self): export = Export.objects.filter(pk=export.pk).first() self.assertEquals(export.internal_status, Export.FAILED) - def test_delete_old_failed_exports(self): + def test_delete_expired_failed_exports(self): self._publish_transportation_form_and_submit_instance() over_threshold = settings.EXPORT_TASK_LIFESPAN + 2 export = Export.objects.create(xform=self.xform, @@ -68,5 +68,5 @@ def test_delete_old_failed_exports(self): export.created_on = timezone.now() - timedelta(hours=over_threshold) export.save() pk = export.pk - delete_old_failed_exports() + delete_expired_failed_exports() self.assertEquals(Export.objects.filter(pk=pk).first(), None)