Skip to content

Commit

Permalink
FLEX: Support Interactive both in gsctl and coordinator side
Browse files Browse the repository at this point in the history
  • Loading branch information
lidongze0629 committed Nov 7, 2023
1 parent f16da8c commit 57e03b9
Show file tree
Hide file tree
Showing 20 changed files with 1,167 additions and 65 deletions.
2 changes: 2 additions & 0 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from graphscope.config import Config
from graphscope.proto import coordinator_service_pb2_grpc

from gscoordinator.servicer import init_interactive_service_servicer
from gscoordinator.servicer import init_graphscope_one_service_servicer
from gscoordinator.utils import GS_GRPC_MAX_MESSAGE_LENGTH

Expand Down Expand Up @@ -109,6 +110,7 @@ def get_servicer(config: Config):
"""Get servicer of specified solution under FLEX architecture"""
service_initializers = {
"GraphScope One": init_graphscope_one_service_servicer,
"Interactive": init_interactive_service_servicer,
}

initializer = service_initializers.get(config.solution)
Expand Down
250 changes: 250 additions & 0 deletions coordinator/gscoordinator/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2023 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import datetime
import json
import time
from abc import ABCMeta
from abc import abstractmethod

import schedule
from graphscope.framework.utils.py import random_string
from schedule import CancelJob

from gscoordinator.stoppable_thread import StoppableThread
from gscoordinator.utils import decode_datetimestr


class Schedule(object):
"""Schedule class that wrapper dbader schedule
Repo: https://github.com/dbader/schedule.
"""

def __init__(self):
self._schedule = schedule.Scheduler()
self._run_pending_thread = StoppableThread(target=self.run_pending, args=())
self._run_pending_thread.daemon = True
self._run_pending_thread.start()

@property
def schedule(self):
return self._schedule

def run_pending(self):
"""Run all jobs that are scheduled to run."""
while True:
self._schedule.run_pending()
time.sleep(1)


schedule = Schedule().schedule # noqa: F811


class Scheduler(metaclass=ABCMeta):
"""
Objects instantiated by the :class:`Scheduler <Scheduler>` are
factories to create jobs, keep record of scheduled jobs and
handle their execution in the :method:`run` method.
"""

def __init__(self, at_time, repeat):
# scheduler id
self._scheduler_id = "Job-{0}".format(random_string(16)).upper()
# periodic job as used
self._job = None
# true will be run immediately
self._run_now = False
# time at which this job to schedule
self._at_time = self._decode_datetimestr(at_time)
# repeat every day or week, or run job once(no repeat)
# optional value "day", "week", "null"
self._repeat = repeat
# job running thread, note that:
# the last job should be end of execution at the beginning of the next job
self._running_thread = None
# tags
self._tags = []

# when the job actually scheduled, the following variables will be generated and overridden.
self._jobid = None
self._last_run = None

def _decode_datetimestr(self, datetime_str):
if datetime_str == "now":
self._run_now = True
return datetime.datetime.now()
return decode_datetimestr(datetime_str)

def __str__(self):
return "Scheduler(at_time={}, repeat={})".format(self._at_time, self._repeat)

@property
def monday(self):
return self._at_time.weekday() == 0

@property
def tuesday(self):
return self._at_time.weekday() == 1

@property
def wednesday(self):
return self._at_time.weekday() == 2

@property
def thursday(self):
return self._at_time.weekday() == 3

@property
def friday(self):
return self._at_time.weekday() == 4

@property
def saturday(self):
return self._at_time.weekday() == 5

@property
def sunday(self):
return self._at_time.weekday() == 6

@property
def timestr(self):
"""return str of the time object.
time([hour[, minute[, second[, microsecond[, tzinfo]]]]]) --> a time object
"""
return str(self._at_time.time())

@property
def job(self):
"""A periodic job managed by the dbader scheduler.
https://github.com/dbader/schedule.
"""
return self._job

@property
def jobid(self):
"""id for the last scheduled job"""
return self._jobid

@property
def schedulerid(self):
"""id for the scheduler"""
return self._scheduler_id

@property
def last_run(self):
"""datetime of the last run"""
return self._last_run

@property
def tags(self):
return self._tags

@property
def running_thread(self):
return self._running_thread

def run_once(self):
"""Run the job immediately."""
self.do_run()
return CancelJob

def waiting_until_to_run(self):
"""Run the job once at a specific time."""
if datetime.datetime.now() >= self._at_time:
return self.run_once()

def do_run(self):
"""Start a thread for the job."""
# overwrite for each scheduled job
self._jobid = "job-{0}".format(random_string(16)).upper()
self._last_run = datetime.datetime.now()
# schedule in a thread
self._running_thread = StoppableThread(target=self.run, args=())
self._running_thread.daemon = True
self._running_thread.start()

def submit(self):
if not self._run_now and self._repeat not in ["week", "day", "null", None]:
raise RuntimeError(
"Submit schedule job failed: at_time is '{0}', repeat is '{1}'".format(
self._at_time, self._repeat
)
)

if self._run_now:
self._job = schedule.every().seconds.do(self.run_once)

if not self._run_now and self._repeat == "week":
if self.monday:
self._job = schedule.every().monday.at(self.timestr).do(self.do_run)
elif self.tuesday:
self._job = schedule.every().tuesday.at(self.timestr).do(self.do_run)
elif self.wednesday:
self._job = schedule.every().wednesday.at(self.timestr).do(self.do_run)
elif self.thursday:
self._job = schedule.every().thursday.at(self.timestr).do(self.do_run)
elif self.friday:
self._job = schedule.every().friday.at(self.timestr).do(self.do_run)
elif self.saturday:
self._job = schedule.every().saturday.at(self.timestr).do(self.do_run)
elif self.sunday:
self._job = schedule.every().sunday.at(self.timestr).do(self.do_run)

if not self._run_now and self._repeat == "day":
self._job = schedule.every().day.at(self.timestr).do(self.do_run)

if not self._run_now and self._repeat in ["null", None]:
self._job = (
schedule.every().day.at(self.timestr).do(self.waiting_until_to_run)
)

# tag
self._job.tag(self._scheduler_id, *self._tags)

Check notice on line 218 in coordinator/gscoordinator/scheduler.py

View check run for this annotation

codefactor.io / CodeFactor

coordinator/gscoordinator/scheduler.py#L182-L218

Complex Method

def cancel(self):
"""
Set the running job thread stoppable and wait for the
thread to exit properly by using join() method.
"""
if self._running_thread is not None and self._running_thread.is_alive():
self._running_thread.stop()
self._running_thread.join()

@abstractmethod
def run(self):
"""
Methods that all subclasses need to implement, note that
subclass needs to handle exception by itself.
"""
raise NotImplementedError


def cancel_job(job, delete_scheduler=True):
"""
Cancel the job which going to scheduled or cancel the whole scheduler.
Args:
job: Periodic job as used by :class:`Scheduler`.
delete_scheduler: True will can the whole scheduler, otherwise,
delay the next-run time by on period.
"""
if delete_scheduler:
schedule.cancel_job(job)
else:
job.next_run += job.period
1 change: 1 addition & 0 deletions coordinator/gscoordinator/servicer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@
# limitations under the License.
#

from gscoordinator.servicer.interactive.service import *
from gscoordinator.servicer.graphscope_one.service import *
45 changes: 45 additions & 0 deletions coordinator/gscoordinator/servicer/base_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2023 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import atexit
import logging

from graphscope.config import Config
from graphscope.proto import coordinator_service_pb2_grpc
from graphscope.proto import message_pb2


class BaseServiceServicer(coordinator_service_pb2_grpc.CoordinatorServiceServicer):
"""Base class of coordinator service"""

def __init__(self, config: Config):
self._config = config
atexit.register(self.cleanup)

def __del__(self):
self.cleanup()

def Connect(self, request, context):
return message_pb2.ConnectResponse(solution=self._config.solution)

@property
def launcher_type(self):
return self._config.launcher_type

def cleanup(self):
pass
26 changes: 26 additions & 0 deletions coordinator/gscoordinator/servicer/interactive/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2023 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
import sys

try:
sys.path.insert(0, os.path.dirname(__file__))
import interactive_client
except ImportError:
raise
Loading

0 comments on commit 57e03b9

Please sign in to comment.