Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow an integer parameter for 'randomize_start' in executor/base.py #199

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ Some options common to all executors:
- `pipeline` a list consisting of the pipeline steps that should be run
- `logging_dir` a datafolder where log files, statistics and more should be saved. Do not reuse folders for different pipelines/jobs as this will overwrite your stats, logs and completions.
- `skip_completed` (_bool_, `True` by default) datatrove keeps track of completed tasks so that when you relaunch a job they can be skipped. Set this to `False` to disable this behaviour
- `randomize_start` (_bool_, `False` by default) randomizes the start time of each task within a job by approximately 3 minutes to prevent all tasks from starting simultaneously and potentially overloading the system.
- `randomize_start_duration` (_int_, `0` by default) the maximum number of seconds to delay the start of each task to prevent all tasks from starting simultaneously and potentially overloading the system.

Call an executor's `run` method to execute its pipeline.

Expand Down
10 changes: 5 additions & 5 deletions src/datatrove/executor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class PipelineExecutor(ABC):
logging_dir: where to save logs, stats, etc. Should be parsable into a datatrove.io.DataFolder
skip_completed: whether to skip tasks that were completed in
previous runs. default: True
randomize_start: randomize the start of each task in a job in a ~3 min window
randomize_start_duration: the maximum number of seconds to delay the start of each task.
"""

@abstractmethod
Expand All @@ -38,12 +38,12 @@ def __init__(
pipeline: list[PipelineStep | Callable],
logging_dir: DataFolderLike = None,
skip_completed: bool = True,
randomize_start: bool = False,
randomize_start_duration: int = 0,
):
self.pipeline: list[PipelineStep | Callable] = pipeline
self.logging_dir = get_datafolder(logging_dir if logging_dir else f"logs/{get_timestamp()}_{get_random_str()}")
self.skip_completed = skip_completed
self.randomize_start = randomize_start
self.randomize_start_duration = randomize_start_duration

@abstractmethod
def run(self):
Expand Down Expand Up @@ -80,8 +80,8 @@ def _run_for_rank(self, rank: int, local_rank: int = 0) -> PipelineStats:
logfile = add_task_logger(self.logging_dir, rank, local_rank)
log_pipeline(self.pipeline)

if self.randomize_start:
time.sleep(random.randint(0, 60 * 3))
if self.randomize_start_duration > 0:
time.sleep(random.randint(0, self.randomize_start_duration))
try:
# pipe data from one step to the next
pipelined_data = None
Expand Down
6 changes: 3 additions & 3 deletions src/datatrove/executor/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class LocalPipelineExecutor(PipelineExecutor):
Tasks [local_rank_offset, local_rank_offset + local_tasks] will be run.
depends: another LocalPipelineExecutor that should run
before this one
randomize_start: randomize the start of each task in a job in a ~3 min window
randomize_start_duration: the maximum number of seconds to delay the start of each task.
"""

def __init__(
Expand All @@ -44,9 +44,9 @@ def __init__(
start_method: str = "forkserver",
local_tasks: int = -1,
local_rank_offset: int = 0,
randomize_start: bool = False,
randomize_start_duration: int = 0,
):
super().__init__(pipeline, logging_dir, skip_completed, randomize_start)
super().__init__(pipeline, logging_dir, skip_completed, randomize_start_duration)
self.tasks = tasks
self.workers = workers if workers != -1 else tasks
self.start_method = start_method
Expand Down
8 changes: 4 additions & 4 deletions src/datatrove/executor/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class SlurmPipelineExecutor(PipelineExecutor):
stagger_max_array_jobs: when max_array_launch_parallel is True, this determines how many seconds to wait
between launching each of the parallel jobs
run_on_dependency_fail: start executing when a job we depend on finishes even if it has failed
randomize_start: randomize the start of each task in a job in a ~3 min window
randomize_start_duration: the maximum number of seconds to delay the start of each task.
requeue_signals: requeue the job and exit when one of these signals is received. Useful for when an instance
is being reclaimed and jobs must be stopped for example. Set to None to disable
mail_type: see https://slurm.schedmd.com/sbatch.html. Common values are (NONE, BEGIN, END, FAIL, REQUEUE, ALL)
Expand Down Expand Up @@ -105,15 +105,15 @@ def __init__(
max_array_launch_parallel: bool = False,
stagger_max_array_jobs: int = 0,
run_on_dependency_fail: bool = False,
randomize_start: bool = False,
randomize_start_duration: int = 0,
requeue_signals: tuple[str] | None = ("SIGUSR1",),
mail_type: str = "ALL",
mail_user: str = None,
requeue: bool = True,
srun_args: dict = None,
tasks_per_job: int = 1,
):
super().__init__(pipeline, logging_dir, skip_completed, randomize_start)
super().__init__(pipeline, logging_dir, skip_completed, randomize_start_duration)
self.tasks = tasks
self.workers = workers
self.partition = partition
Expand All @@ -133,7 +133,7 @@ def __init__(
self.max_array_launch_parallel = max_array_launch_parallel
self.stagger_max_array_jobs = stagger_max_array_jobs
self.run_on_dependency_fail = run_on_dependency_fail
self.randomize_start = randomize_start
self.randomize_start_duration = randomize_start_duration
self.job_id = None
self.requeue_signals = requeue_signals
self.mail_type = mail_type
Expand Down
Loading