diff --git a/geonode/harvesting/admin.py b/geonode/harvesting/admin.py
index 73fc1dd50f0..fd861ec5cdb 100644
--- a/geonode/harvesting/admin.py
+++ b/geonode/harvesting/admin.py
@@ -17,27 +17,23 @@
#
#########################################################################
-import functools
import json
import logging
-import typing
from django.contrib import (
admin,
messages,
)
-from django.db import transaction
from django.urls import reverse
from django.utils.html import (
format_html,
mark_safe,
)
+from django.utils.translation import gettext_lazy as _
from . import (
forms,
models,
- tasks,
- utils,
)
logger = logging.getLogger(__name__)
@@ -55,9 +51,10 @@ class HarvesterAdmin(admin.ModelAdmin):
"remote_available",
"get_num_harvestable_resources",
"get_num_harvestable_resources_selected",
- "get_worker_specific_configuration",
"show_link_to_selected_harvestable_resources",
"show_link_to_latest_harvesting_session",
+ "show_link_to_latest_refresh_session",
+ "get_worker_specific_configuration",
)
list_filter = (
"status",
@@ -72,6 +69,7 @@ class HarvesterAdmin(admin.ModelAdmin):
"num_harvestable_resources",
"show_link_to_selected_harvestable_resources",
"show_link_to_latest_harvesting_session",
+ "show_link_to_latest_refresh_session",
)
list_editable = (
@@ -80,50 +78,24 @@ class HarvesterAdmin(admin.ModelAdmin):
actions = [
"update_harvester_availability",
- "update_harvestable_resources",
- "perform_harvesting"
+ "initiate_update_harvestable_resources",
+ "initiate_abort_update_harvestable_resources",
+ "initiate_perform_harvesting",
+ "initiate_abort_perform_harvesting",
]
- def save_model(self, request, obj: models.Harvester, form, change):
- # TODO: disallow changing the model if it is not ready
- with transaction.atomic():
- super().save_model(request, obj, form, change)
- available = utils.update_harvester_availability(obj)
- if available:
- partial_task = functools.partial(
- tasks.update_harvestable_resources.apply_async, args=(obj.pk,))
- # NOTE: below we are using transaction.on_commit in order to ensure
- # the harvester is already saved in the DB before we schedule the
- # celery task. This is needed in order to avoid the celery worker
- # picking up the task before it is saved in the DB. More info:
- #
- # https://docs.djangoproject.com/en/2.2/topics/db/transactions/#performing-actions-after-commit
- #
- if not change:
- transaction.on_commit(partial_task)
- message = (
- f"Updating harvestable resources asynchronously for {obj!r}...")
- self.message_user(request, message)
- logger.debug(message)
- elif _worker_config_changed(form):
- self.message_user(
- request,
- (
- "Harvester worker specific configuration has been changed. "
- "Updating list of this harvester's harvestable "
- "resources asynchronously. When this is done the harvester "
- "status will be set to `ready`. Refresh this page in order to monitor it."
- ),
- level=messages.WARNING
- )
- # models.HarvestableResource.objects.filter(harvester=obj).delete()
- transaction.on_commit(partial_task)
- else:
- self.message_user(
- request,
- f"Harvester {obj} is{'' if available else ' not'} available",
- messages.INFO if available else messages.WARNING
- )
+ def save_model(self, request, harvester: models.Harvester, form, change):
+ super().save_model(request, harvester, form, change)
+ if _worker_config_changed(form):
+ self.message_user(
+ request,
+ (
+ "Harvester worker specific configuration has been changed. "
+ "You should update the list of this harvester's harvestable "
+ "resources now in order to ensure consistency."
+ ),
+ level=messages.WARNING
+ )
def get_form(self, request, obj=None, change=False, **kwargs):
form = super().get_form(request, obj, change, **kwargs)
@@ -135,7 +107,7 @@ def update_harvester_availability(self, request, queryset):
updated_harvesters = []
non_available_harvesters = []
for harvester in queryset:
- available = utils.update_harvester_availability(harvester)
+ available = harvester.update_availability()
updated_harvesters.append(harvester)
if not available:
non_available_harvesters.append(harvester)
@@ -151,46 +123,90 @@ def update_harvester_availability(self, request, queryset):
messages.WARNING
)
- @admin.action(description="Update harvestable resources from selected harvesters")
- def update_harvestable_resources(self, request, queryset):
+ @admin.action(description="Update harvestable resources for selected harvesters")
+ def initiate_update_harvestable_resources(self, request, queryset):
being_updated = []
for harvester in queryset:
- should_continue, error_msg = _should_act(harvester)
- if should_continue:
- harvester.status = harvester.STATUS_UPDATING_HARVESTABLE_RESOURCES
- harvester.save()
- tasks.update_harvestable_resources.apply_async(args=(harvester.pk,))
- being_updated.append(harvester)
- else:
- self.message_user(request, error_msg, level=messages.ERROR)
- continue
+ try:
+ if harvester.update_availability():
+ harvester.initiate_update_harvestable_resources()
+ being_updated.append(harvester)
+ else:
+ raise RuntimeError(f"Harvester {harvester!r} is not available")
+ except RuntimeError as exc:
+ self.message_user(request, str(exc), level=messages.ERROR)
if len(being_updated) > 0:
- self.message_user(
- request,
- (
- f"Updating harvestable resources asynchronously for {being_updated}. "
- f"This operation can take a while to complete. Check the harvesters' "
- f"status for when it becomes `ready`"
- )
+ message = (
+ f"Updating harvestable resources asynchronously for {being_updated}. "
+ f"This operation can take a while to complete. Check the harvesters' "
+ f"status for when it becomes `ready` or inspect its latest refresh "
+ f"session and monitor the reported progress"
+ )
+ else:
+ message = _("No ready harvesters have been selected, skipping...")
+ self.message_user(request, message)
+
+ @admin.action(description="Abort on-going update of harvestable resources for selected harvesters")
+ def initiate_abort_update_harvestable_resources(self, request, queryset):
+ being_aborted = []
+ for harvester in queryset:
+ try:
+ if harvester.update_availability():
+ harvester.initiate_abort_update_harvestable_resources()
+ being_aborted.append(harvester)
+ else:
+ raise RuntimeError(f"Harvester {harvester!r} is not available")
+ except RuntimeError as exc:
+ self.message_user(request, str(exc), level=messages.ERROR)
+ if len(being_aborted) > 0:
+ message = (
+ f"Aborting update of harvestable resources for {being_aborted}. "
+ f"This operation can take a while to complete. Check the harvesters' "
+ f"status for when it becomes `ready`"
)
+ else:
+ message = _("No active refresh sessions have been found for the selected harvesters. Skipping...")
+ self.message_user(request, message)
@admin.action(description="Perform harvesting on selected harvesters")
- def perform_harvesting(self, request, queryset):
+ def initiate_perform_harvesting(self, request, queryset):
being_harvested = []
for harvester in queryset:
- should_continue, error_msg = _should_act(harvester)
- if should_continue:
- harvester.status = harvester.STATUS_PERFORMING_HARVESTING
- harvester.save()
- harvesting_session = models.HarvestingSession.objects.create(harvester=harvester)
- tasks.harvesting_dispatcher.apply_async(args=(harvester.pk, harvesting_session.pk))
- being_harvested.append(harvester)
- else:
- self.message_user(request, error_msg, level=messages.ERROR)
- continue
+ try:
+ if harvester.update_availability():
+ harvester.initiate_perform_harvesting()
+ being_harvested.append(harvester)
+ else:
+ raise RuntimeError(f"Harvester {harvester!r} is not available")
+ except RuntimeError as exc:
+ self.message_user(request, str(exc), level=messages.ERROR)
if len(being_harvested) > 0:
- self.message_user(
- request, f"Performing harvesting asynchronously for {being_harvested}")
+ message = f"Performing harvesting asynchronously for {being_harvested}..."
+ else:
+ message = _("No ready harvesters have been selected, skipping...")
+ self.message_user(request, message)
+
+ @admin.action(description="Abort on-going harvesting sessions for selected harvesters")
+ def initiate_abort_perform_harvesting(self, request, queryset):
+ being_aborted = []
+ for harvester in queryset:
+ try:
+ if harvester.update_availability():
+ harvester.initiate_abort_perform_harvesting()
+ being_aborted.append(harvester)
+ else:
+ raise RuntimeError(f"Harvester {harvester!r} is not available")
+ except RuntimeError as exc:
+ self.message_user(request, str(exc), level=messages.ERROR)
+ if len(being_aborted) > 0:
+ message = (
+ f"Aborting current harvesting sessions for {being_aborted}. "
+ f"This operation can take a while to complete. Check the harvesters' "
+ f"status for when it becomes `ready`"
+ )
+ else:
+ message = _("No active harvesting sessions have been found for the selected harvesters. Skipping...")
+ self.message_user(request, message)
@admin.display(description="Number of selected resources to harvest")
def get_num_harvestable_resources_selected(self, harvester: models.Harvester):
@@ -227,8 +243,22 @@ def show_link_to_selected_harvestable_resources(self, harvester: models.Harveste
@admin.display(description="Go to latest harvesting session")
def show_link_to_latest_harvesting_session(self, harvester: models.Harvester):
- latest = models.HarvestingSession.objects.filter(harvester=harvester).latest("started")
- changelist_uri = reverse("admin:harvesting_harvestingsession_change", args=(latest.id,))
+ changelist_uri = reverse(
+ "admin:harvesting_asynchronousharvestingsession_change",
+ args=(harvester.latest_harvesting_session.id,)
+ )
+ return mark_safe(
+ format_html(
+ f'Go'
+ )
+ )
+
+ @admin.display(description="Go to latest refresh session")
+ def show_link_to_latest_refresh_session(self, harvester: models.Harvester):
+ changelist_uri = reverse(
+ "admin:harvesting_asynchronousharvestingsession_change",
+ args=(harvester.latest_refresh_session.id,)
+ )
return mark_safe(
format_html(
f'Go'
@@ -236,46 +266,37 @@ def show_link_to_latest_harvesting_session(self, harvester: models.Harvester):
)
-@admin.register(models.HarvestingSession)
-class HarvestingSessionAdmin(admin.ModelAdmin):
+@admin.register(models.AsynchronousHarvestingSession)
+class AsynchronousHarvestingSessionAdmin(admin.ModelAdmin):
list_display = (
"id",
+ "session_type",
"status",
"started",
"updated",
"ended",
- "records_to_harvest",
- "records_harvested",
- "calculate_harvesting_progress",
"harvester",
+ "total_records_to_process",
+ "records_done",
+ "get_progress_percentage",
)
readonly_fields = (
"id",
+ "session_type",
"status",
"started",
"updated",
"ended",
- "records_to_harvest",
- "records_harvested",
- "calculate_harvesting_progress",
"harvester",
- "session_details",
+ "total_records_to_process",
+ "records_done",
+ "get_progress_percentage",
+ "details",
)
- def has_change_permission(self, request, obj=None):
- return False
-
def has_add_permission(self, request):
return False
- @admin.display(description="progress(%)")
- def calculate_harvesting_progress(self, harvesting_session: models.HarvestingSession):
- if harvesting_session.records_to_harvest == 0:
- result = 0
- else:
- result = int((harvesting_session.records_harvested / harvesting_session.records_to_harvest) * 100)
- return result
-
@admin.register(models.HarvestableResource)
class HarvestableResourceAdmin(admin.ModelAdmin):
@@ -317,7 +338,7 @@ class HarvestableResourceAdmin(admin.ModelAdmin):
actions = [
"toggle_should_be_harvested",
- "harvest_selected_resources",
+ "initiate_harvest_selected_resources",
]
def delete_queryset(self, request, queryset):
@@ -351,28 +372,24 @@ def toggle_should_be_harvested(self, request, queryset):
request, "Toggled harvestable resources' `should_be_harvested` attribute")
@admin.action(description="Harvest selected resources")
- def harvest_selected_resources(self, request, queryset):
+ def initiate_harvest_selected_resources(self, request, queryset):
selected_harvestable_resources = {}
for harvestable_resource in queryset:
- harvester_resources = selected_harvestable_resources.setdefault(harvestable_resource.harvester, [])
+ harvester_resources = selected_harvestable_resources.setdefault(
+ harvestable_resource.harvester, [])
harvester_resources.append(harvestable_resource.id)
for harvester, harvestable_resource_ids in selected_harvestable_resources.items():
- should_continue, error_msg = _should_act(harvester)
- if should_continue:
- harvester.status = models.Harvester.STATUS_PERFORMING_HARVESTING
- harvester.save()
- harvesting_session = models.HarvestingSession.objects.create(harvester=harvester)
- tasks.harvest_resources.apply_async(args=(harvestable_resource_ids, harvesting_session.pk))
- self.message_user(
- request,
- f"Harvesting {len(harvestable_resource_ids)} resources from {harvester.name!r} harvester..."
- )
- else:
- self.message_user(
- request,
- error_msg,
- level=messages.ERROR
- )
+ try:
+ if harvester.update_availability():
+ harvester.initiate_perform_harvesting(harvestable_resource_ids)
+ self.message_user(
+ request,
+ f"Harvesting {len(harvestable_resource_ids)} resources from {harvester.name!r} harvester..."
+ )
+ else:
+ raise RuntimeError(f"Harvester {harvester!r} is not available")
+ except RuntimeError as exc:
+ self.message_user(request, str(exc), level=messages.ERROR)
@admin.display(description="harvester")
def show_link_to_harvester(self, harvestable_resource: models.HarvestableResource):
@@ -385,27 +402,13 @@ def show_link_to_harvester(self, harvestable_resource: models.HarvestableResourc
)
-def _should_act(harvester: models.Harvester) -> typing.Tuple[bool, str]:
- if harvester.status != harvester.STATUS_READY:
- error_message = (
- f"Harvester {harvester!r} is currently busy. Please wait until its status "
- f"is {harvester.STATUS_READY!r} before retrying"
- )
- result = False
- else:
- available = utils.update_harvester_availability(harvester)
- if not available:
- error_message = (
- f"harvester {harvester!r} is not available, skipping harvesting...")
- result = False
- else:
- result = True
- error_message = ""
- return result, error_message
-
-
def _worker_config_changed(form) -> bool:
field_name = "harvester_type_specific_configuration"
- original = json.loads(form.data[f"initial-{field_name}"])
- cleaned = form.cleaned_data.get(field_name)
- return original != cleaned
+ try:
+ original = json.loads(form.data[f"initial-{field_name}"])
+ except KeyError:
+ result = True
+ else:
+ cleaned = form.cleaned_data.get(field_name)
+ result = original != cleaned
+ return result
diff --git a/geonode/harvesting/api/serializers.py b/geonode/harvesting/api/serializers.py
index 7f3a6ce0348..72ba2f5c843 100644
--- a/geonode/harvesting/api/serializers.py
+++ b/geonode/harvesting/api/serializers.py
@@ -40,7 +40,6 @@
from .. import (
models,
tasks,
- utils,
)
logger = logging.getLogger(__name__)
@@ -141,7 +140,7 @@ def validate(self, data):
worker_config_field, getattr(self.instance, worker_config_field, None))
if worker_type is not None and worker_config is not None:
try:
- utils.validate_worker_configuration(worker_type, worker_config)
+ models.validate_worker_configuration(worker_type, worker_config)
except jsonschema.exceptions.ValidationError:
raise serializers.ValidationError(
f"Invalid {worker_config_field!r} configuration")
@@ -162,7 +161,7 @@ def create(self, validated_data):
f"value of {models.Harvester.STATUS_READY!r}"
)
harvester = super().create(validated_data)
- available = utils.update_harvester_availability(harvester)
+ available = harvester.update_availability()
if available:
harvester.status = harvester.STATUS_UPDATING_HARVESTABLE_RESOURCES
harvester.save()
@@ -208,12 +207,17 @@ def update(self, instance: models.Harvester, validated_data):
f"This status can only be set by the server, when appropriate."
)
elif desired_status == models.Harvester.STATUS_UPDATING_HARVESTABLE_RESOURCES:
- post_update_task = tasks.update_harvestable_resources.signature(
- args=(instance.id,))
+ session = models.AsynchronousHarvestingSession.objects.create(
+ harvester=instance,
+ session_type=models.AsynchronousHarvestingSession.TYPE_DISCOVER_HARVESTABLE_RESOURCES
+ )
+ post_update_task = tasks.update_harvestable_resources.signature(args=(session.pk,))
elif desired_status == models.Harvester.STATUS_PERFORMING_HARVESTING:
- harvesting_session = models.HarvestingSession.objects.create(harvester=instance)
- post_update_task = tasks.harvesting_dispatcher.signature(
- args=(instance.pk, harvesting_session.pk))
+ session = models.AsynchronousHarvestingSession.objects.create(
+ harvester=instance,
+ session_type=models.AsynchronousHarvestingSession.TYPE_HARVESTING
+ )
+ post_update_task = tasks.harvesting_dispatcher.signature(args=(session.pk,))
elif desired_status == models.Harvester.STATUS_CHECKING_AVAILABILITY:
post_update_task = tasks.check_harvester_available.signature(
args=(instance.id,))
@@ -239,15 +243,16 @@ def update(self, instance: models.Harvester, validated_data):
return updated_instance
-class BriefHarvestingSessionSerializer(DynamicModelSerializer):
+class BriefAsynchronousHarvestingSessionSerializer(DynamicModelSerializer):
class Meta:
- model = models.HarvestingSession
+ model = models.AsynchronousHarvestingSession
fields = (
"id",
"started",
"updated",
"ended",
- "records_harvested",
+ "total_records_to_process",
+ "records_done",
)
diff --git a/geonode/harvesting/api/urls.py b/geonode/harvesting/api/urls.py
index 9b838106865..2b78c92039a 100644
--- a/geonode/harvesting/api/urls.py
+++ b/geonode/harvesting/api/urls.py
@@ -31,6 +31,6 @@
basename='harvestable-resources',
parents_query_lookups=['harvester_id']
)
-router.register('harvesting-sessions', views.HarvestingSessionViewSet)
+router.register('harvesting-sessions', views.AsynchronousHarvestingSessionViewSet)
urlpatterns = router.urls
diff --git a/geonode/harvesting/api/views.py b/geonode/harvesting/api/views.py
index 7cb340b3471..9427076fbbe 100644
--- a/geonode/harvesting/api/views.py
+++ b/geonode/harvesting/api/views.py
@@ -94,7 +94,7 @@ def get_serializer_context(self):
return context
-class HarvestingSessionViewSet(WithDynamicViewSetMixin, viewsets.ReadOnlyModelViewSet):
- queryset = models.HarvestingSession.objects.all()
- serializer_class = serializers.BriefHarvestingSessionSerializer
+class AsynchronousHarvestingSessionViewSet(WithDynamicViewSetMixin, viewsets.ReadOnlyModelViewSet):
+ queryset = models.AsynchronousHarvestingSession.objects.all()
+ serializer_class = serializers.BriefAsynchronousHarvestingSessionSerializer
pagination_class = GeoNodeApiPagination
diff --git a/geonode/harvesting/harvesters/base.py b/geonode/harvesting/harvesters/base.py
index a6869112170..af937c267ff 100644
--- a/geonode/harvesting/harvesters/base.py
+++ b/geonode/harvesting/harvesters/base.py
@@ -124,7 +124,6 @@ def get_geonode_resource_type(self, remote_resource_type: str) -> ResourceBase:
def get_resource(
self,
harvestable_resource: "HarvestableResource", # noqa
- harvesting_session_id: int
) -> typing.Optional[HarvestedResourceInfo]:
"""Harvest a single resource from the remote service.
@@ -155,7 +154,6 @@ def finalize_resource_update(
geonode_resource: ResourceBase,
harvested_info: HarvestedResourceInfo,
harvestable_resource: "HarvestableResource", # noqa
- harvesting_session_id: int
) -> ResourceBase:
"""Perform additional actions just after having created/updated a local GeoNode resource.
@@ -262,7 +260,6 @@ def update_geonode_resource(
self,
harvested_info: HarvestedResourceInfo,
harvestable_resource: "HarvestableResource", # noqa
- harvesting_session_id: int,
):
"""Create or update a local GeoNode resource with the input harvested information.
@@ -297,7 +294,6 @@ def update_geonode_resource(
geonode_resource,
harvested_info,
harvestable_resource,
- harvesting_session_id
)
def _create_new_geonode_resource(self, geonode_resource_type, defaults: typing.Dict):
diff --git a/geonode/harvesting/harvesters/geonodeharvester.py b/geonode/harvesting/harvesters/geonodeharvester.py
index cad40bdf496..d5bac6d1b0b 100644
--- a/geonode/harvesting/harvesters/geonodeharvester.py
+++ b/geonode/harvesting/harvesters/geonodeharvester.py
@@ -279,7 +279,6 @@ def get_geonode_resource_type(self, remote_resource_type: str) -> typing.Type[ty
def get_resource(
self,
harvestable_resource: models.HarvestableResource,
- harvesting_session_id: int
) -> typing.Optional[base.HarvestedResourceInfo]:
resource_unique_identifier = harvestable_resource.unique_identifier
local_resource_type = self.get_geonode_resource_type(harvestable_resource.remote_resource_type)
diff --git a/geonode/harvesting/harvesters/wms.py b/geonode/harvesting/harvesters/wms.py
index f8e06a1feed..9799ac1f19a 100644
--- a/geonode/harvesting/harvesters/wms.py
+++ b/geonode/harvesting/harvesters/wms.py
@@ -161,7 +161,6 @@ def get_geonode_resource_type(self, remote_resource_type: str) -> ResourceBase:
def get_resource(
self,
harvestable_resource: "HarvestableResource", # noqa
- harvesting_session_id: int
) -> typing.Optional[base.HarvestedResourceInfo]:
resource_unique_identifier = harvestable_resource.unique_identifier
data = self._get_data()
diff --git a/geonode/harvesting/migrations/0037_alter_harvester_status.py b/geonode/harvesting/migrations/0037_alter_harvester_status.py
new file mode 100644
index 00000000000..5e05991028b
--- /dev/null
+++ b/geonode/harvesting/migrations/0037_alter_harvester_status.py
@@ -0,0 +1,29 @@
+# Generated by Django 3.2.4 on 2021-09-24 09:42
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('harvesting', '0036_alter_harvester_harvester_type'),
+ ]
+
+ operations = [
+ migrations.AlterField(
+ model_name='harvester',
+ name='status',
+ field=models.CharField(
+ choices=[
+ ('ready', 'ready'),
+ ('updating-harvestable-resources', 'updating-harvestable-resources'),
+ ('aborting-update-harvestable-resources', 'aborting-update-harvestable-resources'),
+ ('harvesting-resources', 'harvesting-resources'),
+ ('aborting-harvesting-resources', 'aborting-harvesting-resources'),
+ ('checking-availability', 'checking-availability')
+ ],
+ default='ready',
+ max_length=50
+ ),
+ ),
+ ]
diff --git a/geonode/harvesting/migrations/0038_auto_20210927_1455.py b/geonode/harvesting/migrations/0038_auto_20210927_1455.py
new file mode 100644
index 00000000000..2110903424c
--- /dev/null
+++ b/geonode/harvesting/migrations/0038_auto_20210927_1455.py
@@ -0,0 +1,34 @@
+# Generated by Django 3.2.4 on 2021-09-27 14:55
+
+from django.db import migrations, models
+import django.db.models.deletion
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('harvesting', '0037_alter_harvester_status'),
+ ]
+
+ operations = [
+ migrations.AlterField(
+ model_name='harvestingsession',
+ name='status',
+ field=models.CharField(choices=[('pending', 'pending'), ('on-going', 'on-going'), ('finished-all-ok', 'finished-all-ok'), ('finished-all-failed', 'finished-all-failed'), ('finished-some-failed', 'finished-some-failed'), ('aborting', 'aborting'), ('aborted', 'aborted')], default='pending', editable=False, max_length=50),
+ ),
+ migrations.CreateModel(
+ name='AsynchronousHarvestingSession',
+ fields=[
+ ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
+ ('session_type', models.CharField(choices=[('harvesting', 'harvesting'), ('discover-harvestable-resources', 'discover-harvestable-resources')], editable=False, max_length=50)),
+ ('started', models.DateTimeField(auto_now_add=True)),
+ ('updated', models.DateTimeField(auto_now=True)),
+ ('ended', models.DateTimeField(blank=True, null=True)),
+ ('status', models.CharField(choices=[('pending', 'pending'), ('on-going', 'on-going'), ('finished-all-ok', 'finished-all-ok'), ('finished-all-failed', 'finished-all-failed'), ('finished-some-failed', 'finished-some-failed'), ('aborting', 'aborting'), ('aborted', 'aborted')], default='pending', editable=False, max_length=50)),
+ ('details', models.TextField(blank=True)),
+ ('total_records_to_process', models.IntegerField(default=0, editable=False, help_text='Number of records being processed in this session')),
+ ('records_done', models.IntegerField(default=0, help_text='Number of records that have already been processed')),
+ ('harvester', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='sessions', to='harvesting.harvester')),
+ ],
+ ),
+ ]
diff --git a/geonode/harvesting/migrations/0039_delete_harvestingsession.py b/geonode/harvesting/migrations/0039_delete_harvestingsession.py
new file mode 100644
index 00000000000..d82efa58d54
--- /dev/null
+++ b/geonode/harvesting/migrations/0039_delete_harvestingsession.py
@@ -0,0 +1,16 @@
+# Generated by Django 3.2.4 on 2021-09-27 15:01
+
+from django.db import migrations
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('harvesting', '0038_auto_20210927_1455'),
+ ]
+
+ operations = [
+ migrations.DeleteModel(
+ name='HarvestingSession',
+ ),
+ ]
diff --git a/geonode/harvesting/models.py b/geonode/harvesting/models.py
index af2c71ccc1c..45168622792 100644
--- a/geonode/harvesting/models.py
+++ b/geonode/harvesting/models.py
@@ -19,9 +19,12 @@
import json
import logging
-import jsonschema.exceptions
+import typing
+import jsonschema
+import jsonschema.exceptions
from django.conf import settings
+from django.contrib import admin
from django.core.exceptions import ValidationError
from django.db import models
from django.utils import timezone
@@ -32,7 +35,8 @@
PeriodicTask,
)
-from . import utils
+from geonode import celery_app
+
from .config import get_setting
logger = logging.getLogger(__name__)
@@ -41,12 +45,16 @@
class Harvester(models.Model):
STATUS_READY = "ready"
STATUS_UPDATING_HARVESTABLE_RESOURCES = "updating-harvestable-resources"
+ STATUS_ABORTING_UPDATE_HARVESTABLE_RESOURCES = "aborting-update-harvestable-resources"
STATUS_PERFORMING_HARVESTING = "harvesting-resources"
+ STATUS_ABORTING_PERFORMING_HARVESTING = "aborting-harvesting-resources"
STATUS_CHECKING_AVAILABILITY = "checking-availability"
STATUS_CHOICES = [
(STATUS_READY, _("ready")),
(STATUS_UPDATING_HARVESTABLE_RESOURCES, _("updating-harvestable-resources")),
+ (STATUS_ABORTING_UPDATE_HARVESTABLE_RESOURCES, _("aborting-update-harvestable-resources")),
(STATUS_PERFORMING_HARVESTING, _("harvesting-resources")),
+ (STATUS_ABORTING_PERFORMING_HARVESTING, _("aborting-harvesting-resources")),
(STATUS_CHECKING_AVAILABILITY, _("checking-availability")),
]
@@ -170,6 +178,18 @@ class Meta:
def __str__(self):
return f"{self.name}({self.id})"
+ @property
+ def latest_refresh_session(self):
+ return self.sessions.filter(
+ session_type=AsynchronousHarvestingSession.TYPE_DISCOVER_HARVESTABLE_RESOURCES
+ ).latest("started")
+
+ @property
+ def latest_harvesting_session(self):
+ return self.sessions.filter(
+ session_type=AsynchronousHarvestingSession.TYPE_HARVESTING
+ ).latest("started")
+
def clean(self):
"""Perform model validation by inspecting fields that depend on each other.
@@ -179,8 +199,8 @@ def clean(self):
"""
try:
- utils.validate_worker_configuration(
- self.harvester_type, self.harvester_type_specific_configuration)
+ validate_worker_configuration(self.harvester_type, self.harvester_type_specific_configuration)
+ # self.validate_worker_configuration()
except jsonschema.exceptions.ValidationError as exc:
raise ValidationError(str(exc))
@@ -224,33 +244,122 @@ def setup_periodic_tasks(self) -> None:
)
self.save()
+ def update_availability(
+ self,
+ timeout_seconds: typing.Optional[int] = 5
+ ):
+ """Use the harvesting worker to check if the remote service is available"""
+ worker = self.get_harvester_worker()
+ self.last_checked_availability = timezone.now()
+ available = worker.check_availability(timeout_seconds=timeout_seconds)
+ self.remote_available = available
+ self.save()
+ return available
+
+ def initiate_update_harvestable_resources(self):
+ should_continue, error_msg = self.worker_can_perform_action()
+ if should_continue:
+ self.status = self.STATUS_UPDATING_HARVESTABLE_RESOURCES
+ self.save()
+ refresh_session = AsynchronousHarvestingSession.objects.create(
+ harvester=self,
+ session_type=AsynchronousHarvestingSession.TYPE_DISCOVER_HARVESTABLE_RESOURCES
+ )
+ refresh_session.initiate()
+ else:
+ raise RuntimeError(error_msg)
+
+ def initiate_perform_harvesting(
+ self,
+ harvestable_resource_ids: typing.Optional[typing.List[int]] = None
+ ):
+ should_continue, error_msg = self.worker_can_perform_action()
+ if should_continue:
+ self.status = self.STATUS_PERFORMING_HARVESTING
+ self.save()
+ harvesting_session = AsynchronousHarvestingSession.objects.create(
+ harvester=self,
+ session_type=AsynchronousHarvestingSession.TYPE_HARVESTING
+ )
+ harvesting_session.initiate(harvestable_resource_ids)
+ else:
+ raise RuntimeError(error_msg)
+
+ def initiate_abort_update_harvestable_resources(self):
+ should_continue, error_msg = self.worker_can_perform_action(
+ self.STATUS_UPDATING_HARVESTABLE_RESOURCES)
+ if should_continue:
+ self.status = self.STATUS_ABORTING_UPDATE_HARVESTABLE_RESOURCES
+ self.save()
+ self.latest_refresh_session.abort()
+ else:
+ raise RuntimeError(error_msg)
+
+ def initiate_abort_perform_harvesting(self):
+ should_continue, error_msg = self.worker_can_perform_action(
+ self.STATUS_PERFORMING_HARVESTING)
+ if should_continue:
+ self.status = self.STATUS_ABORTING_PERFORMING_HARVESTING
+ self.save()
+ self.latest_harvesting_session.abort()
+ else:
+ raise RuntimeError(error_msg)
+
def get_harvester_worker(self) -> "BaseHarvesterWorker": # noqa
worker_class = import_string(self.harvester_type)
return worker_class.from_django_record(self)
+ def worker_can_perform_action(
+ self,
+ target_status: typing.Optional[str] = STATUS_READY,
+ ) -> typing.Tuple[bool, str]:
+ if self.status != target_status:
+ error_message = (
+ f"Harvester {self!r} cannot currently perform the desired action. Please wait until its status "
+ f"is reported as {target_status!r} before retrying."
+ )
+ result = False
+ else:
+ result = True
+ error_message = ""
+ return result, error_message
-class HarvestingSession(models.Model):
+
+class AsynchronousHarvestingSession(models.Model):
STATUS_PENDING = "pending"
STATUS_ON_GOING = "on-going"
STATUS_FINISHED_ALL_OK = "finished-all-ok"
STATUS_FINISHED_ALL_FAILED = "finished-all-failed"
STATUS_FINISHED_SOME_FAILED = "finished-some-failed"
+ STATUS_ABORTING = "aborting"
+ STATUS_ABORTED = "aborted"
STATUS_CHOICES = [
(STATUS_PENDING, _("pending")),
(STATUS_ON_GOING, _("on-going")),
(STATUS_FINISHED_ALL_OK, _("finished-all-ok")),
(STATUS_FINISHED_ALL_FAILED, _("finished-all-failed")),
(STATUS_FINISHED_SOME_FAILED, _("finished-some-failed")),
+ (STATUS_ABORTING, _("aborting")),
+ (STATUS_ABORTED, _("aborted")),
+ ]
+ TYPE_HARVESTING = "harvesting"
+ TYPE_DISCOVER_HARVESTABLE_RESOURCES = "discover-harvestable-resources"
+ TYPE_CHOICES = [
+ (TYPE_HARVESTING, _("harvesting")),
+ (TYPE_DISCOVER_HARVESTABLE_RESOURCES, _("discover-harvestable-resources")),
]
+ session_type = models.CharField(
+ max_length=50,
+ choices=TYPE_CHOICES,
+ editable=False
+ )
started = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
ended = models.DateTimeField(null=True, blank=True)
- records_to_harvest = models.IntegerField(default=0, editable=False)
- records_harvested = models.IntegerField(default=0)
harvester = models.ForeignKey(
Harvester,
on_delete=models.CASCADE,
- related_name="harvesting_sessions"
+ related_name="sessions"
)
status = models.CharField(
max_length=50,
@@ -258,11 +367,77 @@ class HarvestingSession(models.Model):
default=STATUS_PENDING,
editable=False,
)
- session_details = models.TextField(
- blank=True,
- help_text=_("Details about the harvesting session")
+ details = models.TextField(blank=True,)
+ total_records_to_process = models.IntegerField(
+ default=0,
+ editable=False,
+ help_text=_("Number of records being processed in this session")
+ )
+ records_done = models.IntegerField(
+ default=0,
+ help_text=_("Number of records that have already been processed")
)
+ @admin.display(description="Progress (%)")
+ def get_progress_percentage(self) -> int:
+ try:
+ result = (self.records_done / self.total_records_to_process) * 100
+ except ZeroDivisionError:
+ result = 0
+ return result
+
+ def initiate(self, harvestable_resource_ids: typing.Optional[typing.List[int]] = None):
+ """Initiate the asynchronous process that performs the work related to this session."""
+ # NOTE: below we are calling celery tasks using the method of creating a
+ # signature from the main celery app object in order to avoid having
+ # to import the `tasks` module, which would create circular
+ # dependency issues - this is a common celery pattern, described here:
+ #
+ # https://docs.celeryproject.org/en/stable/faq.html#can-i-call-a-task-by-name
+ #
+ # Also note that using `app.send_task()` does not seem to work in this case (which
+ # is mysterious, since the celery docs say it should work)
+ if self.session_type == self.TYPE_DISCOVER_HARVESTABLE_RESOURCES:
+ task_signature = celery_app.app.signature(
+ "geonode.harvesting.tasks.update_harvestable_resources",
+ args=(self.pk,)
+ )
+ elif self.session_type == self.TYPE_HARVESTING:
+ if harvestable_resource_ids is None:
+ task_signature = celery_app.app.signature(
+ "geonode.harvesting.tasks.harvesting_dispatcher",
+ args=(self.pk,)
+ )
+ else:
+ task_signature = celery_app.app.signature(
+ "geonode.harvesting.tasks.harvest_resources",
+ args=(self.pk, harvestable_resource_ids or [])
+ )
+ else:
+ raise RuntimeError("Invalid selection")
+ task_signature.apply_async()
+ self.status = self.STATUS_PENDING
+ self.save()
+
+ def abort(self):
+ """Abort a pending or on-going session."""
+
+ # NOTE: We do not use celery's task revoke feature when aborting a session. This
+ # is an explicit design choice. The main reason being that we keep track of a session's
+ # state and want to know when it has finished. This is done by leveraging celery's `chord`
+ # feature, whereby the async tasks are executed in parallel and there is a final
+ # synchronization step when they are done that updates the session's state in the DB.
+ # Maintaining this synchronization step working OK together with revoking tasks would be
+ # harder to address.
+ if self.status == self.STATUS_PENDING:
+ self.status = self.STATUS_ABORTED
+ self.session_details = "Aborted"
+ elif self.status == self.STATUS_ON_GOING:
+ self.status = self.STATUS_ABORTING
+ else:
+ logger.debug("Session is not currently in an state that can be aborted, skipping...")
+ self.save()
+
class HarvestableResource(models.Model):
STATUS_READY = "ready"
@@ -326,3 +501,16 @@ def delete(self, using=None, keep_parents=False):
if delete_orphan_resource:
worker.finalize_harvestable_resource_deletion(self)
return super().delete(using, keep_parents)
+
+
+def validate_worker_configuration(
+ worker_type: "BaseHarvesterWorker", # noqa
+ worker_config: typing.Dict
+):
+ worker_class = import_string(worker_type)
+ schema = worker_class.get_extra_config_schema()
+ if schema is not None:
+ try:
+ jsonschema.validate(worker_config, schema)
+ except jsonschema.exceptions.SchemaError as exc:
+ raise RuntimeError(f"Invalid schema: {exc}")
diff --git a/geonode/harvesting/tasks.py b/geonode/harvesting/tasks.py
index 3778c857580..4cdd1f0d231 100644
--- a/geonode/harvesting/tasks.py
+++ b/geonode/harvesting/tasks.py
@@ -28,13 +28,10 @@
Value,
)
from django.db.models.functions import Concat
-from django.utils.timezone import now
+from django.utils import timezone
from geonode.celery_app import app
-from . import (
- models,
- utils,
-)
+from . import models
from .harvesters import base
logger = logging.getLogger(__name__)
@@ -48,7 +45,6 @@
)
def harvesting_dispatcher(
self,
- harvester_id: int,
harvesting_session_id: int
):
"""Perform harvesting asynchronously.
@@ -71,17 +67,19 @@ def harvesting_dispatcher(
"""
- harvester = models.Harvester.objects.get(pk=harvester_id)
+ session = models.AsynchronousHarvestingSession.objects.get(pk=harvesting_session_id)
+ harvester = session.harvester
harvestable_resources = list(harvester.harvestable_resources.filter(
should_be_harvested=True).values_list("id", flat=True))
if len(harvestable_resources) > 0:
harvest_resources.apply_async(args=(harvestable_resources, harvesting_session_id))
else:
- logger.debug("harvesting_dispatcher - nothing to do...")
- finish_harvesting_session(
+ message = "harvesting_dispatcher - Nothing to do"
+ logger.debug(message)
+ finish_asynchronous_session(
harvesting_session_id,
- models.HarvestingSession.STATUS_FINISHED_ALL_OK,
- "nothing to do"
+ models.AsynchronousHarvestingSession.STATUS_FINISHED_ALL_OK,
+ final_details=message
)
@@ -97,34 +95,16 @@ def harvest_resources(
harvesting_session_id: int
):
"""Harvest a list of remote resources that all belong to the same harvester."""
- if len(harvestable_resource_ids) == 0:
- logger.debug("harvest_resources - nothing to do...")
- finish_harvesting_session(
- harvesting_session_id,
- models.HarvestingSession.STATUS_FINISHED_ALL_OK,
- "nothing to do"
- )
- else:
- sample_id = harvestable_resource_ids[0]
- try:
- sample_harvestable_resource = models.HarvestableResource.objects.get(pk=sample_id)
- except models.HarvestableResource.DoesNotExist:
- logger.warning(f"harvestable resource {sample_id!r} does not exist.")
- finish_harvesting_session(
- harvesting_session_id,
- models.HarvestingSession.STATUS_FINISHED_ALL_FAILED,
- "Could not retrieve first harvestable resource from the GeoNode db"
- )
- else:
- harvester = sample_harvestable_resource.harvester
- available = utils.update_harvester_availability(harvester)
- if available:
+ session = models.AsynchronousHarvestingSession.objects.get(pk=harvesting_session_id)
+ if session.status != session.STATUS_ABORTED:
+ if len(harvestable_resource_ids) > 0:
+ harvester = session.harvester
+ if harvester.update_availability():
harvester.status = harvester.STATUS_PERFORMING_HARVESTING
harvester.save()
- models.HarvestingSession.objects.filter(pk=harvesting_session_id).update(
- status=models.HarvestingSession.STATUS_ON_GOING,
- records_to_harvest=len(harvestable_resource_ids)
- )
+ session.status = session.STATUS_ON_GOING
+ session.total_records_to_process = len(harvestable_resource_ids)
+ session.save()
resource_tasks = []
for harvestable_resource_id in harvestable_resource_ids:
resource_tasks.append(
@@ -133,12 +113,11 @@ def harvest_resources(
)
)
harvesting_finalizer = _finish_harvesting.signature(
- args=(harvester.id, harvesting_session_id),
+ args=(harvesting_session_id,),
immutable=True
).on_error(
_handle_harvesting_error.signature(
kwargs={
- "harvester_id": harvester.id,
"harvesting_session_id": harvesting_session_id,
}
)
@@ -146,15 +125,26 @@ def harvest_resources(
harvesting_workflow = chord(resource_tasks, body=harvesting_finalizer)
harvesting_workflow.apply_async()
else:
- logger.warning(
+ message = (
f"Skipping harvesting for harvester {harvester.name!r} because the "
f"remote {harvester.remote_url!r} seems to be unavailable"
)
- finish_harvesting_session(
+ logger.warning(message)
+ finish_asynchronous_session(
harvesting_session_id,
- models.HarvestingSession.STATUS_FINISHED_ALL_FAILED,
- f"Remote url {harvester.remote_url} seems to be unavailable"
+ session.STATUS_FINISHED_ALL_FAILED,
+ final_details=message
)
+ else:
+ message = "harvest_resources - Nothing to do..."
+ logger.debug(message)
+ finish_asynchronous_session(
+ harvesting_session_id,
+ models.AsynchronousHarvestingSession.STATUS_FINISHED_ALL_OK,
+ final_details=message
+ )
+ else:
+ logger.debug("Session has been aborted, skipping...")
@app.task(
@@ -169,40 +159,48 @@ def _harvest_resource(
harvesting_session_id: int
):
"""Harvest a single resource from the input harvestable resource id"""
- harvestable_resource = models.HarvestableResource.objects.get(pk=harvestable_resource_id)
- worker: base.BaseHarvesterWorker = harvestable_resource.harvester.get_harvester_worker()
- harvested_resource_info = worker.get_resource(harvestable_resource, harvesting_session_id)
- now_ = now()
- if harvested_resource_info is not None:
- if worker.should_copy_resource(harvestable_resource):
- copied_path = worker.copy_resource(harvestable_resource, harvested_resource_info)
- if copied_path is not None:
- harvested_resource_info.copied_resources.append(copied_path)
- try:
- worker.update_geonode_resource(
- harvested_resource_info,
- harvestable_resource,
+ session = models.AsynchronousHarvestingSession.objects.get(pk=harvesting_session_id)
+ if session.status != session.STATUS_ABORTING:
+ harvestable_resource = models.HarvestableResource.objects.get(pk=harvestable_resource_id)
+ worker: base.BaseHarvesterWorker = harvestable_resource.harvester.get_harvester_worker()
+ harvested_resource_info = worker.get_resource(harvestable_resource)
+ now_ = timezone.now()
+ if harvested_resource_info is not None:
+ if worker.should_copy_resource(harvestable_resource):
+ copied_path = worker.copy_resource(harvestable_resource, harvested_resource_info)
+ if copied_path is not None:
+ harvested_resource_info.copied_resources.append(copied_path)
+ try:
+ worker.update_geonode_resource(
+ harvested_resource_info,
+ harvestable_resource,
+ )
+ result = True
+ details = ""
+ except (RuntimeError, ValidationError) as exc:
+ logger.error(msg="Unable to update geonode resource")
+ result = False
+ details = str(exc)
+ harvesting_message = f"{harvestable_resource.title}({harvestable_resource_id}) - {'Success' if result else details}"
+ update_asynchronous_session(
harvesting_session_id,
+ additional_processed_records=1 if result else 0,
+ additional_details=harvesting_message
)
- result = True
- details = ""
- except (RuntimeError, ValidationError) as exc:
- logger.error(msg="Unable to update geonode resource")
- result = False
- details = str(exc)
- harvesting_message = f"{harvestable_resource.title}({harvestable_resource_id}) - {'Success' if result else details}"
- update_harvesting_session(
- harvesting_session_id,
- additional_harvested_records=1 if result else 0,
- additional_details=harvesting_message
- )
- harvestable_resource.last_harvesting_message = f"{now_} - {harvesting_message}"
- harvestable_resource.last_harvesting_succeeded = result
+ harvestable_resource.last_harvesting_message = f"{now_} - {harvesting_message}"
+ harvestable_resource.last_harvesting_succeeded = result
+ else:
+ harvestable_resource.last_harvesting_message = f"{now_}Harvesting failed"
+ harvestable_resource.last_harvesting_succeeded = False
+ harvestable_resource.last_harvested = now_
+ harvestable_resource.save()
else:
- harvestable_resource.last_harvesting_message = f"{now_}Harvesting failed"
- harvestable_resource.last_harvesting_succeeded = False
- harvestable_resource.last_harvested = now_
- harvestable_resource.save()
+ message = (
+ f"Skipping harvesting of resource {harvestable_resource_id} since the "
+ f"session has been aborted"
+ )
+ update_asynchronous_session(harvesting_session_id, additional_details=message)
+ logger.debug(message)
@app.task(
@@ -211,16 +209,26 @@ def _harvest_resource(
acks_late=False,
ignore_result=False,
)
-def _finish_harvesting(self, harvester_id: int, harvesting_session_id: int):
- harvester = models.Harvester.objects.get(pk=harvester_id)
- harvester.status = harvester.STATUS_READY
- harvester.save()
- finish_harvesting_session(
- harvesting_session_id, final_status=models.HarvestingSession.STATUS_FINISHED_ALL_OK)
+def _finish_harvesting(self, harvesting_session_id: int):
+ session = models.AsynchronousHarvestingSession.objects.get(pk=harvesting_session_id)
+ harvester = session.harvester
+ if session.status == session.STATUS_ABORTING:
+ message = "Harvesting session aborted by user"
+ final_status = session.STATUS_ABORTED
+ else:
+ message = "Harvesting completed successfully!"
+ final_status = session.STATUS_FINISHED_ALL_OK
+ finish_asynchronous_session(
+ harvesting_session_id,
+ final_status=final_status,
+ final_details=message
+ )
logger.debug(
- f"(harvester: {harvester_id!r} - session: {harvesting_session_id!r}) "
- f"Harvesting completed successfully! "
+ f"(harvester: {harvester.pk!r} - session: {harvesting_session_id!r}) "
+ f"{message}"
)
+ harvester.status = harvester.STATUS_READY
+ harvester.save()
@app.task(
@@ -257,13 +265,14 @@ def _handle_harvesting_error(self, task_id, *args, **kwargs):
logger.debug(f"result: {result.result}")
logger.debug(f"traceback: {result.traceback}")
logger.debug(f"harvesting task with kwargs: {kwargs} has failed")
- harvester = models.Harvester.objects.get(pk=kwargs["harvester_id"])
+ session = models.AsynchronousHarvestingSession.objects.get(pk=kwargs["harvesting_session_id"])
+ harvester = session.harvester
harvester.status = harvester.STATUS_READY
harvester.save()
details = f"state: {result.state}\nresult: {result.result}\ntraceback: {result.traceback}"
- finish_harvesting_session(
+ finish_asynchronous_session(
kwargs["harvesting_session_id"],
- final_status=models.HarvestingSession.STATUS_FINISHED_SOME_FAILED,
+ session.STATUS_FINISHED_SOME_FAILED,
final_details=details
)
@@ -276,7 +285,7 @@ def _handle_harvesting_error(self, task_id, *args, **kwargs):
)
def check_harvester_available(self, harvester_id: int):
harvester = models.Harvester.objects.get(pk=harvester_id)
- available = utils.update_harvester_availability(harvester)
+ available = harvester.update_availability()
logger.info(
f"Harvester {harvester!r}: remote server is "
f"{'' if available else 'not '}available"
@@ -289,42 +298,57 @@ def check_harvester_available(self, harvester_id: int):
acks_late=False,
ignore_result=False,
)
-def update_harvestable_resources(self, harvester_id: int):
+def update_harvestable_resources(self, refresh_session_id: int):
# NOTE: we are able to implement batch discovery of existing harvestable resources
# because we want to know about all of them. We are not able to batch harvesting
# of resources because these have potentially been individually selected by the
# user, which means we are not interested in all of them
- harvester = models.Harvester.objects.get(pk=harvester_id)
- harvester.status = harvester.STATUS_UPDATING_HARVESTABLE_RESOURCES
- harvester.save()
- worker = harvester.get_harvester_worker()
- try:
- num_resources = worker.get_num_available_resources()
- except (NotImplementedError, base.HarvestingException) as exc:
- _handle_harvestable_resources_update_error(
- self.request.id, harvester_id=harvester_id, raised_exception=exc)
- else:
- harvester.num_harvestable_resources = num_resources
- harvester.save()
- page_size = 10
- total_pages = math.ceil(num_resources / page_size)
- batches = []
- for page in range(total_pages):
- batches.append(
- _update_harvestable_resources_batch.signature(
- args=(harvester_id, page, page_size),
+ session = models.AsynchronousHarvestingSession.objects.get(pk=refresh_session_id)
+ if session.status != session.STATUS_ABORTED:
+ session.status = session.STATUS_ON_GOING
+ session.save()
+ harvester = session.harvester
+ if harvester.update_availability():
+ harvester.status = harvester.STATUS_UPDATING_HARVESTABLE_RESOURCES
+ harvester.save()
+ worker = harvester.get_harvester_worker()
+ try:
+ num_resources = worker.get_num_available_resources()
+ except (NotImplementedError, base.HarvestingException) as exc:
+ _handle_harvestable_resources_update_error(
+ self.request.id, refresh_session_id=refresh_session_id, raised_exception=exc)
+ else:
+ harvester.num_harvestable_resources = num_resources
+ harvester.save()
+ session.total_records_to_process = num_resources
+ session.save()
+ page_size = 10
+ total_pages = math.ceil(num_resources / page_size)
+ batches = []
+ for page in range(total_pages):
+ batches.append(
+ _update_harvestable_resources_batch.signature(
+ args=(refresh_session_id, page, page_size),
+ )
+ )
+ update_finalizer = _finish_harvestable_resources_update.signature(
+ args=(refresh_session_id,),
+ immutable=True
+ ).on_error(
+ _handle_harvestable_resources_update_error.signature(
+ kwargs={"refresh_session_id": refresh_session_id}
+ )
)
+ update_workflow = chord(batches, body=update_finalizer)
+ update_workflow.apply_async()
+ else:
+ finish_asynchronous_session(
+ refresh_session_id,
+ session.STATUS_FINISHED_ALL_FAILED,
+ final_details="Harvester is not available"
)
- update_finalizer = _finish_harvestable_resources_update.signature(
- args=(harvester_id,),
- immutable=True
- ).on_error(
- _handle_harvestable_resources_update_error.signature(
- kwargs={"harvester_id": harvester_id}
- )
- )
- update_workflow = chord(batches, body=update_finalizer)
- update_workflow.apply_async()
+ else:
+ logger.debug("Session has been aborted, skipping...")
@app.task(
@@ -334,31 +358,42 @@ def update_harvestable_resources(self, harvester_id: int):
ignore_result=False,
)
def _update_harvestable_resources_batch(
- self, harvester_id: int, page: int, page_size: int):
- harvester = models.Harvester.objects.get(pk=harvester_id)
- worker = harvester.get_harvester_worker()
- offset = page * page_size
- try:
- found_resources = worker.list_resources(offset)
- except base.HarvestingException:
- logger.exception("Could not retrieve list of remote resources.")
+ self,
+ refresh_session_id: int,
+ page: int,
+ page_size: int
+):
+ session = models.AsynchronousHarvestingSession.objects.get(pk=refresh_session_id)
+ if session.status == session.STATUS_ON_GOING:
+ harvester = session.harvester
+ worker = harvester.get_harvester_worker()
+ offset = page * page_size
+ try:
+ found_resources = worker.list_resources(offset)
+ except base.HarvestingException:
+ logger.exception("Could not retrieve list of remote resources.")
+ else:
+ processed = 0
+ for remote_resource in found_resources:
+ resource, created = models.HarvestableResource.objects.get_or_create(
+ harvester=harvester,
+ unique_identifier=remote_resource.unique_identifier,
+ title=remote_resource.title,
+ defaults={
+ "should_be_harvested": harvester.harvest_new_resources_by_default,
+ "remote_resource_type": remote_resource.resource_type,
+ "last_refreshed": timezone.now()
+ }
+ )
+ processed += 1
+ # NOTE: make sure to save the resource because we need to have its
+ # `last_updated` property be refreshed - this is done in order to be able
+ # to compare when a resource has been found
+ resource.last_refreshed = timezone.now()
+ resource.save()
+ update_asynchronous_session(refresh_session_id, additional_processed_records=processed)
else:
- for remote_resource in found_resources:
- resource, created = models.HarvestableResource.objects.get_or_create(
- harvester=harvester,
- unique_identifier=remote_resource.unique_identifier,
- title=remote_resource.title,
- defaults={
- "should_be_harvested": harvester.harvest_new_resources_by_default,
- "remote_resource_type": remote_resource.resource_type,
- "last_refreshed": now()
- }
- )
- # NOTE: make sure to save the resource because we need to have its
- # `last_updated` property be refreshed - this is done in order to be able
- # to compare when a resource has been found
- resource.last_refreshed = now()
- resource.save()
+ logger.info("The refresh session has been asked to abort, so skipping...")
@app.task(
@@ -367,15 +402,23 @@ def _update_harvestable_resources_batch(
acks_late=False,
ignore_result=False,
)
-def _finish_harvestable_resources_update(self, harvester_id: int):
- harvester = models.Harvester.objects.get(pk=harvester_id)
- if harvester.last_checked_harvestable_resources is not None:
- _delete_stale_harvestable_resources(harvester)
+def _finish_harvestable_resources_update(self, refresh_session_id: int):
+ session = models.AsynchronousHarvestingSession.objects.get(pk=refresh_session_id)
+ harvester = session.harvester
+ if session.status == session.STATUS_ABORTING:
+ message = "Refresh session aborted by user"
+ finish_asynchronous_session(
+ refresh_session_id, session.STATUS_ABORTED, final_details=message)
+ else:
+ message = "Harvestable resources successfully refreshed"
+ finish_asynchronous_session(
+ refresh_session_id, session.STATUS_FINISHED_ALL_OK, final_details=message)
+ if harvester.last_checked_harvestable_resources is not None:
+ _delete_stale_harvestable_resources(harvester)
harvester.status = harvester.STATUS_READY
- now_ = now()
+ now_ = timezone.now()
harvester.last_checked_harvestable_resources = now_
- harvester.last_check_harvestable_resources_message = (
- f"{now_} - Harvestable resources successfully checked")
+ harvester.last_check_harvestable_resources_message = f"{now_} - {message}"
harvester.save()
@@ -386,14 +429,17 @@ def _finish_harvestable_resources_update(self, harvester_id: int):
ignore_result=False,
)
def _handle_harvestable_resources_update_error(self, task_id, *args, **kwargs):
+ logger.debug("Inside _handle_harvestable_resources_update_error -----------------------------------------")
+ result = self.app.AsyncResult(str(task_id))
+ logger.debug(f"locals: {locals()}")
+ logger.debug(f"state: {result.state}")
+ logger.debug(f"result: {result.result}")
+ logger.debug(f"traceback: {result.traceback}")
result = self.app.AsyncResult(str(task_id))
- print(f"locals: {locals()}")
- print(f"state: {result.state}")
- print(f"result: {result.result}")
- print(f"traceback: {result.traceback}")
- harvester = models.Harvester.objects.get(pk=kwargs["harvester_id"])
+ session = models.AsynchronousHarvestingSession.objects.get(pk=kwargs["refresh_session_id"])
+ harvester = session.harvester
harvester.status = harvester.STATUS_READY
- now_ = now()
+ now_ = timezone.now()
harvester.last_checked_harvestable_resources = now_
harvester.last_check_harvestable_resources_message = (
f"{now_} - There was an error retrieving information on available "
@@ -401,6 +447,12 @@ def _handle_harvestable_resources_update_error(self, task_id, *args, **kwargs):
f"Please check the logs"
)
harvester.save()
+ details = f"state: {result.state}\nresult: {result.result}\ntraceback: {result.traceback}"
+ finish_asynchronous_session(
+ kwargs["refresh_session_id"],
+ final_status=models.AsynchronousHarvestingSession.STATUS_FINISHED_SOME_FAILED,
+ final_details=details
+ )
def _delete_stale_harvestable_resources(harvester: models.Harvester):
@@ -422,7 +474,7 @@ def _delete_stale_harvestable_resources(harvester: models.Harvester):
previously_checked_at = harvester.last_checked_harvestable_resources
logger.debug(f"last checked at: {previously_checked_at}")
- logger.debug(f"now: {now()}")
+ logger.debug(f"now: {timezone.now()}")
to_remove = models.HarvestableResource.objects.filter(
harvester=harvester, last_refreshed__lte=previously_checked_at)
for harvestable_resource in to_remove:
@@ -435,38 +487,34 @@ def _delete_stale_harvestable_resources(harvester: models.Harvester):
harvestable_resource.delete()
-def update_harvesting_session(
- session_id: int,
- total_records_found: typing.Optional[int] = None,
- additional_harvested_records: typing.Optional[int] = None,
- additional_details: typing.Optional[str] = None,
-) -> None:
- """Update the input harvesting session."""
- update_kwargs = {}
- if total_records_found is not None:
- update_kwargs["total_records_found"] = total_records_found
- if additional_harvested_records is not None:
- update_kwargs["records_harvested"] = (
- F("records_harvested") + additional_harvested_records)
- if additional_details is not None:
- update_kwargs["session_details"] = Concat("session_details", Value(f"\n{additional_details}"))
- models.HarvestingSession.objects.filter(id=session_id).update(**update_kwargs)
-
-
-def finish_harvesting_session(
+def finish_asynchronous_session(
session_id: int,
final_status: str,
final_details: typing.Optional[str] = None,
- additional_harvested_records: typing.Optional[int] = None,
+ additional_processed_records: typing.Optional[int] = None
) -> None:
- """Finish the input harvesting session"""
update_kwargs = {
- "ended": now(),
+ "ended": timezone.now(),
"status": final_status,
}
- if additional_harvested_records is not None:
- update_kwargs["records_harvested"] = (
- F("records_harvested") + additional_harvested_records)
+ if additional_processed_records is not None:
+ update_kwargs["records_done"] = F("records_done") + additional_processed_records
if final_details is not None:
- update_kwargs["session_details"] = Concat("session_details", Value(f"\n{final_details}"))
- models.HarvestingSession.objects.filter(id=session_id).update(**update_kwargs)
+ update_kwargs["details"] = Concat("details", Value(f"\n{final_details}"))
+ models.AsynchronousHarvestingSession.objects.filter(id=session_id).update(**update_kwargs)
+
+
+def update_asynchronous_session(
+ session_id: int,
+ total_records_to_process: typing.Optional[int] = None,
+ additional_processed_records: typing.Optional[int] = None,
+ additional_details: typing.Optional[str] = None,
+) -> None:
+ update_kwargs = {}
+ if total_records_to_process is not None:
+ update_kwargs["total_records_to_process"] = total_records_to_process
+ if additional_processed_records is not None:
+ update_kwargs["records_done"] = F("records_done") + additional_processed_records
+ if additional_details is not None:
+ update_kwargs["details"] = Concat("details", Value(f"\n{additional_details}"))
+ models.AsynchronousHarvestingSession.objects.filter(id=session_id).update(**update_kwargs)
diff --git a/geonode/harvesting/tests/test_admin.py b/geonode/harvesting/tests/test_admin.py
index e2633e1ab8b..4934ebc3d68 100644
--- a/geonode/harvesting/tests/test_admin.py
+++ b/geonode/harvesting/tests/test_admin.py
@@ -1,18 +1,24 @@
from unittest import mock
+from django.contrib.admin.sites import AdminSite
from django.urls import reverse
from django.contrib.auth import get_user_model
+from django.test import RequestFactory
from rest_framework import status
from geonode.tests.base import GeoNodeBaseTestSupport
-from .. import models
+from .. import (
+ admin,
+ models
+)
class HarvesterAdminTestCase(GeoNodeBaseTestSupport):
harvester_type = 'geonode.harvesting.harvesters.geonodeharvester.GeonodeLegacyHarvester'
def setUp(self):
+ self.factory = RequestFactory()
self.user = get_user_model().objects.get(username='admin')
self.client.login(username="admin", password="admin")
@@ -23,10 +29,14 @@ def setUp(self):
harvester_type=self.harvester_type
)
- @mock.patch(
- "geonode.harvesting.harvesters.geonodeharvester.GeonodeLegacyHarvester.check_availability")
- def test_add_harvester(self, mock_check_availability):
- mock_check_availability.return_value = True
+ def test_get_form_returns_current_user(self):
+ request = self.factory.post(reverse("admin:harvesting_harvester_add"))
+ request.user = self.user
+ model_admin = admin.HarvesterAdmin(model=models.Harvester, admin_site=AdminSite())
+ form = model_admin.get_form(request)
+ self.assertEqual(form.base_fields["default_owner"].initial.username, self.user.username)
+
+ def test_add_harvester(self):
data = {
'remote_url': "http://fake.com",
'name': 'harvester',
@@ -45,29 +55,99 @@ def test_add_harvester(self, mock_check_availability):
self.assertEqual(harvester.name, data['name'])
self.assertEqual(harvester.remote_url, data['remote_url'])
self.assertEqual(harvester.status, models.Harvester.STATUS_READY)
- self.assertEqual(harvester.remote_available, True)
-
- @mock.patch(
- "geonode.harvesting.harvesters.geonodeharvester.GeonodeLegacyHarvester.check_availability")
- def test_update_harvester_availability(self, mock_check_availability):
- mock_check_availability.return_value = True
- data = {'action': 'update_harvester_availability',
- '_selected_action': [self.harvester.pk]}
- response = self.client.post(reverse('admin:harvesting_harvester_changelist'), data)
- self.assertEqual(response.status_code, status.HTTP_302_FOUND) # response from admin
- self.harvester.refresh_from_db()
- self.assertEqual(self.harvester.remote_available, True)
-
- @mock.patch(
- "geonode.harvesting.harvesters.geonodeharvester.GeonodeLegacyHarvester.check_availability")
- def test_perform_harvesting(self, mock_check_availability):
- mock_check_availability.return_value = True
- data = {'action': 'perform_harvesting',
- '_selected_action': [self.harvester.pk]}
- self.harvester.status = models.Harvester.STATUS_READY
- self.harvester.save()
-
- response = self.client.post(reverse('admin:harvesting_harvester_changelist'), data)
- self.assertEqual(response.status_code, status.HTTP_302_FOUND) # response from admin
- self.harvester.refresh_from_db()
- self.assertEqual(self.harvester.status, models.Harvester.STATUS_PERFORMING_HARVESTING)
+
+ def test_update_harvester_availability(self):
+ mock_harvester_model = mock.MagicMock(spec=models.Harvester)
+ mock_harvester = mock_harvester_model.return_value
+ model_admin = admin.HarvesterAdmin(model=mock_harvester, admin_site=AdminSite())
+ with mock.patch.object(model_admin, "message_user"):
+ model_admin.update_harvester_availability(None, [mock_harvester])
+ mock_harvester.update_availability.assert_called()
+
+ def test_initiate_update_harvestable_resources_initiates_when_harvester_is_available(self):
+ mock_harvester_model = mock.MagicMock(spec=models.Harvester)
+ mock_harvester = mock_harvester_model.return_value
+ mock_harvester.update_availability.return_value = True
+ model_admin = admin.HarvesterAdmin(model=mock_harvester, admin_site=AdminSite())
+ with mock.patch.object(model_admin, "message_user"):
+ model_admin.initiate_update_harvestable_resources(None, [mock_harvester])
+ mock_harvester.update_availability.assert_called()
+ mock_harvester.initiate_update_harvestable_resources.assert_called()
+
+ def test_initiate_update_harvestable_resources_skips_when_harvester_not_available(self):
+ mock_harvester_model = mock.MagicMock(spec=models.Harvester)
+ mock_harvester = mock_harvester_model.return_value
+ mock_harvester.update_availability.return_value = False
+ model_admin = admin.HarvesterAdmin(model=mock_harvester, admin_site=AdminSite())
+ with mock.patch.object(model_admin, "message_user"):
+ model_admin.initiate_update_harvestable_resources(None, [mock_harvester])
+ mock_harvester.update_availability.assert_called()
+ mock_harvester.initiate_update_harvestable_resources.assert_not_called()
+
+ def test_initiate_perform_harvesting_initiates_when_harvester_is_available(self):
+ mock_harvester_model = mock.MagicMock(spec=models.Harvester)
+ mock_harvester = mock_harvester_model.return_value
+ mock_harvester.update_availability.return_value = True
+ model_admin = admin.HarvesterAdmin(model=mock_harvester, admin_site=AdminSite())
+ with mock.patch.object(model_admin, "message_user"):
+ model_admin.initiate_perform_harvesting(None, [mock_harvester])
+ mock_harvester.update_availability.assert_called()
+ mock_harvester.initiate_perform_harvesting.assert_called()
+
+ def test_initiate_perform_harvesting_skips_when_harvester_not_available(self):
+ mock_harvester_model = mock.MagicMock(spec=models.Harvester)
+ mock_harvester = mock_harvester_model.return_value
+ mock_harvester.update_availability.return_value = False
+ model_admin = admin.HarvesterAdmin(model=mock_harvester, admin_site=AdminSite())
+ with mock.patch.object(model_admin, "message_user"):
+ model_admin.initiate_perform_harvesting(None, [mock_harvester])
+ mock_harvester.update_availability.assert_called()
+ mock_harvester.initiate_perform_harvesting.assert_not_called()
+
+
+class AsynchronousHarvestingSessionAdminTestCase(GeoNodeBaseTestSupport):
+
+ def test_django_admin_does_not_allow_creating_new_session(self):
+ data = {
+ 'session_type': "harvesting",
+ 'harvester': 1000,
+ 'status': 'pending'
+ }
+ response = self.client.post(reverse('admin:harvesting_harvester_add'), data, follow=True)
+ print(response.redirect_chain)
+ self.assertRedirects(response, "/en/admin/login/?next=/en/admin/harvesting/harvester/add/")
+
+
+class HarvestableResourceAdminTestCase(GeoNodeBaseTestSupport):
+
+ def test_initiate_harvest_selected_resources_initiates_when_harvester_is_available(self):
+ mock_harvester_model = mock.MagicMock(spec=models.Harvester)
+ mock_harvester = mock_harvester_model.return_value
+ mock_harvester.update_availability.return_value = True
+ fake_resource_id = 1000
+ mock_resource = mock.MagicMock(
+ spec=models.HarvestableResource,
+ harvester=mock_harvester,
+ id=fake_resource_id
+ )
+ model_admin = admin.HarvestableResourceAdmin(model=mock_resource, admin_site=AdminSite())
+ with mock.patch.object(model_admin, "message_user"):
+ model_admin.initiate_harvest_selected_resources(None, [mock_resource])
+ mock_harvester.update_availability.assert_called()
+ mock_harvester.initiate_perform_harvesting.assert_called_with([fake_resource_id])
+
+ def test_initiate_harvest_selected_resources_skips_when_harvester_not_available(self):
+ mock_harvester_model = mock.MagicMock(spec=models.Harvester)
+ mock_harvester = mock_harvester_model.return_value
+ mock_harvester.update_availability.return_value = False
+ fake_resource_id = 1000
+ mock_resource = mock.MagicMock(
+ spec=models.HarvestableResource,
+ harvester=mock_harvester,
+ id=fake_resource_id
+ )
+ model_admin = admin.HarvestableResourceAdmin(model=mock_resource, admin_site=AdminSite())
+ with mock.patch.object(model_admin, "message_user"):
+ model_admin.initiate_harvest_selected_resources(None, [mock_resource])
+ mock_harvester.update_availability.assert_called()
+ mock_harvester.initiate_perform_harvesting.assert_not_called()
diff --git a/geonode/harvesting/tests/test_api_serializers.py b/geonode/harvesting/tests/test_api_serializers.py
index 98edbf3b69b..7bfb5a405b7 100644
--- a/geonode/harvesting/tests/test_api_serializers.py
+++ b/geonode/harvesting/tests/test_api_serializers.py
@@ -70,8 +70,8 @@ def test_serializer_is_able_to_serialize_model_instance(self):
self.assertEqual(urlparse(serialized["links"]["self"]).path, f"{api_endpoint}{self.harvester.pk}/")
self.assertIsNotNone(serialized["links"]["harvestable_resources"])
- @mock.patch("geonode.harvesting.api.serializers.utils")
- def test_validate_also_validates_worker_specific_config(self, mock_utils):
+ @mock.patch("geonode.harvesting.models.validate_worker_configuration")
+ def test_validate_also_validates_worker_specific_config(self, mock_validate_config):
input_data = {
"name": "phony",
"remote_url": "http://fake.com",
@@ -85,10 +85,9 @@ def test_validate_also_validates_worker_specific_config(self, mock_utils):
serializer = serializers.HarvesterSerializer(data=input_data, context={"request": request})
serializer.is_valid(raise_exception=True)
- mock_utils.validate_worker_configuration.assert_called()
+ mock_validate_config.assert_called()
- @mock.patch("geonode.harvesting.api.serializers.utils")
- def test_validate_does_not_allow_changing_status_and_worker_specific_config(self, mock_utils):
+ def test_validate_does_not_allow_changing_status_and_worker_specific_config(self):
input_data = {
"name": "phony",
"remote_url": "http://fake.com",
@@ -120,9 +119,7 @@ def test_create_does_not_allow_setting_custom_status(self):
with self.assertRaises(ValidationError):
serializer.save()
- @mock.patch("geonode.harvesting.api.serializers.tasks")
- @mock.patch("geonode.harvesting.api.serializers.utils")
- def test_create_checks_availability_of_remote_and_updates_harvestable_resources(self, mock_utils, mock_tasks):
+ def test_create(self):
input_data = {
"name": "phony",
"remote_url": "http://fake.com",
@@ -134,9 +131,8 @@ def test_create_checks_availability_of_remote_and_updates_harvestable_resources(
serializer = serializers.HarvesterSerializer(data=input_data, context={"request": request})
serializer.is_valid(raise_exception=True)
- serializer.save()
- mock_utils.update_harvester_availability.assert_called()
- mock_tasks.update_harvestable_resources.apply_async.assert_called()
+ harvester = serializer.save()
+ self.assertEqual(harvester.name, input_data["name"])
def test_update_errors_out_if_current_status_is_not_ready(self):
request = _REQUEST_FACTORY.patch(f"/api/v2/harvesters/{self.harvester.pk}")
@@ -178,7 +174,7 @@ def test_update_calls_update_harvestable_resources_task(self, mock_tasks):
)
serializer.is_valid(raise_exception=True)
serializer.save()
- mock_tasks.update_harvestable_resources.signature.assert_called_with(args=(self.harvester.pk,))
+ mock_tasks.update_harvestable_resources.signature.assert_called()
mock_tasks.update_harvestable_resources.signature.return_value.apply_async.assert_called()
@mock.patch("geonode.harvesting.api.serializers.tasks")
@@ -193,8 +189,10 @@ def test_update_calls_harvesting_dispatcher_task(self, mock_tasks):
)
serializer.is_valid(raise_exception=True)
serializer.save()
- called_harvester_pk, _ = mock_tasks.harvesting_dispatcher.signature.call_args_list[0].kwargs["args"]
- self.assertEqual(called_harvester_pk, self.harvester.pk)
+ called_args = mock_tasks.harvesting_dispatcher.signature.call_args_list[0].kwargs["args"]
+ called_session_pk = called_args[0]
+ session = models.AsynchronousHarvestingSession.objects.get(pk=called_session_pk)
+ self.assertEqual(session.harvester.pk, self.harvester.pk)
mock_tasks.harvesting_dispatcher.signature.return_value.apply_async.assert_called()
@mock.patch("geonode.harvesting.api.serializers.tasks")
@@ -231,7 +229,8 @@ def test_update_updates_harvestable_resources_whenever_worker_config_changes(sel
mock_tasks.update_harvestable_resources.signature.return_value.apply_async.assert_called()
-class BriefHarvestingSessionSerializerTestCase(GeoNodeBaseTestSupport):
+class BriefAsynchronousHarvestingSessionSerializerTestCase(GeoNodeBaseTestSupport):
+ harvester: models.Harvester
@classmethod
def setUpTestData(cls):
@@ -245,14 +244,15 @@ def setUpTestData(cls):
default_owner=user,
harvester_type=harvester_type
)
- cls.harvesting_session = models.HarvestingSession.objects.create(
- harvester=cls.harvester
+ cls.harvesting_session = models.AsynchronousHarvestingSession.objects.create(
+ harvester=cls.harvester,
+ session_type=models.AsynchronousHarvestingSession.TYPE_HARVESTING
)
def test_serializer_is_able_to_serialize_model_instance(self):
api_endpoint = "/api/v2/harvesting-sessions/"
request = _REQUEST_FACTORY.get(api_endpoint)
- serializer = serializers.BriefHarvestingSessionSerializer(
+ serializer = serializers.BriefAsynchronousHarvestingSessionSerializer(
self.harvesting_session, context={"request": request})
serialized = serializer.data
self.assertIsNotNone(serialized["started"])
diff --git a/geonode/harvesting/tests/test_api_views.py b/geonode/harvesting/tests/test_api_views.py
index e400d516d60..5ebec358349 100644
--- a/geonode/harvesting/tests/test_api_views.py
+++ b/geonode/harvesting/tests/test_api_views.py
@@ -46,13 +46,15 @@ def setUpTestData(cls):
harvester2.id: harvester_resource2,
}
- session1 = models.HarvestingSession.objects.create(
+ session1 = models.AsynchronousHarvestingSession.objects.create(
harvester=harvester1,
- records_harvested=10
+ session_type=models.AsynchronousHarvestingSession.TYPE_HARVESTING,
+ records_done=10
)
- session2 = models.HarvestingSession.objects.create(
+ session2 = models.AsynchronousHarvestingSession.objects.create(
harvester=harvester2,
- records_harvested=5
+ session_type=models.AsynchronousHarvestingSession.TYPE_HARVESTING,
+ records_done=5
)
cls.sessions = [session1, session2]
@@ -99,5 +101,5 @@ def test_get_harvester_sessions(self):
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["total"], len(self.sessions))
for index, harvester in enumerate(self.sessions):
- self.assertEqual(response.data["harvesting_sessions"][index]["id"], self.sessions[index].pk)
- self.assertEqual(response.data["harvesting_sessions"][index]["records_harvested"], self.sessions[index].records_harvested)
+ self.assertEqual(response.data["asynchronous_harvesting_sessions"][index]["id"], self.sessions[index].pk)
+ self.assertEqual(response.data["asynchronous_harvesting_sessions"][index]["records_done"], self.sessions[index].records_done)
diff --git a/geonode/harvesting/tests/test_harvester_worker_geonode_legacy.py b/geonode/harvesting/tests/test_harvester_worker_geonode_legacy.py
index f6c0d5526b9..ee900898745 100644
--- a/geonode/harvesting/tests/test_harvester_worker_geonode_legacy.py
+++ b/geonode/harvesting/tests/test_harvester_worker_geonode_legacy.py
@@ -18,9 +18,13 @@
#########################################################################
import mock.mock
+from django.utils import timezone
+
from geonode.harvesting.harvesters import geonodeharvester
from geonode.tests.base import GeoNodeBaseSimpleTestSupport
+from .. import models
+
class GeoNodeHarvesterWorkerTestCase(GeoNodeBaseSimpleTestSupport):
@@ -35,6 +39,29 @@ def test_base_api_url(self):
worker = geonodeharvester.GeonodeLegacyHarvester(base_url, harvester_id)
self.assertEqual(worker.base_api_url, expected)
+ def test_that_copying_remote_resources_is_allowed(self):
+ worker = geonodeharvester.GeonodeLegacyHarvester("http://fake-url2/", "fake-id")
+ self.assertTrue(worker.allows_copying_resources)
+
+ def test_creation_from_harvester(self):
+ now = timezone.now()
+ keywords = ["keyword1", "keyword2"]
+ categories = ["category1", "category2"]
+ combinations = [
+ {
+ "harvest_documents": True, "harvest_datasets": True, "copy_datasets": True, "copy_documents": True,
+ "resource_title_filter": "something", "start_date_filter": now, "end_date_filter": now,
+ "keywords_filter": keywords, "categories_filter": categories
+ },
+ ]
+ for param_combination in combinations:
+ harvester = models.Harvester(
+ name="fake1",
+ harvester_type="geonode.harvesting.harvesters.geonodeharvester.GeonodeLegacyHarvester",
+ harvester_type_specific_configuration=param_combination
+ )
+ harvester.get_harvester_worker()
+
@mock.patch("geonode.harvesting.harvesters.geonodeharvester.requests.Session")
def test_check_availability_works_when_response_includes_layers_object(self, mock_requests_session):
mock_response = mock.MagicMock()
diff --git a/geonode/harvesting/tests/test_models.py b/geonode/harvesting/tests/test_models.py
index ab435fc034e..354b282f4cf 100644
--- a/geonode/harvesting/tests/test_models.py
+++ b/geonode/harvesting/tests/test_models.py
@@ -18,8 +18,10 @@
#########################################################################
import datetime
+from unittest import mock
from django.contrib.auth import get_user_model
+from django.test import SimpleTestCase
from geonode.tests.base import GeoNodeBaseTestSupport
from .. import models
@@ -52,8 +54,24 @@ def test_setup_periodic_tasks(self):
self.assertEqual(self.harvester.availability_check_task.name, f"Check availability of {self.name}")
self.assertEqual(self.harvester.availability_check_task.interval.every, self.harvester.check_availability_frequency)
+ @mock.patch("geonode.harvesting.models.jsonschema")
+ @mock.patch("geonode.harvesting.models.import_string")
+ def test_validate_worker_configuration(self, mock_import_string, mock_jsonschema):
+ extra_config_schema = "fake_config_schema"
+ mock_worker_class = mock.MagicMock()
+ mock_worker_class.get_extra_config_schema.return_value = extra_config_schema
+ mock_import_string.return_value = mock_worker_class
-class HarvesterSessionTestCase(GeoNodeBaseTestSupport):
+ harvester_type = "fake_harvester_type"
+ configuration = {"fake_key": "fake_configuration"}
+ models.validate_worker_configuration(harvester_type, configuration)
+
+ mock_import_string.assert_called_with(harvester_type)
+ mock_worker_class.get_extra_config_schema.assert_called()
+ mock_jsonschema.validate.assert_called_with(configuration, extra_config_schema)
+
+
+class AsynchronousHarvestingSessionTestCase(GeoNodeBaseTestSupport):
remote_url = 'test.com'
name = 'This is geonode harvester'
user = get_user_model().objects.get(username='AnonymousUser')
@@ -67,8 +85,9 @@ def setUp(self):
default_owner=self.user,
harvester_type=self.harvester_type
)
- self.harvesting_session = models.HarvestingSession.objects.create(
- harvester=self.harvester
+ self.harvesting_session = models.AsynchronousHarvestingSession.objects.create(
+ harvester=self.harvester,
+ session_type=models.AsynchronousHarvestingSession.TYPE_HARVESTING
)
def test_check_attributes(self):
@@ -77,7 +96,7 @@ def test_check_attributes(self):
"""
self.assertIsNotNone(self.harvesting_session.pk)
self.assertEqual(self.harvesting_session.harvester, self.harvester)
- self.assertEqual(self.harvesting_session.records_harvested, 0)
+ self.assertEqual(self.harvesting_session.records_done, 0)
class HarvestableResourceTestCase(GeoNodeBaseTestSupport):
@@ -110,3 +129,22 @@ def test_check_attributes(self):
self.assertEqual(self.harvestable_resource.unique_identifier, self.unique_identifier)
self.assertFalse(self.harvestable_resource.should_be_harvested)
self.assertEqual(self.harvestable_resource.status, models.HarvestableResource.STATUS_READY)
+
+
+class WorkerConfigValidationTestCase(SimpleTestCase):
+
+ @mock.patch("geonode.harvesting.models.jsonschema")
+ @mock.patch("geonode.harvesting.models.import_string")
+ def test_validate_worker_configuration(self, mock_import_string, mock_jsonschema):
+ extra_config_schema = "fake_config_schema"
+ mock_worker_class = mock.MagicMock()
+ mock_worker_class.get_extra_config_schema.return_value = extra_config_schema
+ mock_import_string.return_value = mock_worker_class
+
+ harvester_type = "fake_harvester_type"
+ configuration = {"somekey": "fake_configuration"}
+ models.validate_worker_configuration(harvester_type, configuration)
+
+ mock_import_string.assert_called_with(harvester_type)
+ mock_worker_class.get_extra_config_schema.assert_called()
+ mock_jsonschema.validate.assert_called_with(configuration, extra_config_schema)
diff --git a/geonode/harvesting/tests/test_tasks.py b/geonode/harvesting/tests/test_tasks.py
index 6e0f8003e4c..456312872af 100644
--- a/geonode/harvesting/tests/test_tasks.py
+++ b/geonode/harvesting/tests/test_tasks.py
@@ -45,7 +45,10 @@ def setUpTestData(cls):
default_owner=cls.harvester_owner,
harvester_type=cls.harvester_type,
)
- cls.harvesting_session = models.HarvestingSession.objects.create(harvester=cls.harvester)
+ cls.harvesting_session = models.AsynchronousHarvestingSession.objects.create(
+ harvester=cls.harvester,
+ session_type=models.AsynchronousHarvestingSession.TYPE_HARVESTING
+ )
for index in range(3):
models.HarvestableResource.objects.create(
unique_identifier=f"fake-identifier-{index}",
@@ -55,8 +58,8 @@ def setUpTestData(cls):
last_refreshed=now()
)
- @mock.patch("geonode.harvesting.tasks.update_harvesting_session")
- def test_harvest_resource_updates_geonode_when_remote_resource_exists(self, mock_update_harvesting_session):
+ @mock.patch("geonode.harvesting.tasks.update_asynchronous_session")
+ def test_harvest_resource_updates_geonode_when_remote_resource_exists(self, mock_update_asynchronous_session):
"""Test that `worker.get_resource()` is called by the `_harvest_resource()` task and that the related workflow is called too.
Verify that `worker.get_resource()` is always called. Then verify that if the result of `worker.get_resource()` is
@@ -78,7 +81,7 @@ def test_harvest_resource_updates_geonode_when_remote_resource_exists(self, mock
mock_models.HarvestableResource.objects.get.assert_called_with(pk=harvestable_resource_id)
mock_worker.get_resource.assert_called()
mock_worker.update_geonode_resource.assert_called()
- mock_update_harvesting_session.assert_called()
+ mock_update_asynchronous_session.assert_called()
def test_harvest_resource_does_not_update_geonode_when_remote_resource_does_not_exist(self):
"""Test that the worker does not try to update existing GeoNode resources when the remote resource cannot be harvested."""
@@ -99,7 +102,7 @@ def test_harvest_resource_does_not_update_geonode_when_remote_resource_does_not_
mock_worker.update_geonode_resource.assert_not_called()
def test_finish_harvesting_updates_harvester_status(self):
- tasks._finish_harvesting(self.harvester.id, self.harvesting_session.id)
+ tasks._finish_harvesting(self.harvesting_session.id)
self.harvester.refresh_from_db()
self.harvesting_session.refresh_from_db()
self.assertEqual(self.harvester.status, models.Harvester.STATUS_READY)
@@ -112,10 +115,12 @@ def test_handle_harvesting_error_cleans_up_harvest_execution(self):
self.assertEqual(self.harvester.status, models.Harvester.STATUS_READY)
self.assertIsNotNone(self.harvesting_session.ended)
- @mock.patch("geonode.harvesting.tasks.utils")
- def test_check_harvester_available(self, mock_harvesting_utils):
- tasks.check_harvester_available(self.harvester.id)
- mock_harvesting_utils.update_harvester_availability.assert_called_with(self.harvester)
+ @mock.patch("geonode.harvesting.tasks.models.Harvester")
+ def test_check_harvester_available(self, mock_harvester_model):
+ mock_harvester = mock.MagicMock(spec=models.Harvester).return_value
+ mock_harvester_model.objects.get.return_value = mock_harvester
+ tasks.check_harvester_available(1000)
+ mock_harvester.update_availability.assert_called()
@mock.patch("geonode.harvesting.tasks._handle_harvestable_resources_update_error")
@mock.patch("geonode.harvesting.tasks._finish_harvestable_resources_update")
diff --git a/geonode/harvesting/tests/test_utils.py b/geonode/harvesting/tests/test_utils.py
deleted file mode 100644
index 51d578adcc1..00000000000
--- a/geonode/harvesting/tests/test_utils.py
+++ /dev/null
@@ -1,24 +0,0 @@
-from unittest import mock
-
-from django.test import SimpleTestCase
-
-from .. import utils
-
-
-class UtilsTestCase(SimpleTestCase):
-
- @mock.patch("geonode.harvesting.utils.jsonschema")
- @mock.patch("geonode.harvesting.utils.import_string")
- def test_validate_worker_configuration(self, mock_import_string, mock_jsonschema):
- extra_config_schema = "fake_config_schema"
- mock_worker_class = mock.MagicMock()
- mock_worker_class.get_extra_config_schema.return_value = extra_config_schema
- mock_import_string.return_value = mock_worker_class
-
- harvester_type = "fake_harvester_type"
- configuration = "fake_configuration"
- utils.validate_worker_configuration(harvester_type, configuration)
-
- mock_import_string.assert_called_with(harvester_type)
- mock_worker_class.get_extra_config_schema.assert_called()
- mock_jsonschema.validate.assert_called_with(configuration, extra_config_schema)
diff --git a/geonode/harvesting/utils.py b/geonode/harvesting/utils.py
index da1890e2947..f9730c1c792 100644
--- a/geonode/harvesting/utils.py
+++ b/geonode/harvesting/utils.py
@@ -19,9 +19,6 @@
import typing
-from django.utils.timezone import now
-from django.utils.module_loading import import_string
-import jsonschema
from lxml import etree
@@ -29,31 +26,6 @@
XML_PARSER: typing.Final = etree.XMLParser(resolve_entities=False)
-def update_harvester_availability(
- harvester: "Harvester", # noqa
- timeout_seconds: typing.Optional[int] = 5
-) -> bool:
- harvester.status = harvester.STATUS_CHECKING_AVAILABILITY
- harvester.save()
- worker = harvester.get_harvester_worker()
- harvester.last_checked_availability = now()
- available = worker.check_availability(timeout_seconds=timeout_seconds)
- harvester.remote_available = available
- harvester.status = harvester.STATUS_READY
- harvester.save()
- return available
-
-
-def validate_worker_configuration(harvester_type, configuration: typing.Dict):
- worker_class = import_string(harvester_type)
- schema = worker_class.get_extra_config_schema()
- if schema is not None:
- try:
- jsonschema.validate(configuration, schema)
- except jsonschema.exceptions.SchemaError as exc:
- raise RuntimeError(f"Invalid schema: {exc}")
-
-
def get_xpath_value(
element: etree.Element,
xpath_expression: str,