From 086764285b9c43933daa645718aa1fa7cccf511a Mon Sep 17 00:00:00 2001 From: Ricardo Garcia Silva Date: Fri, 1 Oct 2021 09:55:56 +0100 Subject: [PATCH] Allow aborting of on-going harvesting tasks (#8176) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Bump urllib3 from 1.26.2 to 1.26.3 (#6908) Bumps [urllib3](https://github.com/urllib3/urllib3) from 1.26.2 to 1.26.3. - [Release notes](https://github.com/urllib3/urllib3/releases) - [Changelog](https://github.com/urllib3/urllib3/blob/1.26.3/CHANGES.rst) - [Commits](https://github.com/urllib3/urllib3/compare/1.26.2...1.26.3) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Toni * [Fixes #6880] Circle CI upload tests fail irregulary (#6881) * [Fixes #6880] Circle CI upload tests fail irregulary * CircleCI test fix: sometimes expires due to upload timeout in the test environment * - Avoid infinite loop on upload testing * Revert "CircleCI test fix: sometimes expires due to upload timeout in the test environment" This reverts commit 66139fdbf0b7510a9829a3e01254f41782fb7e1d. Co-authored-by: Alessio Fabiani Co-authored-by: afabiani * [Fixes #6914] Remove "add to basket" tool for documents and maps (#6915) * Added malnajdi as contributor * [Fixes #6910] meaningful filename for document download (#6911) * get meaningful document filenames on download * - Strip extension from document title before slugify it (e.g.: image.jpg instead of imagejpg.jpg) Co-authored-by: afabiani Co-authored-by: Alessio Fabiani * - CircleCI Upload Tests: trying to reduce more the risk of infinite loop on "wait_for_progress" * [Fixes #6916] gsimporter.api.NotFound caused by missing trailing slash at the end of GEOSERVER_LOCATION (#6913) * [Fixes #6916] gsimporter.api.NotFound caused by missing trailing slash at the end of GEOSERVER_LOCATION * [Fixes #6916] unit test for GEOSERVER_LOCATION * Bump django-cors-headers from 3.6.0 to 3.7.0 (#6901) Bumps [django-cors-headers](https://github.com/adamchainz/django-cors-headers) from 3.6.0 to 3.7.0. - [Release notes](https://github.com/adamchainz/django-cors-headers/releases) - [Changelog](https://github.com/adamchainz/django-cors-headers/blob/master/HISTORY.rst) - [Commits](https://github.com/adamchainz/django-cors-headers/compare/3.6.0...3.7.0) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Bump amqp from 5.0.3 to 5.0.5 (#6905) Bumps [amqp](https://github.com/celery/py-amqp) from 5.0.3 to 5.0.5. - [Release notes](https://github.com/celery/py-amqp/releases) - [Changelog](https://github.com/celery/py-amqp/blob/master/Changelog) - [Commits](https://github.com/celery/py-amqp/compare/v5.0.3...v5.0.5) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Bump pip from 21.0 to 21.0.1 (#6900) Bumps [pip](https://github.com/pypa/pip) from 21.0 to 21.0.1. - [Release notes](https://github.com/pypa/pip/releases) - [Changelog](https://github.com/pypa/pip/blob/master/NEWS.rst) - [Commits](https://github.com/pypa/pip/compare/21.0...21.0.1) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Bump coverage from 5.3.1 to 5.4 (#6903) Bumps [coverage](https://github.com/nedbat/coveragepy) from 5.3.1 to 5.4. - [Release notes](https://github.com/nedbat/coveragepy/releases) - [Changelog](https://github.com/nedbat/coveragepy/blob/master/CHANGES.rst) - [Commits](https://github.com/nedbat/coveragepy/compare/coverage-5.3.1...coverage-5.4) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Bump pytest from 6.2.1 to 6.2.2 (#6907) Bumps [pytest](https://github.com/pytest-dev/pytest) from 6.2.1 to 6.2.2. - [Release notes](https://github.com/pytest-dev/pytest/releases) - [Changelog](https://github.com/pytest-dev/pytest/blob/master/CHANGELOG.rst) - [Commits](https://github.com/pytest-dev/pytest/compare/6.2.1...6.2.2) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Bump djangorestframework-gis from 0.16 to 0.17 (#6902) Bumps [djangorestframework-gis](https://github.com/openwisp/django-rest-framework-gis) from 0.16 to 0.17. - [Release notes](https://github.com/openwisp/django-rest-framework-gis/releases) - [Changelog](https://github.com/openwisp/django-rest-framework-gis/blob/master/CHANGES.rst) - [Commits](https://github.com/openwisp/django-rest-framework-gis/compare/v0.16.0...v0.17.0) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * - Algin setup.cfg to requirements.txt * [Fixes #6922][REST API v2] Expose the curated thumbnail URL if it has… (#6923) * [Fixes #6922][REST API v2] Expose the curated thumbnail URL if it has been uploaded * - Add REST APIs test suite to CircleCI * [Fixes #6918] Removal of QGIS support (#6919) * [Cleanup and Refactor] Remove QGIS server backend dependencies * [Cleanup and Refactor] Remove QGIS server backend dependencies * - Fix LGTM issues * allow Basic authenticated requests in LOCKDOWN mode * fix to avoid circular import * flake8 check fix * added tests * [Fixes #6880] Circle CI upload tests fail irregulary (#6881) * [Fixes #6880] Circle CI upload tests fail irregulary * CircleCI test fix: sometimes expires due to upload timeout in the test environment * - Avoid infinite loop on upload testing * Revert "CircleCI test fix: sometimes expires due to upload timeout in the test environment" This reverts commit 66139fdbf0b7510a9829a3e01254f41782fb7e1d. Co-authored-by: Alessio Fabiani Co-authored-by: afabiani * [Fixes #6914] Remove "add to basket" tool for documents and maps (#6915) * Added malnajdi as contributor * Bump pip from 21.0 to 21.0.1 (#6900) Bumps [pip](https://github.com/pypa/pip) from 21.0 to 21.0.1. - [Release notes](https://github.com/pypa/pip/releases) - [Changelog](https://github.com/pypa/pip/blob/master/NEWS.rst) - [Commits](https://github.com/pypa/pip/compare/21.0...21.0.1) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * - Algin setup.cfg to requirements.txt * [Fixes #6922][REST API v2] Expose the curated thumbnail URL if it has… (#6923) * [Fixes #6922][REST API v2] Expose the curated thumbnail URL if it has been uploaded * - Add REST APIs test suite to CircleCI * [Fixes #6918] Removal of QGIS support (#6919) * [Cleanup and Refactor] Remove QGIS server backend dependencies * [Cleanup and Refactor] Remove QGIS server backend dependencies * - Fix LGTM issues * allow Basic authenticated requests in LOCKDOWN mode * fix to avoid circular import * - Align to upstream master branch * [Fixes #7945] Ingest harvested layer data to geonode * Improve harvesting session and the admin * fix migration files conflict * Initial work for implementing stoppable harvesting sessions * Implement aborting of harvesting celery tasks Add the `AsynchronousHarvestingSession` model, which is used to implement sessions for both refreshing of a harvester's harvestable resources and for the harvesting of remote resources. Refactor the `admin`, `api` and `tasks` to use this new model. Moved some functions out of `harvesting.utils` module in order to avoid circular imports * fix tests * fix conflicts * Remove accidental duplication of code that crept in during conflict resolution * Add a couple more tests * Update signature of get_resource method in WMS harvester worker * Uncomment `settings.py` line that designates the GeoNode test runner as the one to be used Co-authored-by: Giovanni Allegri Co-authored-by: allyoucanmap Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Toni Co-authored-by: Alessio Fabiani Co-authored-by: afabiani Co-authored-by: Florian Hoedt Co-authored-by: Mohammed Y. Alnajdi Co-authored-by: biegan Co-authored-by: meomancer --- geonode/harvesting/admin.py | 289 ++++++------ geonode/harvesting/api/serializers.py | 27 +- geonode/harvesting/api/urls.py | 2 +- geonode/harvesting/api/views.py | 6 +- geonode/harvesting/harvesters/base.py | 4 - .../harvesting/harvesters/geonodeharvester.py | 1 - geonode/harvesting/harvesters/wms.py | 1 - .../migrations/0037_alter_harvester_status.py | 29 ++ .../migrations/0038_auto_20210927_1455.py | 34 ++ .../0039_delete_harvestingsession.py | 16 + geonode/harvesting/models.py | 210 ++++++++- geonode/harvesting/tasks.py | 410 ++++++++++-------- geonode/harvesting/tests/test_admin.py | 142 ++++-- .../harvesting/tests/test_api_serializers.py | 36 +- geonode/harvesting/tests/test_api_views.py | 14 +- .../test_harvester_worker_geonode_legacy.py | 27 ++ geonode/harvesting/tests/test_models.py | 46 +- geonode/harvesting/tests/test_tasks.py | 23 +- geonode/harvesting/tests/test_utils.py | 24 - geonode/harvesting/utils.py | 28 -- 20 files changed, 893 insertions(+), 476 deletions(-) create mode 100644 geonode/harvesting/migrations/0037_alter_harvester_status.py create mode 100644 geonode/harvesting/migrations/0038_auto_20210927_1455.py create mode 100644 geonode/harvesting/migrations/0039_delete_harvestingsession.py delete mode 100644 geonode/harvesting/tests/test_utils.py diff --git a/geonode/harvesting/admin.py b/geonode/harvesting/admin.py index 73fc1dd50f0..fd861ec5cdb 100644 --- a/geonode/harvesting/admin.py +++ b/geonode/harvesting/admin.py @@ -17,27 +17,23 @@ # ######################################################################### -import functools import json import logging -import typing from django.contrib import ( admin, messages, ) -from django.db import transaction from django.urls import reverse from django.utils.html import ( format_html, mark_safe, ) +from django.utils.translation import gettext_lazy as _ from . import ( forms, models, - tasks, - utils, ) logger = logging.getLogger(__name__) @@ -55,9 +51,10 @@ class HarvesterAdmin(admin.ModelAdmin): "remote_available", "get_num_harvestable_resources", "get_num_harvestable_resources_selected", - "get_worker_specific_configuration", "show_link_to_selected_harvestable_resources", "show_link_to_latest_harvesting_session", + "show_link_to_latest_refresh_session", + "get_worker_specific_configuration", ) list_filter = ( "status", @@ -72,6 +69,7 @@ class HarvesterAdmin(admin.ModelAdmin): "num_harvestable_resources", "show_link_to_selected_harvestable_resources", "show_link_to_latest_harvesting_session", + "show_link_to_latest_refresh_session", ) list_editable = ( @@ -80,50 +78,24 @@ class HarvesterAdmin(admin.ModelAdmin): actions = [ "update_harvester_availability", - "update_harvestable_resources", - "perform_harvesting" + "initiate_update_harvestable_resources", + "initiate_abort_update_harvestable_resources", + "initiate_perform_harvesting", + "initiate_abort_perform_harvesting", ] - def save_model(self, request, obj: models.Harvester, form, change): - # TODO: disallow changing the model if it is not ready - with transaction.atomic(): - super().save_model(request, obj, form, change) - available = utils.update_harvester_availability(obj) - if available: - partial_task = functools.partial( - tasks.update_harvestable_resources.apply_async, args=(obj.pk,)) - # NOTE: below we are using transaction.on_commit in order to ensure - # the harvester is already saved in the DB before we schedule the - # celery task. This is needed in order to avoid the celery worker - # picking up the task before it is saved in the DB. More info: - # - # https://docs.djangoproject.com/en/2.2/topics/db/transactions/#performing-actions-after-commit - # - if not change: - transaction.on_commit(partial_task) - message = ( - f"Updating harvestable resources asynchronously for {obj!r}...") - self.message_user(request, message) - logger.debug(message) - elif _worker_config_changed(form): - self.message_user( - request, - ( - "Harvester worker specific configuration has been changed. " - "Updating list of this harvester's harvestable " - "resources asynchronously. When this is done the harvester " - "status will be set to `ready`. Refresh this page in order to monitor it." - ), - level=messages.WARNING - ) - # models.HarvestableResource.objects.filter(harvester=obj).delete() - transaction.on_commit(partial_task) - else: - self.message_user( - request, - f"Harvester {obj} is{'' if available else ' not'} available", - messages.INFO if available else messages.WARNING - ) + def save_model(self, request, harvester: models.Harvester, form, change): + super().save_model(request, harvester, form, change) + if _worker_config_changed(form): + self.message_user( + request, + ( + "Harvester worker specific configuration has been changed. " + "You should update the list of this harvester's harvestable " + "resources now in order to ensure consistency." + ), + level=messages.WARNING + ) def get_form(self, request, obj=None, change=False, **kwargs): form = super().get_form(request, obj, change, **kwargs) @@ -135,7 +107,7 @@ def update_harvester_availability(self, request, queryset): updated_harvesters = [] non_available_harvesters = [] for harvester in queryset: - available = utils.update_harvester_availability(harvester) + available = harvester.update_availability() updated_harvesters.append(harvester) if not available: non_available_harvesters.append(harvester) @@ -151,46 +123,90 @@ def update_harvester_availability(self, request, queryset): messages.WARNING ) - @admin.action(description="Update harvestable resources from selected harvesters") - def update_harvestable_resources(self, request, queryset): + @admin.action(description="Update harvestable resources for selected harvesters") + def initiate_update_harvestable_resources(self, request, queryset): being_updated = [] for harvester in queryset: - should_continue, error_msg = _should_act(harvester) - if should_continue: - harvester.status = harvester.STATUS_UPDATING_HARVESTABLE_RESOURCES - harvester.save() - tasks.update_harvestable_resources.apply_async(args=(harvester.pk,)) - being_updated.append(harvester) - else: - self.message_user(request, error_msg, level=messages.ERROR) - continue + try: + if harvester.update_availability(): + harvester.initiate_update_harvestable_resources() + being_updated.append(harvester) + else: + raise RuntimeError(f"Harvester {harvester!r} is not available") + except RuntimeError as exc: + self.message_user(request, str(exc), level=messages.ERROR) if len(being_updated) > 0: - self.message_user( - request, - ( - f"Updating harvestable resources asynchronously for {being_updated}. " - f"This operation can take a while to complete. Check the harvesters' " - f"status for when it becomes `ready`" - ) + message = ( + f"Updating harvestable resources asynchronously for {being_updated}. " + f"This operation can take a while to complete. Check the harvesters' " + f"status for when it becomes `ready` or inspect its latest refresh " + f"session and monitor the reported progress" + ) + else: + message = _("No ready harvesters have been selected, skipping...") + self.message_user(request, message) + + @admin.action(description="Abort on-going update of harvestable resources for selected harvesters") + def initiate_abort_update_harvestable_resources(self, request, queryset): + being_aborted = [] + for harvester in queryset: + try: + if harvester.update_availability(): + harvester.initiate_abort_update_harvestable_resources() + being_aborted.append(harvester) + else: + raise RuntimeError(f"Harvester {harvester!r} is not available") + except RuntimeError as exc: + self.message_user(request, str(exc), level=messages.ERROR) + if len(being_aborted) > 0: + message = ( + f"Aborting update of harvestable resources for {being_aborted}. " + f"This operation can take a while to complete. Check the harvesters' " + f"status for when it becomes `ready`" ) + else: + message = _("No active refresh sessions have been found for the selected harvesters. Skipping...") + self.message_user(request, message) @admin.action(description="Perform harvesting on selected harvesters") - def perform_harvesting(self, request, queryset): + def initiate_perform_harvesting(self, request, queryset): being_harvested = [] for harvester in queryset: - should_continue, error_msg = _should_act(harvester) - if should_continue: - harvester.status = harvester.STATUS_PERFORMING_HARVESTING - harvester.save() - harvesting_session = models.HarvestingSession.objects.create(harvester=harvester) - tasks.harvesting_dispatcher.apply_async(args=(harvester.pk, harvesting_session.pk)) - being_harvested.append(harvester) - else: - self.message_user(request, error_msg, level=messages.ERROR) - continue + try: + if harvester.update_availability(): + harvester.initiate_perform_harvesting() + being_harvested.append(harvester) + else: + raise RuntimeError(f"Harvester {harvester!r} is not available") + except RuntimeError as exc: + self.message_user(request, str(exc), level=messages.ERROR) if len(being_harvested) > 0: - self.message_user( - request, f"Performing harvesting asynchronously for {being_harvested}") + message = f"Performing harvesting asynchronously for {being_harvested}..." + else: + message = _("No ready harvesters have been selected, skipping...") + self.message_user(request, message) + + @admin.action(description="Abort on-going harvesting sessions for selected harvesters") + def initiate_abort_perform_harvesting(self, request, queryset): + being_aborted = [] + for harvester in queryset: + try: + if harvester.update_availability(): + harvester.initiate_abort_perform_harvesting() + being_aborted.append(harvester) + else: + raise RuntimeError(f"Harvester {harvester!r} is not available") + except RuntimeError as exc: + self.message_user(request, str(exc), level=messages.ERROR) + if len(being_aborted) > 0: + message = ( + f"Aborting current harvesting sessions for {being_aborted}. " + f"This operation can take a while to complete. Check the harvesters' " + f"status for when it becomes `ready`" + ) + else: + message = _("No active harvesting sessions have been found for the selected harvesters. Skipping...") + self.message_user(request, message) @admin.display(description="Number of selected resources to harvest") def get_num_harvestable_resources_selected(self, harvester: models.Harvester): @@ -227,8 +243,22 @@ def show_link_to_selected_harvestable_resources(self, harvester: models.Harveste @admin.display(description="Go to latest harvesting session") def show_link_to_latest_harvesting_session(self, harvester: models.Harvester): - latest = models.HarvestingSession.objects.filter(harvester=harvester).latest("started") - changelist_uri = reverse("admin:harvesting_harvestingsession_change", args=(latest.id,)) + changelist_uri = reverse( + "admin:harvesting_asynchronousharvestingsession_change", + args=(harvester.latest_harvesting_session.id,) + ) + return mark_safe( + format_html( + f'Go' + ) + ) + + @admin.display(description="Go to latest refresh session") + def show_link_to_latest_refresh_session(self, harvester: models.Harvester): + changelist_uri = reverse( + "admin:harvesting_asynchronousharvestingsession_change", + args=(harvester.latest_refresh_session.id,) + ) return mark_safe( format_html( f'Go' @@ -236,46 +266,37 @@ def show_link_to_latest_harvesting_session(self, harvester: models.Harvester): ) -@admin.register(models.HarvestingSession) -class HarvestingSessionAdmin(admin.ModelAdmin): +@admin.register(models.AsynchronousHarvestingSession) +class AsynchronousHarvestingSessionAdmin(admin.ModelAdmin): list_display = ( "id", + "session_type", "status", "started", "updated", "ended", - "records_to_harvest", - "records_harvested", - "calculate_harvesting_progress", "harvester", + "total_records_to_process", + "records_done", + "get_progress_percentage", ) readonly_fields = ( "id", + "session_type", "status", "started", "updated", "ended", - "records_to_harvest", - "records_harvested", - "calculate_harvesting_progress", "harvester", - "session_details", + "total_records_to_process", + "records_done", + "get_progress_percentage", + "details", ) - def has_change_permission(self, request, obj=None): - return False - def has_add_permission(self, request): return False - @admin.display(description="progress(%)") - def calculate_harvesting_progress(self, harvesting_session: models.HarvestingSession): - if harvesting_session.records_to_harvest == 0: - result = 0 - else: - result = int((harvesting_session.records_harvested / harvesting_session.records_to_harvest) * 100) - return result - @admin.register(models.HarvestableResource) class HarvestableResourceAdmin(admin.ModelAdmin): @@ -317,7 +338,7 @@ class HarvestableResourceAdmin(admin.ModelAdmin): actions = [ "toggle_should_be_harvested", - "harvest_selected_resources", + "initiate_harvest_selected_resources", ] def delete_queryset(self, request, queryset): @@ -351,28 +372,24 @@ def toggle_should_be_harvested(self, request, queryset): request, "Toggled harvestable resources' `should_be_harvested` attribute") @admin.action(description="Harvest selected resources") - def harvest_selected_resources(self, request, queryset): + def initiate_harvest_selected_resources(self, request, queryset): selected_harvestable_resources = {} for harvestable_resource in queryset: - harvester_resources = selected_harvestable_resources.setdefault(harvestable_resource.harvester, []) + harvester_resources = selected_harvestable_resources.setdefault( + harvestable_resource.harvester, []) harvester_resources.append(harvestable_resource.id) for harvester, harvestable_resource_ids in selected_harvestable_resources.items(): - should_continue, error_msg = _should_act(harvester) - if should_continue: - harvester.status = models.Harvester.STATUS_PERFORMING_HARVESTING - harvester.save() - harvesting_session = models.HarvestingSession.objects.create(harvester=harvester) - tasks.harvest_resources.apply_async(args=(harvestable_resource_ids, harvesting_session.pk)) - self.message_user( - request, - f"Harvesting {len(harvestable_resource_ids)} resources from {harvester.name!r} harvester..." - ) - else: - self.message_user( - request, - error_msg, - level=messages.ERROR - ) + try: + if harvester.update_availability(): + harvester.initiate_perform_harvesting(harvestable_resource_ids) + self.message_user( + request, + f"Harvesting {len(harvestable_resource_ids)} resources from {harvester.name!r} harvester..." + ) + else: + raise RuntimeError(f"Harvester {harvester!r} is not available") + except RuntimeError as exc: + self.message_user(request, str(exc), level=messages.ERROR) @admin.display(description="harvester") def show_link_to_harvester(self, harvestable_resource: models.HarvestableResource): @@ -385,27 +402,13 @@ def show_link_to_harvester(self, harvestable_resource: models.HarvestableResourc ) -def _should_act(harvester: models.Harvester) -> typing.Tuple[bool, str]: - if harvester.status != harvester.STATUS_READY: - error_message = ( - f"Harvester {harvester!r} is currently busy. Please wait until its status " - f"is {harvester.STATUS_READY!r} before retrying" - ) - result = False - else: - available = utils.update_harvester_availability(harvester) - if not available: - error_message = ( - f"harvester {harvester!r} is not available, skipping harvesting...") - result = False - else: - result = True - error_message = "" - return result, error_message - - def _worker_config_changed(form) -> bool: field_name = "harvester_type_specific_configuration" - original = json.loads(form.data[f"initial-{field_name}"]) - cleaned = form.cleaned_data.get(field_name) - return original != cleaned + try: + original = json.loads(form.data[f"initial-{field_name}"]) + except KeyError: + result = True + else: + cleaned = form.cleaned_data.get(field_name) + result = original != cleaned + return result diff --git a/geonode/harvesting/api/serializers.py b/geonode/harvesting/api/serializers.py index 7f3a6ce0348..72ba2f5c843 100644 --- a/geonode/harvesting/api/serializers.py +++ b/geonode/harvesting/api/serializers.py @@ -40,7 +40,6 @@ from .. import ( models, tasks, - utils, ) logger = logging.getLogger(__name__) @@ -141,7 +140,7 @@ def validate(self, data): worker_config_field, getattr(self.instance, worker_config_field, None)) if worker_type is not None and worker_config is not None: try: - utils.validate_worker_configuration(worker_type, worker_config) + models.validate_worker_configuration(worker_type, worker_config) except jsonschema.exceptions.ValidationError: raise serializers.ValidationError( f"Invalid {worker_config_field!r} configuration") @@ -162,7 +161,7 @@ def create(self, validated_data): f"value of {models.Harvester.STATUS_READY!r}" ) harvester = super().create(validated_data) - available = utils.update_harvester_availability(harvester) + available = harvester.update_availability() if available: harvester.status = harvester.STATUS_UPDATING_HARVESTABLE_RESOURCES harvester.save() @@ -208,12 +207,17 @@ def update(self, instance: models.Harvester, validated_data): f"This status can only be set by the server, when appropriate." ) elif desired_status == models.Harvester.STATUS_UPDATING_HARVESTABLE_RESOURCES: - post_update_task = tasks.update_harvestable_resources.signature( - args=(instance.id,)) + session = models.AsynchronousHarvestingSession.objects.create( + harvester=instance, + session_type=models.AsynchronousHarvestingSession.TYPE_DISCOVER_HARVESTABLE_RESOURCES + ) + post_update_task = tasks.update_harvestable_resources.signature(args=(session.pk,)) elif desired_status == models.Harvester.STATUS_PERFORMING_HARVESTING: - harvesting_session = models.HarvestingSession.objects.create(harvester=instance) - post_update_task = tasks.harvesting_dispatcher.signature( - args=(instance.pk, harvesting_session.pk)) + session = models.AsynchronousHarvestingSession.objects.create( + harvester=instance, + session_type=models.AsynchronousHarvestingSession.TYPE_HARVESTING + ) + post_update_task = tasks.harvesting_dispatcher.signature(args=(session.pk,)) elif desired_status == models.Harvester.STATUS_CHECKING_AVAILABILITY: post_update_task = tasks.check_harvester_available.signature( args=(instance.id,)) @@ -239,15 +243,16 @@ def update(self, instance: models.Harvester, validated_data): return updated_instance -class BriefHarvestingSessionSerializer(DynamicModelSerializer): +class BriefAsynchronousHarvestingSessionSerializer(DynamicModelSerializer): class Meta: - model = models.HarvestingSession + model = models.AsynchronousHarvestingSession fields = ( "id", "started", "updated", "ended", - "records_harvested", + "total_records_to_process", + "records_done", ) diff --git a/geonode/harvesting/api/urls.py b/geonode/harvesting/api/urls.py index 9b838106865..2b78c92039a 100644 --- a/geonode/harvesting/api/urls.py +++ b/geonode/harvesting/api/urls.py @@ -31,6 +31,6 @@ basename='harvestable-resources', parents_query_lookups=['harvester_id'] ) -router.register('harvesting-sessions', views.HarvestingSessionViewSet) +router.register('harvesting-sessions', views.AsynchronousHarvestingSessionViewSet) urlpatterns = router.urls diff --git a/geonode/harvesting/api/views.py b/geonode/harvesting/api/views.py index 7cb340b3471..9427076fbbe 100644 --- a/geonode/harvesting/api/views.py +++ b/geonode/harvesting/api/views.py @@ -94,7 +94,7 @@ def get_serializer_context(self): return context -class HarvestingSessionViewSet(WithDynamicViewSetMixin, viewsets.ReadOnlyModelViewSet): - queryset = models.HarvestingSession.objects.all() - serializer_class = serializers.BriefHarvestingSessionSerializer +class AsynchronousHarvestingSessionViewSet(WithDynamicViewSetMixin, viewsets.ReadOnlyModelViewSet): + queryset = models.AsynchronousHarvestingSession.objects.all() + serializer_class = serializers.BriefAsynchronousHarvestingSessionSerializer pagination_class = GeoNodeApiPagination diff --git a/geonode/harvesting/harvesters/base.py b/geonode/harvesting/harvesters/base.py index a6869112170..af937c267ff 100644 --- a/geonode/harvesting/harvesters/base.py +++ b/geonode/harvesting/harvesters/base.py @@ -124,7 +124,6 @@ def get_geonode_resource_type(self, remote_resource_type: str) -> ResourceBase: def get_resource( self, harvestable_resource: "HarvestableResource", # noqa - harvesting_session_id: int ) -> typing.Optional[HarvestedResourceInfo]: """Harvest a single resource from the remote service. @@ -155,7 +154,6 @@ def finalize_resource_update( geonode_resource: ResourceBase, harvested_info: HarvestedResourceInfo, harvestable_resource: "HarvestableResource", # noqa - harvesting_session_id: int ) -> ResourceBase: """Perform additional actions just after having created/updated a local GeoNode resource. @@ -262,7 +260,6 @@ def update_geonode_resource( self, harvested_info: HarvestedResourceInfo, harvestable_resource: "HarvestableResource", # noqa - harvesting_session_id: int, ): """Create or update a local GeoNode resource with the input harvested information. @@ -297,7 +294,6 @@ def update_geonode_resource( geonode_resource, harvested_info, harvestable_resource, - harvesting_session_id ) def _create_new_geonode_resource(self, geonode_resource_type, defaults: typing.Dict): diff --git a/geonode/harvesting/harvesters/geonodeharvester.py b/geonode/harvesting/harvesters/geonodeharvester.py index cad40bdf496..d5bac6d1b0b 100644 --- a/geonode/harvesting/harvesters/geonodeharvester.py +++ b/geonode/harvesting/harvesters/geonodeharvester.py @@ -279,7 +279,6 @@ def get_geonode_resource_type(self, remote_resource_type: str) -> typing.Type[ty def get_resource( self, harvestable_resource: models.HarvestableResource, - harvesting_session_id: int ) -> typing.Optional[base.HarvestedResourceInfo]: resource_unique_identifier = harvestable_resource.unique_identifier local_resource_type = self.get_geonode_resource_type(harvestable_resource.remote_resource_type) diff --git a/geonode/harvesting/harvesters/wms.py b/geonode/harvesting/harvesters/wms.py index f8e06a1feed..9799ac1f19a 100644 --- a/geonode/harvesting/harvesters/wms.py +++ b/geonode/harvesting/harvesters/wms.py @@ -161,7 +161,6 @@ def get_geonode_resource_type(self, remote_resource_type: str) -> ResourceBase: def get_resource( self, harvestable_resource: "HarvestableResource", # noqa - harvesting_session_id: int ) -> typing.Optional[base.HarvestedResourceInfo]: resource_unique_identifier = harvestable_resource.unique_identifier data = self._get_data() diff --git a/geonode/harvesting/migrations/0037_alter_harvester_status.py b/geonode/harvesting/migrations/0037_alter_harvester_status.py new file mode 100644 index 00000000000..5e05991028b --- /dev/null +++ b/geonode/harvesting/migrations/0037_alter_harvester_status.py @@ -0,0 +1,29 @@ +# Generated by Django 3.2.4 on 2021-09-24 09:42 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('harvesting', '0036_alter_harvester_harvester_type'), + ] + + operations = [ + migrations.AlterField( + model_name='harvester', + name='status', + field=models.CharField( + choices=[ + ('ready', 'ready'), + ('updating-harvestable-resources', 'updating-harvestable-resources'), + ('aborting-update-harvestable-resources', 'aborting-update-harvestable-resources'), + ('harvesting-resources', 'harvesting-resources'), + ('aborting-harvesting-resources', 'aborting-harvesting-resources'), + ('checking-availability', 'checking-availability') + ], + default='ready', + max_length=50 + ), + ), + ] diff --git a/geonode/harvesting/migrations/0038_auto_20210927_1455.py b/geonode/harvesting/migrations/0038_auto_20210927_1455.py new file mode 100644 index 00000000000..2110903424c --- /dev/null +++ b/geonode/harvesting/migrations/0038_auto_20210927_1455.py @@ -0,0 +1,34 @@ +# Generated by Django 3.2.4 on 2021-09-27 14:55 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('harvesting', '0037_alter_harvester_status'), + ] + + operations = [ + migrations.AlterField( + model_name='harvestingsession', + name='status', + field=models.CharField(choices=[('pending', 'pending'), ('on-going', 'on-going'), ('finished-all-ok', 'finished-all-ok'), ('finished-all-failed', 'finished-all-failed'), ('finished-some-failed', 'finished-some-failed'), ('aborting', 'aborting'), ('aborted', 'aborted')], default='pending', editable=False, max_length=50), + ), + migrations.CreateModel( + name='AsynchronousHarvestingSession', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('session_type', models.CharField(choices=[('harvesting', 'harvesting'), ('discover-harvestable-resources', 'discover-harvestable-resources')], editable=False, max_length=50)), + ('started', models.DateTimeField(auto_now_add=True)), + ('updated', models.DateTimeField(auto_now=True)), + ('ended', models.DateTimeField(blank=True, null=True)), + ('status', models.CharField(choices=[('pending', 'pending'), ('on-going', 'on-going'), ('finished-all-ok', 'finished-all-ok'), ('finished-all-failed', 'finished-all-failed'), ('finished-some-failed', 'finished-some-failed'), ('aborting', 'aborting'), ('aborted', 'aborted')], default='pending', editable=False, max_length=50)), + ('details', models.TextField(blank=True)), + ('total_records_to_process', models.IntegerField(default=0, editable=False, help_text='Number of records being processed in this session')), + ('records_done', models.IntegerField(default=0, help_text='Number of records that have already been processed')), + ('harvester', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='sessions', to='harvesting.harvester')), + ], + ), + ] diff --git a/geonode/harvesting/migrations/0039_delete_harvestingsession.py b/geonode/harvesting/migrations/0039_delete_harvestingsession.py new file mode 100644 index 00000000000..d82efa58d54 --- /dev/null +++ b/geonode/harvesting/migrations/0039_delete_harvestingsession.py @@ -0,0 +1,16 @@ +# Generated by Django 3.2.4 on 2021-09-27 15:01 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('harvesting', '0038_auto_20210927_1455'), + ] + + operations = [ + migrations.DeleteModel( + name='HarvestingSession', + ), + ] diff --git a/geonode/harvesting/models.py b/geonode/harvesting/models.py index af2c71ccc1c..45168622792 100644 --- a/geonode/harvesting/models.py +++ b/geonode/harvesting/models.py @@ -19,9 +19,12 @@ import json import logging -import jsonschema.exceptions +import typing +import jsonschema +import jsonschema.exceptions from django.conf import settings +from django.contrib import admin from django.core.exceptions import ValidationError from django.db import models from django.utils import timezone @@ -32,7 +35,8 @@ PeriodicTask, ) -from . import utils +from geonode import celery_app + from .config import get_setting logger = logging.getLogger(__name__) @@ -41,12 +45,16 @@ class Harvester(models.Model): STATUS_READY = "ready" STATUS_UPDATING_HARVESTABLE_RESOURCES = "updating-harvestable-resources" + STATUS_ABORTING_UPDATE_HARVESTABLE_RESOURCES = "aborting-update-harvestable-resources" STATUS_PERFORMING_HARVESTING = "harvesting-resources" + STATUS_ABORTING_PERFORMING_HARVESTING = "aborting-harvesting-resources" STATUS_CHECKING_AVAILABILITY = "checking-availability" STATUS_CHOICES = [ (STATUS_READY, _("ready")), (STATUS_UPDATING_HARVESTABLE_RESOURCES, _("updating-harvestable-resources")), + (STATUS_ABORTING_UPDATE_HARVESTABLE_RESOURCES, _("aborting-update-harvestable-resources")), (STATUS_PERFORMING_HARVESTING, _("harvesting-resources")), + (STATUS_ABORTING_PERFORMING_HARVESTING, _("aborting-harvesting-resources")), (STATUS_CHECKING_AVAILABILITY, _("checking-availability")), ] @@ -170,6 +178,18 @@ class Meta: def __str__(self): return f"{self.name}({self.id})" + @property + def latest_refresh_session(self): + return self.sessions.filter( + session_type=AsynchronousHarvestingSession.TYPE_DISCOVER_HARVESTABLE_RESOURCES + ).latest("started") + + @property + def latest_harvesting_session(self): + return self.sessions.filter( + session_type=AsynchronousHarvestingSession.TYPE_HARVESTING + ).latest("started") + def clean(self): """Perform model validation by inspecting fields that depend on each other. @@ -179,8 +199,8 @@ def clean(self): """ try: - utils.validate_worker_configuration( - self.harvester_type, self.harvester_type_specific_configuration) + validate_worker_configuration(self.harvester_type, self.harvester_type_specific_configuration) + # self.validate_worker_configuration() except jsonschema.exceptions.ValidationError as exc: raise ValidationError(str(exc)) @@ -224,33 +244,122 @@ def setup_periodic_tasks(self) -> None: ) self.save() + def update_availability( + self, + timeout_seconds: typing.Optional[int] = 5 + ): + """Use the harvesting worker to check if the remote service is available""" + worker = self.get_harvester_worker() + self.last_checked_availability = timezone.now() + available = worker.check_availability(timeout_seconds=timeout_seconds) + self.remote_available = available + self.save() + return available + + def initiate_update_harvestable_resources(self): + should_continue, error_msg = self.worker_can_perform_action() + if should_continue: + self.status = self.STATUS_UPDATING_HARVESTABLE_RESOURCES + self.save() + refresh_session = AsynchronousHarvestingSession.objects.create( + harvester=self, + session_type=AsynchronousHarvestingSession.TYPE_DISCOVER_HARVESTABLE_RESOURCES + ) + refresh_session.initiate() + else: + raise RuntimeError(error_msg) + + def initiate_perform_harvesting( + self, + harvestable_resource_ids: typing.Optional[typing.List[int]] = None + ): + should_continue, error_msg = self.worker_can_perform_action() + if should_continue: + self.status = self.STATUS_PERFORMING_HARVESTING + self.save() + harvesting_session = AsynchronousHarvestingSession.objects.create( + harvester=self, + session_type=AsynchronousHarvestingSession.TYPE_HARVESTING + ) + harvesting_session.initiate(harvestable_resource_ids) + else: + raise RuntimeError(error_msg) + + def initiate_abort_update_harvestable_resources(self): + should_continue, error_msg = self.worker_can_perform_action( + self.STATUS_UPDATING_HARVESTABLE_RESOURCES) + if should_continue: + self.status = self.STATUS_ABORTING_UPDATE_HARVESTABLE_RESOURCES + self.save() + self.latest_refresh_session.abort() + else: + raise RuntimeError(error_msg) + + def initiate_abort_perform_harvesting(self): + should_continue, error_msg = self.worker_can_perform_action( + self.STATUS_PERFORMING_HARVESTING) + if should_continue: + self.status = self.STATUS_ABORTING_PERFORMING_HARVESTING + self.save() + self.latest_harvesting_session.abort() + else: + raise RuntimeError(error_msg) + def get_harvester_worker(self) -> "BaseHarvesterWorker": # noqa worker_class = import_string(self.harvester_type) return worker_class.from_django_record(self) + def worker_can_perform_action( + self, + target_status: typing.Optional[str] = STATUS_READY, + ) -> typing.Tuple[bool, str]: + if self.status != target_status: + error_message = ( + f"Harvester {self!r} cannot currently perform the desired action. Please wait until its status " + f"is reported as {target_status!r} before retrying." + ) + result = False + else: + result = True + error_message = "" + return result, error_message -class HarvestingSession(models.Model): + +class AsynchronousHarvestingSession(models.Model): STATUS_PENDING = "pending" STATUS_ON_GOING = "on-going" STATUS_FINISHED_ALL_OK = "finished-all-ok" STATUS_FINISHED_ALL_FAILED = "finished-all-failed" STATUS_FINISHED_SOME_FAILED = "finished-some-failed" + STATUS_ABORTING = "aborting" + STATUS_ABORTED = "aborted" STATUS_CHOICES = [ (STATUS_PENDING, _("pending")), (STATUS_ON_GOING, _("on-going")), (STATUS_FINISHED_ALL_OK, _("finished-all-ok")), (STATUS_FINISHED_ALL_FAILED, _("finished-all-failed")), (STATUS_FINISHED_SOME_FAILED, _("finished-some-failed")), + (STATUS_ABORTING, _("aborting")), + (STATUS_ABORTED, _("aborted")), + ] + TYPE_HARVESTING = "harvesting" + TYPE_DISCOVER_HARVESTABLE_RESOURCES = "discover-harvestable-resources" + TYPE_CHOICES = [ + (TYPE_HARVESTING, _("harvesting")), + (TYPE_DISCOVER_HARVESTABLE_RESOURCES, _("discover-harvestable-resources")), ] + session_type = models.CharField( + max_length=50, + choices=TYPE_CHOICES, + editable=False + ) started = models.DateTimeField(auto_now_add=True) updated = models.DateTimeField(auto_now=True) ended = models.DateTimeField(null=True, blank=True) - records_to_harvest = models.IntegerField(default=0, editable=False) - records_harvested = models.IntegerField(default=0) harvester = models.ForeignKey( Harvester, on_delete=models.CASCADE, - related_name="harvesting_sessions" + related_name="sessions" ) status = models.CharField( max_length=50, @@ -258,11 +367,77 @@ class HarvestingSession(models.Model): default=STATUS_PENDING, editable=False, ) - session_details = models.TextField( - blank=True, - help_text=_("Details about the harvesting session") + details = models.TextField(blank=True,) + total_records_to_process = models.IntegerField( + default=0, + editable=False, + help_text=_("Number of records being processed in this session") + ) + records_done = models.IntegerField( + default=0, + help_text=_("Number of records that have already been processed") ) + @admin.display(description="Progress (%)") + def get_progress_percentage(self) -> int: + try: + result = (self.records_done / self.total_records_to_process) * 100 + except ZeroDivisionError: + result = 0 + return result + + def initiate(self, harvestable_resource_ids: typing.Optional[typing.List[int]] = None): + """Initiate the asynchronous process that performs the work related to this session.""" + # NOTE: below we are calling celery tasks using the method of creating a + # signature from the main celery app object in order to avoid having + # to import the `tasks` module, which would create circular + # dependency issues - this is a common celery pattern, described here: + # + # https://docs.celeryproject.org/en/stable/faq.html#can-i-call-a-task-by-name + # + # Also note that using `app.send_task()` does not seem to work in this case (which + # is mysterious, since the celery docs say it should work) + if self.session_type == self.TYPE_DISCOVER_HARVESTABLE_RESOURCES: + task_signature = celery_app.app.signature( + "geonode.harvesting.tasks.update_harvestable_resources", + args=(self.pk,) + ) + elif self.session_type == self.TYPE_HARVESTING: + if harvestable_resource_ids is None: + task_signature = celery_app.app.signature( + "geonode.harvesting.tasks.harvesting_dispatcher", + args=(self.pk,) + ) + else: + task_signature = celery_app.app.signature( + "geonode.harvesting.tasks.harvest_resources", + args=(self.pk, harvestable_resource_ids or []) + ) + else: + raise RuntimeError("Invalid selection") + task_signature.apply_async() + self.status = self.STATUS_PENDING + self.save() + + def abort(self): + """Abort a pending or on-going session.""" + + # NOTE: We do not use celery's task revoke feature when aborting a session. This + # is an explicit design choice. The main reason being that we keep track of a session's + # state and want to know when it has finished. This is done by leveraging celery's `chord` + # feature, whereby the async tasks are executed in parallel and there is a final + # synchronization step when they are done that updates the session's state in the DB. + # Maintaining this synchronization step working OK together with revoking tasks would be + # harder to address. + if self.status == self.STATUS_PENDING: + self.status = self.STATUS_ABORTED + self.session_details = "Aborted" + elif self.status == self.STATUS_ON_GOING: + self.status = self.STATUS_ABORTING + else: + logger.debug("Session is not currently in an state that can be aborted, skipping...") + self.save() + class HarvestableResource(models.Model): STATUS_READY = "ready" @@ -326,3 +501,16 @@ def delete(self, using=None, keep_parents=False): if delete_orphan_resource: worker.finalize_harvestable_resource_deletion(self) return super().delete(using, keep_parents) + + +def validate_worker_configuration( + worker_type: "BaseHarvesterWorker", # noqa + worker_config: typing.Dict +): + worker_class = import_string(worker_type) + schema = worker_class.get_extra_config_schema() + if schema is not None: + try: + jsonschema.validate(worker_config, schema) + except jsonschema.exceptions.SchemaError as exc: + raise RuntimeError(f"Invalid schema: {exc}") diff --git a/geonode/harvesting/tasks.py b/geonode/harvesting/tasks.py index 3778c857580..4cdd1f0d231 100644 --- a/geonode/harvesting/tasks.py +++ b/geonode/harvesting/tasks.py @@ -28,13 +28,10 @@ Value, ) from django.db.models.functions import Concat -from django.utils.timezone import now +from django.utils import timezone from geonode.celery_app import app -from . import ( - models, - utils, -) +from . import models from .harvesters import base logger = logging.getLogger(__name__) @@ -48,7 +45,6 @@ ) def harvesting_dispatcher( self, - harvester_id: int, harvesting_session_id: int ): """Perform harvesting asynchronously. @@ -71,17 +67,19 @@ def harvesting_dispatcher( """ - harvester = models.Harvester.objects.get(pk=harvester_id) + session = models.AsynchronousHarvestingSession.objects.get(pk=harvesting_session_id) + harvester = session.harvester harvestable_resources = list(harvester.harvestable_resources.filter( should_be_harvested=True).values_list("id", flat=True)) if len(harvestable_resources) > 0: harvest_resources.apply_async(args=(harvestable_resources, harvesting_session_id)) else: - logger.debug("harvesting_dispatcher - nothing to do...") - finish_harvesting_session( + message = "harvesting_dispatcher - Nothing to do" + logger.debug(message) + finish_asynchronous_session( harvesting_session_id, - models.HarvestingSession.STATUS_FINISHED_ALL_OK, - "nothing to do" + models.AsynchronousHarvestingSession.STATUS_FINISHED_ALL_OK, + final_details=message ) @@ -97,34 +95,16 @@ def harvest_resources( harvesting_session_id: int ): """Harvest a list of remote resources that all belong to the same harvester.""" - if len(harvestable_resource_ids) == 0: - logger.debug("harvest_resources - nothing to do...") - finish_harvesting_session( - harvesting_session_id, - models.HarvestingSession.STATUS_FINISHED_ALL_OK, - "nothing to do" - ) - else: - sample_id = harvestable_resource_ids[0] - try: - sample_harvestable_resource = models.HarvestableResource.objects.get(pk=sample_id) - except models.HarvestableResource.DoesNotExist: - logger.warning(f"harvestable resource {sample_id!r} does not exist.") - finish_harvesting_session( - harvesting_session_id, - models.HarvestingSession.STATUS_FINISHED_ALL_FAILED, - "Could not retrieve first harvestable resource from the GeoNode db" - ) - else: - harvester = sample_harvestable_resource.harvester - available = utils.update_harvester_availability(harvester) - if available: + session = models.AsynchronousHarvestingSession.objects.get(pk=harvesting_session_id) + if session.status != session.STATUS_ABORTED: + if len(harvestable_resource_ids) > 0: + harvester = session.harvester + if harvester.update_availability(): harvester.status = harvester.STATUS_PERFORMING_HARVESTING harvester.save() - models.HarvestingSession.objects.filter(pk=harvesting_session_id).update( - status=models.HarvestingSession.STATUS_ON_GOING, - records_to_harvest=len(harvestable_resource_ids) - ) + session.status = session.STATUS_ON_GOING + session.total_records_to_process = len(harvestable_resource_ids) + session.save() resource_tasks = [] for harvestable_resource_id in harvestable_resource_ids: resource_tasks.append( @@ -133,12 +113,11 @@ def harvest_resources( ) ) harvesting_finalizer = _finish_harvesting.signature( - args=(harvester.id, harvesting_session_id), + args=(harvesting_session_id,), immutable=True ).on_error( _handle_harvesting_error.signature( kwargs={ - "harvester_id": harvester.id, "harvesting_session_id": harvesting_session_id, } ) @@ -146,15 +125,26 @@ def harvest_resources( harvesting_workflow = chord(resource_tasks, body=harvesting_finalizer) harvesting_workflow.apply_async() else: - logger.warning( + message = ( f"Skipping harvesting for harvester {harvester.name!r} because the " f"remote {harvester.remote_url!r} seems to be unavailable" ) - finish_harvesting_session( + logger.warning(message) + finish_asynchronous_session( harvesting_session_id, - models.HarvestingSession.STATUS_FINISHED_ALL_FAILED, - f"Remote url {harvester.remote_url} seems to be unavailable" + session.STATUS_FINISHED_ALL_FAILED, + final_details=message ) + else: + message = "harvest_resources - Nothing to do..." + logger.debug(message) + finish_asynchronous_session( + harvesting_session_id, + models.AsynchronousHarvestingSession.STATUS_FINISHED_ALL_OK, + final_details=message + ) + else: + logger.debug("Session has been aborted, skipping...") @app.task( @@ -169,40 +159,48 @@ def _harvest_resource( harvesting_session_id: int ): """Harvest a single resource from the input harvestable resource id""" - harvestable_resource = models.HarvestableResource.objects.get(pk=harvestable_resource_id) - worker: base.BaseHarvesterWorker = harvestable_resource.harvester.get_harvester_worker() - harvested_resource_info = worker.get_resource(harvestable_resource, harvesting_session_id) - now_ = now() - if harvested_resource_info is not None: - if worker.should_copy_resource(harvestable_resource): - copied_path = worker.copy_resource(harvestable_resource, harvested_resource_info) - if copied_path is not None: - harvested_resource_info.copied_resources.append(copied_path) - try: - worker.update_geonode_resource( - harvested_resource_info, - harvestable_resource, + session = models.AsynchronousHarvestingSession.objects.get(pk=harvesting_session_id) + if session.status != session.STATUS_ABORTING: + harvestable_resource = models.HarvestableResource.objects.get(pk=harvestable_resource_id) + worker: base.BaseHarvesterWorker = harvestable_resource.harvester.get_harvester_worker() + harvested_resource_info = worker.get_resource(harvestable_resource) + now_ = timezone.now() + if harvested_resource_info is not None: + if worker.should_copy_resource(harvestable_resource): + copied_path = worker.copy_resource(harvestable_resource, harvested_resource_info) + if copied_path is not None: + harvested_resource_info.copied_resources.append(copied_path) + try: + worker.update_geonode_resource( + harvested_resource_info, + harvestable_resource, + ) + result = True + details = "" + except (RuntimeError, ValidationError) as exc: + logger.error(msg="Unable to update geonode resource") + result = False + details = str(exc) + harvesting_message = f"{harvestable_resource.title}({harvestable_resource_id}) - {'Success' if result else details}" + update_asynchronous_session( harvesting_session_id, + additional_processed_records=1 if result else 0, + additional_details=harvesting_message ) - result = True - details = "" - except (RuntimeError, ValidationError) as exc: - logger.error(msg="Unable to update geonode resource") - result = False - details = str(exc) - harvesting_message = f"{harvestable_resource.title}({harvestable_resource_id}) - {'Success' if result else details}" - update_harvesting_session( - harvesting_session_id, - additional_harvested_records=1 if result else 0, - additional_details=harvesting_message - ) - harvestable_resource.last_harvesting_message = f"{now_} - {harvesting_message}" - harvestable_resource.last_harvesting_succeeded = result + harvestable_resource.last_harvesting_message = f"{now_} - {harvesting_message}" + harvestable_resource.last_harvesting_succeeded = result + else: + harvestable_resource.last_harvesting_message = f"{now_}Harvesting failed" + harvestable_resource.last_harvesting_succeeded = False + harvestable_resource.last_harvested = now_ + harvestable_resource.save() else: - harvestable_resource.last_harvesting_message = f"{now_}Harvesting failed" - harvestable_resource.last_harvesting_succeeded = False - harvestable_resource.last_harvested = now_ - harvestable_resource.save() + message = ( + f"Skipping harvesting of resource {harvestable_resource_id} since the " + f"session has been aborted" + ) + update_asynchronous_session(harvesting_session_id, additional_details=message) + logger.debug(message) @app.task( @@ -211,16 +209,26 @@ def _harvest_resource( acks_late=False, ignore_result=False, ) -def _finish_harvesting(self, harvester_id: int, harvesting_session_id: int): - harvester = models.Harvester.objects.get(pk=harvester_id) - harvester.status = harvester.STATUS_READY - harvester.save() - finish_harvesting_session( - harvesting_session_id, final_status=models.HarvestingSession.STATUS_FINISHED_ALL_OK) +def _finish_harvesting(self, harvesting_session_id: int): + session = models.AsynchronousHarvestingSession.objects.get(pk=harvesting_session_id) + harvester = session.harvester + if session.status == session.STATUS_ABORTING: + message = "Harvesting session aborted by user" + final_status = session.STATUS_ABORTED + else: + message = "Harvesting completed successfully!" + final_status = session.STATUS_FINISHED_ALL_OK + finish_asynchronous_session( + harvesting_session_id, + final_status=final_status, + final_details=message + ) logger.debug( - f"(harvester: {harvester_id!r} - session: {harvesting_session_id!r}) " - f"Harvesting completed successfully! " + f"(harvester: {harvester.pk!r} - session: {harvesting_session_id!r}) " + f"{message}" ) + harvester.status = harvester.STATUS_READY + harvester.save() @app.task( @@ -257,13 +265,14 @@ def _handle_harvesting_error(self, task_id, *args, **kwargs): logger.debug(f"result: {result.result}") logger.debug(f"traceback: {result.traceback}") logger.debug(f"harvesting task with kwargs: {kwargs} has failed") - harvester = models.Harvester.objects.get(pk=kwargs["harvester_id"]) + session = models.AsynchronousHarvestingSession.objects.get(pk=kwargs["harvesting_session_id"]) + harvester = session.harvester harvester.status = harvester.STATUS_READY harvester.save() details = f"state: {result.state}\nresult: {result.result}\ntraceback: {result.traceback}" - finish_harvesting_session( + finish_asynchronous_session( kwargs["harvesting_session_id"], - final_status=models.HarvestingSession.STATUS_FINISHED_SOME_FAILED, + session.STATUS_FINISHED_SOME_FAILED, final_details=details ) @@ -276,7 +285,7 @@ def _handle_harvesting_error(self, task_id, *args, **kwargs): ) def check_harvester_available(self, harvester_id: int): harvester = models.Harvester.objects.get(pk=harvester_id) - available = utils.update_harvester_availability(harvester) + available = harvester.update_availability() logger.info( f"Harvester {harvester!r}: remote server is " f"{'' if available else 'not '}available" @@ -289,42 +298,57 @@ def check_harvester_available(self, harvester_id: int): acks_late=False, ignore_result=False, ) -def update_harvestable_resources(self, harvester_id: int): +def update_harvestable_resources(self, refresh_session_id: int): # NOTE: we are able to implement batch discovery of existing harvestable resources # because we want to know about all of them. We are not able to batch harvesting # of resources because these have potentially been individually selected by the # user, which means we are not interested in all of them - harvester = models.Harvester.objects.get(pk=harvester_id) - harvester.status = harvester.STATUS_UPDATING_HARVESTABLE_RESOURCES - harvester.save() - worker = harvester.get_harvester_worker() - try: - num_resources = worker.get_num_available_resources() - except (NotImplementedError, base.HarvestingException) as exc: - _handle_harvestable_resources_update_error( - self.request.id, harvester_id=harvester_id, raised_exception=exc) - else: - harvester.num_harvestable_resources = num_resources - harvester.save() - page_size = 10 - total_pages = math.ceil(num_resources / page_size) - batches = [] - for page in range(total_pages): - batches.append( - _update_harvestable_resources_batch.signature( - args=(harvester_id, page, page_size), + session = models.AsynchronousHarvestingSession.objects.get(pk=refresh_session_id) + if session.status != session.STATUS_ABORTED: + session.status = session.STATUS_ON_GOING + session.save() + harvester = session.harvester + if harvester.update_availability(): + harvester.status = harvester.STATUS_UPDATING_HARVESTABLE_RESOURCES + harvester.save() + worker = harvester.get_harvester_worker() + try: + num_resources = worker.get_num_available_resources() + except (NotImplementedError, base.HarvestingException) as exc: + _handle_harvestable_resources_update_error( + self.request.id, refresh_session_id=refresh_session_id, raised_exception=exc) + else: + harvester.num_harvestable_resources = num_resources + harvester.save() + session.total_records_to_process = num_resources + session.save() + page_size = 10 + total_pages = math.ceil(num_resources / page_size) + batches = [] + for page in range(total_pages): + batches.append( + _update_harvestable_resources_batch.signature( + args=(refresh_session_id, page, page_size), + ) + ) + update_finalizer = _finish_harvestable_resources_update.signature( + args=(refresh_session_id,), + immutable=True + ).on_error( + _handle_harvestable_resources_update_error.signature( + kwargs={"refresh_session_id": refresh_session_id} + ) ) + update_workflow = chord(batches, body=update_finalizer) + update_workflow.apply_async() + else: + finish_asynchronous_session( + refresh_session_id, + session.STATUS_FINISHED_ALL_FAILED, + final_details="Harvester is not available" ) - update_finalizer = _finish_harvestable_resources_update.signature( - args=(harvester_id,), - immutable=True - ).on_error( - _handle_harvestable_resources_update_error.signature( - kwargs={"harvester_id": harvester_id} - ) - ) - update_workflow = chord(batches, body=update_finalizer) - update_workflow.apply_async() + else: + logger.debug("Session has been aborted, skipping...") @app.task( @@ -334,31 +358,42 @@ def update_harvestable_resources(self, harvester_id: int): ignore_result=False, ) def _update_harvestable_resources_batch( - self, harvester_id: int, page: int, page_size: int): - harvester = models.Harvester.objects.get(pk=harvester_id) - worker = harvester.get_harvester_worker() - offset = page * page_size - try: - found_resources = worker.list_resources(offset) - except base.HarvestingException: - logger.exception("Could not retrieve list of remote resources.") + self, + refresh_session_id: int, + page: int, + page_size: int +): + session = models.AsynchronousHarvestingSession.objects.get(pk=refresh_session_id) + if session.status == session.STATUS_ON_GOING: + harvester = session.harvester + worker = harvester.get_harvester_worker() + offset = page * page_size + try: + found_resources = worker.list_resources(offset) + except base.HarvestingException: + logger.exception("Could not retrieve list of remote resources.") + else: + processed = 0 + for remote_resource in found_resources: + resource, created = models.HarvestableResource.objects.get_or_create( + harvester=harvester, + unique_identifier=remote_resource.unique_identifier, + title=remote_resource.title, + defaults={ + "should_be_harvested": harvester.harvest_new_resources_by_default, + "remote_resource_type": remote_resource.resource_type, + "last_refreshed": timezone.now() + } + ) + processed += 1 + # NOTE: make sure to save the resource because we need to have its + # `last_updated` property be refreshed - this is done in order to be able + # to compare when a resource has been found + resource.last_refreshed = timezone.now() + resource.save() + update_asynchronous_session(refresh_session_id, additional_processed_records=processed) else: - for remote_resource in found_resources: - resource, created = models.HarvestableResource.objects.get_or_create( - harvester=harvester, - unique_identifier=remote_resource.unique_identifier, - title=remote_resource.title, - defaults={ - "should_be_harvested": harvester.harvest_new_resources_by_default, - "remote_resource_type": remote_resource.resource_type, - "last_refreshed": now() - } - ) - # NOTE: make sure to save the resource because we need to have its - # `last_updated` property be refreshed - this is done in order to be able - # to compare when a resource has been found - resource.last_refreshed = now() - resource.save() + logger.info("The refresh session has been asked to abort, so skipping...") @app.task( @@ -367,15 +402,23 @@ def _update_harvestable_resources_batch( acks_late=False, ignore_result=False, ) -def _finish_harvestable_resources_update(self, harvester_id: int): - harvester = models.Harvester.objects.get(pk=harvester_id) - if harvester.last_checked_harvestable_resources is not None: - _delete_stale_harvestable_resources(harvester) +def _finish_harvestable_resources_update(self, refresh_session_id: int): + session = models.AsynchronousHarvestingSession.objects.get(pk=refresh_session_id) + harvester = session.harvester + if session.status == session.STATUS_ABORTING: + message = "Refresh session aborted by user" + finish_asynchronous_session( + refresh_session_id, session.STATUS_ABORTED, final_details=message) + else: + message = "Harvestable resources successfully refreshed" + finish_asynchronous_session( + refresh_session_id, session.STATUS_FINISHED_ALL_OK, final_details=message) + if harvester.last_checked_harvestable_resources is not None: + _delete_stale_harvestable_resources(harvester) harvester.status = harvester.STATUS_READY - now_ = now() + now_ = timezone.now() harvester.last_checked_harvestable_resources = now_ - harvester.last_check_harvestable_resources_message = ( - f"{now_} - Harvestable resources successfully checked") + harvester.last_check_harvestable_resources_message = f"{now_} - {message}" harvester.save() @@ -386,14 +429,17 @@ def _finish_harvestable_resources_update(self, harvester_id: int): ignore_result=False, ) def _handle_harvestable_resources_update_error(self, task_id, *args, **kwargs): + logger.debug("Inside _handle_harvestable_resources_update_error -----------------------------------------") + result = self.app.AsyncResult(str(task_id)) + logger.debug(f"locals: {locals()}") + logger.debug(f"state: {result.state}") + logger.debug(f"result: {result.result}") + logger.debug(f"traceback: {result.traceback}") result = self.app.AsyncResult(str(task_id)) - print(f"locals: {locals()}") - print(f"state: {result.state}") - print(f"result: {result.result}") - print(f"traceback: {result.traceback}") - harvester = models.Harvester.objects.get(pk=kwargs["harvester_id"]) + session = models.AsynchronousHarvestingSession.objects.get(pk=kwargs["refresh_session_id"]) + harvester = session.harvester harvester.status = harvester.STATUS_READY - now_ = now() + now_ = timezone.now() harvester.last_checked_harvestable_resources = now_ harvester.last_check_harvestable_resources_message = ( f"{now_} - There was an error retrieving information on available " @@ -401,6 +447,12 @@ def _handle_harvestable_resources_update_error(self, task_id, *args, **kwargs): f"Please check the logs" ) harvester.save() + details = f"state: {result.state}\nresult: {result.result}\ntraceback: {result.traceback}" + finish_asynchronous_session( + kwargs["refresh_session_id"], + final_status=models.AsynchronousHarvestingSession.STATUS_FINISHED_SOME_FAILED, + final_details=details + ) def _delete_stale_harvestable_resources(harvester: models.Harvester): @@ -422,7 +474,7 @@ def _delete_stale_harvestable_resources(harvester: models.Harvester): previously_checked_at = harvester.last_checked_harvestable_resources logger.debug(f"last checked at: {previously_checked_at}") - logger.debug(f"now: {now()}") + logger.debug(f"now: {timezone.now()}") to_remove = models.HarvestableResource.objects.filter( harvester=harvester, last_refreshed__lte=previously_checked_at) for harvestable_resource in to_remove: @@ -435,38 +487,34 @@ def _delete_stale_harvestable_resources(harvester: models.Harvester): harvestable_resource.delete() -def update_harvesting_session( - session_id: int, - total_records_found: typing.Optional[int] = None, - additional_harvested_records: typing.Optional[int] = None, - additional_details: typing.Optional[str] = None, -) -> None: - """Update the input harvesting session.""" - update_kwargs = {} - if total_records_found is not None: - update_kwargs["total_records_found"] = total_records_found - if additional_harvested_records is not None: - update_kwargs["records_harvested"] = ( - F("records_harvested") + additional_harvested_records) - if additional_details is not None: - update_kwargs["session_details"] = Concat("session_details", Value(f"\n{additional_details}")) - models.HarvestingSession.objects.filter(id=session_id).update(**update_kwargs) - - -def finish_harvesting_session( +def finish_asynchronous_session( session_id: int, final_status: str, final_details: typing.Optional[str] = None, - additional_harvested_records: typing.Optional[int] = None, + additional_processed_records: typing.Optional[int] = None ) -> None: - """Finish the input harvesting session""" update_kwargs = { - "ended": now(), + "ended": timezone.now(), "status": final_status, } - if additional_harvested_records is not None: - update_kwargs["records_harvested"] = ( - F("records_harvested") + additional_harvested_records) + if additional_processed_records is not None: + update_kwargs["records_done"] = F("records_done") + additional_processed_records if final_details is not None: - update_kwargs["session_details"] = Concat("session_details", Value(f"\n{final_details}")) - models.HarvestingSession.objects.filter(id=session_id).update(**update_kwargs) + update_kwargs["details"] = Concat("details", Value(f"\n{final_details}")) + models.AsynchronousHarvestingSession.objects.filter(id=session_id).update(**update_kwargs) + + +def update_asynchronous_session( + session_id: int, + total_records_to_process: typing.Optional[int] = None, + additional_processed_records: typing.Optional[int] = None, + additional_details: typing.Optional[str] = None, +) -> None: + update_kwargs = {} + if total_records_to_process is not None: + update_kwargs["total_records_to_process"] = total_records_to_process + if additional_processed_records is not None: + update_kwargs["records_done"] = F("records_done") + additional_processed_records + if additional_details is not None: + update_kwargs["details"] = Concat("details", Value(f"\n{additional_details}")) + models.AsynchronousHarvestingSession.objects.filter(id=session_id).update(**update_kwargs) diff --git a/geonode/harvesting/tests/test_admin.py b/geonode/harvesting/tests/test_admin.py index e2633e1ab8b..4934ebc3d68 100644 --- a/geonode/harvesting/tests/test_admin.py +++ b/geonode/harvesting/tests/test_admin.py @@ -1,18 +1,24 @@ from unittest import mock +from django.contrib.admin.sites import AdminSite from django.urls import reverse from django.contrib.auth import get_user_model +from django.test import RequestFactory from rest_framework import status from geonode.tests.base import GeoNodeBaseTestSupport -from .. import models +from .. import ( + admin, + models +) class HarvesterAdminTestCase(GeoNodeBaseTestSupport): harvester_type = 'geonode.harvesting.harvesters.geonodeharvester.GeonodeLegacyHarvester' def setUp(self): + self.factory = RequestFactory() self.user = get_user_model().objects.get(username='admin') self.client.login(username="admin", password="admin") @@ -23,10 +29,14 @@ def setUp(self): harvester_type=self.harvester_type ) - @mock.patch( - "geonode.harvesting.harvesters.geonodeharvester.GeonodeLegacyHarvester.check_availability") - def test_add_harvester(self, mock_check_availability): - mock_check_availability.return_value = True + def test_get_form_returns_current_user(self): + request = self.factory.post(reverse("admin:harvesting_harvester_add")) + request.user = self.user + model_admin = admin.HarvesterAdmin(model=models.Harvester, admin_site=AdminSite()) + form = model_admin.get_form(request) + self.assertEqual(form.base_fields["default_owner"].initial.username, self.user.username) + + def test_add_harvester(self): data = { 'remote_url': "http://fake.com", 'name': 'harvester', @@ -45,29 +55,99 @@ def test_add_harvester(self, mock_check_availability): self.assertEqual(harvester.name, data['name']) self.assertEqual(harvester.remote_url, data['remote_url']) self.assertEqual(harvester.status, models.Harvester.STATUS_READY) - self.assertEqual(harvester.remote_available, True) - - @mock.patch( - "geonode.harvesting.harvesters.geonodeharvester.GeonodeLegacyHarvester.check_availability") - def test_update_harvester_availability(self, mock_check_availability): - mock_check_availability.return_value = True - data = {'action': 'update_harvester_availability', - '_selected_action': [self.harvester.pk]} - response = self.client.post(reverse('admin:harvesting_harvester_changelist'), data) - self.assertEqual(response.status_code, status.HTTP_302_FOUND) # response from admin - self.harvester.refresh_from_db() - self.assertEqual(self.harvester.remote_available, True) - - @mock.patch( - "geonode.harvesting.harvesters.geonodeharvester.GeonodeLegacyHarvester.check_availability") - def test_perform_harvesting(self, mock_check_availability): - mock_check_availability.return_value = True - data = {'action': 'perform_harvesting', - '_selected_action': [self.harvester.pk]} - self.harvester.status = models.Harvester.STATUS_READY - self.harvester.save() - - response = self.client.post(reverse('admin:harvesting_harvester_changelist'), data) - self.assertEqual(response.status_code, status.HTTP_302_FOUND) # response from admin - self.harvester.refresh_from_db() - self.assertEqual(self.harvester.status, models.Harvester.STATUS_PERFORMING_HARVESTING) + + def test_update_harvester_availability(self): + mock_harvester_model = mock.MagicMock(spec=models.Harvester) + mock_harvester = mock_harvester_model.return_value + model_admin = admin.HarvesterAdmin(model=mock_harvester, admin_site=AdminSite()) + with mock.patch.object(model_admin, "message_user"): + model_admin.update_harvester_availability(None, [mock_harvester]) + mock_harvester.update_availability.assert_called() + + def test_initiate_update_harvestable_resources_initiates_when_harvester_is_available(self): + mock_harvester_model = mock.MagicMock(spec=models.Harvester) + mock_harvester = mock_harvester_model.return_value + mock_harvester.update_availability.return_value = True + model_admin = admin.HarvesterAdmin(model=mock_harvester, admin_site=AdminSite()) + with mock.patch.object(model_admin, "message_user"): + model_admin.initiate_update_harvestable_resources(None, [mock_harvester]) + mock_harvester.update_availability.assert_called() + mock_harvester.initiate_update_harvestable_resources.assert_called() + + def test_initiate_update_harvestable_resources_skips_when_harvester_not_available(self): + mock_harvester_model = mock.MagicMock(spec=models.Harvester) + mock_harvester = mock_harvester_model.return_value + mock_harvester.update_availability.return_value = False + model_admin = admin.HarvesterAdmin(model=mock_harvester, admin_site=AdminSite()) + with mock.patch.object(model_admin, "message_user"): + model_admin.initiate_update_harvestable_resources(None, [mock_harvester]) + mock_harvester.update_availability.assert_called() + mock_harvester.initiate_update_harvestable_resources.assert_not_called() + + def test_initiate_perform_harvesting_initiates_when_harvester_is_available(self): + mock_harvester_model = mock.MagicMock(spec=models.Harvester) + mock_harvester = mock_harvester_model.return_value + mock_harvester.update_availability.return_value = True + model_admin = admin.HarvesterAdmin(model=mock_harvester, admin_site=AdminSite()) + with mock.patch.object(model_admin, "message_user"): + model_admin.initiate_perform_harvesting(None, [mock_harvester]) + mock_harvester.update_availability.assert_called() + mock_harvester.initiate_perform_harvesting.assert_called() + + def test_initiate_perform_harvesting_skips_when_harvester_not_available(self): + mock_harvester_model = mock.MagicMock(spec=models.Harvester) + mock_harvester = mock_harvester_model.return_value + mock_harvester.update_availability.return_value = False + model_admin = admin.HarvesterAdmin(model=mock_harvester, admin_site=AdminSite()) + with mock.patch.object(model_admin, "message_user"): + model_admin.initiate_perform_harvesting(None, [mock_harvester]) + mock_harvester.update_availability.assert_called() + mock_harvester.initiate_perform_harvesting.assert_not_called() + + +class AsynchronousHarvestingSessionAdminTestCase(GeoNodeBaseTestSupport): + + def test_django_admin_does_not_allow_creating_new_session(self): + data = { + 'session_type': "harvesting", + 'harvester': 1000, + 'status': 'pending' + } + response = self.client.post(reverse('admin:harvesting_harvester_add'), data, follow=True) + print(response.redirect_chain) + self.assertRedirects(response, "/en/admin/login/?next=/en/admin/harvesting/harvester/add/") + + +class HarvestableResourceAdminTestCase(GeoNodeBaseTestSupport): + + def test_initiate_harvest_selected_resources_initiates_when_harvester_is_available(self): + mock_harvester_model = mock.MagicMock(spec=models.Harvester) + mock_harvester = mock_harvester_model.return_value + mock_harvester.update_availability.return_value = True + fake_resource_id = 1000 + mock_resource = mock.MagicMock( + spec=models.HarvestableResource, + harvester=mock_harvester, + id=fake_resource_id + ) + model_admin = admin.HarvestableResourceAdmin(model=mock_resource, admin_site=AdminSite()) + with mock.patch.object(model_admin, "message_user"): + model_admin.initiate_harvest_selected_resources(None, [mock_resource]) + mock_harvester.update_availability.assert_called() + mock_harvester.initiate_perform_harvesting.assert_called_with([fake_resource_id]) + + def test_initiate_harvest_selected_resources_skips_when_harvester_not_available(self): + mock_harvester_model = mock.MagicMock(spec=models.Harvester) + mock_harvester = mock_harvester_model.return_value + mock_harvester.update_availability.return_value = False + fake_resource_id = 1000 + mock_resource = mock.MagicMock( + spec=models.HarvestableResource, + harvester=mock_harvester, + id=fake_resource_id + ) + model_admin = admin.HarvestableResourceAdmin(model=mock_resource, admin_site=AdminSite()) + with mock.patch.object(model_admin, "message_user"): + model_admin.initiate_harvest_selected_resources(None, [mock_resource]) + mock_harvester.update_availability.assert_called() + mock_harvester.initiate_perform_harvesting.assert_not_called() diff --git a/geonode/harvesting/tests/test_api_serializers.py b/geonode/harvesting/tests/test_api_serializers.py index 98edbf3b69b..7bfb5a405b7 100644 --- a/geonode/harvesting/tests/test_api_serializers.py +++ b/geonode/harvesting/tests/test_api_serializers.py @@ -70,8 +70,8 @@ def test_serializer_is_able_to_serialize_model_instance(self): self.assertEqual(urlparse(serialized["links"]["self"]).path, f"{api_endpoint}{self.harvester.pk}/") self.assertIsNotNone(serialized["links"]["harvestable_resources"]) - @mock.patch("geonode.harvesting.api.serializers.utils") - def test_validate_also_validates_worker_specific_config(self, mock_utils): + @mock.patch("geonode.harvesting.models.validate_worker_configuration") + def test_validate_also_validates_worker_specific_config(self, mock_validate_config): input_data = { "name": "phony", "remote_url": "http://fake.com", @@ -85,10 +85,9 @@ def test_validate_also_validates_worker_specific_config(self, mock_utils): serializer = serializers.HarvesterSerializer(data=input_data, context={"request": request}) serializer.is_valid(raise_exception=True) - mock_utils.validate_worker_configuration.assert_called() + mock_validate_config.assert_called() - @mock.patch("geonode.harvesting.api.serializers.utils") - def test_validate_does_not_allow_changing_status_and_worker_specific_config(self, mock_utils): + def test_validate_does_not_allow_changing_status_and_worker_specific_config(self): input_data = { "name": "phony", "remote_url": "http://fake.com", @@ -120,9 +119,7 @@ def test_create_does_not_allow_setting_custom_status(self): with self.assertRaises(ValidationError): serializer.save() - @mock.patch("geonode.harvesting.api.serializers.tasks") - @mock.patch("geonode.harvesting.api.serializers.utils") - def test_create_checks_availability_of_remote_and_updates_harvestable_resources(self, mock_utils, mock_tasks): + def test_create(self): input_data = { "name": "phony", "remote_url": "http://fake.com", @@ -134,9 +131,8 @@ def test_create_checks_availability_of_remote_and_updates_harvestable_resources( serializer = serializers.HarvesterSerializer(data=input_data, context={"request": request}) serializer.is_valid(raise_exception=True) - serializer.save() - mock_utils.update_harvester_availability.assert_called() - mock_tasks.update_harvestable_resources.apply_async.assert_called() + harvester = serializer.save() + self.assertEqual(harvester.name, input_data["name"]) def test_update_errors_out_if_current_status_is_not_ready(self): request = _REQUEST_FACTORY.patch(f"/api/v2/harvesters/{self.harvester.pk}") @@ -178,7 +174,7 @@ def test_update_calls_update_harvestable_resources_task(self, mock_tasks): ) serializer.is_valid(raise_exception=True) serializer.save() - mock_tasks.update_harvestable_resources.signature.assert_called_with(args=(self.harvester.pk,)) + mock_tasks.update_harvestable_resources.signature.assert_called() mock_tasks.update_harvestable_resources.signature.return_value.apply_async.assert_called() @mock.patch("geonode.harvesting.api.serializers.tasks") @@ -193,8 +189,10 @@ def test_update_calls_harvesting_dispatcher_task(self, mock_tasks): ) serializer.is_valid(raise_exception=True) serializer.save() - called_harvester_pk, _ = mock_tasks.harvesting_dispatcher.signature.call_args_list[0].kwargs["args"] - self.assertEqual(called_harvester_pk, self.harvester.pk) + called_args = mock_tasks.harvesting_dispatcher.signature.call_args_list[0].kwargs["args"] + called_session_pk = called_args[0] + session = models.AsynchronousHarvestingSession.objects.get(pk=called_session_pk) + self.assertEqual(session.harvester.pk, self.harvester.pk) mock_tasks.harvesting_dispatcher.signature.return_value.apply_async.assert_called() @mock.patch("geonode.harvesting.api.serializers.tasks") @@ -231,7 +229,8 @@ def test_update_updates_harvestable_resources_whenever_worker_config_changes(sel mock_tasks.update_harvestable_resources.signature.return_value.apply_async.assert_called() -class BriefHarvestingSessionSerializerTestCase(GeoNodeBaseTestSupport): +class BriefAsynchronousHarvestingSessionSerializerTestCase(GeoNodeBaseTestSupport): + harvester: models.Harvester @classmethod def setUpTestData(cls): @@ -245,14 +244,15 @@ def setUpTestData(cls): default_owner=user, harvester_type=harvester_type ) - cls.harvesting_session = models.HarvestingSession.objects.create( - harvester=cls.harvester + cls.harvesting_session = models.AsynchronousHarvestingSession.objects.create( + harvester=cls.harvester, + session_type=models.AsynchronousHarvestingSession.TYPE_HARVESTING ) def test_serializer_is_able_to_serialize_model_instance(self): api_endpoint = "/api/v2/harvesting-sessions/" request = _REQUEST_FACTORY.get(api_endpoint) - serializer = serializers.BriefHarvestingSessionSerializer( + serializer = serializers.BriefAsynchronousHarvestingSessionSerializer( self.harvesting_session, context={"request": request}) serialized = serializer.data self.assertIsNotNone(serialized["started"]) diff --git a/geonode/harvesting/tests/test_api_views.py b/geonode/harvesting/tests/test_api_views.py index e400d516d60..5ebec358349 100644 --- a/geonode/harvesting/tests/test_api_views.py +++ b/geonode/harvesting/tests/test_api_views.py @@ -46,13 +46,15 @@ def setUpTestData(cls): harvester2.id: harvester_resource2, } - session1 = models.HarvestingSession.objects.create( + session1 = models.AsynchronousHarvestingSession.objects.create( harvester=harvester1, - records_harvested=10 + session_type=models.AsynchronousHarvestingSession.TYPE_HARVESTING, + records_done=10 ) - session2 = models.HarvestingSession.objects.create( + session2 = models.AsynchronousHarvestingSession.objects.create( harvester=harvester2, - records_harvested=5 + session_type=models.AsynchronousHarvestingSession.TYPE_HARVESTING, + records_done=5 ) cls.sessions = [session1, session2] @@ -99,5 +101,5 @@ def test_get_harvester_sessions(self): self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data["total"], len(self.sessions)) for index, harvester in enumerate(self.sessions): - self.assertEqual(response.data["harvesting_sessions"][index]["id"], self.sessions[index].pk) - self.assertEqual(response.data["harvesting_sessions"][index]["records_harvested"], self.sessions[index].records_harvested) + self.assertEqual(response.data["asynchronous_harvesting_sessions"][index]["id"], self.sessions[index].pk) + self.assertEqual(response.data["asynchronous_harvesting_sessions"][index]["records_done"], self.sessions[index].records_done) diff --git a/geonode/harvesting/tests/test_harvester_worker_geonode_legacy.py b/geonode/harvesting/tests/test_harvester_worker_geonode_legacy.py index f6c0d5526b9..ee900898745 100644 --- a/geonode/harvesting/tests/test_harvester_worker_geonode_legacy.py +++ b/geonode/harvesting/tests/test_harvester_worker_geonode_legacy.py @@ -18,9 +18,13 @@ ######################################################################### import mock.mock +from django.utils import timezone + from geonode.harvesting.harvesters import geonodeharvester from geonode.tests.base import GeoNodeBaseSimpleTestSupport +from .. import models + class GeoNodeHarvesterWorkerTestCase(GeoNodeBaseSimpleTestSupport): @@ -35,6 +39,29 @@ def test_base_api_url(self): worker = geonodeharvester.GeonodeLegacyHarvester(base_url, harvester_id) self.assertEqual(worker.base_api_url, expected) + def test_that_copying_remote_resources_is_allowed(self): + worker = geonodeharvester.GeonodeLegacyHarvester("http://fake-url2/", "fake-id") + self.assertTrue(worker.allows_copying_resources) + + def test_creation_from_harvester(self): + now = timezone.now() + keywords = ["keyword1", "keyword2"] + categories = ["category1", "category2"] + combinations = [ + { + "harvest_documents": True, "harvest_datasets": True, "copy_datasets": True, "copy_documents": True, + "resource_title_filter": "something", "start_date_filter": now, "end_date_filter": now, + "keywords_filter": keywords, "categories_filter": categories + }, + ] + for param_combination in combinations: + harvester = models.Harvester( + name="fake1", + harvester_type="geonode.harvesting.harvesters.geonodeharvester.GeonodeLegacyHarvester", + harvester_type_specific_configuration=param_combination + ) + harvester.get_harvester_worker() + @mock.patch("geonode.harvesting.harvesters.geonodeharvester.requests.Session") def test_check_availability_works_when_response_includes_layers_object(self, mock_requests_session): mock_response = mock.MagicMock() diff --git a/geonode/harvesting/tests/test_models.py b/geonode/harvesting/tests/test_models.py index ab435fc034e..354b282f4cf 100644 --- a/geonode/harvesting/tests/test_models.py +++ b/geonode/harvesting/tests/test_models.py @@ -18,8 +18,10 @@ ######################################################################### import datetime +from unittest import mock from django.contrib.auth import get_user_model +from django.test import SimpleTestCase from geonode.tests.base import GeoNodeBaseTestSupport from .. import models @@ -52,8 +54,24 @@ def test_setup_periodic_tasks(self): self.assertEqual(self.harvester.availability_check_task.name, f"Check availability of {self.name}") self.assertEqual(self.harvester.availability_check_task.interval.every, self.harvester.check_availability_frequency) + @mock.patch("geonode.harvesting.models.jsonschema") + @mock.patch("geonode.harvesting.models.import_string") + def test_validate_worker_configuration(self, mock_import_string, mock_jsonschema): + extra_config_schema = "fake_config_schema" + mock_worker_class = mock.MagicMock() + mock_worker_class.get_extra_config_schema.return_value = extra_config_schema + mock_import_string.return_value = mock_worker_class -class HarvesterSessionTestCase(GeoNodeBaseTestSupport): + harvester_type = "fake_harvester_type" + configuration = {"fake_key": "fake_configuration"} + models.validate_worker_configuration(harvester_type, configuration) + + mock_import_string.assert_called_with(harvester_type) + mock_worker_class.get_extra_config_schema.assert_called() + mock_jsonschema.validate.assert_called_with(configuration, extra_config_schema) + + +class AsynchronousHarvestingSessionTestCase(GeoNodeBaseTestSupport): remote_url = 'test.com' name = 'This is geonode harvester' user = get_user_model().objects.get(username='AnonymousUser') @@ -67,8 +85,9 @@ def setUp(self): default_owner=self.user, harvester_type=self.harvester_type ) - self.harvesting_session = models.HarvestingSession.objects.create( - harvester=self.harvester + self.harvesting_session = models.AsynchronousHarvestingSession.objects.create( + harvester=self.harvester, + session_type=models.AsynchronousHarvestingSession.TYPE_HARVESTING ) def test_check_attributes(self): @@ -77,7 +96,7 @@ def test_check_attributes(self): """ self.assertIsNotNone(self.harvesting_session.pk) self.assertEqual(self.harvesting_session.harvester, self.harvester) - self.assertEqual(self.harvesting_session.records_harvested, 0) + self.assertEqual(self.harvesting_session.records_done, 0) class HarvestableResourceTestCase(GeoNodeBaseTestSupport): @@ -110,3 +129,22 @@ def test_check_attributes(self): self.assertEqual(self.harvestable_resource.unique_identifier, self.unique_identifier) self.assertFalse(self.harvestable_resource.should_be_harvested) self.assertEqual(self.harvestable_resource.status, models.HarvestableResource.STATUS_READY) + + +class WorkerConfigValidationTestCase(SimpleTestCase): + + @mock.patch("geonode.harvesting.models.jsonschema") + @mock.patch("geonode.harvesting.models.import_string") + def test_validate_worker_configuration(self, mock_import_string, mock_jsonschema): + extra_config_schema = "fake_config_schema" + mock_worker_class = mock.MagicMock() + mock_worker_class.get_extra_config_schema.return_value = extra_config_schema + mock_import_string.return_value = mock_worker_class + + harvester_type = "fake_harvester_type" + configuration = {"somekey": "fake_configuration"} + models.validate_worker_configuration(harvester_type, configuration) + + mock_import_string.assert_called_with(harvester_type) + mock_worker_class.get_extra_config_schema.assert_called() + mock_jsonschema.validate.assert_called_with(configuration, extra_config_schema) diff --git a/geonode/harvesting/tests/test_tasks.py b/geonode/harvesting/tests/test_tasks.py index 6e0f8003e4c..456312872af 100644 --- a/geonode/harvesting/tests/test_tasks.py +++ b/geonode/harvesting/tests/test_tasks.py @@ -45,7 +45,10 @@ def setUpTestData(cls): default_owner=cls.harvester_owner, harvester_type=cls.harvester_type, ) - cls.harvesting_session = models.HarvestingSession.objects.create(harvester=cls.harvester) + cls.harvesting_session = models.AsynchronousHarvestingSession.objects.create( + harvester=cls.harvester, + session_type=models.AsynchronousHarvestingSession.TYPE_HARVESTING + ) for index in range(3): models.HarvestableResource.objects.create( unique_identifier=f"fake-identifier-{index}", @@ -55,8 +58,8 @@ def setUpTestData(cls): last_refreshed=now() ) - @mock.patch("geonode.harvesting.tasks.update_harvesting_session") - def test_harvest_resource_updates_geonode_when_remote_resource_exists(self, mock_update_harvesting_session): + @mock.patch("geonode.harvesting.tasks.update_asynchronous_session") + def test_harvest_resource_updates_geonode_when_remote_resource_exists(self, mock_update_asynchronous_session): """Test that `worker.get_resource()` is called by the `_harvest_resource()` task and that the related workflow is called too. Verify that `worker.get_resource()` is always called. Then verify that if the result of `worker.get_resource()` is @@ -78,7 +81,7 @@ def test_harvest_resource_updates_geonode_when_remote_resource_exists(self, mock mock_models.HarvestableResource.objects.get.assert_called_with(pk=harvestable_resource_id) mock_worker.get_resource.assert_called() mock_worker.update_geonode_resource.assert_called() - mock_update_harvesting_session.assert_called() + mock_update_asynchronous_session.assert_called() def test_harvest_resource_does_not_update_geonode_when_remote_resource_does_not_exist(self): """Test that the worker does not try to update existing GeoNode resources when the remote resource cannot be harvested.""" @@ -99,7 +102,7 @@ def test_harvest_resource_does_not_update_geonode_when_remote_resource_does_not_ mock_worker.update_geonode_resource.assert_not_called() def test_finish_harvesting_updates_harvester_status(self): - tasks._finish_harvesting(self.harvester.id, self.harvesting_session.id) + tasks._finish_harvesting(self.harvesting_session.id) self.harvester.refresh_from_db() self.harvesting_session.refresh_from_db() self.assertEqual(self.harvester.status, models.Harvester.STATUS_READY) @@ -112,10 +115,12 @@ def test_handle_harvesting_error_cleans_up_harvest_execution(self): self.assertEqual(self.harvester.status, models.Harvester.STATUS_READY) self.assertIsNotNone(self.harvesting_session.ended) - @mock.patch("geonode.harvesting.tasks.utils") - def test_check_harvester_available(self, mock_harvesting_utils): - tasks.check_harvester_available(self.harvester.id) - mock_harvesting_utils.update_harvester_availability.assert_called_with(self.harvester) + @mock.patch("geonode.harvesting.tasks.models.Harvester") + def test_check_harvester_available(self, mock_harvester_model): + mock_harvester = mock.MagicMock(spec=models.Harvester).return_value + mock_harvester_model.objects.get.return_value = mock_harvester + tasks.check_harvester_available(1000) + mock_harvester.update_availability.assert_called() @mock.patch("geonode.harvesting.tasks._handle_harvestable_resources_update_error") @mock.patch("geonode.harvesting.tasks._finish_harvestable_resources_update") diff --git a/geonode/harvesting/tests/test_utils.py b/geonode/harvesting/tests/test_utils.py deleted file mode 100644 index 51d578adcc1..00000000000 --- a/geonode/harvesting/tests/test_utils.py +++ /dev/null @@ -1,24 +0,0 @@ -from unittest import mock - -from django.test import SimpleTestCase - -from .. import utils - - -class UtilsTestCase(SimpleTestCase): - - @mock.patch("geonode.harvesting.utils.jsonschema") - @mock.patch("geonode.harvesting.utils.import_string") - def test_validate_worker_configuration(self, mock_import_string, mock_jsonschema): - extra_config_schema = "fake_config_schema" - mock_worker_class = mock.MagicMock() - mock_worker_class.get_extra_config_schema.return_value = extra_config_schema - mock_import_string.return_value = mock_worker_class - - harvester_type = "fake_harvester_type" - configuration = "fake_configuration" - utils.validate_worker_configuration(harvester_type, configuration) - - mock_import_string.assert_called_with(harvester_type) - mock_worker_class.get_extra_config_schema.assert_called() - mock_jsonschema.validate.assert_called_with(configuration, extra_config_schema) diff --git a/geonode/harvesting/utils.py b/geonode/harvesting/utils.py index da1890e2947..f9730c1c792 100644 --- a/geonode/harvesting/utils.py +++ b/geonode/harvesting/utils.py @@ -19,9 +19,6 @@ import typing -from django.utils.timezone import now -from django.utils.module_loading import import_string -import jsonschema from lxml import etree @@ -29,31 +26,6 @@ XML_PARSER: typing.Final = etree.XMLParser(resolve_entities=False) -def update_harvester_availability( - harvester: "Harvester", # noqa - timeout_seconds: typing.Optional[int] = 5 -) -> bool: - harvester.status = harvester.STATUS_CHECKING_AVAILABILITY - harvester.save() - worker = harvester.get_harvester_worker() - harvester.last_checked_availability = now() - available = worker.check_availability(timeout_seconds=timeout_seconds) - harvester.remote_available = available - harvester.status = harvester.STATUS_READY - harvester.save() - return available - - -def validate_worker_configuration(harvester_type, configuration: typing.Dict): - worker_class = import_string(harvester_type) - schema = worker_class.get_extra_config_schema() - if schema is not None: - try: - jsonschema.validate(configuration, schema) - except jsonschema.exceptions.SchemaError as exc: - raise RuntimeError(f"Invalid schema: {exc}") - - def get_xpath_value( element: etree.Element, xpath_expression: str,