Skip to content

Commit

Permalink
Make job scheduler local to task process (#674)
Browse files Browse the repository at this point in the history
* Make job scheduler local to task process
  • Loading branch information
NajmudheenCT authored Aug 28, 2021
1 parent baa386e commit 903138a
Show file tree
Hide file tree
Showing 23 changed files with 808 additions and 451 deletions.
10 changes: 10 additions & 0 deletions delfin/cmd/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"""Starter script for delfin task service."""

import eventlet

eventlet.monkey_patch()

import sys
Expand All @@ -45,9 +46,18 @@ def main():
task_server = service.TaskService.create(binary='delfin-task',
coordination=True)
leader_election = service.LeaderElectionService.create()
metrics_task_server = service. \
TaskService.create(binary='delfin-task',
topic=CONF.host,
manager='delfin.'
'task_manager.'
'metrics_manager.'
'MetricsTaskManager',
coordination=True)

service.serve(task_server)
service.serve(leader_election)
service.serve(metrics_task_server)

service.wait()

Expand Down
2 changes: 2 additions & 0 deletions delfin/db/sqlalchemy/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ class Task(BASE, DelfinBase):
args = Column(JsonEncodedDict)
last_run_time = Column(Integer)
job_id = Column(String(36))
executor = Column(String(255))
deleted_at = Column(DateTime)
deleted = Column(Boolean, default=False)

Expand All @@ -285,6 +286,7 @@ class FailedTask(BASE, DelfinBase):
method = Column(String(255))
result = Column(String(255))
job_id = Column(String(36))
executor = Column(String(255))
deleted_at = Column(DateTime)
deleted = Column(Boolean, default=False)

Expand Down
Empty file.
67 changes: 67 additions & 0 deletions delfin/leader_election/distributor/failed_task_distributor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2021 The SODA Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import six
from oslo_config import cfg
from oslo_log import log

from delfin import db
from delfin.common.constants import TelemetryCollection
from delfin.task_manager import metrics_rpcapi as task_rpcapi

CONF = cfg.CONF
LOG = log.getLogger(__name__)


class FailedTaskDistributor(object):
def __init__(self, ctx):
# create the object of periodic scheduler
self.task_rpcapi = task_rpcapi.TaskAPI()
self.ctx = ctx

def __call__(self):

try:
# Remove jobs from scheduler when marked for delete
filters = {'deleted': True}
failed_tasks = db.failed_task_get_all(self.ctx, filters=filters)
LOG.debug("Total failed_tasks found deleted "
"in this cycle:%s" % len(failed_tasks))
for failed_task in failed_tasks:
self.task_rpcapi.remove_failed_job(self.ctx, failed_task['id'],
failed_task['executor'])
except Exception as e:
LOG.error("Failed to remove periodic scheduling job , reason: %s.",
six.text_type(e))
try:
failed_tasks = db.failed_task_get_all(self.ctx)
for failed_task in failed_tasks:
# Todo Get executor for the job
LOG.debug('Assigning failed task for for id: '
'%s' % failed_task['id'])
self.task_rpcapi.assign_failed_job(self.ctx, failed_task['id'],
failed_task['executor'])

LOG.info('Assigned failed task for id: '
'%s ' % failed_task['id'])
except Exception as e:
LOG.error("Failed to schedule retry tasks for performance "
"collection, reason: %s", six.text_type(e))
else:
LOG.info("Schedule collection completed")

@classmethod
def job_interval(cls):
return TelemetryCollection.FAILED_JOB_SCHEDULE_INTERVAL
79 changes: 79 additions & 0 deletions delfin/leader_election/distributor/task_distributor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2021 The SODA Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import six
from oslo_config import cfg
from oslo_log import log

from delfin import db
from delfin.common.constants import TelemetryCollection
from delfin.task_manager import metrics_rpcapi as task_rpcapi

CONF = cfg.CONF
LOG = log.getLogger(__name__)


class TaskDistributor(object):
def __init__(self, ctx):
self.ctx = ctx
self.task_rpcapi = task_rpcapi.TaskAPI()

# Reset last run time of tasks to restart scheduling and
# start the failed task job
task_list = db.task_get_all(ctx)
for task in task_list:
db.task_update(ctx, task['id'], {'last_run_time': None})

def __call__(self):
""" Schedule the collection tasks based on interval """

try:
# Remove jobs from scheduler when marked for delete
filters = {'deleted': True}
tasks = db.task_get_all(self.ctx, filters=filters)
LOG.debug("Total tasks found deleted "
"in this cycle:%s" % len(tasks))
for task in tasks:
self.task_rpcapi.remove_job(self.ctx, task['id'],
task['executor'])
except Exception as e:
LOG.error("Failed to remove periodic scheduling job , reason: %s.",
six.text_type(e))

try:

filters = {'last_run_time': None}
tasks = db.task_get_all(self.ctx, filters=filters)
LOG.debug("Distributing performance collection jobs: total "
"jobs to be handled:%s" % len(tasks))
for task in tasks:
# Todo Get executor for the job
executor = CONF.host
db.task_update(self.ctx, task['id'], {'executor': executor})
LOG.info('Assigning executor for collection job for id: '
'%s' % task['id'])
self.task_rpcapi.assign_job(self.ctx, task['id'], executor)

LOG.debug('Periodic collection job assigned for id: '
'%s ' % task['id'])
except Exception as e:
LOG.error("Failed to distribute periodic collection, reason: %s.",
six.text_type(e))
else:
LOG.debug("Periodic job distribution completed.")

@classmethod
def job_interval(cls):
return TelemetryCollection.PERIODIC_JOB_INTERVAL
3 changes: 2 additions & 1 deletion delfin/leader_election/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ def construct_elector(plugin, leader_key=None):
scheduler_mgr = SchedulerManager()

if plugin == "tooz":
scheduler_mgr.start()
# Create callback object
callback = ToozLeaderElectionCallback.register(
on_leading_callback=scheduler_mgr.start,
on_leading_callback=scheduler_mgr.schedule_boot_jobs,
on_stop_callback=scheduler_mgr.stop)

return Elector(callback, leader_election_key)
Expand Down
9 changes: 0 additions & 9 deletions delfin/task_manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,6 @@ def sync_storage_resource(self, context, storage_id, resource_task):
device_obj = cls(context, storage_id)
device_obj.sync()

def collect_telemetry(self, context, storage_id, telemetry_task,
args, start_time, end_time):
LOG.debug("Collecting resource metrics: {0} request for storage"
" id:{1}".format(args, storage_id))
cls = importutils.import_class(telemetry_task)
device_obj = cls()
return device_obj.collect(context, storage_id, args, start_time,
end_time)

def remove_storage_resource(self, context, storage_id, resource_task):
cls = importutils.import_class(resource_task)
device_obj = cls(context, storage_id)
Expand Down
55 changes: 55 additions & 0 deletions delfin/task_manager/metrics_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright 2021 The SODA Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
periodical task manager for metric collection tasks**
"""
from oslo_log import log
from delfin import manager
from delfin.task_manager.scheduler import schedule_manager
from delfin.task_manager.scheduler.schedulers.telemetry.job_handler\
import JobHandler
from delfin.task_manager.scheduler.schedulers.telemetry.job_handler\
import FailedJobHandler
from delfin.task_manager.tasks import telemetry

LOG = log.getLogger(__name__)


class MetricsTaskManager(manager.Manager):
"""manage periodical tasks"""

RPC_API_VERSION = '1.0'

def __init__(self, service_name=None, *args, **kwargs):
self.telemetry_task = telemetry.TelemetryTask()
super(MetricsTaskManager, self).__init__(*args, **kwargs)
scheduler = schedule_manager.SchedulerManager()
scheduler.start()
JobHandler.schedule_boot_jobs()

def assign_job(self, context, task_id):
instance = JobHandler.get_instance(context, task_id)
instance.schedule_job(task_id)

def remove_job(self, context, task_id):
instance = JobHandler.get_instance(context, task_id)
instance.remove_job(task_id)

def assign_failed_job(self, context, failed_task_id):
instance = FailedJobHandler.get_instance(context, failed_task_id)
instance.schedule_failed_job(failed_task_id)

def remove_failed_job(self, context, failed_task_id):
instance = FailedJobHandler.get_instance(context, failed_task_id)
instance.remove_failed_job(failed_task_id)
75 changes: 75 additions & 0 deletions delfin/task_manager/metrics_rpcapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright 2021 The SODA Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Client side of the metrics task manager RPC API.
"""

import oslo_messaging as messaging
from oslo_config import cfg

from delfin import rpc

CONF = cfg.CONF


class TaskAPI(object):
"""Client side of the metrics task rpc API.
API version history:
1.0 - Initial version.
"""

RPC_API_VERSION = '1.0'

def __init__(self):
super(TaskAPI, self).__init__()
self.target = messaging.Target(topic=CONF.host,
version=self.RPC_API_VERSION)
self.client = rpc.get_client(self.target,
version_cap=self.RPC_API_VERSION)

def get_client(self, topic):
target = messaging.Target(topic=topic,
version=self.RPC_API_VERSION)
return rpc.get_client(target, version_cap=self.RPC_API_VERSION)

def assign_job(self, context, task_id, executor):
rpc_client = self.get_client(str(executor))
call_context = rpc_client.prepare(topic=str(executor), version='1.0',
fanout=True)
return call_context.cast(context, 'assign_job',
task_id=task_id)

def remove_job(self, context, task_id, executor):
rpc_client = self.get_client(str(executor))
call_context = rpc_client.prepare(topic=str(executor), version='1.0',
fanout=True)
return call_context.cast(context, 'remove_job',
task_id=task_id)

def assign_failed_job(self, context, failed_task_id, executor):
rpc_client = self.get_client(str(executor))
call_context = rpc_client.prepare(topic=str(executor), version='1.0',
fanout=True)
return call_context.cast(context, 'assign_failed_job',
failed_task_id=failed_task_id)

def remove_failed_job(self, context, failed_task_id, executor):
rpc_client = self.get_client(str(executor))
call_context = rpc_client.prepare(topic=str(executor), version='1.0',
fanout=True)
return call_context.cast(context, 'remove_failed_job',
failed_task_id=failed_task_id)
13 changes: 7 additions & 6 deletions delfin/task_manager/scheduler/schedule_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@

from delfin import context
from delfin import utils
from delfin.task_manager.scheduler.schedulers.telemetry.failed_telemetry_job \
import FailedTelemetryJob
from delfin.task_manager.scheduler.schedulers.telemetry.telemetry_job import \
TelemetryJob
from delfin.leader_election.distributor.failed_task_distributor\
import FailedTaskDistributor
from delfin.leader_election.distributor.task_distributor \
import TaskDistributor

LOG = log.getLogger(__name__)

SCHEDULER_BOOT_JOBS = [
TelemetryJob.__module__ + '.' + TelemetryJob.__name__,
FailedTelemetryJob.__module__ + '.' + FailedTelemetryJob.__name__
TaskDistributor.__module__ + '.' + TaskDistributor.__name__,
FailedTaskDistributor.__module__ + '.' + FailedTaskDistributor.__name__
]


Expand All @@ -54,6 +54,7 @@ def start(self):
self.scheduler.start()
self.scheduler_started = True

def schedule_boot_jobs(self):
if not self.boot_jobs_scheduled:
try:
for job in SCHEDULER_BOOT_JOBS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,11 @@ def __call__(self):
% (self.storage_id, self.failed_task_id))
return

# Pull performance collection info
self.retry_count = self.retry_count + 1
try:
status = self.task_rpcapi.collect_telemetry(
self.ctx, self.storage_id,
PerformanceCollectionTask.__module__ + '.' +
PerformanceCollectionTask.__name__,
self.args, self.start_time, self.end_time)
telemetry = PerformanceCollectionTask()
status = telemetry.collect(self.ctx, self.storage_id, self.args,
self.start_time, self.end_time)

if not status:
raise exception.TelemetryTaskExecError()
Expand Down
Loading

0 comments on commit 903138a

Please sign in to comment.