From 665902396ae440282706cff7084e8151ac897537 Mon Sep 17 00:00:00 2001 From: ashione Date: Tue, 2 Apr 2024 17:00:05 +0800 Subject: [PATCH 01/10] manylinux build fix --- .github/workflows/debian_manylinux_build.yaml | 5 +- training/python/setup.py | 14 +- .../streaming/operator/checkpoint_listener.py | 3 +- .../operator/constant/operator_constants.py | 5 +- .../streaming/operator/impl/eval_function.py | 16 +- .../streaming/operator/impl/eval_operator.py | 4 +- .../operator/impl/optimizer_operator.py | 6 +- .../streaming/operator/impl/ps_operator.py | 4 +- .../streaming/operator/impl/tf_function.py | 62 ++--- .../streaming/operator/impl/tf_operator.py | 95 ++++---- .../impl/training_independent_actor.py | 8 +- .../operator/reader/circular_buffer.py | 46 ++-- .../streaming/operator/reader/eval_reader.py | 23 +- .../streaming/operator/reader/ring_buffer.py | 181 ++++++++------ .../operator/reader/stream_reader.py | 8 +- .../operator/reader/tf_operator_reader.py | 93 ++++--- .../streaming/operator/test/test_eval.py | 12 +- .../operator/test/test_pysink_operator.py | 81 ++++--- .../operator/test/test_ring_buffer.py | 226 +++++++++++------- .../operator/test/test_tf_operator.py | 60 +++-- training/python/streaming/operator/util.py | 42 ++-- 21 files changed, 582 insertions(+), 412 deletions(-) diff --git a/.github/workflows/debian_manylinux_build.yaml b/.github/workflows/debian_manylinux_build.yaml index f184caba..8b60372e 100644 --- a/.github/workflows/debian_manylinux_build.yaml +++ b/.github/workflows/debian_manylinux_build.yaml @@ -15,7 +15,7 @@ env: jobs: streaming-debian-manylinux-pipeline: timeout-minutes: 120 - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 container: quay.io/pypa/manylinux_2_24_x86_64 steps: @@ -23,11 +23,8 @@ jobs: - name: Apt get update and Install bazel run: | - cp /etc/apt/sources.list /etc/apt/sources.list.bak - cp .github/sources.list /etc/apt/sources.list apt-get update apt-get install -yq wget openjdk-8-jdk zlib1g-dev zip git - apt-get update apt-get install -yq gcc g++ sh scripts/install-bazel.sh diff --git a/training/python/setup.py b/training/python/setup.py index 894c53b3..55fd83d6 100644 --- a/training/python/setup.py +++ b/training/python/setup.py @@ -12,15 +12,13 @@ setup( name=pkg_dir, version="0.0.1", - description='streaming', + description="streaming", keywords=("ray", "streaming", "runtime", "operator"), - author='The Authors of Antgroup', - packages=find_packages( - exclude=["*.tests", "*.tests.*", "tests.*", "tests"]), + author="The Authors of Antgroup", + packages=find_packages(exclude=["*.tests", "*.tests.*", "tests.*", "tests"]), platforms="any", scripts=[], include_package_data=True, - install_requires=[ - 'protobuf', 'schedule', 'psutil' - ], - zip_safe=False) + install_requires=["protobuf", "schedule", "psutil"], + zip_safe=False, +) diff --git a/training/python/streaming/operator/checkpoint_listener.py b/training/python/streaming/operator/checkpoint_listener.py index c4655b10..b0ea8d27 100644 --- a/training/python/streaming/operator/checkpoint_listener.py +++ b/training/python/streaming/operator/checkpoint_listener.py @@ -2,11 +2,10 @@ class CheckpointListener(metaclass=ABCMeta): - @abstractmethod def on_checkpoint_complete(self, checkpoint_id): """ :param checkpoint_id: checkpoint id :return: """ - raise NotImplementedError('function not implemented!') + raise NotImplementedError("function not implemented!") diff --git a/training/python/streaming/operator/constant/operator_constants.py b/training/python/streaming/operator/constant/operator_constants.py index e3e07b00..09038162 100644 --- a/training/python/streaming/operator/constant/operator_constants.py +++ b/training/python/streaming/operator/constant/operator_constants.py @@ -7,6 +7,7 @@ class OperatorConstants: """ Training constants """ + NONE_BYTES = 8 # Stream reader @@ -14,7 +15,9 @@ class OperatorConstants: READER_MAX_SLOT_SIZE = "reader_max_slot_size" READER_MAX_BYTES = "reader_max_bytes" READER_LOG_INTERVAL = "reader_log_interval_in_secs" - READER_FORCE_CLEAR_WHEN_FULL_AND_ALL_CONSUMED = "reader_force_clear_when_full_and_all_consumed" + READER_FORCE_CLEAR_WHEN_FULL_AND_ALL_CONSUMED = ( + "reader_force_clear_when_full_and_all_consumed" + ) DEFAULT_QUEUE_START_OFFSET = 0 DEFAULT_QUEUE_MAX_SLOT_SIZE = 100000 diff --git a/training/python/streaming/operator/impl/eval_function.py b/training/python/streaming/operator/impl/eval_function.py index a8996331..50a8aa69 100644 --- a/training/python/streaming/operator/impl/eval_function.py +++ b/training/python/streaming/operator/impl/eval_function.py @@ -22,10 +22,11 @@ def open(self, runtime_context): self.job_id = ray.get_runtime_context().actor_id.job_id.hex() logging.info( "Open eval function. op config : {}, job config : {}.".format( - self._op_config, runtime_context.get_job_config())) + self._op_config, runtime_context.get_job_config() + ) + ) self.optimize_config() - logger.info("Initializing operator with config: {}".format( - self._op_config)) + logger.info("Initializing operator with config: {}".format(self._op_config)) self._reader = EvalReader(self._op_config) self._evaluate_thread_started = False logger.info("Operator begin finish.") @@ -33,8 +34,7 @@ def open(self, runtime_context): def optimize_config(self): for k, v in self._op_config.items(): try: - if isinstance(v, - str) and v.find("{") != -1 and v.find("}") != -1: + if isinstance(v, str) and v.find("{") != -1 and v.find("}") != -1: self._op_config[k] = json.loads(v) except Exception: pass @@ -43,8 +43,8 @@ def get_all_actor_names(self): task_index = 0 global_index = self.task_id - self.task_index actor_names = [ - f"{self.job_id}-{self.job_config['StreamingOpName']}-" + - f"{task_index + i}|{global_index+i}" + f"{self.job_id}-{self.job_config['StreamingOpName']}-" + + f"{task_index + i}|{global_index+i}" for i in range(0, self.parallelism) ] return actor_names @@ -65,7 +65,7 @@ def _evaluate(self): evaluator should override this method :return: """ - raise NotImplementedError('function _evaluate not implemented!') + raise NotImplementedError("function _evaluate not implemented!") def save_checkpoint(self, checkpoint_id): pass diff --git a/training/python/streaming/operator/impl/eval_operator.py b/training/python/streaming/operator/impl/eval_operator.py index eb825a03..e86cad7b 100644 --- a/training/python/streaming/operator/impl/eval_operator.py +++ b/training/python/streaming/operator/impl/eval_operator.py @@ -7,7 +7,9 @@ import ray import json -from streaming.operator.impl.training_independent_actor import TrainingIndependentActorInterface +from streaming.operator.impl.training_independent_actor import ( + TrainingIndependentActorInterface, +) logger = logging.getLogger(__name__) diff --git a/training/python/streaming/operator/impl/optimizer_operator.py b/training/python/streaming/operator/impl/optimizer_operator.py index 38fcbe3e..068e0ac2 100644 --- a/training/python/streaming/operator/impl/optimizer_operator.py +++ b/training/python/streaming/operator/impl/optimizer_operator.py @@ -7,7 +7,9 @@ import ray import json -from streaming.operator.impl.training_independent_actor import TrainingIndependentActorInterface +from streaming.operator.impl.training_independent_actor import ( + TrainingIndependentActorInterface, +) logger = logging.getLogger(__name__) @@ -22,4 +24,4 @@ def __init__(self, conf): class OptimizerInterface(TrainingIndependentActorInterface): def __init__(self, config=None): - super().__init__(config) \ No newline at end of file + super().__init__(config) diff --git a/training/python/streaming/operator/impl/ps_operator.py b/training/python/streaming/operator/impl/ps_operator.py index 2d330bad..f442f049 100644 --- a/training/python/streaming/operator/impl/ps_operator.py +++ b/training/python/streaming/operator/impl/ps_operator.py @@ -7,7 +7,9 @@ import ray import json -from streaming.operator.impl.training_independent_actor import TrainingIndependentActorInterface +from streaming.operator.impl.training_independent_actor import ( + TrainingIndependentActorInterface, +) logger = logging.getLogger(__name__) diff --git a/training/python/streaming/operator/impl/tf_function.py b/training/python/streaming/operator/impl/tf_function.py index 7fb3a8ef..5015439f 100644 --- a/training/python/streaming/operator/impl/tf_function.py +++ b/training/python/streaming/operator/impl/tf_function.py @@ -28,58 +28,59 @@ def open(self, runtime_context): self._op_config = runtime_context.get_config() logging.info( "Open tf function. op config : {}, job config : {}.".format( - self._op_config, runtime_context.get_job_config())) + self._op_config, runtime_context.get_job_config() + ) + ) self._operator_module_manager = self.__init_operator_module_manager() module_name = self._op_config[OPERATOR_MODULE_NAME] class_name = self._op_config[OPERATOR_CLASS_NAME] try: self._operator = self._operator_module_manager.load_single_module( - module_name, class_name) + module_name, class_name + ) self._operator.set_master_actor( - runtime_context.get_controller_actor_handler()) + runtime_context.get_controller_actor_handler() + ) except Exception as e: ray.report_event( - ray.EventSeverity.ERROR, "TF_OPERATOR_ERROR", - f"Loading operator exception {e}") + ray.EventSeverity.ERROR, + "TF_OPERATOR_ERROR", + f"Loading operator exception {e}", + ) raise RuntimeError(f"Loading operator exception : {e}.") self.optimize_config() - logger.info("Initializing operator with config: {}".format( - self._op_config)) + logger.info("Initializing operator with config: {}".format(self._op_config)) - self._op_config[ - WORKER_PARALLELISM_INDEX] = runtime_context.get_task_index() + self._op_config[WORKER_PARALLELISM_INDEX] = runtime_context.get_task_index() was_reconstructed = False try: - was_reconstructed = \ + was_reconstructed = ( ray.get_runtime_context().was_current_actor_reconstructed - logger.info( - "Get ray actor reconstructed {}".format(was_reconstructed)) + ) + logger.info("Get ray actor reconstructed {}".format(was_reconstructed)) except Exception as e: was_reconstructed = False logger.info("Get ray actor reconstructed exception, {}".format(e)) - logger.info("Operator actor list : {}".format( - self.get_all_actor_names())) + logger.info("Operator actor list : {}".format(self.get_all_actor_names())) logger.info("Operator begin init.") self._operator.set_named_handler(self.get_all_actor_names) - self._operator.set_key_value_state( - runtime_context.get_key_value_state()) - self._operator.init_and_run({ - **self._op_config, - **{ - ACTOR_WAS_RECONSTRUCTED: was_reconstructed - }, - **self.job_config - }) + self._operator.set_key_value_state(runtime_context.get_key_value_state()) + self._operator.init_and_run( + { + **self._op_config, + **{ACTOR_WAS_RECONSTRUCTED: was_reconstructed}, + **self.job_config, + } + ) logger.info("Operator init finished.") def optimize_config(self): for k, v in self._op_config.items(): try: - if isinstance(v, - str) and v.find("{") != -1 and v.find("}") != -1: + if isinstance(v, str) and v.find("{") != -1 and v.find("}") != -1: self._op_config[k] = json.loads(v) except Exception: pass @@ -110,8 +111,8 @@ def get_all_actor_names(self): task_index = 0 global_index = self.task_id - self.task_index actor_names = [ - f"{self.job_id}-{self.job_config['StreamingOpName']}-" + - f"{task_index + i}|{global_index+i}" + f"{self.job_id}-{self.job_config['StreamingOpName']}-" + + f"{task_index + i}|{global_index+i}" for i in range(0, self.parallelism) ] return actor_names @@ -127,6 +128,9 @@ def delete_checkpoint(self, checkpoint_id): def __init_operator_module_manager(self): self._operator_module_path = self._op_config[OPERATOR_MODULE_PATH] - logger.info("Initializing operator module from path {}".format( - self._operator_module_path)) + logger.info( + "Initializing operator module from path {}".format( + self._operator_module_path + ) + ) return OperatorModuleManager(self._operator_module_path) diff --git a/training/python/streaming/operator/impl/tf_operator.py b/training/python/streaming/operator/impl/tf_operator.py index 99216bb3..bcdfb0c3 100644 --- a/training/python/streaming/operator/impl/tf_operator.py +++ b/training/python/streaming/operator/impl/tf_operator.py @@ -21,7 +21,8 @@ def __init__(self): # can update executor state by itself. self._last_checkpoint_state = {} self._async_cp_threadpool = ThreadPoolExecutor( - max_workers=1, thread_name_prefix="TF-Operator-Checkpoint-Thread") + max_workers=1, thread_name_prefix="TF-Operator-Checkpoint-Thread" + ) self._worker_parallelism_index = None self._trainer_thread_started = False @@ -30,8 +31,9 @@ def init(self, config): logger.info("TF operator Initialized with config: {}".format(config)) self._reader = TFOperatorReader(config) index_key = "worker_parallelism_index" - self._worker_parallelism_index = int( - config[index_key]) if index_key in config else 0 + self._worker_parallelism_index = ( + int(config[index_key]) if index_key in config else 0 + ) # should override by penrose def init_and_run(self, config): @@ -55,7 +57,7 @@ def _train(self): operator should override this method :return: """ - raise NotImplementedError('function _train not implemented!') + raise NotImplementedError("function _train not implemented!") def _get_consumed_offset(self): """ @@ -70,81 +72,96 @@ def on_checkpoint_complete(self, checkpoint_id): :param checkpoint_id: :return: """ - logger.info("Tf operator on checkpoint complete, checkpoint id: {}." - .format(checkpoint_id)) - logger.debug("Tf operator last checkpoint state: {}" - .format(self._last_checkpoint_state)) - if self._last_checkpoint_state is None or len( - self._last_checkpoint_state) == 0: + logger.info( + "Tf operator on checkpoint complete, checkpoint id: {}.".format( + checkpoint_id + ) + ) + logger.debug( + "Tf operator last checkpoint state: {}".format(self._last_checkpoint_state) + ) + if self._last_checkpoint_state is None or len(self._last_checkpoint_state) == 0: logger.warning("Tf operator last checkpoint state is None.") return self._reader.clear_expired_data( - self._last_checkpoint_state["reader_consumed_offset"]) + self._last_checkpoint_state["reader_consumed_offset"] + ) def __gen_tf_operator_cp_key(self, checkpoint_id): - return "checkpoint-{}-state-{}".format(checkpoint_id, - self._worker_parallelism_index) + return "checkpoint-{}-state-{}".format( + checkpoint_id, self._worker_parallelism_index + ) def save_checkpoint(self, checkpoint_id): # 1. save current consume offset before cp sink! - logger.info("Tf operator({}) saving checkpoint, checkpoint id: {}, " - "getting last checkpoint state.".format( - self._worker_parallelism_index, checkpoint_id)) + logger.info( + "Tf operator({}) saving checkpoint, checkpoint id: {}, " + "getting last checkpoint state.".format( + self._worker_parallelism_index, checkpoint_id + ) + ) self._last_checkpoint_state.update(self._reader.get_state()) self._reader.set_can_be_notified_to_clear() - logger.info("Tf operator saving checkpoint, checkpoint id: {}.".format( - checkpoint_id)) - logger.debug("Tf operator last checkpoint state: {}".format( - self._last_checkpoint_state)) + logger.info( + "Tf operator saving checkpoint, checkpoint id: {}.".format(checkpoint_id) + ) + logger.debug( + "Tf operator last checkpoint state: {}".format(self._last_checkpoint_state) + ) state_data = pickle.dumps(self._last_checkpoint_state) try: self.key_value_state.put( - self.__gen_tf_operator_cp_key(checkpoint_id), state_data) + self.__gen_tf_operator_cp_key(checkpoint_id), state_data + ) except BaseException as e: logger.warning( "Put state checkpoint id {} failed, exception {}.".format( - checkpoint_id, e)) + checkpoint_id, e + ) + ) return state_data def save_checkpoint_async(self, checkpoint_id): # return future logger.info( "Tf operator({}) submitting async checkpoint task, checkpoint is: " - "{}.".format(self._worker_parallelism_index, checkpoint_id)) - f = self._async_cp_threadpool.submit(self.save_checkpoint, - checkpoint_id) + "{}.".format(self._worker_parallelism_index, checkpoint_id) + ) + f = self._async_cp_threadpool.submit(self.save_checkpoint, checkpoint_id) logger.info( "Tf operator({}) save_checkpoint_async return future: {}.".format( - self._worker_parallelism_index, f)) + self._worker_parallelism_index, f + ) + ) return f def load_checkpoint(self, checkpoint_id): - logger.info( - "Tf operator load checkpoint, id: {}.".format(checkpoint_id)) + logger.info("Tf operator load checkpoint, id: {}.".format(checkpoint_id)) # user operator should use state to keep reader's offset from now on if checkpoint_id <= 0: return try: state_data = self.key_value_state.get( - self.__gen_tf_operator_cp_key(checkpoint_id)) + self.__gen_tf_operator_cp_key(checkpoint_id) + ) self._last_checkpoint_state = pickle.loads(state_data) - self._last_checkpoint_state["operator_consumed_offset"] = \ - self._get_consumed_offset() + self._last_checkpoint_state[ + "operator_consumed_offset" + ] = self._get_consumed_offset() self._reader.load_checkpoint(self._last_checkpoint_state) except BaseException as e: - logger.warning("Get state checkpoint id {} failed, {}.".format( - checkpoint_id, e)) + logger.warning( + "Get state checkpoint id {} failed, {}.".format(checkpoint_id, e) + ) def delete_checkpoint(self, checkpoint_id): logger.info("Delete tf operator state : {}.".format(checkpoint_id)) try: - self.key_value_state.remove( - self.__gen_tf_operator_cp_key(checkpoint_id)) + self.key_value_state.remove(self.__gen_tf_operator_cp_key(checkpoint_id)) except BaseException as e: - logger.warning( - "Remove tf operator state failed, exception {}.".format(e)) + logger.warning("Remove tf operator state failed, exception {}.".format(e)) def forward_command(self, message): pass @@ -233,9 +250,7 @@ def forward_command(self, message): command_func = message.message.get("func_name", None) if command_func is None: raise ValueError(f"Can't find function {command_func}.") - return methodcaller( - command_func, - *message.message.get("func_args", ()))(self) + return methodcaller(command_func, *message.message.get("func_args", ()))(self) def is_ready_rescaling(self): return True diff --git a/training/python/streaming/operator/impl/training_independent_actor.py b/training/python/streaming/operator/impl/training_independent_actor.py index c0f7c710..38477607 100644 --- a/training/python/streaming/operator/impl/training_independent_actor.py +++ b/training/python/streaming/operator/impl/training_independent_actor.py @@ -7,8 +7,9 @@ from ray.streaming.generated import remote_call_pb2 from ray.streaming.runtime.worker import WorkerRuntimeInfo -from raystreaming.runtime.udc_adapter_callee import \ - UnitedDistributedControllerAdapterCallee as UDCAdapterCallee +from raystreaming.runtime.udc_adapter_callee import ( + UnitedDistributedControllerAdapterCallee as UDCAdapterCallee, +) import ray logger = logging.getLogger(__name__) @@ -40,8 +41,7 @@ def health_check(self): return info_result_pb.SerializeToString() except Exception as e: - logger.warning( - "Get worker runtime info occurs an exception: {}".format(e)) + logger.warning("Get worker runtime info occurs an exception: {}".format(e)) return remote_call_pb2.WorkerRuntimeInfo().SerializeToString() def inject_exception(self, exception_injector_bytes): diff --git a/training/python/streaming/operator/reader/circular_buffer.py b/training/python/streaming/operator/reader/circular_buffer.py index fb75cd30..3d03f0b7 100644 --- a/training/python/streaming/operator/reader/circular_buffer.py +++ b/training/python/streaming/operator/reader/circular_buffer.py @@ -8,20 +8,21 @@ logger = logging.getLogger(__name__) -class CircularBuffer: - +class CircularBuffer: def __init__( - self, - max_size=OperatorConstants.DEFAULT_CIRCULAR_BUFFER_MAX_SIZE, - max_bytes=OperatorConstants.DEFAULT_CIRCULAR_BUFFER_MAX_SIZE, - log_interval=OperatorConstants.DEFAULT_LOG_DETAIL_INTERVAL): + self, + max_size=OperatorConstants.DEFAULT_CIRCULAR_BUFFER_MAX_SIZE, + max_bytes=OperatorConstants.DEFAULT_CIRCULAR_BUFFER_MAX_SIZE, + log_interval=OperatorConstants.DEFAULT_LOG_DETAIL_INTERVAL, + ): self.max_size = max_size self.max_bytes = max_bytes self.queue = [None] * max_size self.remain_bytes = max_bytes - utils.getsize(self.queue) if self.remain_bytes < 0: - raise ValueError("Used bytes of {} items > max bytes {}".format( - max_size, max_bytes)) + raise ValueError( + "Used bytes of {} items > max bytes {}".format(max_size, max_bytes) + ) self.head = 0 self.tail = 0 self.is_empty = True @@ -55,8 +56,7 @@ def _get(self): self._lock.wait() data = self.queue[self.head] self.queue[self.head] = None - self.remain_bytes += utils.getsize(data) - \ - OperatorConstants.NONE_BYTES + self.remain_bytes += utils.getsize(data) - OperatorConstants.NONE_BYTES self.head = self._add_cursor(self.head) if self.head == self.tail: self.is_empty = True @@ -66,8 +66,10 @@ def put(self, data): self._lock.acquire() used_bytes = utils.getsize(data) if used_bytes > self.max_bytes: - raise RuntimeError("The bytes of this \ - item is out of queue bytes limit.") + raise RuntimeError( + "The bytes of this \ + item is out of queue bytes limit." + ) while not self._can_put(used_bytes): self._pop_front() self.lost_data_count += 1 @@ -86,9 +88,8 @@ def get_size(self): self._lock.acquire() if self.empty(): size = 0 - else: - size = (self.tail + self.max_size - self.head - 1) \ - % self.max_size + 1 + else: + size = (self.tail + self.max_size - self.head - 1) % self.max_size + 1 self._lock.release() return size @@ -101,8 +102,7 @@ def get_bytes_size(self): def clear(self): self._lock.acquire() self.queue = [None] * self.max_size - self.remain_bytes = \ - self.max_bytes - utils.getsize(self.queue) + self.remain_bytes = self.max_bytes - utils.getsize(self.queue) self.head = 0 self.tail = 0 self.is_empty = True @@ -112,11 +112,9 @@ def _pop_front(self): self._get() def _can_put(self, used_bytes): - if self.head == self.tail and \ - not self.empty(): + if self.head == self.tail and not self.empty(): return False - if used_bytes - OperatorConstants.NONE_BYTES \ - > self.remain_bytes: + if used_bytes - OperatorConstants.NONE_BYTES > self.remain_bytes: return False return True @@ -125,7 +123,9 @@ def _add_cursor(self, cursor): def need_log(self): if self.log_detail_interval_in_secs > 0: - return time.time( - ) - self.last_log_detail_time > self.log_detail_interval_in_secs + return ( + time.time() - self.last_log_detail_time + > self.log_detail_interval_in_secs + ) else: return True diff --git a/training/python/streaming/operator/reader/eval_reader.py b/training/python/streaming/operator/reader/eval_reader.py index f5e832a3..4245db62 100644 --- a/training/python/streaming/operator/reader/eval_reader.py +++ b/training/python/streaming/operator/reader/eval_reader.py @@ -15,18 +15,21 @@ class EvalReader(StreamReader): def __init__(self, config): - max_size = int(config[OperatorConstants.READER_MAX_SLOT_SIZE]) \ - if OperatorConstants.READER_MAX_SLOT_SIZE in config \ + max_size = ( + int(config[OperatorConstants.READER_MAX_SLOT_SIZE]) + if OperatorConstants.READER_MAX_SLOT_SIZE in config else OperatorConstants.DEFAULT_QUEUE_MAX_SLOT_SIZE + ) - max_bytes = int(config[OperatorConstants.READER_MAX_BYTES]) \ - if OperatorConstants.READER_MAX_BYTES in config \ + max_bytes = ( + int(config[OperatorConstants.READER_MAX_BYTES]) + if OperatorConstants.READER_MAX_BYTES in config else OperatorConstants.DEFAULT_QUEUE_MAX_BYTES + ) super().__init__() - self.__data_buffer = CircularBuffer( - max_size=max_size, max_bytes=max_bytes) + self.__data_buffer = CircularBuffer(max_size=max_size, max_bytes=max_bytes) def put_data(self, msg): self.__data_buffer.put(msg) @@ -38,8 +41,12 @@ def fetch_data(self, batch_size, **options): :param options: Other options :return: List of training input data """ - batch_data = list(map(lambda x: str(x, encoding="utf-8"), \ - self.__data_buffer.get_list(batch_size))) + batch_data = list( + map( + lambda x: str(x, encoding="utf-8"), + self.__data_buffer.get_list(batch_size), + ) + ) return batch_data def get_offset(self): diff --git a/training/python/streaming/operator/reader/ring_buffer.py b/training/python/streaming/operator/reader/ring_buffer.py index 34120011..3babc895 100644 --- a/training/python/streaming/operator/reader/ring_buffer.py +++ b/training/python/streaming/operator/reader/ring_buffer.py @@ -19,18 +19,23 @@ class RingBuffer(Queue): Note that item in ringbuffer must to bytes """ + def __init__( - self, - max_slot_size=OperatorConstants.DEFAULT_QUEUE_MAX_SLOT_SIZE, - max_bytes=OperatorConstants.DEFAULT_QUEUE_MAX_BYTES, - start_offset=OperatorConstants.DEFAULT_QUEUE_START_OFFSET, - log_interval=OperatorConstants.DEFAULT_RINGBUFFER_LOG_DETAIL_INTERVAL, - force_clear=OperatorConstants.DEFAULT_FORCE_CLEAR, - report_interval=OperatorConstants.DEFAULT_LOG_DETAIL_INTERVAL): - - logger.info("Ringbuffer initialized with max_slot_size:{}, " - "max_byte:{}, start_offset:{}, log_interval:{}".format( - max_slot_size, max_bytes, start_offset, log_interval)) + self, + max_slot_size=OperatorConstants.DEFAULT_QUEUE_MAX_SLOT_SIZE, + max_bytes=OperatorConstants.DEFAULT_QUEUE_MAX_BYTES, + start_offset=OperatorConstants.DEFAULT_QUEUE_START_OFFSET, + log_interval=OperatorConstants.DEFAULT_RINGBUFFER_LOG_DETAIL_INTERVAL, + force_clear=OperatorConstants.DEFAULT_FORCE_CLEAR, + report_interval=OperatorConstants.DEFAULT_LOG_DETAIL_INTERVAL, + ): + + logger.info( + "Ringbuffer initialized with max_slot_size:{}, " + "max_byte:{}, start_offset:{}, log_interval:{}".format( + max_slot_size, max_bytes, start_offset, log_interval + ) + ) super(RingBuffer, self).__init__(maxsize=max_slot_size) @@ -39,8 +44,11 @@ def __init__( self.queue = [None] * max_slot_size self.used_bytes = utils.getsize(self.queue) if self.used_bytes > max_bytes: - raise ValueError("Used bytes {} of {} items > max bytes {}".format( - self.used_bytes, max_slot_size, max_bytes)) + raise ValueError( + "Used bytes {} of {} items > max bytes {}".format( + self.used_bytes, max_slot_size, max_bytes + ) + ) self.start_offset = start_offset self.cursor = start_offset % max_slot_size # the reader cursor @@ -120,21 +128,26 @@ def _put(self, data): if self.need_log(): msg = "Increase used bytes from {} to {}".format( - self.used_bytes, self.used_bytes + commit_data_size) + self.used_bytes, self.used_bytes + commit_data_size + ) self.log_ring_buffer_detail("put", msg) if self.tail == self.head: logger.warning( - "Queue is full, tail: {}, head: {}, unfinished tasks: {}". - format(self.head, self.tail, self.unfinished_tasks)) + "Queue is full, tail: {}, head: {}, unfinished tasks: {}".format( + self.head, self.tail, self.unfinished_tasks + ) + ) self.is_full = True self.log_ring_buffer_detail("put", "queue is full") if self.used_bytes > self.max_bytes: - logger.warning("Queue is full, max bytes: {}, used bytes: {}, " - "commit data bytes: {}.".format( - self.max_bytes, self.used_bytes, - commit_data_size)) + logger.warning( + "Queue is full, max bytes: {}, used bytes: {}, " + "commit data bytes: {}.".format( + self.max_bytes, self.used_bytes, commit_data_size + ) + ) self.is_full = True self.log_ring_buffer_detail("put", "queue is full") @@ -201,20 +214,20 @@ def _get(self): # Queue is full and all data are consumed if self.is_full and self.force_clear: self.log_ring_buffer_detail( - "get", "queue is full and all data are consumed, " - "force clear buffer starting") + "get", + "queue is full and all data are consumed, " + "force clear buffer starting", + ) self.queue = [None] * self.max_slot_size self.used_bytes = utils.getsize(self.queue) self.is_full = False self.not_full.notify() self.can_be_notified_to_clear = False self.real_head = self.real_cursor - self.log_ring_buffer_detail("get", - "force clear buffer finished") + self.log_ring_buffer_detail("get", "force clear buffer finished") return data except Exception as e: - logger.error("Data is not ready, cursor: {}".format(self.cursor), - e) + logger.error("Data is not ready, cursor: {}".format(self.cursor), e) return None def _qsize(self): @@ -239,21 +252,19 @@ def seek_by_offset(self, start_offset): :return: """ with self.mutex: - logger.info( - "Start to reset start offset to {}".format(start_offset)) + logger.info("Start to reset start offset to {}".format(start_offset)) if not (self.real_head <= start_offset <= self.real_tail): - raise ValueError("Seek error, specified cursor {} is not in " - "[{}, {}]".format(start_offset, - self.real_head, - self.real_tail)) + raise ValueError( + "Seek error, specified cursor {} is not in " + "[{}, {}]".format(start_offset, self.real_head, self.real_tail) + ) self.start_offset = start_offset self.real_cursor = start_offset self.cursor = self.real_cursor % self.max_slot_size - logger.info( - "Finished resetting start offset to {}".format(start_offset)) + logger.info("Finished resetting start offset to {}".format(start_offset)) def clear_expired_data(self, expired_offset): """ @@ -265,23 +276,32 @@ def clear_expired_data(self, expired_offset): self.log_ring_buffer_detail("clear") if not self.can_be_notified_to_clear: - msg = "can_be_notified_to_clear=False, do not clear for this" \ - " time, waiting for next checkpoint" + msg = ( + "can_be_notified_to_clear=False, do not clear for this" + " time, waiting for next checkpoint" + ) self.log_ring_buffer_detail("clear", msg) return self.log_ring_buffer_detail( - "clear", "Start to clear expired data, " - "expired offset: {}".format(expired_offset)) + "clear", + "Start to clear expired data, " + "expired offset: {}".format(expired_offset), + ) if expired_offset < self.real_head: - msg = "expired_offset {} < real_head {}, reader consumed offset has not bean updated after " \ - "force clearing, do not clear this time".format(expired_offset, self.real_head) + msg = ( + "expired_offset {} < real_head {}, reader consumed offset has not bean updated after " + "force clearing, do not clear this time".format( + expired_offset, self.real_head + ) + ) self.log_ring_buffer_detail("clear", msg) return if expired_offset > self.real_cursor: msg = "expired_offset {} > real cursor {}, illegal value, do not clear this time.".format( - expired_offset, self.real_cursor) + expired_offset, self.real_cursor + ) self.log_ring_buffer_detail("clear", msg) return @@ -290,8 +310,9 @@ def clear_expired_data(self, expired_offset): self.real_head = expired_offset self.head = self.real_head % self.max_slot_size self.log_ring_buffer_detail( - "clear", "Setting head to {}, old head is:" - " {}".format(self.head, old_head)) + "clear", + "Setting head to {}, old head is:" " {}".format(self.head, old_head), + ) if old_head == self.head: if self.is_full and self.real_cursor == self.real_tail: # queue is full and all data are consumed @@ -319,8 +340,10 @@ def clear_expired_data(self, expired_offset): self.is_full = False self.log_ring_buffer_detail( - "clear", "Finished clearing expired data," - " expired offset: {}".format(expired_offset)) + "clear", + "Finished clearing expired data," + " expired offset: {}".format(expired_offset), + ) if not self.is_full: self.not_full.notify() @@ -335,14 +358,25 @@ def get_tail(self): return self.tail def log_ring_buffer_detail(self, operation, extra=None): - pattern = "[{}] ringbuffer max size:{}, used size:{}, real head:{}, " \ - "real tail:{}, real cursor:{}, max bytes:{} used bytes:{}, " \ - "is full:{}, is empty:{}, can_be_notified_to_clear:{}, extra msg:{}" - detail = pattern.format(operation, self.max_slot_size, self._qsize(), - self.real_head, self.real_tail, - self.real_cursor, self.max_bytes, - self.used_bytes, self.is_full, self.is_empty, - self.can_be_notified_to_clear, extra) + pattern = ( + "[{}] ringbuffer max size:{}, used size:{}, real head:{}, " + "real tail:{}, real cursor:{}, max bytes:{} used bytes:{}, " + "is full:{}, is empty:{}, can_be_notified_to_clear:{}, extra msg:{}" + ) + detail = pattern.format( + operation, + self.max_slot_size, + self._qsize(), + self.real_head, + self.real_tail, + self.real_cursor, + self.max_bytes, + self.used_bytes, + self.is_full, + self.is_empty, + self.can_be_notified_to_clear, + extra, + ) logger.debug(detail) self.last_log_detail_time = time.time() # Report ringbuffer metrics @@ -353,15 +387,19 @@ def report_metrics(self): def need_log(self): if self.log_detail_interval_in_secs > 0: - return time.time( - ) - self.last_log_detail_time > self.log_detail_interval_in_secs + return ( + time.time() - self.last_log_detail_time + > self.log_detail_interval_in_secs + ) else: return True def need_report_metrics(self): if self.report_metrics_interval_in_secs > 0: - return time.time( - ) - self.last_report_time > self.report_metrics_interval_in_secs + return ( + time.time() - self.last_report_time + > self.report_metrics_interval_in_secs + ) return True def set_can_be_notified_to_clear(self): @@ -372,7 +410,8 @@ def set_can_be_notified_to_clear(self): with self.mutex: self.can_be_notified_to_clear = True self.log_ring_buffer_detail( - "checkpoint", "set can_be_notified_to_clear to True") + "checkpoint", "set can_be_notified_to_clear to True" + ) def get_state(self, reader_consumed_offset): """ @@ -385,11 +424,10 @@ def get_state(self, reader_consumed_offset): "real_tail": self.real_tail, "real_cursor": self.real_cursor, "reader_consumed_offset": reader_consumed_offset, - "unconsumed_data": [] + "unconsumed_data": [], } # save unconsumed data to checkpoint state - for i in range(state["reader_consumed_offset"], - state["real_tail"]): + for i in range(state["reader_consumed_offset"], state["real_tail"]): state["unconsumed_data"].append(self.queue[i % self.maxsize]) return state @@ -411,23 +449,25 @@ def __reset_cursor(self, real_cursor): def load_checkpoint(self, state): with self.mutex: - self.log_ring_buffer_detail("load_checkpoint", - "load checkpoint start") + self.log_ring_buffer_detail("load_checkpoint", "load checkpoint start") # reset pointer - logger.info("Reset start offset to: {}".format( - state["reader_consumed_offset"])) + logger.info( + "Reset start offset to: {}".format(state["reader_consumed_offset"]) + ) self.__reset_start_offset(state["reader_consumed_offset"]) - logger.info("Reset cursor to: {}".format( - state["operator_consumed_offset"])) + logger.info("Reset cursor to: {}".format(state["operator_consumed_offset"])) self.__reset_cursor(state["operator_consumed_offset"]) # load data from state recover_start = state["reader_consumed_offset"] recover_end = state["real_tail"] - logger.info("Load checkpoint and reput data from {} to {}.".format( - recover_start, recover_end)) + logger.info( + "Load checkpoint and reput data from {} to {}.".format( + recover_start, recover_end + ) + ) logger.info( f"Before load checkpoint, state info : real_head {state['real_head']},\ real_tail {state['real_tail']}, real_cursor {state['real_cursor']}, \ @@ -449,8 +489,7 @@ def load_checkpoint(self, state): if recover_end > recover_start: self.is_empty = False self.not_empty.notify() - self.log_ring_buffer_detail("load_checkpoint", - "load checkpoint finish") + self.log_ring_buffer_detail("load_checkpoint", "load checkpoint finish") logger.info( f"After load checkpoint, none value num : {none_value_num_in_state} {self}" ) diff --git a/training/python/streaming/operator/reader/stream_reader.py b/training/python/streaming/operator/reader/stream_reader.py index 54c3275a..962b57b6 100644 --- a/training/python/streaming/operator/reader/stream_reader.py +++ b/training/python/streaming/operator/reader/stream_reader.py @@ -15,13 +15,13 @@ def __init__(self, start_offset=0): self._start_offset = start_offset def get_offset(self): - raise NotImplementedError('function not implemented') + raise NotImplementedError("function not implemented") def fetch_data(self, batch_size, **options): - raise NotImplementedError('function not implemented') + raise NotImplementedError("function not implemented") def seek_by_offset(self, start_offset): - raise NotImplementedError('function not implemented') + raise NotImplementedError("function not implemented") def clear_expired_data(self, expired_offset): - raise NotImplementedError('function not implemented') + raise NotImplementedError("function not implemented") diff --git a/training/python/streaming/operator/reader/tf_operator_reader.py b/training/python/streaming/operator/reader/tf_operator_reader.py index dff37cd2..dcb316d3 100644 --- a/training/python/streaming/operator/reader/tf_operator_reader.py +++ b/training/python/streaming/operator/reader/tf_operator_reader.py @@ -4,6 +4,7 @@ import os from streaming.operator.constant.operator_constants import OperatorConstants + # from ray.streaming.constants import StreamingConstants from .ring_buffer import RingBuffer @@ -16,34 +17,47 @@ class TFOperatorReader(StreamReader): """ TF Operator Data Reader """ + def __init__(self, config): - start_offset = int(config[OperatorConstants.READER_START_OFFSET]) \ - if OperatorConstants.READER_START_OFFSET in config \ + start_offset = ( + int(config[OperatorConstants.READER_START_OFFSET]) + if OperatorConstants.READER_START_OFFSET in config else OperatorConstants.DEFAULT_QUEUE_START_OFFSET + ) - max_slot_size = int(config[OperatorConstants.READER_MAX_SLOT_SIZE]) \ - if OperatorConstants.READER_MAX_SLOT_SIZE in config \ + max_slot_size = ( + int(config[OperatorConstants.READER_MAX_SLOT_SIZE]) + if OperatorConstants.READER_MAX_SLOT_SIZE in config else OperatorConstants.DEFAULT_QUEUE_MAX_SLOT_SIZE + ) - max_bytes = int(config[OperatorConstants.READER_MAX_BYTES]) \ - if OperatorConstants.READER_MAX_BYTES in config \ + max_bytes = ( + int(config[OperatorConstants.READER_MAX_BYTES]) + if OperatorConstants.READER_MAX_BYTES in config else OperatorConstants.DEFAULT_QUEUE_MAX_BYTES + ) - log_interval = int(config[OperatorConstants.READER_LOG_INTERVAL]) \ - if OperatorConstants.READER_LOG_INTERVAL in config \ + log_interval = ( + int(config[OperatorConstants.READER_LOG_INTERVAL]) + if OperatorConstants.READER_LOG_INTERVAL in config else OperatorConstants.DEFAULT_LOG_DETAIL_INTERVAL + ) - force_clear = config[OperatorConstants.READER_FORCE_CLEAR_WHEN_FULL_AND_ALL_CONSUMED] \ - if OperatorConstants.READER_FORCE_CLEAR_WHEN_FULL_AND_ALL_CONSUMED in config \ + force_clear = ( + config[OperatorConstants.READER_FORCE_CLEAR_WHEN_FULL_AND_ALL_CONSUMED] + if OperatorConstants.READER_FORCE_CLEAR_WHEN_FULL_AND_ALL_CONSUMED in config else OperatorConstants.DEFAULT_FORCE_CLEAR + ) super().__init__(start_offset) - self.__data_buffer = RingBuffer(start_offset=start_offset, - max_slot_size=max_slot_size, - max_bytes=max_bytes, - log_interval=log_interval, - force_clear=force_clear) + self.__data_buffer = RingBuffer( + start_offset=start_offset, + max_slot_size=max_slot_size, + max_bytes=max_bytes, + log_interval=log_interval, + force_clear=force_clear, + ) self._consumed_offset = start_offset self._stop_event = threading.Event() @@ -55,7 +69,9 @@ def put_data(self, msg): self._stop_event.clear() raise Exception( "Stop event is set, raise exception to rollback now." - if not self._stop_reason else self._stop_reason) + if not self._stop_reason + else self._stop_reason + ) def fetch_data(self, batch_size, **options): """ @@ -73,8 +89,7 @@ def fetch_data(self, batch_size, **options): time.sleep(0.01) continue # Just for tensorflow data where buffer item type is byte - batch_data.append( - (self._consumed_offset, str(item, encoding="utf-8"))) + batch_data.append((self._consumed_offset, str(item, encoding="utf-8"))) # Update consume offset(The checkpoint state info) self._consumed_offset += batch_size @@ -97,8 +112,7 @@ def iterator(self, batch_size, **options): data_num += 1 batch_data.append(str(item, encoding="utf-8")) if len(batch_data) % 100 == 0: - logger.debug("in iterator batch data size is: %d" % - len(batch_data)) + logger.debug("in iterator batch data size is: %d" % len(batch_data)) if batch_size == len(batch_data): logger.debug("in iterator batch data size is: %d" % batch_size) @@ -116,8 +130,10 @@ def seek_by_offset(self, start_offset): try: self.__data_buffer.seek_by_offset(start_offset) except ValueError as v: - logger.warning("Seek by offset caught value error: {}, " - "use current cursor as start offset.".format(v)) + logger.warning( + "Seek by offset caught value error: {}, " + "use current cursor as start offset.".format(v) + ) def stop_reader(self, reason=None): """ @@ -130,8 +146,7 @@ def stop_reader(self, reason=None): # Close it to awake ring buffer. self.__data_buffer.close() except Exception as e: - logger.warning( - "Stop reader and make exception event failed {}.".format(e)) + logger.warning("Stop reader and make exception event failed {}.".format(e)) os._exit(-1) logger.info("Finish to stop reader, emit exception.") @@ -159,29 +174,37 @@ def get_state(self): def load_checkpoint(self, checkpoint_state): if not self.__is_operator_consumed_offset_legal(checkpoint_state): logger.info( - "operator_consumed_offset is illegal, use reader_consumed_offset: {}" - .format(checkpoint_state["reader_consumed_offset"])) + "operator_consumed_offset is illegal, use reader_consumed_offset: {}".format( + checkpoint_state["reader_consumed_offset"] + ) + ) checkpoint_state["operator_consumed_offset"] = checkpoint_state[ - "reader_consumed_offset"] + "reader_consumed_offset" + ] self._consumed_offset = checkpoint_state["operator_consumed_offset"] - logger.info("Tf operator consumed_offset is: {}".format( - self._consumed_offset)) + logger.info("Tf operator consumed_offset is: {}".format(self._consumed_offset)) self.__data_buffer.load_checkpoint(checkpoint_state) logger.debug( - "Tf operator finished load checkpoint, current state is: {}". - format(self.get_state())) + "Tf operator finished load checkpoint, current state is: {}".format( + self.get_state() + ) + ) def __is_operator_consumed_offset_legal(self, checkpoint_state): reader_consumed_offset = checkpoint_state["reader_consumed_offset"] operator_consumed_offset = checkpoint_state["operator_consumed_offset"] real_head = checkpoint_state["real_head"] real_tail = checkpoint_state["real_tail"] - logger.info( - "operator_consumed_offset is: {}".format(operator_consumed_offset)) + logger.info("operator_consumed_offset is: {}".format(operator_consumed_offset)) # if operator_consumed_offset is valid, use it as reader offset to keep # exactly once - if operator_consumed_offset is not None and \ - real_head <= reader_consumed_offset <= operator_consumed_offset <= real_tail: + if ( + operator_consumed_offset is not None + and real_head + <= reader_consumed_offset + <= operator_consumed_offset + <= real_tail + ): logger.info("operator_consumed_offset is legal.") return True else: diff --git a/training/python/streaming/operator/test/test_eval.py b/training/python/streaming/operator/test/test_eval.py index 2c6442b4..962bf102 100644 --- a/training/python/streaming/operator/test/test_eval.py +++ b/training/python/streaming/operator/test/test_eval.py @@ -16,7 +16,6 @@ @test_utils.skip_if_no_streaming_jar() def test_eval_word_count(): class MockSourceFunction(SourceFunction): - def init(self, parallel, index): self.tot = 10 @@ -29,7 +28,6 @@ def fetch(self, ctx, checkpoint_id): ctx.collect(bytes("b", encoding="utf8")) class MockEvalSinkFunction(EvalSinkFunction): - def _evaluate(self): sleep(5) data_list = self._reader.fetch_data(10) @@ -42,15 +40,13 @@ def _evaluate(self): if os.path.exists(sink_file): os.remove(sink_file) - ctx = StreamingContext.Builder() \ - .build() + ctx = StreamingContext.Builder().build() a = MockSourceFunction() b = MockEvalSinkFunction() - ctx.source(a) \ - .disable_chain() \ - .sink(b) \ - .with_config(conf={"reader_max_slot_size": "10"}) + ctx.source(a).disable_chain().sink(b).with_config( + conf={"reader_max_slot_size": "10"} + ) ctx.submit("test_eval_word_count") diff --git a/training/python/streaming/operator/test/test_pysink_operator.py b/training/python/streaming/operator/test/test_pysink_operator.py index 6c3a6a3e..700d479b 100644 --- a/training/python/streaming/operator/test/test_pysink_operator.py +++ b/training/python/streaming/operator/test/test_pysink_operator.py @@ -9,24 +9,28 @@ @test_utils.skip_if_no_streaming_jar() def test_pysink_word_count(): test_utils.start_ray() - ctx = StreamingContext.Builder() \ - .option("streaming.metrics.reporters", "") \ - .build() + ctx = StreamingContext.Builder().option("streaming.metrics.reporters", "").build() tf_sink_function = TFSinkFunction() - ctx.read_text_file(__file__) \ - .set_parallelism(1) \ - .flat_map(lambda x: x.split()) \ - .map(lambda x: (x, 1)) \ - .key_by(lambda x: x[0]) \ - .reduce(lambda old_value, new_value: - (old_value[0], old_value[1] + new_value[1])) \ - .filter(lambda x: "ray" not in x) \ - .map(lambda x : bytes(x[0], encoding='utf-8')) \ - .disable_chain() \ - .sink(tf_sink_function) \ - .with_config(conf={"operator_module_path" : ".", "operator_module_name" : "streaming.operator.impl.tf_operator", "operator_class_name" : "Mock4RescaleOperator"}) + ctx.read_text_file(__file__).set_parallelism(1).flat_map(lambda x: x.split()).map( + lambda x: (x, 1) + ).key_by(lambda x: x[0]).reduce( + lambda old_value, new_value: (old_value[0], old_value[1] + new_value[1]) + ).filter( + lambda x: "ray" not in x + ).map( + lambda x: bytes(x[0], encoding="utf-8") + ).disable_chain().sink( + tf_sink_function + ).with_config( + conf={ + "operator_module_path": ".", + "operator_module_name": "streaming.operator.impl.tf_operator", + "operator_class_name": "Mock4RescaleOperator", + } + ) ctx.submit("tf_function") import time + time.sleep(10) ray.shutdown() @@ -34,16 +38,18 @@ def test_pysink_word_count(): @test_utils.skip_if_no_streaming_jar() def test_is_ready_rescaling(): test_utils.start_ray() - ctx = StreamingContext.Builder() \ - .option("streaming.metrics.reporters", "") \ - .build() + ctx = StreamingContext.Builder().option("streaming.metrics.reporters", "").build() tf_sink_function = TFSinkFunction() - ctx.read_text_file(__file__) \ - .disable_chain() \ - .sink(tf_sink_function) \ - .with_config(conf={"operator_module_path" : ".", "operator_module_name" : "streaming.operator.impl.tf_operator", "operator_class_name" : "Mock4RescaleOperator"}) + ctx.read_text_file(__file__).disable_chain().sink(tf_sink_function).with_config( + conf={ + "operator_module_path": ".", + "operator_module_name": "streaming.operator.impl.tf_operator", + "operator_class_name": "Mock4RescaleOperator", + } + ) ctx.submit("test_is_ready_rescaling") import time + time.sleep(10) actor_name_1 = "1-PythonOperator-0|0" actor_name_2 = "2-PythonOperator-0|1" @@ -64,25 +70,32 @@ def init(self, parallel, index): def fetch(self, ctx, checkpoint_id): import time + time.sleep(0.1) ctx.collect("hello ray {}".format(random.randint(0, 100))) test_utils.start_ray() ctx = StreamingContext.Builder().build() tf_sink_function = TFSinkFunction() - ctx.source(TestSource()) \ - .set_parallelism(1) \ - .flat_map(lambda x: x.split()) \ - .map(lambda x: (x, 1)) \ - .key_by(lambda x: x[0]) \ - .reduce(lambda old_value, new_value: - (old_value[0], old_value[1] + new_value[1])) \ - .filter(lambda x: "ray" not in x) \ - .map(lambda x : bytes(x[0], encoding='utf-8')) \ - .disable_chain() \ - .sink(tf_sink_function) \ - .with_config(conf={"operator_module_path" : ".", "operator_module_name" : "streaming.operator.impl.tf_operator", "operator_class_name" : "Mock4RescaleOperator"}) + ctx.source(TestSource()).set_parallelism(1).flat_map(lambda x: x.split()).map( + lambda x: (x, 1) + ).key_by(lambda x: x[0]).reduce( + lambda old_value, new_value: (old_value[0], old_value[1] + new_value[1]) + ).filter( + lambda x: "ray" not in x + ).map( + lambda x: bytes(x[0], encoding="utf-8") + ).disable_chain().sink( + tf_sink_function + ).with_config( + conf={ + "operator_module_path": ".", + "operator_module_name": "streaming.operator.impl.tf_operator", + "operator_class_name": "Mock4RescaleOperator", + } + ) ctx.submit("tf_function_state_{}".format(random.random())) import time + time.sleep(30) ray.shutdown() diff --git a/training/python/streaming/operator/test/test_ring_buffer.py b/training/python/streaming/operator/test/test_ring_buffer.py index c86d6a3a..24cd880c 100644 --- a/training/python/streaming/operator/test/test_ring_buffer.py +++ b/training/python/streaming/operator/test/test_ring_buffer.py @@ -15,8 +15,11 @@ def debug(q, return_msg): print(return_msg) print("Buffer is: {}".format(q.queue)) - print("head: {}, tail:{}, cursor: {}, size:{}\n".format( - q.head, q.tail, q.cursor, q._qsize())) + print( + "head: {}, tail:{}, cursor: {}, size:{}\n".format( + q.head, q.tail, q.cursor, q._qsize() + ) + ) def pull(q: RingBuffer): @@ -47,10 +50,10 @@ def push(q): def test_buffer_operation_block(): q = RingBuffer(30, force_clear=False) - producer = threading.Thread(target=push, args=(q, )) + producer = threading.Thread(target=push, args=(q,)) producer.start() - consumer = threading.Thread(target=pull, args=(q, )) + consumer = threading.Thread(target=pull, args=(q,)) consumer.start() producer.join() @@ -62,24 +65,45 @@ def test_buffer_operation_non_block(): q = RingBuffer(5, force_clear=False) debug(q, q.put(1)) - assert q._qsize( - ) == 1 and not q.is_full and q.head == 0 and q.tail == 1 and q.cursor == 0 + assert ( + q._qsize() == 1 + and not q.is_full + and q.head == 0 + and q.tail == 1 + and q.cursor == 0 + ) debug(q, q.put(2)) - assert q._qsize( - ) == 2 and not q.is_full and q.head == 0 and q.tail == 2 and q.cursor == 0 + assert ( + q._qsize() == 2 + and not q.is_full + and q.head == 0 + and q.tail == 2 + and q.cursor == 0 + ) debug(q, q.put(3)) - assert q._qsize( - ) == 3 and not q.is_full and q.head == 0 and q.tail == 3 and q.cursor == 0 + assert ( + q._qsize() == 3 + and not q.is_full + and q.head == 0 + and q.tail == 3 + and q.cursor == 0 + ) debug(q, q.put(4)) - assert q._qsize( - ) == 4 and not q.is_full and q.head == 0 and q.tail == 4 and q.cursor == 0 + assert ( + q._qsize() == 4 + and not q.is_full + and q.head == 0 + and q.tail == 4 + and q.cursor == 0 + ) debug(q, q.put(5)) - assert q._qsize( - ) == 5 and q.is_full and q.head == 0 and q.tail == 0 and q.cursor == 0 + assert ( + q._qsize() == 5 and q.is_full and q.head == 0 and q.tail == 0 and q.cursor == 0 + ) # Buffer is full, blocking until timeout with pytest.raises(Full): @@ -87,24 +111,29 @@ def test_buffer_operation_non_block(): # Get operation does not pop data, so the buffer is still full debug(q, q.get()) - assert q._qsize( - ) == 5 and q.is_full and q.head == 0 and q.tail == 0 and q.cursor == 1 + assert ( + q._qsize() == 5 and q.is_full and q.head == 0 and q.tail == 0 and q.cursor == 1 + ) debug(q, q.get()) - assert q._qsize( - ) == 5 and q.is_full and q.head == 0 and q.tail == 0 and q.cursor == 2 + assert ( + q._qsize() == 5 and q.is_full and q.head == 0 and q.tail == 0 and q.cursor == 2 + ) debug(q, q.get()) - assert q._qsize( - ) == 5 and q.is_full and q.head == 0 and q.tail == 0 and q.cursor == 3 + assert ( + q._qsize() == 5 and q.is_full and q.head == 0 and q.tail == 0 and q.cursor == 3 + ) debug(q, q.get()) - assert q._qsize( - ) == 5 and q.is_full and q.head == 0 and q.tail == 0 and q.cursor == 4 + assert ( + q._qsize() == 5 and q.is_full and q.head == 0 and q.tail == 0 and q.cursor == 4 + ) debug(q, q.get()) - assert q._qsize( - ) == 5 and q.is_full and q.head == 0 and q.tail == 0 and q.cursor == 0 + assert ( + q._qsize() == 5 and q.is_full and q.head == 0 and q.tail == 0 and q.cursor == 0 + ) # All data are consumed so the queue is both full and empty for now assert q.is_empty @@ -120,29 +149,42 @@ def test_buffer_operation_non_block(): # Clear expired data to free buffer memory, set head from 0 to 2 (index 0 and 1 are expired) q.clear_expired_data(2) - assert q._qsize( - ) == 3 and not q.is_full and q.head == 2 and q.tail == 0 and q.cursor == 0 + assert ( + q._qsize() == 3 + and not q.is_full + and q.head == 2 + and q.tail == 0 + and q.cursor == 0 + ) debug(q, q.put(6)) - assert q._qsize( - ) == 4 and not q.is_full and q.head == 2 and q.tail == 1 and q.cursor == 0 + assert ( + q._qsize() == 4 + and not q.is_full + and q.head == 2 + and q.tail == 1 + and q.cursor == 0 + ) # Buffer is full again debug(q, q.put(7)) - assert q._qsize( - ) == 5 and q.is_full and q.head == 2 and q.tail == 2 and q.cursor == 0 + assert ( + q._qsize() == 5 and q.is_full and q.head == 2 and q.tail == 2 and q.cursor == 0 + ) # Can not put when buffer is full with pytest.raises(Full): q.put_nowait(8) debug(q, q.get()) - assert q._qsize( - ) == 5 and q.is_full and q.head == 2 and q.tail == 2 and q.cursor == 1 + assert ( + q._qsize() == 5 and q.is_full and q.head == 2 and q.tail == 2 and q.cursor == 1 + ) debug(q, q.get()) - assert q._qsize( - ) == 5 and q.is_full and q.head == 2 and q.tail == 2 and q.cursor == 2 + assert ( + q._qsize() == 5 and q.is_full and q.head == 2 and q.tail == 2 and q.cursor == 2 + ) # Empty again with pytest.raises(Empty): @@ -165,29 +207,27 @@ def test_python_list_memory(): assert 64 + 8 * 5 + 16 == utils.getsize([None] * 5) # []->64 + item pointer-> 8*5 + None->16 + bytes->33 + 'a'->1 - assert 64 + 8 * 5 + 16 + 33 + 1 == utils.getsize( - [None, b'a', None, None, None]) + assert 64 + 8 * 5 + 16 + 33 + 1 == utils.getsize([None, b"a", None, None, None]) # []->64 + item pointer-> 8*5 + bytes->33 + 'a'->1 - assert 64 + 8 * 5 + 33 + 1 == utils.getsize([b'a', b'a', b'a', b'a', b'a']) + assert 64 + 8 * 5 + 33 + 1 == utils.getsize([b"a", b"a", b"a", b"a", b"a"]) - assert 64 + 8 == sys.getsizeof([b'a']) + assert 64 + 8 == sys.getsizeof([b"a"]) # []->64 + item pointer->8 + bytes->33 - assert 64 + 8 + 33 == utils.getsize([b'']) + assert 64 + 8 + 33 == utils.getsize([b""]) # []->64 + item pointer->8 + bytes->33 + 'a'->1 - assert 64 + 8 + 34 == utils.getsize([b'a']) + assert 64 + 8 + 34 == utils.getsize([b"a"]) # []->64 + item pointer->8 + bytes->33 + 'ab'->2 - assert 64 + 8 + 35 == utils.getsize([b'ab']) + assert 64 + 8 + 35 == utils.getsize([b"ab"]) last_memory_in_bytes = get_process_memory() # Ring buffer with 10000000 same b'a' takes 76MB - q = [b'a'] * 10000000 + q = [b"a"] * 10000000 assert 64 + 8 * 10000000 + 33 + 1 == utils.getsize(q) assert int(utils.getsize(q) / 1024 / 1024) == 76 # 76MB # Check increased memory of current process - assert int( - (get_process_memory() - last_memory_in_bytes) / 1024 / 1024) == 76 + assert int((get_process_memory() - last_memory_in_bytes) / 1024 / 1024) == 76 def test_ring_buffer_memory(): @@ -198,16 +238,21 @@ def test_ring_buffer_memory(): print("\n") for i in range(1, 10): - q.put(str(i).encode(encoding='utf-8')) + q.put(str(i).encode(encoding="utf-8")) # Count used bytes directly - print("#{} Used byte: {}, diff byte: {}".format( - i, q.used_bytes, q.used_bytes - last_used_bytes)) + print( + "#{} Used byte: {}, diff byte: {}".format( + i, q.used_bytes, q.used_bytes - last_used_bytes + ) + ) # Use memory utils to count memory size - print("#{} Used mem: {}, diff mem: {}\n".format( - i, utils.getsize(q.queue), - utils.getsize(q.queue) - last_mem_size)) + print( + "#{} Used mem: {}, diff mem: {}\n".format( + i, utils.getsize(q.queue), utils.getsize(q.queue) - last_mem_size + ) + ) last_used_bytes = q.used_bytes last_mem_size = utils.getsize(q.queue) @@ -306,26 +351,35 @@ def test_put_reach_memory_limit(): assert 64 + 8 * 40 + 16 == utils.getsize(buffer.queue) # 400Bytes # add 35Bytes - buffer.put(b'10', timeout=1) - assert buffer.used_bytes == 435 and utils.getsize( - buffer.queue) == 435 and not buffer.is_full + buffer.put(b"10", timeout=1) + assert ( + buffer.used_bytes == 435 + and utils.getsize(buffer.queue) == 435 + and not buffer.is_full + ) # add 35Bytes - buffer.put(b'11', timeout=1) - assert buffer.used_bytes == 470 and utils.getsize( - buffer.queue) == 470 and not buffer.is_full + buffer.put(b"11", timeout=1) + assert ( + buffer.used_bytes == 470 + and utils.getsize(buffer.queue) == 470 + and not buffer.is_full + ) # add 35Bytes, can exceed little bit - buffer.put(b'12', timeout=1) - assert buffer.used_bytes == 505 and utils.getsize( - buffer.queue) == 505 and buffer.is_full + buffer.put(b"12", timeout=1) + assert ( + buffer.used_bytes == 505 + and utils.getsize(buffer.queue) == 505 + and buffer.is_full + ) with pytest.raises(Full): - buffer.put(b'13', timeout=1) + buffer.put(b"13", timeout=1) assert buffer.is_full - assert b'10' == buffer.get() - assert b'11' == buffer.get() - assert b'12' == buffer.get() + assert b"10" == buffer.get() + assert b"11" == buffer.get() + assert b"12" == buffer.get() assert buffer.is_full @@ -372,9 +426,9 @@ def test_clear(): buffer.clear_expired_data(expired_offset) assert buffer.real_head == 10 - buffer.put(b'1') - buffer.put(b'2') - buffer.put(b'3') + buffer.put(b"1") + buffer.put(b"2") + buffer.put(b"3") buffer.get() buffer.get() @@ -412,12 +466,12 @@ def test_seek_by_offset(): def test_load_checkpoint(): state = { - 'real_head': 8, - 'real_tail': 10, - 'real_cursor': 8, - 'reader_consumed_offset': 8, - 'operator_consumed_offset': 8, - 'unconsumed_data': [1, 2] + "real_head": 8, + "real_tail": 10, + "real_cursor": 8, + "reader_consumed_offset": 8, + "operator_consumed_offset": 8, + "unconsumed_data": [1, 2], } buffer = RingBuffer(max_slot_size=10) buffer.load_checkpoint(state) @@ -442,7 +496,7 @@ def __init__(self, buffer, num): self.num = num def run(self): - print('starting ' + self.name) + print("starting " + self.name) for i in range(0, self.num): sleep(0.01) self.buffer.put(i) @@ -456,7 +510,7 @@ def __init__(self, buffer, num): self.sum = 0 def run(self): - print('starting ' + self.name) + print("starting " + self.name) for i in range(0, self.num): self.sum += self.buffer.get() @@ -466,12 +520,12 @@ def get_sum(self): def test_producer_consumer(): state = { - 'real_head': 8, - 'real_tail': 10, - 'real_cursor': 8, - 'reader_consumed_offset': 8, - 'operator_consumed_offset': 8, - 'unconsumed_data': [1, 2] + "real_head": 8, + "real_tail": 10, + "real_cursor": 8, + "reader_consumed_offset": 8, + "operator_consumed_offset": 8, + "unconsumed_data": [1, 2], } buffer = RingBuffer(max_slot_size=10) buffer.load_checkpoint(state) @@ -488,12 +542,12 @@ def test_producer_consumer(): def test_load_none_value(): state = { - 'real_head': 8, - 'real_tail': 10, - 'real_cursor': 8, - 'reader_consumed_offset': 8, - 'operator_consumed_offset': 8, - 'unconsumed_data': [None, 2] + "real_head": 8, + "real_tail": 10, + "real_cursor": 8, + "reader_consumed_offset": 8, + "operator_consumed_offset": 8, + "unconsumed_data": [None, 2], } buffer = RingBuffer(max_slot_size=10) buffer.load_checkpoint(state) diff --git a/training/python/streaming/operator/test/test_tf_operator.py b/training/python/streaming/operator/test/test_tf_operator.py index 59f2a5a5..32bab58a 100644 --- a/training/python/streaming/operator/test/test_tf_operator.py +++ b/training/python/streaming/operator/test/test_tf_operator.py @@ -63,16 +63,19 @@ def test_generate(): def test_save_checkpoint(): key_value_state = StateBackendFactory.get_state_backend( - StreamingConstants.CP_STATE_BACKEND_PANGU) - key_value_state.init({ - "cp_state_backend_type": "cp_state_backend_pangu", - "cp_pangu_root_dir": "file:///tmp/state_backend/" - }) + StreamingConstants.CP_STATE_BACKEND_PANGU + ) + key_value_state.init( + { + "cp_state_backend_type": "cp_state_backend_pangu", + "cp_pangu_root_dir": "file:///tmp/state_backend/", + } + ) tf_operator = MockOperator() tf_operator.init({"batch_size": 5, "epoch": 2}) tf_operator.set_key_value_state(key_value_state) tf_operator.run() - [tf_operator.process(c.encode(encoding='utf-8')) for c in "abcdefgh"] + [tf_operator.process(c.encode(encoding="utf-8")) for c in "abcdefgh"] # Leave a little bit time for consumer sleep(0.5) assert tf_operator._reader.get_head() == 0 @@ -89,7 +92,7 @@ def test_save_checkpoint(): tf_operator.on_checkpoint_complete(1) assert tf_operator._reader.get_head() == 5 - [tf_operator.process(c.encode(encoding='utf-8')) for c in "ijk"] + [tf_operator.process(c.encode(encoding="utf-8")) for c in "ijk"] sleep(0.5) assert tf_operator._reader.get_offset() == 10 assert tf_operator._reader.get_cursor() == 10 @@ -97,18 +100,20 @@ def test_save_checkpoint(): def test_save_and_load_checkpoint(): key_value_state = StateBackendFactory.get_state_backend( - StreamingConstants.CP_STATE_BACKEND_DFS) - key_value_state.init({ - StreamingConstants.CP_STATE_BACKEND_TYPE: - StreamingConstants.CP_STATE_BACKEND_DFS, - StreamingConstants.CP_DFS_ROOT_DIR: - StreamingConstants.CP_DFS_ROOT_DIR_DEFAULT + "/{}".format(random()) - }) + StreamingConstants.CP_STATE_BACKEND_DFS + ) + key_value_state.init( + { + StreamingConstants.CP_STATE_BACKEND_TYPE: StreamingConstants.CP_STATE_BACKEND_DFS, + StreamingConstants.CP_DFS_ROOT_DIR: StreamingConstants.CP_DFS_ROOT_DIR_DEFAULT + + "/{}".format(random()), + } + ) tf_operator = MockOperator() tf_operator.init({"batch_size": 5, "epoch": 2}) tf_operator.set_key_value_state(key_value_state) tf_operator.run() - [tf_operator.process(c.encode(encoding='utf-8')) for c in "abcdefgh"] + [tf_operator.process(c.encode(encoding="utf-8")) for c in "abcdefgh"] # Leave a little bit time for consumer sleep(0.5) assert tf_operator._reader.get_head() == 0 @@ -125,7 +130,7 @@ def test_save_and_load_checkpoint(): tf_operator.on_checkpoint_complete(1) assert tf_operator._reader.get_head() == 5 - [tf_operator.process(c.encode(encoding='utf-8')) for c in "ijk"] + [tf_operator.process(c.encode(encoding="utf-8")) for c in "ijk"] sleep(0.5) assert tf_operator._reader.get_offset() == 10 assert tf_operator._reader.get_cursor() == 10 @@ -147,11 +152,14 @@ def test_save_and_load_checkpoint(): def test_save_checkpoint_async(): key_value_state = StateBackendFactory.get_state_backend( - StreamingConstants.CP_STATE_BACKEND_PANGU) - key_value_state.init({ - "cp_state_backend_type": "cp_state_backend_pangu", - "cp_pangu_root_dir": "file:///tmp/state_backend/" - }) + StreamingConstants.CP_STATE_BACKEND_PANGU + ) + key_value_state.init( + { + "cp_state_backend_type": "cp_state_backend_pangu", + "cp_pangu_root_dir": "file:///tmp/state_backend/", + } + ) tf_operator = MockOperator() tf_operator.init({}) tf_operator.set_key_value_state(key_value_state) @@ -159,7 +167,7 @@ def test_save_checkpoint_async(): assert len(tf_operator._last_checkpoint_state) == 0 - [tf_operator.process(c.encode(encoding='utf-8')) for c in "abcde"] + [tf_operator.process(c.encode(encoding="utf-8")) for c in "abcde"] # Leave a little bit time for consumer sleep(0.5) @@ -175,12 +183,12 @@ def test_seek_by_offset_normal(): assert len(tf_operator._last_checkpoint_state) == 0 - [tf_operator.process(c.encode(encoding='utf-8')) for c in "abcde"] + [tf_operator.process(c.encode(encoding="utf-8")) for c in "abcde"] # Leave a little bit time for consumer sleep(0.5) - [tf_operator.process(c.encode(encoding='utf-8')) for c in "xyz"] + [tf_operator.process(c.encode(encoding="utf-8")) for c in "xyz"] def test_seek_by_offset_abnormal(): @@ -189,12 +197,12 @@ def test_seek_by_offset_abnormal(): assert len(tf_operator._last_checkpoint_state) == 0 - [tf_operator.process(c.encode(encoding='utf-8')) for c in "abcdefghijk"] + [tf_operator.process(c.encode(encoding="utf-8")) for c in "abcdefghijk"] # Leave a little bit time for consumer sleep(0.5) - [tf_operator.process(c.encode(encoding='utf-8')) for c in "xyz"] + [tf_operator.process(c.encode(encoding="utf-8")) for c in "xyz"] class MockAbortOperator(TFOperator): diff --git a/training/python/streaming/operator/util.py b/training/python/streaming/operator/util.py index fe058710..88a8650b 100644 --- a/training/python/streaming/operator/util.py +++ b/training/python/streaming/operator/util.py @@ -3,6 +3,7 @@ import os import sys from ray.streaming.constants import StreamingConstants + logger = logging.getLogger(StreamingConstants.LOGGER_NAME_DEFAULT) @@ -15,14 +16,18 @@ class OperatorModuleManager: def __init__(self, modules_dir_path, reload_all=False): abs_modules_dir_path = os.path.abspath(modules_dir_path) if not os.path.exists(modules_dir_path) or not os.path.exists( - abs_modules_dir_path): - logger.warning("Cannot find module path '{}', ignore it.".format( - abs_modules_dir_path)) + abs_modules_dir_path + ): + logger.warning( + "Cannot find module path '{}', ignore it.".format(abs_modules_dir_path) + ) return None if not os.path.isdir(abs_modules_dir_path): logger.warning( "Provided path '{}' is not a directory, ignore it.".format( - abs_modules_dir_path)) + abs_modules_dir_path + ) + ) return None logger.info("Adding {} to sys.path".format(abs_modules_dir_path)) if abs_modules_dir_path not in sys.path: @@ -30,8 +35,11 @@ def __init__(self, modules_dir_path, reload_all=False): self.module_dir_path = abs_modules_dir_path self.module_dict = {} if reload_all: - logger.info("Initializing, Loading modules from path {}".format( - self.module_dir_path)) + logger.info( + "Initializing, Loading modules from path {}".format( + self.module_dir_path + ) + ) self.__load_modules_from_path(self.module_dir_path) # Invalidate Python's caches so the new modules can be found logger.info("Initializing, Invalidating cache") @@ -48,13 +56,15 @@ def __load_modules_from_path(self, module_dir_path): # Create a python module import path relative to the absModulePath import_path = os.path.join( dir_path.replace(module_dir_path, "")[1:], - os.path.splitext(file)[0]).replace("/", ".") + os.path.splitext(file)[0], + ).replace("/", ".") cur_module = self.module_dict.get(import_path) if not cur_module: self.add_module(import_path) # If found module but the modified time changed then reload it elif cur_module and cur_module["mtime"] != os.path.getmtime( - self.get_os_path(import_path)): + self.get_os_path(import_path) + ): self.reload_module(import_path) def load_single_module(self, module_name, class_name): @@ -105,14 +115,11 @@ def add_module(self, module_path, keep_cache=False): module_instance = module_class() if keep_cache: self.module_dict[module_path] = { - "ref": - module, - "instance": - module_instance, - "mtime": - os.path.getmtime( - os.path.join( - self.get_os_path(self.module_dir_path, module_path))) + "ref": module, + "instance": module_instance, + "mtime": os.path.getmtime( + os.path.join(self.get_os_path(self.module_dir_path, module_path)) + ), } return module_instance @@ -148,7 +155,8 @@ def reload_module(self, module_path): importlib.reload(module_path) # Update new module time self.module_dict[module_path]["mtime"] = os.path.getmtime( - self.get_os_path(self.module_dir_path, module_path)) + self.get_os_path(self.module_dir_path, module_path) + ) # Invalidate Cache importlib.invalidate_caches() From 8881c4ee7f2fe06637ae120e948b548a03704518 Mon Sep 17 00:00:00 2001 From: ashione Date: Tue, 2 Apr 2024 17:06:38 +0800 Subject: [PATCH 02/10] replace source location --- .github/sources.list | 8 -------- .github/workflows/debian_manylinux_build.yaml | 2 ++ 2 files changed, 2 insertions(+), 8 deletions(-) delete mode 100644 .github/sources.list diff --git a/.github/sources.list b/.github/sources.list deleted file mode 100644 index d168489b..00000000 --- a/.github/sources.list +++ /dev/null @@ -1,8 +0,0 @@ -deb http://mirrors.aliyun.com/debian/ buster main non-free contrib -deb-src http://mirrors.aliyun.com/debian/ buster main non-free contrib -deb http://mirrors.aliyun.com/debian-security buster/updates main -deb-src http://mirrors.aliyun.com/debian-security buster/updates main -deb http://mirrors.aliyun.com/debian/ buster-updates main non-free contrib -deb-src http://mirrors.aliyun.com/debian/ buster-updates main non-free contrib -deb http://mirrors.aliyun.com/debian/ buster-backports main non-free contrib -deb-src http://mirrors.aliyun.com/debian/ buster-backports main non-free contrib diff --git a/.github/workflows/debian_manylinux_build.yaml b/.github/workflows/debian_manylinux_build.yaml index 8b60372e..9724bc99 100644 --- a/.github/workflows/debian_manylinux_build.yaml +++ b/.github/workflows/debian_manylinux_build.yaml @@ -23,6 +23,8 @@ jobs: - name: Apt get update and Install bazel run: | + sed -i s/deb.debian.org/archive.debian.org/g /etc/apt/sources.list + sed -i s/security.debian.org/archive.debian.org/g /etc/apt/sources.list apt-get update apt-get install -yq wget openjdk-8-jdk zlib1g-dev zip git apt-get install -yq gcc g++ From f4a4a2804ee281f61c4dffa8b2eb9147e36a2377 Mon Sep 17 00:00:00 2001 From: ashione Date: Tue, 2 Apr 2024 17:13:05 +0800 Subject: [PATCH 03/10] aliyun source list instead --- .github/aliyun-source.list | 20 +++++++++++++++++++ .github/workflows/debian_manylinux_build.yaml | 3 +-- 2 files changed, 21 insertions(+), 2 deletions(-) create mode 100644 .github/aliyun-source.list diff --git a/.github/aliyun-source.list b/.github/aliyun-source.list new file mode 100644 index 00000000..42f993ea --- /dev/null +++ b/.github/aliyun-source.list @@ -0,0 +1,20 @@ +deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy main restricted +deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy main restricted +deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates main restricted +deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates main restricted +deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy universe +deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy universe +deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates universe +deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates universe +deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy multiverse +deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy multiverse +deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates multiverse +deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates multiverse +deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-backports main restricted universe multiverse +deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-backports main restricted universe multiverse +deb http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security main restricted +deb-src http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security main restricted +deb http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security universe +deb-src http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security universe +deb http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security multiverse +deb-src http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security multiverse diff --git a/.github/workflows/debian_manylinux_build.yaml b/.github/workflows/debian_manylinux_build.yaml index 9724bc99..fd903a7f 100644 --- a/.github/workflows/debian_manylinux_build.yaml +++ b/.github/workflows/debian_manylinux_build.yaml @@ -23,8 +23,7 @@ jobs: - name: Apt get update and Install bazel run: | - sed -i s/deb.debian.org/archive.debian.org/g /etc/apt/sources.list - sed -i s/security.debian.org/archive.debian.org/g /etc/apt/sources.list + cp .github/aliyun-sources.list /etc/apt/sources.list apt-get update apt-get install -yq wget openjdk-8-jdk zlib1g-dev zip git apt-get install -yq gcc g++ From 642815b890c9eac1e44c7619ee5f9103459f7787 Mon Sep 17 00:00:00 2001 From: ashione Date: Tue, 2 Apr 2024 17:21:30 +0800 Subject: [PATCH 04/10] fix typo of source list file name --- .github/workflows/debian_manylinux_build.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/debian_manylinux_build.yaml b/.github/workflows/debian_manylinux_build.yaml index fd903a7f..cf766511 100644 --- a/.github/workflows/debian_manylinux_build.yaml +++ b/.github/workflows/debian_manylinux_build.yaml @@ -23,7 +23,7 @@ jobs: - name: Apt get update and Install bazel run: | - cp .github/aliyun-sources.list /etc/apt/sources.list + cp .github/aliyun-source.list /etc/apt/sources.list apt-get update apt-get install -yq wget openjdk-8-jdk zlib1g-dev zip git apt-get install -yq gcc g++ From b250bcd2da02ca5615d5486bc2c8d1420d8a916c Mon Sep 17 00:00:00 2001 From: ashione Date: Thu, 4 Apr 2024 19:32:20 +0800 Subject: [PATCH 05/10] remove source list replace --- .github/workflows/debian_manylinux_build.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/debian_manylinux_build.yaml b/.github/workflows/debian_manylinux_build.yaml index cf766511..8b60372e 100644 --- a/.github/workflows/debian_manylinux_build.yaml +++ b/.github/workflows/debian_manylinux_build.yaml @@ -23,7 +23,6 @@ jobs: - name: Apt get update and Install bazel run: | - cp .github/aliyun-source.list /etc/apt/sources.list apt-get update apt-get install -yq wget openjdk-8-jdk zlib1g-dev zip git apt-get install -yq gcc g++ From 2b882e07598f4ebc62a712fa8743d9f68696b855 Mon Sep 17 00:00:00 2001 From: ashione Date: Thu, 4 Apr 2024 19:42:22 +0800 Subject: [PATCH 06/10] ubuntu offical mirror --- .github/ubuntu-source.list | 20 +++++++++++++++++++ .github/workflows/debian_manylinux_build.yaml | 1 + 2 files changed, 21 insertions(+) create mode 100644 .github/ubuntu-source.list diff --git a/.github/ubuntu-source.list b/.github/ubuntu-source.list new file mode 100644 index 00000000..42f993ea --- /dev/null +++ b/.github/ubuntu-source.list @@ -0,0 +1,20 @@ +deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy main restricted +deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy main restricted +deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates main restricted +deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates main restricted +deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy universe +deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy universe +deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates universe +deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates universe +deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy multiverse +deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy multiverse +deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates multiverse +deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates multiverse +deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-backports main restricted universe multiverse +deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-backports main restricted universe multiverse +deb http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security main restricted +deb-src http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security main restricted +deb http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security universe +deb-src http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security universe +deb http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security multiverse +deb-src http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security multiverse diff --git a/.github/workflows/debian_manylinux_build.yaml b/.github/workflows/debian_manylinux_build.yaml index 8b60372e..b1aeb51c 100644 --- a/.github/workflows/debian_manylinux_build.yaml +++ b/.github/workflows/debian_manylinux_build.yaml @@ -23,6 +23,7 @@ jobs: - name: Apt get update and Install bazel run: | + #cp .github/ubuntu-source.list /etc/apt/sources.list apt-get update apt-get install -yq wget openjdk-8-jdk zlib1g-dev zip git apt-get install -yq gcc g++ From d58b4225fc4329b8ec1e2fa92141601231ce1711 Mon Sep 17 00:00:00 2001 From: ashione Date: Thu, 4 Apr 2024 19:51:59 +0800 Subject: [PATCH 07/10] remove apt source list --- .github/workflows/debian_manylinux_build.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/debian_manylinux_build.yaml b/.github/workflows/debian_manylinux_build.yaml index b1aeb51c..f3b5ebc0 100644 --- a/.github/workflows/debian_manylinux_build.yaml +++ b/.github/workflows/debian_manylinux_build.yaml @@ -23,7 +23,7 @@ jobs: - name: Apt get update and Install bazel run: | - #cp .github/ubuntu-source.list /etc/apt/sources.list + cp .github/ubuntu-source.list /etc/apt/sources.list apt-get update apt-get install -yq wget openjdk-8-jdk zlib1g-dev zip git apt-get install -yq gcc g++ From 8a60713c3a6860b02b91a1847372582bcaa0f823 Mon Sep 17 00:00:00 2001 From: ashione Date: Thu, 4 Apr 2024 19:54:31 +0800 Subject: [PATCH 08/10] update ubuntu offical deb source list --- .github/ubuntu-source.list | 34 ++++++++++++++-------------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/.github/ubuntu-source.list b/.github/ubuntu-source.list index 42f993ea..182cdd87 100644 --- a/.github/ubuntu-source.list +++ b/.github/ubuntu-source.list @@ -1,20 +1,14 @@ -deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy main restricted -deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy main restricted -deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates main restricted -deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates main restricted -deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy universe -deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy universe -deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates universe -deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates universe -deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy multiverse -deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy multiverse -deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates multiverse -deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-updates multiverse -deb http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-backports main restricted universe multiverse -deb-src http://mirrors.cloud.aliyuncs.com/ubuntu/ jammy-backports main restricted universe multiverse -deb http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security main restricted -deb-src http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security main restricted -deb http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security universe -deb-src http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security universe -deb http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security multiverse -deb-src http://mirrors.cloud.aliyuncs.com/ubuntu jammy-security multiverse +deb http://archive.ubuntu.com/ubuntu/ jammy main restricted universe multiverse +# deb-src http://archive.ubuntu.com/ubuntu/ jammy main restricted universe multiverse + +deb http://archive.ubuntu.com/ubuntu/ jammy-updates main restricted universe multiverse +# deb-src http://archive.ubuntu.com/ubuntu/ jammy-updates main restricted universe multiverse + +deb http://archive.ubuntu.com/ubuntu/ jammy-security main restricted universe multiverse +# deb-src http://archive.ubuntu.com/ubuntu/ jammy-security main restricted universe multiverse + +deb http://archive.ubuntu.com/ubuntu/ jammy-backports main restricted universe multiverse +# deb-src http://archive.ubuntu.com/ubuntu/ jammy-backports main restricted universe multiverse + +deb http://archive.canonical.com/ubuntu/ jammy partner +# deb-src http://archive.canonical.com/ubuntu/ jammy partner \ No newline at end of file From 3fa6f2144c17e5bc42b62cd45602cd646b281d3b Mon Sep 17 00:00:00 2001 From: ashione Date: Thu, 4 Apr 2024 20:03:14 +0800 Subject: [PATCH 09/10] 24.04 for many linux builder --- .github/workflows/debian_manylinux_build.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/debian_manylinux_build.yaml b/.github/workflows/debian_manylinux_build.yaml index f3b5ebc0..1b18a7b0 100644 --- a/.github/workflows/debian_manylinux_build.yaml +++ b/.github/workflows/debian_manylinux_build.yaml @@ -15,7 +15,7 @@ env: jobs: streaming-debian-manylinux-pipeline: timeout-minutes: 120 - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 container: quay.io/pypa/manylinux_2_24_x86_64 steps: @@ -23,7 +23,7 @@ jobs: - name: Apt get update and Install bazel run: | - cp .github/ubuntu-source.list /etc/apt/sources.list + # cp .github/ubuntu-source.list /etc/apt/sources.list apt-get update apt-get install -yq wget openjdk-8-jdk zlib1g-dev zip git apt-get install -yq gcc g++ From 95ce681a15677240d5a8bf4959da8a03fa9f08a2 Mon Sep 17 00:00:00 2001 From: ashione Date: Thu, 4 Apr 2024 20:52:16 +0800 Subject: [PATCH 10/10] lint --- .../debian_manylinux_build.yaml | 4 +- scripts/suppress_output | 1 + .../python/raystreaming/runtime/transfer.py | 96 +++++----- .../python/raystreaming/runtime/worker.py | 180 ++++++++++-------- 4 files changed, 156 insertions(+), 125 deletions(-) rename .github/{workflows => release_workflows}/debian_manylinux_build.yaml (90%) diff --git a/.github/workflows/debian_manylinux_build.yaml b/.github/release_workflows/debian_manylinux_build.yaml similarity index 90% rename from .github/workflows/debian_manylinux_build.yaml rename to .github/release_workflows/debian_manylinux_build.yaml index 1b18a7b0..f3b5ebc0 100644 --- a/.github/workflows/debian_manylinux_build.yaml +++ b/.github/release_workflows/debian_manylinux_build.yaml @@ -15,7 +15,7 @@ env: jobs: streaming-debian-manylinux-pipeline: timeout-minutes: 120 - runs-on: ubuntu-24.04 + runs-on: ubuntu-22.04 container: quay.io/pypa/manylinux_2_24_x86_64 steps: @@ -23,7 +23,7 @@ jobs: - name: Apt get update and Install bazel run: | - # cp .github/ubuntu-source.list /etc/apt/sources.list + cp .github/ubuntu-source.list /etc/apt/sources.list apt-get update apt-get install -yq wget openjdk-8-jdk zlib1g-dev zip git apt-get install -yq gcc g++ diff --git a/scripts/suppress_output b/scripts/suppress_output index 19deba61..faec51a2 100755 --- a/scripts/suppress_output +++ b/scripts/suppress_output @@ -1,5 +1,6 @@ #!/bin/bash # Run a command, suppressing output unless it hangs or crashes. +CURRENT_DIR=$(dirname "${BASH_SOURCE:-$0}") TMPFILE="$(mktemp)" PID=$$ diff --git a/streaming/python/raystreaming/runtime/transfer.py b/streaming/python/raystreaming/runtime/transfer.py index f81d4686..70b6bdd7 100644 --- a/streaming/python/raystreaming/runtime/transfer.py +++ b/streaming/python/raystreaming/runtime/transfer.py @@ -30,8 +30,7 @@ def __init__(self, channel_id_str: str): channel_id_str: string representation of channel id """ self.channel_id_str = channel_id_str - self.object_qid = ray.ObjectRef( - channel_id_str_to_bytes(channel_id_str)) + self.object_qid = ray.ObjectRef(channel_id_str_to_bytes(channel_id_str)) def __eq__(self, other): if other is None: @@ -100,7 +99,6 @@ def channel_bytes_to_str(id_bytes): class Message(ABC): - @property @abstractmethod def body(self): @@ -131,12 +129,7 @@ class DataMessage(Message): DataMessage represents data between upstream and downstream operator. """ - def __init__(self, - body, - timestamp, - message_id, - channel_id, - is_empty_message=False): + def __init__(self, body, timestamp, message_id, channel_id, is_empty_message=False): self.__body = body self.__timestamp = timestamp self.__channel_id = channel_id @@ -226,25 +219,29 @@ class ChannelCreationParametersBuilder: """ _java_reader_async_function_descriptor = JavaFunctionDescriptor( - "io.ray.streaming.runtime.worker.JobWorker", "onReaderMessage", - "([B)V") + "io.ray.streaming.runtime.worker.JobWorker", "onReaderMessage", "([B)V" + ) _java_reader_sync_function_descriptor = JavaFunctionDescriptor( - "io.ray.streaming.runtime.worker.JobWorker", "onReaderMessageSync", - "([B)[B") + "io.ray.streaming.runtime.worker.JobWorker", "onReaderMessageSync", "([B)[B" + ) _java_writer_async_function_descriptor = JavaFunctionDescriptor( - "io.ray.streaming.runtime.worker.JobWorker", "onWriterMessage", - "([B)V") + "io.ray.streaming.runtime.worker.JobWorker", "onWriterMessage", "([B)V" + ) _java_writer_sync_function_descriptor = JavaFunctionDescriptor( - "io.ray.streaming.runtime.worker.JobWorker", "onWriterMessageSync", - "([B)[B") + "io.ray.streaming.runtime.worker.JobWorker", "onWriterMessageSync", "([B)[B" + ) _python_reader_async_function_descriptor = PythonFunctionDescriptor( - "raystreaming.runtime.worker", "on_reader_message", "JobWorker") + "raystreaming.runtime.worker", "on_reader_message", "JobWorker" + ) _python_reader_sync_function_descriptor = PythonFunctionDescriptor( - "raystreaming.runtime.worker", "on_reader_message_sync", "JobWorker") + "raystreaming.runtime.worker", "on_reader_message_sync", "JobWorker" + ) _python_writer_async_function_descriptor = PythonFunctionDescriptor( - "raystreaming.runtime.worker", "on_writer_message", "JobWorker") + "raystreaming.runtime.worker", "on_writer_message", "JobWorker" + ) _python_writer_sync_function_descriptor = PythonFunctionDescriptor( - "raystreaming.runtime.worker", "on_writer_message_sync", "JobWorker") + "raystreaming.runtime.worker", "on_writer_message_sync", "JobWorker" + ) def get_parameters(self): return self._parameters @@ -272,32 +269,39 @@ def build_output_queue_parameters(self, to_actors): ) return self - def build_parameters(self, actors, java_async_func, java_sync_func, - py_async_func, py_sync_func): + def build_parameters( + self, actors, java_async_func, java_sync_func, py_async_func, py_sync_func + ): for handle in actors: parameter = None if handle._ray_actor_language == Language.PYTHON: parameter = _streaming.ChannelCreationParameter( - handle._ray_actor_id, py_async_func, py_sync_func) + handle._ray_actor_id, py_async_func, py_sync_func + ) else: parameter = _streaming.ChannelCreationParameter( - handle._ray_actor_id, java_async_func, java_sync_func) + handle._ray_actor_id, java_async_func, java_sync_func + ) self._parameters.append(parameter) return self @staticmethod def set_python_writer_function_descriptor(async_function, sync_function): ChannelCreationParametersBuilder._python_writer_async_function_descriptor = ( - async_function) + async_function + ) ChannelCreationParametersBuilder._python_writer_sync_function_descriptor = ( - sync_function) + sync_function + ) @staticmethod def set_python_reader_function_descriptor(async_function, sync_function): ChannelCreationParametersBuilder._python_reader_async_function_descriptor = ( - async_function) + async_function + ) ChannelCreationParametersBuilder._python_reader_sync_function_descriptor = ( - sync_function) + sync_function + ) class DataWriter: @@ -305,8 +309,7 @@ class DataWriter: to downstream workers """ - def __init__(self, output_channels, to_actors: List[ActorHandle], - conf: dict): + def __init__(self, output_channels, to_actors: List[ActorHandle], conf: dict): """Get DataWriter of output channels Args: output_channels: output channels ids @@ -320,8 +323,7 @@ def __init__(self, output_channels, to_actors: List[ActorHandle], ] creation_parameters = ChannelCreationParametersBuilder() creation_parameters.build_output_queue_parameters(to_actors) - channel_size = conf.get(Config.CHANNEL_SIZE, - Config.CHANNEL_SIZE_DEFAULT) + channel_size = conf.get(Config.CHANNEL_SIZE, Config.CHANNEL_SIZE_DEFAULT) py_msg_ids = [0 for _ in range(len(output_channels))] config_bytes = _to_native_conf(conf) is_mock = conf[Config.CHANNEL_TYPE] == Config.MEMORY_CHANNEL @@ -365,8 +367,8 @@ def get_output_checkpoints(self) -> List[int]: def clear_checkpoint(self, checkpoint_id): logger.info( - "producer start to clear checkpoint, checkpoint_id={}".format( - checkpoint_id)) + "producer start to clear checkpoint, checkpoint_id={}".format(checkpoint_id) + ) self.writer.clear_checkpoint(checkpoint_id) def stop(self): @@ -384,8 +386,9 @@ class DataReader: from channels of upstream workers """ - def __init__(self, input_channels: List, from_actors: List[ActorHandle], - conf: dict): + def __init__( + self, input_channels: List, from_actors: List[ActorHandle], conf: dict + ): """Get DataReader of input channels Args: input_channels: input channels @@ -416,8 +419,11 @@ def __init__(self, input_channels: List, from_actors: List[ActorHandle], self.__creation_status = {} for q, status in queues_creation_status.items(): self.__creation_status[q] = ChannelCreationStatus(status) - logger.info("create DataReader succeed, creation_status={}".format( - self.__creation_status)) + logger.info( + "create DataReader succeed, creation_status={}".format( + self.__creation_status + ) + ) def read(self, timeout_millis): """Read data from channel @@ -459,11 +465,11 @@ def _to_native_conf(conf): config.op_name = conf[Config.STREAMING_OP_NAME] # TODO set operator type if Config.STREAMING_RING_BUFFER_CAPACITY in conf: - config.ring_buffer_capacity = int( - conf[Config.STREAMING_RING_BUFFER_CAPACITY]) + config.ring_buffer_capacity = int(conf[Config.STREAMING_RING_BUFFER_CAPACITY]) if Config.STREAMING_EMPTY_MESSAGE_INTERVAL in conf: config.empty_message_interval = int( - conf[Config.STREAMING_EMPTY_MESSAGE_INTERVAL]) + conf[Config.STREAMING_EMPTY_MESSAGE_INTERVAL] + ) if Config.FLOW_CONTROL_TYPE in conf: config.flow_control_type = int(conf[Config.FLOW_CONTROL_TYPE]) if Config.WRITER_CONSUMED_STEP in conf: @@ -475,20 +481,17 @@ def _to_native_conf(conf): class ChannelInitException(Exception): - def __init__(self, msg, abnormal_channels): self.abnormal_channels = abnormal_channels self.msg = msg class ChannelInterruptException(Exception): - def __init__(self, msg=None): self.msg = msg class ChannelRecoverInfo: - def __init__(self, queue_creation_status_map=None): if queue_creation_status_map is None: queue_creation_status_map = {} @@ -505,8 +508,7 @@ def get_data_lost_queues(self): return data_lost_queues def __str__(self): - return "QueueRecoverInfo [dataLostQueues=%s]" % ( - self.get_data_lost_queues()) + return "QueueRecoverInfo [dataLostQueues=%s]" % (self.get_data_lost_queues()) class ChannelCreationStatus(Enum): diff --git a/streaming/python/raystreaming/runtime/worker.py b/streaming/python/raystreaming/runtime/worker.py index ddf5702b..6aa68b63 100644 --- a/streaming/python/raystreaming/runtime/worker.py +++ b/streaming/python/raystreaming/runtime/worker.py @@ -37,8 +37,7 @@ class JobWorker(object): def __init__(self, execution_vertex_pb_bytes): logger.info("Creating job worker, pid={}".format(os.getpid())) - execution_vertex_pb = remote_call_pb2.ExecutionVertexContext.ExecutionVertex( - ) + execution_vertex_pb = remote_call_pb2.ExecutionVertexContext.ExecutionVertex() execution_vertex_pb.ParseFromString(execution_vertex_pb_bytes) self.execution_vertex = ExecutionVertex(execution_vertex_pb) self.config = self.execution_vertex.config @@ -48,8 +47,7 @@ def __init__(self, execution_vertex_pb_bytes): self.task = None self.stream_processor = None self.master_actor = None - self.context_backend = ContextBackendFactory.get_context_backend( - self.config) + self.context_backend = ContextBackendFactory.get_context_backend(self.config) self.initial_state_lock = threading.Lock() self.__rollback_cnt: int = 0 self.__is_recreate: bool = False @@ -60,34 +58,38 @@ def __init__(self, execution_vertex_pb_bytes): try: # load checkpoint was_reconstructed = ( - ray.get_runtime_context().was_current_actor_reconstructed) + ray.get_runtime_context().was_current_actor_reconstructed + ) - logger.info( - "Worker was reconstructed: {}".format(was_reconstructed)) + logger.info("Worker was reconstructed: {}".format(was_reconstructed)) if was_reconstructed: job_worker_context_key = self.__get_job_worker_context_key() - logger.info("Worker get checkpoint state by key: {}.".format( - job_worker_context_key)) - context_bytes = self.context_backend.get( - job_worker_context_key) + logger.info( + "Worker get checkpoint state by key: {}.".format( + job_worker_context_key + ) + ) + context_bytes = self.context_backend.get(job_worker_context_key) if context_bytes is not None and context_bytes.__len__() > 0: self.init(context_bytes) - self.request_rollback( - "Python worker recover from checkpoint.") + self.request_rollback("Python worker recover from checkpoint.") else: logger.error( "Error! Worker get checkpoint state by key {}" " returns None, please check your state backend" - ", only reliable state backend supports fail-over.". - format(job_worker_context_key)) + ", only reliable state backend supports fail-over.".format( + job_worker_context_key + ) + ) except Exception: logger.exception("Error in __init__ of JobWorker") - logger.info("Creating job worker succeeded. worker config {}".format( - self.config)) + logger.info( + "Creating job worker succeeded. worker config {}".format(self.config) + ) def __reduce__(self): deserializer = JobWorker - return deserializer, (b'', ) + return deserializer, (b"",) def init(self, worker_context_bytes): logger.info("Start to init job worker") @@ -97,30 +99,33 @@ def init(self, worker_context_bytes): worker_context.ParseFromString(worker_context_bytes) self.worker_context = worker_context self.master_actor = ActorHandle._deserialization_helper( - worker_context.master_actor) + worker_context.master_actor + ) # build vertex context from pb self.execution_vertex_context = ExecutionVertexContext( - worker_context.execution_vertex_context) + worker_context.execution_vertex_context + ) self.execution_vertex = self.execution_vertex_context.execution_vertex # save context job_worker_context_key = self.__get_job_worker_context_key() - self.context_backend.put(job_worker_context_key, - worker_context_bytes) + self.context_backend.put(job_worker_context_key, worker_context_bytes) # use vertex id as task id self.task_id = self.execution_vertex_context.get_task_id() # build and get processor from operator operator = self.execution_vertex_context.stream_operator self.stream_processor = processor.build_processor(operator) - logger.info("Initializing job worker, exe_vertex_name={}," - "task_id: {}, operator: {}, pid={}".format( - self.execution_vertex_context.exe_vertex_name, - self.task_id, - self.stream_processor, - os.getpid(), - )) + logger.info( + "Initializing job worker, exe_vertex_name={}," + "task_id: {}, operator: {}, pid={}".format( + self.execution_vertex_context.exe_vertex_name, + self.task_id, + self.stream_processor, + os.getpid(), + ) + ) # get config from vertex self.config = self.execution_vertex_context.config @@ -137,14 +142,17 @@ def init(self, worker_context_bytes): def create_stream_task(self, checkpoint_id): if isinstance(self.stream_processor, processor.SourceProcessor): - return SourceStreamTask(self.task_id, self.stream_processor, self, - checkpoint_id) + return SourceStreamTask( + self.task_id, self.stream_processor, self, checkpoint_id + ) elif isinstance(self.stream_processor, processor.OneInputProcessor): - return OneInputStreamTask(self.task_id, self.stream_processor, - self, checkpoint_id) + return OneInputStreamTask( + self.task_id, self.stream_processor, self, checkpoint_id + ) else: - raise Exception("Unsupported processor type: " + - str(type(self.stream_processor))) + raise Exception( + "Unsupported processor type: " + str(type(self.stream_processor)) + ) def rollback(self, checkpoint_id_bytes): checkpoint_id_pb = remote_call_pb2.CheckpointId() @@ -159,15 +167,18 @@ def rollback(self, checkpoint_id_bytes): # skip useless rollback self.initial_state_lock.acquire() try: - if (self.task is not None and self.task.thread.is_alive() - and checkpoint_id == self.task.last_checkpoint_id - and self.task.is_initial_state): - logger.info( - "Task is already in initial state, skip this rollback.") + if ( + self.task is not None + and self.task.thread.is_alive() + and checkpoint_id == self.task.last_checkpoint_id + and self.task.is_initial_state + ): + logger.info("Task is already in initial state, skip this rollback.") return self.__gen_call_result( CallResult.skipped( "Task is already in initial state, skip this rollback." - )) + ) + ) finally: self.initial_state_lock.release() @@ -186,8 +197,10 @@ def rollback(self, checkpoint_id_bytes): self.__need_rollback = False logger.info( - "Rollback success, checkpoint is {}, qRecoverInfo is {}.". - format(checkpoint_id, q_recover_info)) + "Rollback success, checkpoint is {}, qRecoverInfo is {}.".format( + checkpoint_id, q_recover_info + ) + ) return self.__gen_call_result(CallResult.success(q_recover_info)) except Exception: @@ -250,22 +263,24 @@ def commit(self, barrier_bytes): ret.boolRes = True return ret.SerializeToString() - def clear_expired_cp(self, state_checkpoint_id_bytes, - queue_checkpoint_id_bytes): - state_checkpoint_id = self.__parse_to_checkpoint_id( - state_checkpoint_id_bytes) - queue_checkpoint_id = self.__parse_to_checkpoint_id( - queue_checkpoint_id_bytes) - logger.info("Start to clear expired checkpoint, checkpoint_id={}," - "queue_checkpoint_id={}, exe_vertex_name={}.".format( - state_checkpoint_id, - queue_checkpoint_id, - self.execution_vertex_context.exe_vertex_name, - )) + def clear_expired_cp(self, state_checkpoint_id_bytes, queue_checkpoint_id_bytes): + state_checkpoint_id = self.__parse_to_checkpoint_id(state_checkpoint_id_bytes) + queue_checkpoint_id = self.__parse_to_checkpoint_id(queue_checkpoint_id_bytes) + logger.info( + "Start to clear expired checkpoint, checkpoint_id={}," + "queue_checkpoint_id={}, exe_vertex_name={}.".format( + state_checkpoint_id, + queue_checkpoint_id, + self.execution_vertex_context.exe_vertex_name, + ) + ) ret = remote_call_pb2.BoolResult() - ret.boolRes = (self.__clear_expired_cp_state(state_checkpoint_id) - if state_checkpoint_id > 0 else True) + ret.boolRes = ( + self.__clear_expired_cp_state(state_checkpoint_id) + if state_checkpoint_id > 0 + else True + ) ret.boolRes &= self.__clear_expired_queue_msg(queue_checkpoint_id) logger.info( "Clear expired checkpoint done, result={}, checkpoint_id={}," @@ -274,17 +289,21 @@ def clear_expired_cp(self, state_checkpoint_id_bytes, state_checkpoint_id, queue_checkpoint_id, self.execution_vertex_context.exe_vertex_name, - )) + ) + ) return ret.SerializeToString() def __clear_expired_cp_state(self, checkpoint_id: int): if self.__need_rollback: - logger.warning("Need rollback, skip clear_expired_cp_state" - ", checkpoint id: {}".format(checkpoint_id)) + logger.warning( + "Need rollback, skip clear_expired_cp_state" + ", checkpoint id: {}".format(checkpoint_id) + ) return False - logger.info("Clear expired checkpoint state, cp id is {}.".format( - checkpoint_id)) + logger.info( + "Clear expired checkpoint state, cp id is {}.".format(checkpoint_id) + ) if self.task is not None: self.task.clear_expired_cp_state(checkpoint_id) @@ -292,12 +311,15 @@ def __clear_expired_cp_state(self, checkpoint_id: int): def __clear_expired_queue_msg(self, checkpoint_id): if self.__need_rollback: - logger.warning("Need rollback, skip clear_expired_queue_msg" - ", checkpoint id: {}".format(checkpoint_id)) + logger.warning( + "Need rollback, skip clear_expired_queue_msg" + ", checkpoint id: {}".format(checkpoint_id) + ) return False - logger.info("Clear expired queue msg, checkpoint_id is {}.".format( - checkpoint_id)) + logger.info( + "Clear expired queue msg, checkpoint_id is {}.".format(checkpoint_id) + ) if self.task is not None: self.task.clear_expired_queue_msg(checkpoint_id) @@ -327,18 +349,17 @@ def request_rollback(self, exception_msg="Python exception."): self.master_actor, WorkerRollbackRequest( self.execution_vertex_context.actor_id.binary(), - "Exception msg=%s, retry time=%d." % - (exception_msg, i), + "Exception msg=%s, retry time=%d." % (exception_msg, i), ), ) except Exception: logger.exception("Unexpected error when rollback") - logger.info("request rollback {} time, ret={}".format( - i, request_ret)) + logger.info("request rollback {} time, ret={}".format(i, request_ret)) if not request_ret: logger.warning( "Request rollback return false" - ", maybe it's invalid request, try to sleep 1s.") + ", maybe it's invalid request, try to sleep 1s." + ) time.sleep(1) else: break @@ -346,7 +367,9 @@ def request_rollback(self, exception_msg="Python exception."): logger.warning( "Request failed after retry {} times," "now worker shutdown without reconstruction.".format( - Config.REQUEST_ROLLBACK_RETRY_TIMES)) + Config.REQUEST_ROLLBACK_RETRY_TIMES + ) + ) self.shutdown_without_reconstruction() self.__state.set_type(StateType.WAIT_ROLLBACK) @@ -362,14 +385,19 @@ def __gen_call_result(self, call_result): if call_result.result_obj is not None: q_recover_info = call_result.result_obj for q, status in q_recover_info.get_creation_status().items(): - call_result_pb.result_obj.creation_status[channel_bytes_to_str( - q)] = status.value + call_result_pb.result_obj.creation_status[ + channel_bytes_to_str(q) + ] = status.value return call_result_pb.SerializeToString() def _gen_unique_key(self, key_prefix): - return (key_prefix + str(self.config.get(Config.STREAMING_JOB_NAME)) + - "_" + str(self.execution_vertex.execution_vertex_id)) + return ( + key_prefix + + str(self.config.get(Config.STREAMING_JOB_NAME)) + + "_" + + str(self.execution_vertex.execution_vertex_id) + ) def __get_job_worker_context_key(self) -> str: return self._gen_unique_key(Config.JOB_WORKER_CONTEXT_KEY)