diff --git a/delfin/cmd/task.py b/delfin/cmd/task.py index f0e2f3ca2..f15b7aae3 100644 --- a/delfin/cmd/task.py +++ b/delfin/cmd/task.py @@ -20,6 +20,7 @@ """Starter script for delfin task service.""" import eventlet + eventlet.monkey_patch() import sys @@ -45,9 +46,18 @@ def main(): task_server = service.TaskService.create(binary='delfin-task', coordination=True) leader_election = service.LeaderElectionService.create() + metrics_task_server = service. \ + TaskService.create(binary='delfin-task', + topic=CONF.host, + manager='delfin.' + 'task_manager.' + 'metrics_manager.' + 'MetricsTaskManager', + coordination=True) service.serve(task_server) service.serve(leader_election) + service.serve(metrics_task_server) service.wait() diff --git a/delfin/db/sqlalchemy/models.py b/delfin/db/sqlalchemy/models.py index bf1d77a7c..cc8b65eff 100644 --- a/delfin/db/sqlalchemy/models.py +++ b/delfin/db/sqlalchemy/models.py @@ -268,6 +268,7 @@ class Task(BASE, DelfinBase): args = Column(JsonEncodedDict) last_run_time = Column(Integer) job_id = Column(String(36)) + executor = Column(String(255)) deleted_at = Column(DateTime) deleted = Column(Boolean, default=False) @@ -285,6 +286,7 @@ class FailedTask(BASE, DelfinBase): method = Column(String(255)) result = Column(String(255)) job_id = Column(String(36)) + executor = Column(String(255)) deleted_at = Column(DateTime) deleted = Column(Boolean, default=False) diff --git a/delfin/leader_election/distributor/__init__.py b/delfin/leader_election/distributor/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/delfin/leader_election/distributor/failed_task_distributor.py b/delfin/leader_election/distributor/failed_task_distributor.py new file mode 100644 index 000000000..4b83720e0 --- /dev/null +++ b/delfin/leader_election/distributor/failed_task_distributor.py @@ -0,0 +1,67 @@ +# Copyright 2021 The SODA Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import six +from oslo_config import cfg +from oslo_log import log + +from delfin import db +from delfin.common.constants import TelemetryCollection +from delfin.task_manager import metrics_rpcapi as task_rpcapi + +CONF = cfg.CONF +LOG = log.getLogger(__name__) + + +class FailedTaskDistributor(object): + def __init__(self, ctx): + # create the object of periodic scheduler + self.task_rpcapi = task_rpcapi.TaskAPI() + self.ctx = ctx + + def __call__(self): + + try: + # Remove jobs from scheduler when marked for delete + filters = {'deleted': True} + failed_tasks = db.failed_task_get_all(self.ctx, filters=filters) + LOG.debug("Total failed_tasks found deleted " + "in this cycle:%s" % len(failed_tasks)) + for failed_task in failed_tasks: + self.task_rpcapi.remove_failed_job(self.ctx, failed_task['id'], + failed_task['executor']) + except Exception as e: + LOG.error("Failed to remove periodic scheduling job , reason: %s.", + six.text_type(e)) + try: + failed_tasks = db.failed_task_get_all(self.ctx) + for failed_task in failed_tasks: + # Todo Get executor for the job + LOG.debug('Assigning failed task for for id: ' + '%s' % failed_task['id']) + self.task_rpcapi.assign_failed_job(self.ctx, failed_task['id'], + failed_task['executor']) + + LOG.info('Assigned failed task for id: ' + '%s ' % failed_task['id']) + except Exception as e: + LOG.error("Failed to schedule retry tasks for performance " + "collection, reason: %s", six.text_type(e)) + else: + LOG.info("Schedule collection completed") + + @classmethod + def job_interval(cls): + return TelemetryCollection.FAILED_JOB_SCHEDULE_INTERVAL diff --git a/delfin/leader_election/distributor/task_distributor.py b/delfin/leader_election/distributor/task_distributor.py new file mode 100644 index 000000000..5b48ea4dc --- /dev/null +++ b/delfin/leader_election/distributor/task_distributor.py @@ -0,0 +1,79 @@ +# Copyright 2021 The SODA Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import six +from oslo_config import cfg +from oslo_log import log + +from delfin import db +from delfin.common.constants import TelemetryCollection +from delfin.task_manager import metrics_rpcapi as task_rpcapi + +CONF = cfg.CONF +LOG = log.getLogger(__name__) + + +class TaskDistributor(object): + def __init__(self, ctx): + self.ctx = ctx + self.task_rpcapi = task_rpcapi.TaskAPI() + + # Reset last run time of tasks to restart scheduling and + # start the failed task job + task_list = db.task_get_all(ctx) + for task in task_list: + db.task_update(ctx, task['id'], {'last_run_time': None}) + + def __call__(self): + """ Schedule the collection tasks based on interval """ + + try: + # Remove jobs from scheduler when marked for delete + filters = {'deleted': True} + tasks = db.task_get_all(self.ctx, filters=filters) + LOG.debug("Total tasks found deleted " + "in this cycle:%s" % len(tasks)) + for task in tasks: + self.task_rpcapi.remove_job(self.ctx, task['id'], + task['executor']) + except Exception as e: + LOG.error("Failed to remove periodic scheduling job , reason: %s.", + six.text_type(e)) + + try: + + filters = {'last_run_time': None} + tasks = db.task_get_all(self.ctx, filters=filters) + LOG.debug("Distributing performance collection jobs: total " + "jobs to be handled:%s" % len(tasks)) + for task in tasks: + # Todo Get executor for the job + executor = CONF.host + db.task_update(self.ctx, task['id'], {'executor': executor}) + LOG.info('Assigning executor for collection job for id: ' + '%s' % task['id']) + self.task_rpcapi.assign_job(self.ctx, task['id'], executor) + + LOG.debug('Periodic collection job assigned for id: ' + '%s ' % task['id']) + except Exception as e: + LOG.error("Failed to distribute periodic collection, reason: %s.", + six.text_type(e)) + else: + LOG.debug("Periodic job distribution completed.") + + @classmethod + def job_interval(cls): + return TelemetryCollection.PERIODIC_JOB_INTERVAL diff --git a/delfin/leader_election/factory.py b/delfin/leader_election/factory.py index cc80bfaf6..8e3cac86b 100644 --- a/delfin/leader_election/factory.py +++ b/delfin/leader_election/factory.py @@ -36,9 +36,10 @@ def construct_elector(plugin, leader_key=None): scheduler_mgr = SchedulerManager() if plugin == "tooz": + scheduler_mgr.start() # Create callback object callback = ToozLeaderElectionCallback.register( - on_leading_callback=scheduler_mgr.start, + on_leading_callback=scheduler_mgr.schedule_boot_jobs, on_stop_callback=scheduler_mgr.stop) return Elector(callback, leader_election_key) diff --git a/delfin/task_manager/manager.py b/delfin/task_manager/manager.py index 63d36db89..ce98d1204 100644 --- a/delfin/task_manager/manager.py +++ b/delfin/task_manager/manager.py @@ -43,15 +43,6 @@ def sync_storage_resource(self, context, storage_id, resource_task): device_obj = cls(context, storage_id) device_obj.sync() - def collect_telemetry(self, context, storage_id, telemetry_task, - args, start_time, end_time): - LOG.debug("Collecting resource metrics: {0} request for storage" - " id:{1}".format(args, storage_id)) - cls = importutils.import_class(telemetry_task) - device_obj = cls() - return device_obj.collect(context, storage_id, args, start_time, - end_time) - def remove_storage_resource(self, context, storage_id, resource_task): cls = importutils.import_class(resource_task) device_obj = cls(context, storage_id) diff --git a/delfin/task_manager/metrics_manager.py b/delfin/task_manager/metrics_manager.py new file mode 100644 index 000000000..ffaaa6e06 --- /dev/null +++ b/delfin/task_manager/metrics_manager.py @@ -0,0 +1,55 @@ +# Copyright 2021 The SODA Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +periodical task manager for metric collection tasks** +""" +from oslo_log import log +from delfin import manager +from delfin.task_manager.scheduler import schedule_manager +from delfin.task_manager.scheduler.schedulers.telemetry.job_handler\ + import JobHandler +from delfin.task_manager.scheduler.schedulers.telemetry.job_handler\ + import FailedJobHandler +from delfin.task_manager.tasks import telemetry + +LOG = log.getLogger(__name__) + + +class MetricsTaskManager(manager.Manager): + """manage periodical tasks""" + + RPC_API_VERSION = '1.0' + + def __init__(self, service_name=None, *args, **kwargs): + self.telemetry_task = telemetry.TelemetryTask() + super(MetricsTaskManager, self).__init__(*args, **kwargs) + scheduler = schedule_manager.SchedulerManager() + scheduler.start() + JobHandler.schedule_boot_jobs() + + def assign_job(self, context, task_id): + instance = JobHandler.get_instance(context, task_id) + instance.schedule_job(task_id) + + def remove_job(self, context, task_id): + instance = JobHandler.get_instance(context, task_id) + instance.remove_job(task_id) + + def assign_failed_job(self, context, failed_task_id): + instance = FailedJobHandler.get_instance(context, failed_task_id) + instance.schedule_failed_job(failed_task_id) + + def remove_failed_job(self, context, failed_task_id): + instance = FailedJobHandler.get_instance(context, failed_task_id) + instance.remove_failed_job(failed_task_id) diff --git a/delfin/task_manager/metrics_rpcapi.py b/delfin/task_manager/metrics_rpcapi.py new file mode 100644 index 000000000..9019a2ef1 --- /dev/null +++ b/delfin/task_manager/metrics_rpcapi.py @@ -0,0 +1,75 @@ +# Copyright 2021 The SODA Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Client side of the metrics task manager RPC API. +""" + +import oslo_messaging as messaging +from oslo_config import cfg + +from delfin import rpc + +CONF = cfg.CONF + + +class TaskAPI(object): + """Client side of the metrics task rpc API. + + API version history: + + 1.0 - Initial version. + """ + + RPC_API_VERSION = '1.0' + + def __init__(self): + super(TaskAPI, self).__init__() + self.target = messaging.Target(topic=CONF.host, + version=self.RPC_API_VERSION) + self.client = rpc.get_client(self.target, + version_cap=self.RPC_API_VERSION) + + def get_client(self, topic): + target = messaging.Target(topic=topic, + version=self.RPC_API_VERSION) + return rpc.get_client(target, version_cap=self.RPC_API_VERSION) + + def assign_job(self, context, task_id, executor): + rpc_client = self.get_client(str(executor)) + call_context = rpc_client.prepare(topic=str(executor), version='1.0', + fanout=True) + return call_context.cast(context, 'assign_job', + task_id=task_id) + + def remove_job(self, context, task_id, executor): + rpc_client = self.get_client(str(executor)) + call_context = rpc_client.prepare(topic=str(executor), version='1.0', + fanout=True) + return call_context.cast(context, 'remove_job', + task_id=task_id) + + def assign_failed_job(self, context, failed_task_id, executor): + rpc_client = self.get_client(str(executor)) + call_context = rpc_client.prepare(topic=str(executor), version='1.0', + fanout=True) + return call_context.cast(context, 'assign_failed_job', + failed_task_id=failed_task_id) + + def remove_failed_job(self, context, failed_task_id, executor): + rpc_client = self.get_client(str(executor)) + call_context = rpc_client.prepare(topic=str(executor), version='1.0', + fanout=True) + return call_context.cast(context, 'remove_failed_job', + failed_task_id=failed_task_id) diff --git a/delfin/task_manager/scheduler/schedule_manager.py b/delfin/task_manager/scheduler/schedule_manager.py index e495a5b84..6d2a2c37e 100644 --- a/delfin/task_manager/scheduler/schedule_manager.py +++ b/delfin/task_manager/scheduler/schedule_manager.py @@ -22,16 +22,16 @@ from delfin import context from delfin import utils -from delfin.task_manager.scheduler.schedulers.telemetry.failed_telemetry_job \ - import FailedTelemetryJob -from delfin.task_manager.scheduler.schedulers.telemetry.telemetry_job import \ - TelemetryJob +from delfin.leader_election.distributor.failed_task_distributor\ + import FailedTaskDistributor +from delfin.leader_election.distributor.task_distributor \ + import TaskDistributor LOG = log.getLogger(__name__) SCHEDULER_BOOT_JOBS = [ - TelemetryJob.__module__ + '.' + TelemetryJob.__name__, - FailedTelemetryJob.__module__ + '.' + FailedTelemetryJob.__name__ + TaskDistributor.__module__ + '.' + TaskDistributor.__name__, + FailedTaskDistributor.__module__ + '.' + FailedTaskDistributor.__name__ ] @@ -54,6 +54,7 @@ def start(self): self.scheduler.start() self.scheduler_started = True + def schedule_boot_jobs(self): if not self.boot_jobs_scheduled: try: for job in SCHEDULER_BOOT_JOBS: diff --git a/delfin/task_manager/scheduler/schedulers/telemetry/failed_performance_collection_handler.py b/delfin/task_manager/scheduler/schedulers/telemetry/failed_performance_collection_handler.py index 86b1e5793..fd7970bc2 100644 --- a/delfin/task_manager/scheduler/schedulers/telemetry/failed_performance_collection_handler.py +++ b/delfin/task_manager/scheduler/schedulers/telemetry/failed_performance_collection_handler.py @@ -77,14 +77,11 @@ def __call__(self): % (self.storage_id, self.failed_task_id)) return - # Pull performance collection info self.retry_count = self.retry_count + 1 try: - status = self.task_rpcapi.collect_telemetry( - self.ctx, self.storage_id, - PerformanceCollectionTask.__module__ + '.' + - PerformanceCollectionTask.__name__, - self.args, self.start_time, self.end_time) + telemetry = PerformanceCollectionTask() + status = telemetry.collect(self.ctx, self.storage_id, self.args, + self.start_time, self.end_time) if not status: raise exception.TelemetryTaskExecError() diff --git a/delfin/task_manager/scheduler/schedulers/telemetry/failed_telemetry_job.py b/delfin/task_manager/scheduler/schedulers/telemetry/failed_telemetry_job.py deleted file mode 100644 index 55945becf..000000000 --- a/delfin/task_manager/scheduler/schedulers/telemetry/failed_telemetry_job.py +++ /dev/null @@ -1,148 +0,0 @@ -# Copyright 2021 The SODA Authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from datetime import datetime - -import six -from oslo_config import cfg -from oslo_log import log -from oslo_utils import importutils -from oslo_utils import uuidutils - -from delfin import db -from delfin.common.constants import TelemetryJobStatus, TelemetryCollection -from delfin.db.sqlalchemy.models import FailedTask -from delfin.exception import TaskNotFound -from delfin.task_manager.scheduler import schedule_manager - -CONF = cfg.CONF -LOG = log.getLogger(__name__) - - -class FailedTelemetryJob(object): - def __init__(self, ctx): - # create the object of periodic scheduler - self.scheduler = schedule_manager.SchedulerManager().get_scheduler() - self.ctx = ctx - self.stopped = False - self.job_ids = set() - - def __call__(self): - """ - :return: - """ - - if self.stopped: - return - - try: - # Remove jobs from scheduler when marked for delete - filters = {'deleted': True} - failed_tasks = db.failed_task_get_all(self.ctx, filters=filters) - LOG.debug("Total failed_tasks found deleted " - "in this cycle:%s" % len(failed_tasks)) - for failed_task in failed_tasks: - job_id = failed_task['job_id'] - self.remove_scheduled_job(job_id) - db.failed_task_delete(self.ctx, failed_task['id']) - except Exception as e: - LOG.error("Failed to remove periodic scheduling job , reason: %s.", - six.text_type(e)) - try: - # Create the object of periodic scheduler - failed_tasks = db.failed_task_get_all(self.ctx) - - if not len(failed_tasks): - LOG.info("No failed task found for performance collection") - return - - LOG.debug("Schedule performance collection triggered: total " - "failed tasks:%s" % len(failed_tasks)) - - for failed_task in failed_tasks: - failed_task_id = failed_task[FailedTask.id.name] - LOG.info("Processing failed task : %s" % failed_task_id) - - # Get failed jobs, if retry count has reached max, - # remove job and delete db entry - retry_count = failed_task[FailedTask.retry_count.name] - result = failed_task[FailedTask.result.name] - job_id = failed_task[FailedTask.job_id.name] - if retry_count >= \ - TelemetryCollection.MAX_FAILED_JOB_RETRY_COUNT or \ - result == TelemetryJobStatus.FAILED_JOB_STATUS_SUCCESS: - LOG.info("Exiting Failure task processing for task [%d] " - "with result [%s] and retry count [%d] " - % (failed_task_id, result, retry_count)) - # task ID is same as job id - self._teardown_task(self.ctx, failed_task_id, job_id) - continue - - # If job already scheduled, skip - if job_id and self.scheduler.get_job(job_id): - continue - - try: - db.task_get(self.ctx, - failed_task[FailedTask.task_id.name]) - except TaskNotFound as e: - LOG.info("Removing failed telemetry job as parent job " - "do not exist: %s", six.text_type(e)) - # tear down if original task is not available - self._teardown_task(self.ctx, failed_task_id, - job_id) - continue - - if not job_id: - job_id = uuidutils.generate_uuid() - db.failed_task_update(self.ctx, failed_task_id, - {FailedTask.job_id.name: job_id}) - - collection_class = importutils.import_class( - failed_task[FailedTask.method.name]) - instance = \ - collection_class.get_instance(self.ctx, failed_task_id) - self.scheduler.add_job( - instance, 'interval', - seconds=failed_task[FailedTask.interval.name], - next_run_time=datetime.now(), id=job_id, - misfire_grace_time=int( - CONF.telemetry.performance_collection_interval / 2) - ) - self.job_ids.add(job_id) - - except Exception as e: - LOG.error("Failed to schedule retry tasks for performance " - "collection, reason: %s", six.text_type(e)) - else: - LOG.info("Schedule collection completed") - - def _teardown_task(self, ctx, failed_task_id, job_id): - db.failed_task_delete(ctx, failed_task_id) - self.remove_scheduled_job(job_id) - - def remove_scheduled_job(self, job_id): - if job_id in self.job_ids: - self.job_ids.remove(job_id) - if job_id and self.scheduler.get_job(job_id): - self.scheduler.remove_job(job_id) - - def stop(self): - self.stopped = True - for job_id in self.job_ids.copy(): - self.remove_scheduled_job(job_id) - - @classmethod - def job_interval(cls): - return TelemetryCollection.FAILED_JOB_SCHEDULE_INTERVAL diff --git a/delfin/task_manager/scheduler/schedulers/telemetry/job_handler.py b/delfin/task_manager/scheduler/schedulers/telemetry/job_handler.py new file mode 100644 index 000000000..55e75b93d --- /dev/null +++ b/delfin/task_manager/scheduler/schedulers/telemetry/job_handler.py @@ -0,0 +1,236 @@ +# Copyright 2021 The SODA Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime + +import six +from oslo_config import cfg +from oslo_log import log +from oslo_utils import uuidutils, importutils + +from delfin import db, context +from delfin.common.constants import TelemetryCollection, TelemetryJobStatus +from delfin.exception import TaskNotFound +from delfin.task_manager import rpcapi as task_rpcapi +from delfin.task_manager.scheduler import schedule_manager + +CONF = cfg.CONF +LOG = log.getLogger(__name__) + + +class JobHandler(object): + def __init__(self, ctx, task_id, storage_id, args, interval): + # create an object of periodic task scheduler + self.ctx = ctx + self.task_id = task_id + self.storage_id = storage_id + self.args = args + self.interval = interval + self.task_rpcapi = task_rpcapi.TaskAPI() + self.scheduler = schedule_manager.SchedulerManager().get_scheduler() + self.stopped = False + self.job_ids = set() + + @staticmethod + def get_instance(ctx, task_id): + task = db.task_get(ctx, task_id) + return JobHandler(ctx, task_id, task['storage_id'], + task['args'], task['interval']) + + @staticmethod + def schedule_boot_jobs(): + """Schedule periodic collection if any task is currently assigned to + this executor """ + try: + + filters = {'executor': CONF.host} + ctxt = context.get_admin_context() + tasks = db.task_get_all(ctxt, filters=filters) + LOG.info("Scheduling boot time jobs for this executor: total " + "jobs to be handled :%s" % len(tasks)) + for task in tasks: + instance = JobHandler.get_instance(ctxt, task['id']) + instance.schedule_job(task['id']) + LOG.debug('Periodic collection job assigned for id: ' + '%s ' % task['id']) + except Exception as e: + LOG.error("Failed to schedule boot jobs for this executor " + "reason: %s.", + six.text_type(e)) + else: + LOG.debug("Boot job scheduling completed.") + + def schedule_job(self, task_id): + + if self.stopped: + # If Job is stopped return immediately + return + + LOG.info("JobHandler received A job %s to schedule" % task_id) + job = db.task_get(self.ctx, task_id) + collection_class = importutils.import_class( + job['method']) + instance = collection_class.get_instance(self.ctx, self.task_id) + current_time = int(datetime.now().timestamp()) + last_run_time = current_time + next_collection_time = last_run_time + job['interval'] + job_id = uuidutils.generate_uuid() + next_collection_time = datetime \ + .fromtimestamp(next_collection_time) \ + .strftime('%Y-%m-%d %H:%M:%S') + + existing_job_id = job['job_id'] + + scheduler_job = self.scheduler.get_job(existing_job_id) + + if not (existing_job_id and scheduler_job): + LOG.info('JobHandler scheduling a new job') + self.scheduler.add_job( + instance, 'interval', seconds=job['interval'], + next_run_time=next_collection_time, id=job_id, + misfire_grace_time=int( + CONF.telemetry.performance_collection_interval / 2)) + + update_task_dict = {'job_id': job_id, + 'last_run_time': last_run_time} + db.task_update(self.ctx, self.task_id, update_task_dict) + self.job_ids.add(job_id) + LOG.info('Periodic collection tasks scheduled for for job id: ' + '%s ' % self.task_id) + else: + LOG.info('Job already exists with this scheduler') + + def stop(self): + self.stopped = True + for job_id in self.job_ids.copy(): + self.remove_scheduled_job(job_id) + LOG.info("Stopping telemetry jobs") + + def remove_scheduled_job(self, job_id): + if job_id in self.job_ids: + self.job_ids.remove(job_id) + if job_id and self.scheduler.get_job(job_id): + self.scheduler.remove_job(job_id) + + def remove_job(self, task_id): + try: + LOG.info("Received job %s to remove", task_id) + job = db.task_get(self.ctx, task_id) + job_id = job['job_id'] + self.remove_scheduled_job(job_id) + db.task_delete(self.ctx, job['id']) + LOG.info("Removed job %s ", job['id']) + except Exception as e: + LOG.error("Failed to remove periodic scheduling job , reason: %s.", + six.text_type(e)) + + +class FailedJobHandler(object): + def __init__(self, ctx): + # create an object of periodic failed task scheduler + self.scheduler = schedule_manager.SchedulerManager().get_scheduler() + self.ctx = ctx + self.stopped = False + self.job_ids = set() + + @staticmethod + def get_instance(ctx, failed_task_id): + return FailedJobHandler(ctx) + + def schedule_failed_job(self, failed_task_id): + + if self.stopped: + return + + try: + job = db.failed_task_get(self.ctx, failed_task_id) + retry_count = job['retry_count'] + result = job['result'] + job_id = job['job_id'] + if retry_count >= \ + TelemetryCollection.MAX_FAILED_JOB_RETRY_COUNT or \ + result == TelemetryJobStatus.FAILED_JOB_STATUS_SUCCESS: + LOG.info("Exiting Failure task processing for task [%d] " + "with result [%s] and retry count [%d] " + % (job['id'], result, retry_count)) + self._teardown_task(self.ctx, job['id'], job_id) + return + # If job already scheduled, skip + if job_id and self.scheduler.get_job(job_id): + return + + try: + db.task_get(self.ctx, job['task_id']) + except TaskNotFound as e: + LOG.info("Removing failed telemetry job as parent job " + "do not exist: %s", six.text_type(e)) + # tear down if original task is not available + self._teardown_task(self.ctx, job['id'], + job_id) + return + + if not (job_id and self.scheduler.get_job(job_id)): + job_id = uuidutils.generate_uuid() + db.failed_task_update(self.ctx, job['id'], + {'job_id': job_id}) + + collection_class = importutils.import_class( + job['method']) + instance = \ + collection_class.get_instance(self.ctx, job['id']) + self.scheduler.add_job( + instance, 'interval', + seconds=job['interval'], + next_run_time=datetime.now(), id=job_id, + misfire_grace_time=int( + CONF.telemetry.performance_collection_interval / 2) + ) + self.job_ids.add(job_id) + + except Exception as e: + LOG.error("Failed to schedule retry tasks for performance " + "collection, reason: %s", six.text_type(e)) + else: + LOG.info("Schedule collection completed") + + def _teardown_task(self, ctx, failed_task_id, job_id): + db.failed_task_delete(ctx, failed_task_id) + self.remove_scheduled_job(job_id) + + def remove_scheduled_job(self, job_id): + if job_id in self.job_ids: + self.job_ids.remove(job_id) + if job_id and self.scheduler.get_job(job_id): + self.scheduler.remove_job(job_id) + + def stop(self): + self.stopped = True + for job_id in self.job_ids.copy(): + self.remove_scheduled_job(job_id) + + def remove_failed_job(self, failed_task_id): + try: + LOG.info("Received failed job %s to remove", failed_task_id) + job = db.failed_task_get(self.ctx, failed_task_id) + job_id = job['job_id'] + self.remove_scheduled_job(job_id) + db.failed_task_delete(self.ctx, job['id']) + LOG.info("Removed failed_task entry %s ", job['id']) + except Exception as e: + LOG.error("Failed to remove periodic scheduling job , reason: %s.", + six.text_type(e)) + + @classmethod + def job_interval(cls): + return TelemetryCollection.FAILED_JOB_SCHEDULE_INTERVAL diff --git a/delfin/task_manager/scheduler/schedulers/telemetry/performance_collection_handler.py b/delfin/task_manager/scheduler/schedulers/telemetry/performance_collection_handler.py index d42b31ddf..f6b05890a 100644 --- a/delfin/task_manager/scheduler/schedulers/telemetry/performance_collection_handler.py +++ b/delfin/task_manager/scheduler/schedulers/telemetry/performance_collection_handler.py @@ -15,6 +15,9 @@ from datetime import datetime import six +from delfin.task_manager.scheduler.schedulers.telemetry. \ + failed_performance_collection_handler import \ + FailedPerformanceCollectionHandler from oslo_log import log from delfin import db @@ -22,28 +25,29 @@ from delfin.common.constants import TelemetryCollection from delfin.db.sqlalchemy.models import FailedTask from delfin.task_manager import rpcapi as task_rpcapi -from delfin.task_manager.scheduler.schedulers.telemetry. \ - failed_performance_collection_handler import \ - FailedPerformanceCollectionHandler -from delfin.task_manager.tasks import telemetry +from delfin.task_manager.scheduler import schedule_manager +from delfin.task_manager.tasks.telemetry import PerformanceCollectionTask LOG = log.getLogger(__name__) class PerformanceCollectionHandler(object): - def __init__(self, ctx, task_id, storage_id, args, interval): + def __init__(self, ctx, task_id, storage_id, args, interval, executor): self.ctx = ctx self.task_id = task_id self.storage_id = storage_id self.args = args self.interval = interval self.task_rpcapi = task_rpcapi.TaskAPI() + self.executor = executor + self.scheduler = schedule_manager.SchedulerManager().get_scheduler() @staticmethod def get_instance(ctx, task_id): task = db.task_get(ctx, task_id) return PerformanceCollectionHandler(ctx, task_id, task['storage_id'], - task['args'], task['interval']) + task['args'], task['interval'], + task['executor']) def __call__(self): # Upon periodic job callback, if storage is already deleted or soft @@ -72,11 +76,9 @@ def __call__(self): # Times are epoch time in milliseconds end_time = current_time * 1000 start_time = end_time - (self.interval * 1000) - status = self.task_rpcapi. \ - collect_telemetry(self.ctx, self.storage_id, - telemetry.TelemetryTask.__module__ + '.' + - 'PerformanceCollectionTask', self.args, - start_time, end_time) + telemetry = PerformanceCollectionTask() + status = telemetry.collect(self.ctx, self.storage_id, self.args, + start_time, end_time) db.task_update(self.ctx, self.task_id, {'last_run_time': current_time}) @@ -103,5 +105,6 @@ def _handle_task_failure(self, start_time, end_time): FailedTask.method.name: FailedPerformanceCollectionHandler.__module__ + '.' + FailedPerformanceCollectionHandler.__name__, - FailedTask.retry_count.name: 0} + FailedTask.retry_count.name: 0, + FailedTask.executor.name: self.executor} db.failed_task_create(self.ctx, failed_task) diff --git a/delfin/task_manager/scheduler/schedulers/telemetry/telemetry_job.py b/delfin/task_manager/scheduler/schedulers/telemetry/telemetry_job.py deleted file mode 100644 index cf66b5c3c..000000000 --- a/delfin/task_manager/scheduler/schedulers/telemetry/telemetry_job.py +++ /dev/null @@ -1,120 +0,0 @@ -# Copyright 2021 The SODA Authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from datetime import datetime - -import six -from oslo_config import cfg -from oslo_log import log -from oslo_utils import importutils -from oslo_utils import uuidutils - -from delfin import db -from delfin.common.constants import TelemetryCollection -from delfin.task_manager.scheduler import schedule_manager - -CONF = cfg.CONF -LOG = log.getLogger(__name__) - - -class TelemetryJob(object): - def __init__(self, ctx): - self.ctx = ctx - self.scheduler = schedule_manager.SchedulerManager().get_scheduler() - - # Reset last run time of tasks to restart scheduling and - # start the failed task job - task_list = db.task_get_all(ctx) - for task in task_list: - db.task_update(ctx, task['id'], {'last_run_time': None}) - - self.stopped = False - self.job_ids = set() - - def __call__(self): - """ Schedule the collection tasks based on interval """ - - if self.stopped: - """If Job is stopped return immediately""" - return - - try: - # Remove jobs from scheduler when marked for delete - filters = {'deleted': True} - tasks = db.task_get_all(self.ctx, filters=filters) - LOG.debug("Total tasks found deleted " - "in this cycle:%s" % len(tasks)) - for task in tasks: - job_id = task['job_id'] - if job_id and self.scheduler.get_job(job_id): - self.remove_scheduled_job(job_id) - db.task_delete(self.ctx, task['id']) - except Exception as e: - LOG.error("Failed to remove periodic scheduling job , reason: %s.", - six.text_type(e)) - try: - - filters = {'last_run_time': None} - tasks = db.task_get_all(self.ctx, filters=filters) - LOG.debug("Schedule performance collection triggered: total " - "tasks to be handled:%s" % len(tasks)) - for task in tasks: - # Get current time in epoch format in seconds. Here method - # indicates the specific collection task to be triggered - current_time = int(datetime.now().timestamp()) - last_run_time = current_time - next_collection_time = last_run_time + task['interval'] - task_id = task['id'] - job_id = uuidutils.generate_uuid() - next_collection_time = datetime \ - .fromtimestamp(next_collection_time) \ - .strftime('%Y-%m-%d %H:%M:%S') - - collection_class = importutils.import_class(task['method']) - instance = collection_class.get_instance(self.ctx, task_id) - self.scheduler.add_job( - instance, 'interval', seconds=task['interval'], - next_run_time=next_collection_time, id=job_id, - misfire_grace_time=int( - CONF.telemetry.performance_collection_interval / 2)) - - # jobs book keeping - self.job_ids.add(job_id) - - update_task_dict = {'job_id': job_id, - 'last_run_time': last_run_time} - db.task_update(self.ctx, task_id, update_task_dict) - LOG.info('Periodic collection task triggered for for task id: ' - '%s ' % task['id']) - except Exception as e: - LOG.error("Failed to trigger periodic collection, reason: %s.", - six.text_type(e)) - else: - LOG.debug("Periodic collection task Scheduling completed.") - - def stop(self): - self.stopped = True - for job_id in self.job_ids.copy(): - self.remove_scheduled_job(job_id) - LOG.info("Stopping telemetry jobs") - - @classmethod - def job_interval(cls): - return TelemetryCollection.PERIODIC_JOB_INTERVAL - - def remove_scheduled_job(self, job_id): - if job_id in self.job_ids: - self.job_ids.remove(job_id) - if job_id and self.scheduler.get_job(job_id): - self.scheduler.remove_job(job_id) diff --git a/delfin/tests/unit/leader_election/__init__.py b/delfin/tests/unit/leader_election/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/delfin/tests/unit/leader_election/distributor/__init__.py b/delfin/tests/unit/leader_election/distributor/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/delfin/tests/unit/leader_election/distributor/test_failed_task_distributor.py b/delfin/tests/unit/leader_election/distributor/test_failed_task_distributor.py new file mode 100644 index 000000000..50fd9b3ce --- /dev/null +++ b/delfin/tests/unit/leader_election/distributor/test_failed_task_distributor.py @@ -0,0 +1,62 @@ +# Copyright 2021 The SODA Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock +from datetime import datetime + + +from oslo_utils import uuidutils + +from delfin import context +from delfin import db +from delfin import test +from delfin.db.sqlalchemy.models import FailedTask +from delfin.leader_election.distributor.failed_task_distributor import \ + FailedTaskDistributor + +fake_executor = 'node1' +fake_failed_job = { + FailedTask.id.name: 43, + FailedTask.retry_count.name: 0, + FailedTask.result.name: "Init", + FailedTask.job_id.name: "fake_job_id", + FailedTask.task_id.name: uuidutils.generate_uuid(), + FailedTask.start_time.name: int(datetime.now().timestamp()), + FailedTask.end_time.name: int(datetime.now().timestamp()) + 20, + FailedTask.interval.name: 20, + FailedTask.deleted.name: False, + FailedTask.executor.name: fake_executor, +} + +fake_failed_jobs = [ + fake_failed_job, +] + + +class TestFailedTaskDistributor(test.TestCase): + + @mock.patch.object(db, 'failed_task_get_all', + mock.Mock(return_value=fake_failed_jobs)) + @mock.patch.object(db, 'failed_task_update', + mock.Mock(return_value=fake_failed_job)) + @mock.patch.object(db, 'failed_task_get', + mock.Mock(return_value=fake_failed_job)) + @mock.patch( + 'delfin.task_manager.metrics_rpcapi.TaskAPI.assign_failed_job') + def test_telemetry_failed_job_scheduling(self, mock_assign_job): + ctx = context.get_admin_context() + task_distributor = FailedTaskDistributor(ctx) + # call telemetry job scheduling + task_distributor() + self.assertEqual(mock_assign_job.call_count, 1) diff --git a/delfin/tests/unit/leader_election/distributor/test_task_distributor.py b/delfin/tests/unit/leader_election/distributor/test_task_distributor.py new file mode 100644 index 000000000..206c0277d --- /dev/null +++ b/delfin/tests/unit/leader_election/distributor/test_task_distributor.py @@ -0,0 +1,56 @@ +# Copyright 2021 The SODA Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock + +from oslo_utils import uuidutils + +from delfin import context +from delfin import db +from delfin import test +from delfin.common import constants +from delfin.db.sqlalchemy.models import Task +from delfin.leader_election.distributor.task_distributor import TaskDistributor + +fake_telemetry_job = { + Task.id.name: 2, + Task.storage_id.name: uuidutils.generate_uuid(), + Task.args.name: {}, + Task.interval.name: 10, + Task.method.name: constants.TelemetryCollection.PERFORMANCE_TASK_METHOD, + Task.last_run_time.name: None, + Task.deleted.name: 0, +} + +fake_telemetry_jobs = [ + fake_telemetry_job, +] + + +class TestTaskDistributor(test.TestCase): + + @mock.patch.object(db, 'task_get_all', + mock.Mock(return_value=fake_telemetry_jobs)) + @mock.patch.object(db, 'task_update', + mock.Mock(return_value=fake_telemetry_job)) + @mock.patch.object(db, 'task_get', + mock.Mock(return_value=fake_telemetry_job)) + @mock.patch( + 'delfin.task_manager.metrics_rpcapi.TaskAPI.assign_job') + def test_telemetry_job_scheduling(self, mock_assign_job): + ctx = context.get_admin_context() + task_distributor = TaskDistributor(ctx) + # call telemetry job scheduling + task_distributor() + self.assertEqual(mock_assign_job.call_count, 1) diff --git a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_failed_performance_collection_handler.py b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_failed_performance_collection_handler.py index 85e5a5329..d6437ee34 100644 --- a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_failed_performance_collection_handler.py +++ b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_failed_performance_collection_handler.py @@ -81,7 +81,8 @@ class TestFailedPerformanceCollectionHandler(test.TestCase): @mock.patch( 'apscheduler.schedulers.background.BackgroundScheduler.pause_job') @mock.patch('delfin.db.failed_task_update') - @mock.patch('delfin.task_manager.rpcapi.TaskAPI.collect_telemetry') + @mock.patch('delfin.task_manager.tasks.telemetry' + '.PerformanceCollectionTask.collect') def test_failed_job_success(self, mock_collect_telemetry, mock_failed_task_update, mock_pause_job): mock_collect_telemetry.return_value = TelemetryTaskStatus. \ diff --git a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_failed_telemetry_job.py b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_job_handler.py similarity index 51% rename from delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_failed_telemetry_job.py rename to delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_job_handler.py index 3be557dc6..d65d1e43c 100644 --- a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_failed_telemetry_job.py +++ b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_job_handler.py @@ -12,23 +12,69 @@ # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime from unittest import mock - +from datetime import datetime from oslo_utils import uuidutils from delfin import context from delfin import db from delfin import test -from delfin.common.constants import TelemetryCollection -from delfin.db.sqlalchemy.models import FailedTask +from delfin.common import constants from delfin.db.sqlalchemy.models import Task +from delfin.task_manager.scheduler.schedulers.telemetry.job_handler import \ + JobHandler +from delfin.task_manager.scheduler.schedulers.telemetry.job_handler import \ + FailedJobHandler +from delfin.db.sqlalchemy.models import FailedTask from delfin.task_manager.scheduler.schedulers.telemetry. \ failed_performance_collection_handler import \ FailedPerformanceCollectionHandler -from delfin.task_manager.scheduler.schedulers.telemetry.failed_telemetry_job \ - import FailedTelemetryJob +from delfin.common.constants import TelemetryCollection + +fake_executor = 'node1' +fake_telemetry_job = { + Task.id.name: 2, + Task.storage_id.name: uuidutils.generate_uuid(), + Task.args.name: {}, + Task.interval.name: 10, + Task.job_id.name: None, + Task.method.name: constants.TelemetryCollection.PERFORMANCE_TASK_METHOD, + Task.last_run_time.name: None, + Task.executor.name: fake_executor, +} + +fake_telemetry_jobs = [ + fake_telemetry_job, +] + +fake_telemetry_job_deleted = { + Task.id.name: 2, + Task.storage_id.name: uuidutils.generate_uuid(), + Task.args.name: {}, + Task.interval.name: 10, + Task.method.name: constants.TelemetryCollection.PERFORMANCE_TASK_METHOD, + Task.last_run_time.name: None, + Task.deleted.name: True, + Task.executor.name: fake_executor, +} + +fake_telemetry_jobs_deleted = [ + fake_telemetry_job_deleted, +] +# With method name as None +Incorrect_telemetry_job = { + Task.id.name: 2, + Task.storage_id.name: uuidutils.generate_uuid(), + Task.args.name: {}, + Task.interval.name: 10, + Task.method.name: None, + Task.last_run_time.name: None, + Task.executor.name: None, +} +Incorrect_telemetry_jobs = [ + Incorrect_telemetry_job, +] fake_failed_job = { FailedTask.id.name: 43, FailedTask.retry_count.name: 0, @@ -42,17 +88,67 @@ FailedTask.end_time.name: int(datetime.now().timestamp()) + 20, FailedTask.interval.name: 20, FailedTask.deleted.name: False, + FailedTask.executor.name: fake_executor, } fake_failed_jobs = [ fake_failed_job, ] -fake_telemetry_job = { - Task.id.name: 2, - Task.storage_id.name: uuidutils.generate_uuid(), - Task.args.name: {}, -} + +class TestTelemetryJob(test.TestCase): + + @mock.patch.object(db, 'task_get_all', + mock.Mock(return_value=fake_telemetry_jobs)) + @mock.patch.object(db, 'task_update', + mock.Mock(return_value=fake_telemetry_job)) + @mock.patch.object(db, 'task_get', + mock.Mock(return_value=fake_telemetry_job)) + @mock.patch( + 'apscheduler.schedulers.background.BackgroundScheduler.add_job') + def test_telemetry_job_scheduling(self, mock_add_job): + ctx = context.get_admin_context() + telemetry_job = JobHandler(ctx, fake_telemetry_job['id'], + fake_telemetry_job['storage_id'], + fake_telemetry_job['args'], + fake_telemetry_job['interval']) + # call telemetry job scheduling + telemetry_job.schedule_job(fake_telemetry_job['id']) + self.assertEqual(mock_add_job.call_count, 1) + + @mock.patch.object(db, 'task_delete', + mock.Mock()) + @mock.patch.object(db, 'task_get_all', + mock.Mock(return_value=fake_telemetry_jobs_deleted)) + @mock.patch.object(db, 'task_update', + mock.Mock(return_value=fake_telemetry_job)) + @mock.patch.object(db, 'task_get', + mock.Mock(return_value=fake_telemetry_job)) + @mock.patch( + 'apscheduler.schedulers.background.BackgroundScheduler.add_job', + mock.Mock()) + @mock.patch('logging.LoggerAdapter.error') + def test_telemetry_removal_success(self, mock_log_error): + ctx = context.get_admin_context() + telemetry_job = JobHandler(ctx, fake_telemetry_job['id'], + fake_telemetry_job['storage_id'], + fake_telemetry_job['args'], + fake_telemetry_job['interval']) + # call telemetry job scheduling + telemetry_job.remove_job(fake_telemetry_job['id']) + self.assertEqual(mock_log_error.call_count, 0) + + @mock.patch.object(db, 'task_get_all', + mock.Mock(return_value=fake_telemetry_jobs)) + @mock.patch.object(db, 'task_update', + mock.Mock(return_value=fake_telemetry_job)) + @mock.patch.object(db, 'task_get', + mock.Mock(return_value=fake_telemetry_job)) + @mock.patch( + 'apscheduler.schedulers.background.BackgroundScheduler.add_job') + def test_schedule_boot_jobs(self, mock_add_job): + JobHandler.schedule_boot_jobs() + self.assertEqual(mock_add_job.call_count, 1) class TestFailedTelemetryJob(test.TestCase): @@ -68,11 +164,13 @@ class TestFailedTelemetryJob(test.TestCase): @mock.patch( 'apscheduler.schedulers.background.BackgroundScheduler.add_job') def test_failed_job_scheduling(self, mock_add_job): - failed_job = FailedTelemetryJob(context.get_admin_context()) + failed_job = FailedJobHandler(context.get_admin_context()) # call failed job scheduling - failed_job() + failed_job.schedule_failed_job(fake_failed_job['id']) self.assertEqual(mock_add_job.call_count, 1) + @mock.patch.object(db, 'failed_task_get', + mock.Mock(return_value=fake_failed_job)) @mock.patch( 'apscheduler.schedulers.background.BackgroundScheduler.remove_job') @mock.patch( @@ -89,15 +187,15 @@ def test_failed_job_with_max_retry(self, mock_failed_get_all, TelemetryCollection.MAX_FAILED_JOB_RETRY_COUNT mock_failed_get_all.return_value = failed_jobs - failed_job = FailedTelemetryJob(context.get_admin_context()) + failed_job = FailedJobHandler(context.get_admin_context()) # call failed job scheduling - failed_job() + failed_job.schedule_failed_job(failed_jobs[0]) mock_get_job.return_value = True # entry get deleted and job get removed - self.assertEqual(mock_failed_task_delete.call_count, 2) - self.assertEqual(mock_remove_job.call_count, 2) + self.assertEqual(mock_failed_task_delete.call_count, 1) + self.assertEqual(mock_remove_job.call_count, 1) @mock.patch( 'apscheduler.schedulers.background.BackgroundScheduler.get_job') @@ -114,13 +212,15 @@ def test_failed_job_with_job_already_scheduled(self, mock_failed_get_all, # configure to have job in scheduler mock_get_job.return_value = failed_jobs - failed_job = FailedTelemetryJob(context.get_admin_context()) + failed_job = FailedJobHandler(context.get_admin_context()) # call failed job scheduling - failed_job() + failed_job.remove_failed_job(fake_failed_job['id']) # the job will not be scheduled self.assertEqual(mock_add_job.call_count, 0) + @mock.patch.object(db, 'failed_task_get', + mock.Mock(return_value=fake_failed_job)) @mock.patch( 'apscheduler.schedulers.background.BackgroundScheduler.remove_job') @mock.patch.object(db, 'failed_task_delete') @@ -133,10 +233,10 @@ def test_failed_job_scheduling_with_no_task(self, mock_failed_get_all, failed_jobs[0][FailedTask.job_id.name] = uuidutils.generate_uuid() mock_failed_get_all.return_value = failed_jobs - failed_job = FailedTelemetryJob(context.get_admin_context()) + failed_job = FailedJobHandler(context.get_admin_context()) # call failed job scheduling - failed_job() + failed_job.remove_failed_job(fake_failed_job) # entry get deleted and job get removed - self.assertEqual(mock_failed_task_delete.call_count, 2) + self.assertEqual(mock_failed_task_delete.call_count, 1) self.assertEqual(mock_remove_job.call_count, 0) diff --git a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_performance_collection_handler.py b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_performance_collection_handler.py index a90ed680d..4fd6e3e0b 100644 --- a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_performance_collection_handler.py +++ b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_performance_collection_handler.py @@ -28,6 +28,7 @@ PerformanceCollectionHandler fake_task_id = 43 +fake_executor = 'node1' fake_storage_id = '12c2d52f-01bc-41f5-b73f-7abf6f38a2a6' fake_telemetry_job = { Task.id.name: 2, @@ -35,7 +36,8 @@ Task.args.name: {}, Task.interval.name: 10, Task.deleted.name: False, - Task.method.name: constants.TelemetryCollection.PERFORMANCE_TASK_METHOD + Task.method.name: constants.TelemetryCollection.PERFORMANCE_TASK_METHOD, + Task.executor.name: fake_executor } fake_deleted_telemetry_job = { @@ -44,7 +46,8 @@ Task.args.name: {}, Task.interval.name: 10, Task.deleted.name: True, - Task.method.name: constants.TelemetryCollection.PERFORMANCE_TASK_METHOD + Task.method.name: constants.TelemetryCollection.PERFORMANCE_TASK_METHOD, + Task.executor.name: fake_executor } @@ -57,7 +60,8 @@ class TestPerformanceCollectionHandler(test.TestCase): @mock.patch.object(db, 'task_get', mock.Mock(return_value=fake_telemetry_job)) @mock.patch('delfin.db.task_update') - @mock.patch('delfin.task_manager.rpcapi.TaskAPI.collect_telemetry') + @mock.patch('delfin.task_manager.tasks.telemetry' + '.PerformanceCollectionTask.collect') def test_performance_collection_success(self, mock_collect_telemetry, mock_task_update): mock_collect_telemetry.return_value = TelemetryTaskStatus. \ @@ -74,7 +78,8 @@ def test_performance_collection_success(self, mock_collect_telemetry, @mock.patch.object(db, 'task_get', mock.Mock(return_value=fake_telemetry_job)) @mock.patch('delfin.db.failed_task_create') - @mock.patch('delfin.task_manager.rpcapi.TaskAPI.collect_telemetry') + @mock.patch('delfin.task_manager.tasks.telemetry' + '.PerformanceCollectionTask.collect') def test_performance_collection_failure(self, mock_collect_telemetry, mock_failed_task_create): mock_collect_telemetry.return_value = TelemetryTaskStatus. \ @@ -91,7 +96,8 @@ def test_performance_collection_failure(self, mock_collect_telemetry, @mock.patch.object(db, 'task_get', mock.Mock(return_value=fake_deleted_telemetry_job)) @mock.patch('delfin.db.task_update') - @mock.patch('delfin.task_manager.rpcapi.TaskAPI.collect_telemetry') + @mock.patch('delfin.task_manager.tasks.telemetry' + '.PerformanceCollectionTask.collect') def test_performance_collection_deleted_storage(self, mock_collect_telemetry, mock_task_update): @@ -108,14 +114,16 @@ def test_performance_collection_deleted_storage(self, self.assertEqual(mock_task_update.call_count, 0) @mock.patch('delfin.db.task_get', task_not_found_exception) - @mock.patch('delfin.task_manager.rpcapi.TaskAPI.collect_telemetry') + @mock.patch('delfin.task_manager.tasks.telemetry' + '.PerformanceCollectionTask.collect') def test_deleted_storage_exception(self, mock_collect_telemetry): ctx = context.get_admin_context() perf_collection_handler = PerformanceCollectionHandler(ctx, fake_task_id, fake_storage_id, - "", 100) + "", 100, + fake_executor) perf_collection_handler() # Verify that collect telemetry for deleted storage diff --git a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_telemetry_job.py b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_telemetry_job.py deleted file mode 100644 index 54e6600e2..000000000 --- a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_telemetry_job.py +++ /dev/null @@ -1,119 +0,0 @@ -# Copyright 2021 The SODA Authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from unittest import mock - -from oslo_utils import uuidutils - -from delfin import context -from delfin import db -from delfin import test -from delfin.common import constants -from delfin.db.sqlalchemy.models import Task -from delfin.task_manager.scheduler.schedulers.telemetry.telemetry_job import \ - TelemetryJob - -fake_telemetry_job = { - Task.id.name: 2, - Task.storage_id.name: uuidutils.generate_uuid(), - Task.args.name: {}, - Task.interval.name: 10, - Task.method.name: constants.TelemetryCollection.PERFORMANCE_TASK_METHOD, - Task.last_run_time.name: None, -} - -fake_telemetry_jobs = [ - fake_telemetry_job, -] - -fake_telemetry_job_deleted = { - Task.id.name: 2, - Task.storage_id.name: uuidutils.generate_uuid(), - Task.args.name: {}, - Task.interval.name: 10, - Task.method.name: constants.TelemetryCollection.PERFORMANCE_TASK_METHOD, - Task.last_run_time.name: None, - Task.deleted.name: True, -} - -fake_telemetry_jobs_deleted = [ - fake_telemetry_job_deleted, -] -# With method name as None -Incorrect_telemetry_job = { - Task.id.name: 2, - Task.storage_id.name: uuidutils.generate_uuid(), - Task.args.name: {}, - Task.interval.name: 10, - Task.method.name: None, - Task.last_run_time.name: None, -} - -Incorrect_telemetry_jobs = [ - Incorrect_telemetry_job, -] - - -class TestTelemetryJob(test.TestCase): - - @mock.patch.object(db, 'task_get_all', - mock.Mock(return_value=fake_telemetry_jobs)) - @mock.patch.object(db, 'task_update', - mock.Mock(return_value=fake_telemetry_job)) - @mock.patch.object(db, 'task_get', - mock.Mock(return_value=fake_telemetry_job)) - @mock.patch( - 'apscheduler.schedulers.background.BackgroundScheduler.add_job') - def test_telemetry_job_scheduling(self, mock_add_job): - ctx = context.get_admin_context() - telemetry_job = TelemetryJob(ctx) - # call telemetry job scheduling - telemetry_job() - self.assertEqual(mock_add_job.call_count, 1) - - @mock.patch.object(db, 'task_get_all', - mock.Mock(return_value=Incorrect_telemetry_jobs)) - @mock.patch.object(db, 'task_update', - mock.Mock(return_value=Incorrect_telemetry_job)) - @mock.patch.object(db, 'task_get', - mock.Mock(return_value=Incorrect_telemetry_job)) - @mock.patch( - 'apscheduler.schedulers.background.BackgroundScheduler.add_job', - mock.Mock()) - @mock.patch('logging.LoggerAdapter.error') - def test_telemetry_job_scheduling_exception(self, mock_log_error): - ctx = context.get_admin_context() - telemetry_job = TelemetryJob(ctx) - # call telemetry job scheduling - telemetry_job() - self.assertEqual(mock_log_error.call_count, 2) - - @mock.patch.object(db, 'task_delete', - mock.Mock()) - @mock.patch.object(db, 'task_get_all', - mock.Mock(return_value=fake_telemetry_jobs_deleted)) - @mock.patch.object(db, 'task_update', - mock.Mock(return_value=fake_telemetry_job)) - @mock.patch.object(db, 'task_get', - mock.Mock(return_value=fake_telemetry_job)) - @mock.patch( - 'apscheduler.schedulers.background.BackgroundScheduler.add_job', - mock.Mock()) - @mock.patch('logging.LoggerAdapter.error') - def test_telemetry_removal_success(self, mock_log_error): - ctx = context.get_admin_context() - telemetry_job = TelemetryJob(ctx) - # call telemetry job scheduling - telemetry_job() - self.assertEqual(mock_log_error.call_count, 1)