Skip to content

Commit

Permalink
- Set RabbitMQ native expiration property on the message queues (#10674
Browse files Browse the repository at this point in the history
)
  • Loading branch information
afabiani authored Feb 22, 2023
1 parent 0e89afe commit 513659b
Show file tree
Hide file tree
Showing 19 changed files with 94 additions and 52 deletions.
12 changes: 6 additions & 6 deletions geonode/base/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ def resource_service_permissions(self, request, pk):
"created": request_params.get("created", False),
},
)
resouce_service_dispatcher.apply_async((_exec_request.exec_id,))
resouce_service_dispatcher.apply_async(args=(_exec_request.exec_id,), expiration=30)
return Response(
{
"status": _exec_request.status,
Expand Down Expand Up @@ -841,7 +841,7 @@ def resource_service_ingest(self, request, resource_type: str = None):
"defaults": request_params.get("defaults", f'{{"owner":"{request.user.username}"}}'),
},
)
resouce_service_dispatcher.apply_async((_exec_request.exec_id,))
resouce_service_dispatcher.apply_async(args=(_exec_request.exec_id,), expiration=30)
return Response(
{
"status": _exec_request.status,
Expand Down Expand Up @@ -941,7 +941,7 @@ def resource_service_create(self, request, resource_type: str = None):
"defaults": request_params.get("defaults", f'{{"owner":"{request.user.username}"}}'),
},
)
resouce_service_dispatcher.apply_async((_exec_request.exec_id,))
resouce_service_dispatcher.apply_async(args=(_exec_request.exec_id,), expiration=30)
return Response(
{
"status": _exec_request.status,
Expand Down Expand Up @@ -1025,7 +1025,7 @@ def resource_service_delete(self, request, pk):
geonode_resource=resource,
input_params={"uuid": resource.uuid},
)
resouce_service_dispatcher.apply_async((_exec_request.exec_id,))
resouce_service_dispatcher.apply_async(args=(_exec_request.exec_id,), expiration=30)
return Response(
{
"status": _exec_request.status,
Expand Down Expand Up @@ -1146,7 +1146,7 @@ def resource_service_update(self, request, pk):
"notify": request_params.get("notify", True),
},
)
resouce_service_dispatcher.apply_async((_exec_request.exec_id,))
resouce_service_dispatcher.apply_async(args=(_exec_request.exec_id,), expiration=30)
return Response(
{
"status": _exec_request.status,
Expand Down Expand Up @@ -1257,7 +1257,7 @@ def resource_service_copy(self, request, pk):
"defaults": request_params.get("defaults", "{}"),
},
)
resouce_service_dispatcher.apply_async((_exec_request.exec_id,))
resouce_service_dispatcher.apply_async(args=(_exec_request.exec_id,), expiration=30)
return Response(
{
"status": _exec_request.status,
Expand Down
3 changes: 2 additions & 1 deletion geonode/base/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ def user_and_group_permission(request, model):
_errors = True
else:
set_permissions.apply_async(
([permissions_names], resources_names, users_usernames, groups_names, delete_flag)
args=([permissions_names], resources_names, users_usernames, groups_names, delete_flag),
expiration=30,
)
if not _errors:
_message = f'The asyncronous permissions {form.cleaned_data.get("mode")} request for {", ".join(users_usernames or groups_names)} has been sent'
Expand Down
16 changes: 12 additions & 4 deletions geonode/br/management/commands/restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,9 @@ def execute_restore(self, **options):
except Exception as exception:
if notify:
restore_notification.apply_async(
(admin_emails, backup_file, backup_md5, str(exception)))
args=(admin_emails, backup_file, backup_md5, str(exception)),
expiration=30
)

print("...Sanity Checks on Folder failed. Please make sure that the current user has full WRITE access to the above folders (and sub-folders or files).") # noqa
print("Reason:")
Expand Down Expand Up @@ -321,7 +323,9 @@ def execute_restore(self, **options):
self.restore_geoserver_externals(config, settings, recovery_folder)
if notify:
restore_notification.apply_async(
(admin_emails, backup_file, backup_md5, str(exception)))
args=(admin_emails, backup_file, backup_md5, str(exception)),
expiration=30
)
raise exception
else:
print("Skipping geoserver backup restore")
Expand Down Expand Up @@ -503,7 +507,9 @@ def execute_restore(self, **options):
except Exception as exception:
if notify:
restore_notification.apply_async(
(admin_emails, backup_file, backup_md5, str(exception)))
args=(admin_emails, backup_file, backup_md5, str(exception)),
expiration=30
)

finally:
call_command('makemigrations', interactive=False)
Expand All @@ -512,7 +518,9 @@ def execute_restore(self, **options):

if notify:
restore_notification.apply_async(
(admin_emails, backup_file, backup_md5))
args=(admin_emails, backup_file, backup_md5),
expiration=30
)

print("HINT: If you migrated from another site, do not forget to run the command 'migrate_baseurl' to fix Links") # noqa
print(
Expand Down
2 changes: 1 addition & 1 deletion geonode/br/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
bind=True,
name="geonode.br.tasks.restore_notification",
queue="email",
expires=600,
expires=30,
time_limit=600,
acks_late=False,
autoretry_for=(Exception,),
Expand Down
6 changes: 3 additions & 3 deletions geonode/documents/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _get_filetype(self, filname):
bind=True,
name="geonode.documents.tasks.create_document_thumbnail",
queue="geonode",
expires=600,
expires=30,
time_limit=600,
acks_late=False,
autoretry_for=(Exception,),
Expand Down Expand Up @@ -138,7 +138,7 @@ def create_document_thumbnail(self, object_id):
bind=True,
name="geonode.documents.tasks.delete_orphaned_document_files",
queue="cleanup",
expires=600,
expires=30,
time_limit=600,
acks_late=False,
autoretry_for=(Exception,),
Expand All @@ -157,7 +157,7 @@ def delete_orphaned_document_files(self):
bind=True,
name="geonode.documents.tasks.delete_orphaned_thumbnails",
queue="cleanup",
expires=600,
expires=30,
time_limit=600,
acks_late=False,
autoretry_for=(Exception,),
Expand Down
4 changes: 2 additions & 2 deletions geonode/geoserver/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ def delete(self, uuid: str, /, instance: ResourceBase = None) -> int:
or _real_instance.remote_service is None
or _real_instance.remote_service.method == CASCADED
):
geoserver_cascading_delete.apply_async((_real_instance.alternate,))
geoserver_cascading_delete.apply_async(args=(_real_instance.alternate,), expiration=30)
elif isinstance(_real_instance, Map):
geoserver_delete_map.apply_async((_real_instance.id,))
geoserver_delete_map.apply_async(args=(_real_instance.id,), expiration=30)
except Exception as e:
logger.exception(e)

Expand Down
16 changes: 9 additions & 7 deletions geonode/geoserver/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def geoserver_delete(typename):
# cascading_delete should only be called if
# ogc_server_settings.BACKEND_WRITE_ENABLED == True
if getattr(ogc_server_settings, "BACKEND_WRITE_ENABLED", True):
geoserver_cascading_delete.apply_async((typename,))
geoserver_cascading_delete.apply_async(args=(typename,), expiration=30)


@on_ogc_backend(BACKEND_PACKAGE)
Expand All @@ -62,7 +62,7 @@ def geoserver_pre_delete(instance, sender, **kwargs):
or instance.remote_service.method == CASCADED
):
if instance.alternate:
geoserver_cascading_delete.apply_async((instance.alternate,))
geoserver_cascading_delete.apply_async(args=(instance.alternate,), expiration=30)


@on_ogc_backend(BACKEND_PACKAGE)
Expand All @@ -78,7 +78,7 @@ def geoserver_post_save_local(instance, *args, **kwargs):
* Metadata Links,
* Point of Contact name and url
"""
geoserver_post_save_datasets.apply_async((instance.id, args, kwargs))
geoserver_post_save_datasets.apply_async(args=(instance.id, args, kwargs), expiration=30)


@on_ogc_backend(BACKEND_PACKAGE)
Expand Down Expand Up @@ -117,11 +117,12 @@ def geoserver_post_save_map(instance, sender, created, **kwargs):
if not instance.thumbnail_url:
logger.debug(f"... Creating Thumbnail for Map [{instance.title}]")
geoserver_create_thumbnail.apply_async(
(
args=(
instance.id,
False,
True,
)
),
expiration=30,
)


Expand All @@ -143,11 +144,12 @@ def geoserver_set_thumbnail(instance, **kwargs):
_recreate_thumbnail = True
if _recreate_thumbnail:
geoserver_create_thumbnail.apply_async(
(
args=(
instance.id,
False,
True,
)
),
expiration=30,
)
else:
logger.debug(f"... Thumbnail for Dataset {instance.title} already exists: {instance.thumbnail_url}")
Expand Down
16 changes: 8 additions & 8 deletions geonode/geoserver/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
base=FaultTolerantTask,
name="geonode.geoserver.tasks.geoserver_update_datasets",
queue="geoserver.catalog",
expires=600,
expires=30,
time_limit=600,
acks_late=False,
autoretry_for=(Exception,),
Expand Down Expand Up @@ -85,7 +85,7 @@ def geoserver_update_datasets(self, *args, **kwargs):
base=FaultTolerantTask,
name="geonode.geoserver.tasks.geoserver_set_style",
queue="geoserver.catalog",
expires=600,
expires=30,
time_limit=600,
acks_late=False,
autoretry_for=(Exception,),
Expand Down Expand Up @@ -133,7 +133,7 @@ def geoserver_set_style(self, instance_id, base_file):
base=FaultTolerantTask,
name="geonode.geoserver.tasks.geoserver_create_style",
queue="geoserver.catalog",
expires=600,
expires=30,
time_limit=600,
acks_late=False,
autoretry_for=(Exception,),
Expand Down Expand Up @@ -211,7 +211,7 @@ def geoserver_create_style(self, instance_id, name, sld_file, tempdir):
base=FaultTolerantTask,
name="geonode.geoserver.tasks.geoserver_post_save_datasets",
queue="geoserver.catalog",
expires=600,
expires=30,
time_limit=600,
acks_late=False,
autoretry_for=(Exception,),
Expand Down Expand Up @@ -253,7 +253,7 @@ def geoserver_post_save_datasets(self, instance_id, *args, **kwargs):
base=FaultTolerantTask,
name="geonode.geoserver.tasks.geoserver_create_thumbnail",
queue="geoserver.events",
expires=600,
expires=30,
time_limit=600,
acks_late=False,
autoretry_for=(Exception,),
Expand Down Expand Up @@ -299,7 +299,7 @@ def geoserver_create_thumbnail(self, instance_id, overwrite=True, check_bbox=Tru
base=FaultTolerantTask,
name="geonode.geoserver.tasks.geoserver_cascading_delete",
queue="cleanup",
expires=600,
expires=30,
time_limit=600,
acks_late=False,
autoretry_for=(Exception,),
Expand All @@ -325,7 +325,7 @@ def geoserver_cascading_delete(self, *args, **kwargs):
bind=True,
name="geonode.geoserver.tasks.geoserver_delete_map",
queue="cleanup",
expires=600,
expires=30,
time_limit=600,
acks_late=False,
autoretry_for=(Exception,),
Expand Down Expand Up @@ -359,7 +359,7 @@ def geoserver_delete_map(self, object_id):
bind=True,
name="geonode.security.tasks.synch_guardian",
queue="security",
expires=600,
expires=30,
time_limit=600,
acks_late=False,
autoretry_for=(Exception,),
Expand Down
2 changes: 1 addition & 1 deletion geonode/harvesting/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def update(self, instance: models.Harvester, validated_data):
post_update_task = None
updated_instance = super().update(instance, validated_data)
if post_update_task is not None:
post_update_task.apply_async()
post_update_task.apply_async(args=(), expiration=30)
return updated_instance


Expand Down
2 changes: 1 addition & 1 deletion geonode/harvesting/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ def initiate(self, harvestable_resource_ids: typing.Optional[typing.List[int]] =
raise RuntimeError("Invalid selection")
self.status = self.STATUS_PENDING
self.save()
task_signature.apply_async()
task_signature.apply_async(args=(), expiration=30)

def abort(self):
"""Abort a pending or on-going session."""
Expand Down
Loading

0 comments on commit 513659b

Please sign in to comment.