Skip to content

Commit

Permalink
Add generic task to run any function, #10573
Browse files Browse the repository at this point in the history
  • Loading branch information
njkim committed Feb 2, 2024
1 parent 66374c6 commit de27796
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions arches/app/tasks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import importlib
import os
import logging
import shutil
Expand Down Expand Up @@ -352,6 +353,19 @@ def bulk_data_deletion(userid, load_id, graph_id, nodegroup_id, resourceids):
notify_completion(msg, user)


@shared_task
def run_task(module_name=None, class_name=None, method_to_run=None, **kwargs):
"""
this allows the user to run any method as a celery task
module_name, class_name, and method_to_run are required
pass any additional arguments to the method via the kwargs parameter
"""

theClass = getattr(importlib.import_module(module_name), class_name)
theMethod = getattr(theClass(), method_to_run)
theMethod(**kwargs)


@shared_task
def run_etl_task(**kwargs):
"""
Expand All @@ -369,8 +383,7 @@ def run_etl_task(**kwargs):
userid = kwargs.get("userid")

try:
import_class = vars(__import__(import_module, globals(), locals(), [import_class]))[import_class]
import_class().run_load_task(**kwargs)
run_task(module_name=import_module, class_name=import_class, method_to_run="run_load_task", **kwargs)

load_event = models.LoadEvent.objects.get(loadid=loadid)
status = _("Completed") if load_event.status == "indexed" else _("Failed")
Expand Down

0 comments on commit de27796

Please sign in to comment.