From 6cb8d3638e7f9347fc6dd743d9e0b2c0f04f865c Mon Sep 17 00:00:00 2001 From: justhungryman Date: Fri, 24 May 2024 16:38:07 +0900 Subject: [PATCH 1/3] Add ramdomize_start feature in local executor --- src/datatrove/executor/local.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/datatrove/executor/local.py b/src/datatrove/executor/local.py index 9fb756d1..797aa15b 100644 --- a/src/datatrove/executor/local.py +++ b/src/datatrove/executor/local.py @@ -1,3 +1,4 @@ +import random import time from copy import deepcopy from functools import partial @@ -30,6 +31,7 @@ class LocalPipelineExecutor(PipelineExecutor): Tasks [local_rank_offset, local_rank_offset + local_tasks] will be run. depends: another LocalPipelineExecutor that should run before this one + randomize_start: randomize the start of each task in a job in a ~3 min window """ def __init__( @@ -43,6 +45,7 @@ def __init__( start_method: str = "forkserver", local_tasks: int = -1, local_rank_offset: int = 0, + randomize_start: bool = False, ): super().__init__(pipeline, logging_dir, skip_completed) self.tasks = tasks @@ -51,6 +54,7 @@ def __init__( self.local_tasks = local_tasks if local_tasks != -1 else tasks self.local_rank_offset = local_rank_offset self.depends = depends + self.randomize_start = randomize_start if self.local_rank_offset + self.local_tasks > self.tasks: raise ValueError( f"Local tasks go beyond the total tasks (local_rank_offset + local_tasks = {self.local_rank_offset + self.local_tasks} > {self.tasks} = tasks)" @@ -71,6 +75,8 @@ def _launch_run_for_rank(self, rank: int, ranks_q, completed=None, completed_loc """ local_rank = ranks_q.get() try: + if self.randomize_start: + time.sleep(random.randint(0, 60 * 3)) return self._run_for_rank(rank, local_rank) finally: if completed and completed_lock: From 95ed0c538fa995ac41ee0884826bca3c51c1559e Mon Sep 17 00:00:00 2001 From: justhungryman Date: Fri, 24 May 2024 18:27:40 +0900 Subject: [PATCH 2/3] move randomize_start to global class in base, PipelineExecutor --- src/datatrove/executor/base.py | 7 +++++++ src/datatrove/executor/local.py | 6 +----- src/datatrove/executor/slurm.py | 5 +---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/datatrove/executor/base.py b/src/datatrove/executor/base.py index d9b08dbf..1ca79d9b 100644 --- a/src/datatrove/executor/base.py +++ b/src/datatrove/executor/base.py @@ -1,5 +1,7 @@ import dataclasses import json +import random +import time from abc import ABC, abstractmethod from collections import deque from collections.abc import Sequence @@ -35,10 +37,12 @@ def __init__( pipeline: list[PipelineStep | Callable], logging_dir: DataFolderLike = None, skip_completed: bool = True, + randomize_start: bool = False, ): self.pipeline: list[PipelineStep | Callable] = pipeline self.logging_dir = get_datafolder(logging_dir if logging_dir else f"logs/{get_timestamp()}_{get_random_str()}") self.skip_completed = skip_completed + self.randomize_start = randomize_start @abstractmethod def run(self): @@ -74,6 +78,9 @@ def _run_for_rank(self, rank: int, local_rank: int = 0) -> PipelineStats: return PipelineStats() logfile = add_task_logger(self.logging_dir, rank, local_rank) log_pipeline(self.pipeline) + + if self.randomize_start: + time.sleep(random.randint(0, 60 * 3)) try: # pipe data from one step to the next pipelined_data = None diff --git a/src/datatrove/executor/local.py b/src/datatrove/executor/local.py index 797aa15b..4b0e1380 100644 --- a/src/datatrove/executor/local.py +++ b/src/datatrove/executor/local.py @@ -1,4 +1,3 @@ -import random import time from copy import deepcopy from functools import partial @@ -47,14 +46,13 @@ def __init__( local_rank_offset: int = 0, randomize_start: bool = False, ): - super().__init__(pipeline, logging_dir, skip_completed) + super().__init__(pipeline, logging_dir, skip_completed, randomize_start) self.tasks = tasks self.workers = workers if workers != -1 else tasks self.start_method = start_method self.local_tasks = local_tasks if local_tasks != -1 else tasks self.local_rank_offset = local_rank_offset self.depends = depends - self.randomize_start = randomize_start if self.local_rank_offset + self.local_tasks > self.tasks: raise ValueError( f"Local tasks go beyond the total tasks (local_rank_offset + local_tasks = {self.local_rank_offset + self.local_tasks} > {self.tasks} = tasks)" @@ -75,8 +73,6 @@ def _launch_run_for_rank(self, rank: int, ranks_q, completed=None, completed_loc """ local_rank = ranks_q.get() try: - if self.randomize_start: - time.sleep(random.randint(0, 60 * 3)) return self._run_for_rank(rank, local_rank) finally: if completed and completed_lock: diff --git a/src/datatrove/executor/slurm.py b/src/datatrove/executor/slurm.py index 8e7efd14..2f21aeb5 100644 --- a/src/datatrove/executor/slurm.py +++ b/src/datatrove/executor/slurm.py @@ -3,7 +3,6 @@ import json import math import os -import random import signal import subprocess import sys @@ -114,7 +113,7 @@ def __init__( srun_args: dict = None, tasks_per_job: int = 1, ): - super().__init__(pipeline, logging_dir, skip_completed) + super().__init__(pipeline, logging_dir, skip_completed, randomize_start) self.tasks = tasks self.workers = workers self.partition = partition @@ -178,8 +177,6 @@ def run(self): break rank = all_ranks[rank_to_run] - if self.randomize_start: - time.sleep(random.randint(0, 60 * 3)) self._run_for_rank(rank) else: # we still have to launch the job From d78d4acf51da52f8bded9df2a326af0dc6ea9805 Mon Sep 17 00:00:00 2001 From: justhungryman Date: Fri, 24 May 2024 18:58:24 +0900 Subject: [PATCH 3/3] Update randomize_start description in README.md and executor/base.py --- README.md | 1 + src/datatrove/executor/base.py | 1 + 2 files changed, 2 insertions(+) diff --git a/README.md b/README.md index f2626e7c..52dce5fe 100644 --- a/README.md +++ b/README.md @@ -96,6 +96,7 @@ Some options common to all executors: - `pipeline` a list consisting of the pipeline steps that should be run - `logging_dir` a datafolder where log files, statistics and more should be saved. Do not reuse folders for different pipelines/jobs as this will overwrite your stats, logs and completions. - `skip_completed` (_bool_, `True` by default) datatrove keeps track of completed tasks so that when you relaunch a job they can be skipped. Set this to `False` to disable this behaviour +- `randomize_start` (_bool_, `False` by default) randomizes the start time of each task within a job by approximately 3 minutes to prevent all tasks from starting simultaneously and potentially overloading the system. Call an executor's `run` method to execute its pipeline. diff --git a/src/datatrove/executor/base.py b/src/datatrove/executor/base.py index 1ca79d9b..7acc4bb5 100644 --- a/src/datatrove/executor/base.py +++ b/src/datatrove/executor/base.py @@ -29,6 +29,7 @@ class PipelineExecutor(ABC): logging_dir: where to save logs, stats, etc. Should be parsable into a datatrove.io.DataFolder skip_completed: whether to skip tasks that were completed in previous runs. default: True + randomize_start: randomize the start of each task in a job in a ~3 min window """ @abstractmethod