Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add description for randomize_start #194

Merged
merged 3 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ Some options common to all executors:
- `pipeline` a list consisting of the pipeline steps that should be run
- `logging_dir` a datafolder where log files, statistics and more should be saved. Do not reuse folders for different pipelines/jobs as this will overwrite your stats, logs and completions.
- `skip_completed` (_bool_, `True` by default) datatrove keeps track of completed tasks so that when you relaunch a job they can be skipped. Set this to `False` to disable this behaviour
- `randomize_start` (_bool_, `False` by default) randomizes the start time of each task within a job by approximately 3 minutes to prevent all tasks from starting simultaneously and potentially overloading the system.

Call an executor's `run` method to execute its pipeline.

Expand Down
8 changes: 8 additions & 0 deletions src/datatrove/executor/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import dataclasses
import json
import random
import time
from abc import ABC, abstractmethod
from collections import deque
from collections.abc import Sequence
Expand Down Expand Up @@ -27,6 +29,7 @@ class PipelineExecutor(ABC):
logging_dir: where to save logs, stats, etc. Should be parsable into a datatrove.io.DataFolder
skip_completed: whether to skip tasks that were completed in
previous runs. default: True
randomize_start: randomize the start of each task in a job in a ~3 min window
"""

@abstractmethod
Expand All @@ -35,10 +38,12 @@ def __init__(
pipeline: list[PipelineStep | Callable],
logging_dir: DataFolderLike = None,
skip_completed: bool = True,
randomize_start: bool = False,
):
self.pipeline: list[PipelineStep | Callable] = pipeline
self.logging_dir = get_datafolder(logging_dir if logging_dir else f"logs/{get_timestamp()}_{get_random_str()}")
self.skip_completed = skip_completed
self.randomize_start = randomize_start

@abstractmethod
def run(self):
Expand Down Expand Up @@ -74,6 +79,9 @@ def _run_for_rank(self, rank: int, local_rank: int = 0) -> PipelineStats:
return PipelineStats()
logfile = add_task_logger(self.logging_dir, rank, local_rank)
log_pipeline(self.pipeline)

if self.randomize_start:
time.sleep(random.randint(0, 60 * 3))
try:
# pipe data from one step to the next
pipelined_data = None
Expand Down
4 changes: 3 additions & 1 deletion src/datatrove/executor/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class LocalPipelineExecutor(PipelineExecutor):
Tasks [local_rank_offset, local_rank_offset + local_tasks] will be run.
depends: another LocalPipelineExecutor that should run
before this one
randomize_start: randomize the start of each task in a job in a ~3 min window
"""

def __init__(
Expand All @@ -43,8 +44,9 @@ def __init__(
start_method: str = "forkserver",
local_tasks: int = -1,
local_rank_offset: int = 0,
randomize_start: bool = False,
):
super().__init__(pipeline, logging_dir, skip_completed)
super().__init__(pipeline, logging_dir, skip_completed, randomize_start)
self.tasks = tasks
self.workers = workers if workers != -1 else tasks
self.start_method = start_method
Expand Down
5 changes: 1 addition & 4 deletions src/datatrove/executor/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import json
import math
import os
import random
import signal
import subprocess
import sys
Expand Down Expand Up @@ -114,7 +113,7 @@ def __init__(
srun_args: dict = None,
tasks_per_job: int = 1,
):
super().__init__(pipeline, logging_dir, skip_completed)
super().__init__(pipeline, logging_dir, skip_completed, randomize_start)
self.tasks = tasks
self.workers = workers
self.partition = partition
Expand Down Expand Up @@ -178,8 +177,6 @@ def run(self):
break
rank = all_ranks[rank_to_run]

if self.randomize_start:
time.sleep(random.randint(0, 60 * 3))
self._run_for_rank(rank)
else:
# we still have to launch the job
Expand Down
Loading