Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync fork #2

Merged
merged 17 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,6 @@ cython_debug/
# ruff code style
.ruff_cache/

playground/*
.vscode/

playground/
25 changes: 25 additions & 0 deletions CITATION.cff
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
cff-version: 1.2.0
title: 'DataTrove: large scale data processing'
message: >-
If you use this software, please cite it using the metadata from this file.
type: software
authors:
- given-names: Guilherme
family-names: Penedo
- given-names: Alessandro
family-names: Cappelli
- given-names: Thomas
family-names: Wolf
- given-names: Mario
family-names: Sasko
repository-code: 'https://github.com/huggingface/datatrove'
abstract: "DataTrove is a library to process, filter and deduplicate text data at a very large scale. It provides a set of prebuilt commonly used processing blocks with a framework to easily add custom functionality. DataTrove processing pipelines are platform-agnostic, running out of the box locally or on a slurm cluster. Its (relatively) low memory usage and multiple step design makes it ideal for large workloads, such as to process an LLM's training data."
keywords:
- deep-learning
- pytorch
- transformers
- llms
- data
- scale
license: Apache-2.0
version: 0.0.1
45 changes: 35 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Local, remote and other file systems are supported through [fsspec](https://file
+ [Custom function](#custom-function)
+ [Custom block](#custom-block)
- [Contributing](#contributing)
- [Citation](#citation)

<!-- tocstop -->

Expand Down Expand Up @@ -99,10 +100,10 @@ Some options common to all executors:
Call an executor's `run` method to execute its pipeline.


> [!TIP]
> [!TIP]
> Datatrove keeps track of which tasks successfully completed by creating a marker (an empty file) in the `${logging_dir}/completions` folder. Once the job finishes, if some of its tasks have failed, you can **simply relaunch the exact same executor** and datatrove will check and only run the tasks that were not previously completed.

> [!CAUTION]
> [!CAUTION]
> If you relaunch a pipeline because some tasks failed, **do not change the total number of tasks** as this will affect the distribution of input files/sharding.


Expand Down Expand Up @@ -131,6 +132,17 @@ executor.run()
```
</details>

<details>
<summary>Multi-node parallelism</summary>

You can have different nodes/machines process different parts of the total tasks by using the `local_tasks` and `local_rank_offset`. For each node/instance/machine, launch with the following options:
- `tasks` the total tasks to be executed (across all machines). **This value must be the same on each machine or the input file distribution may overlap!** Example: 500
- `local_tasks` how many tasks of the total will be executed on this particular machine. Note that you can use different values for each machine. Example: 100
- `local_rank_offset` the rank of the first task to be executed on this machine. If this is the 3rd machine where you are launching a job, and the 2 previous machines each ran 250 and 150 jobs, this would be `400` for the current machine.

To get final merged stats you will have to invoke the `merge_stats` script manually on a path containing the stats from all machines.
</details>

### SlurmPipelineExecutor
This executor will launch a pipeline on a slurm cluster, using slurm job arrays to group and manage tasks.
Options:
Expand Down Expand Up @@ -182,7 +194,7 @@ executor2 = SlurmPipelineExecutor(
tasks=1,
time="5:00:00", # 5 hours
partition="hopper-cpu",
depends=executor1 # this pipeline will only be launched after executor1 successfuly completes
depends=executor1 # this pipeline will only be launched after executor1 successfully completes
)
# executor1.run()
executor2.run() # this will actually launch executor1, as it is a dependency, so no need to launch it explicitly
Expand Down Expand Up @@ -312,7 +324,7 @@ pipeline = [
...
]
```
> [!TIP]
> [!TIP]
> You might have some pickling issues due to the imports. If this happens, simply move whatever imports you need inside the function body.

#### Custom block
Expand All @@ -339,25 +351,25 @@ class UppercaserBlock(PipelineStep):
# do something
...
yield doc

#
# OR process data from previous blocks (`data`)
#

for doc in data:
with self.track_time():
# you can wrap the main processing code in `track_time` to know how much each document took to process
# you can wrap the main processing code in `track_time` to know how much each document took to process
nr_uppercase_letters = sum(map(lambda c: c.isupper(), doc.text))
# you can also keep track of stats per document using stat_update
self.stat_update("og_upper_letters", value=nr_uppercase_letters)
doc.text = doc.text.upper()
# make sure you keep the yield outside the track_time block, or it will affect the time calculation
yield doc

#
# OR save data to disk
#

with self.some_folder.open("myoutput", "wt") as f:
for doc in data:
f.write(doc...)
Expand Down Expand Up @@ -386,5 +398,18 @@ pre-commit install

Run the tests:
```bash
pytest -sv ./tests/
pytest -sv ./tests/
```

## Citation

```bibtex
@misc{penedo2024datatrove,
author = {Penedo, Guilherme and Cappelli, Alessandro and Wolf, Thomas and Sasko, Mario},
title = {DataTrove: large scale data processing},
year = {2024},
publisher = {GitHub},
journal = {GitHub repository},
url = {https://github.com/huggingface/datatrove}
}
```
21 changes: 11 additions & 10 deletions examples/exact_substrings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,28 @@

from datatrove.executor.base import PipelineExecutor
from datatrove.executor.local import LocalPipelineExecutor
from datatrove.pipeline.dedup import DatasetToSequence, DedupReader, MergeSequences
from datatrove.pipeline.dedup import ESDatasetToSequence, ESMergeSequences, ESRangeRemover
from datatrove.pipeline.extractors import Trafilatura
from datatrove.pipeline.filters import GopherQualityFilter, LanguageFilter
from datatrove.pipeline.readers import WarcReader
from datatrove.pipeline.readers import JsonlReader, WarcReader
from datatrove.pipeline.writers.jsonl import JsonlWriter
from datatrove.utils.typeshelper import Languages


"""
example on how to run exact-substring deduplication. It also requires using
https://github.com/google-research/deduplicate-text-datasets after stage 1, 2
1) DatasetToSequence maps 1 file into a sequence S. With unique separators at the beginning of each document. It also
1) ESDatasetToSequence maps 1 file into a sequence S. With unique separators at the beginning of each document. It also
saves the bytes offset of where each individual document begins.
2) MergeSequences merges all sequences into a big single sequence. It also saves the bytes offset per file.
2) ESMergeSequences merges all sequences into a big single sequence. It also saves the bytes offset per file.

---
after stage two you should use deduplicate-text-datasets scripts to create the suffix array and find all the
duplicates. The final output of these scripts should be a .bytearange file with the ranges in bytes wrt the big
sequence
---

3) DedupReader reads from DocumentsPipeline and duplicates ranges at the same time removing the duplicates ranges.
3) ESRangeRemover reads from DocumentsPipeline and duplicates ranges at the same time removing the duplicates ranges.


to run stage 1,2 call run_stage_1_2, after you have followed deduplicate-text-datasets instructions in the README you
Expand All @@ -42,11 +42,11 @@ def run_step_1_and_2():
GopherQualityFilter(min_stop_words=0),
LanguageFilter(language_threshold=0.5, languages=(Languages.english,)),
JsonlWriter("intermediate/"),
DatasetToSequence(output_folder="es/"),
ESDatasetToSequence(output_folder="es/"),
]

pipeline_2 = [
MergeSequences(
ESMergeSequences(
data_folder="es",
tasks_stage_1=4,
)
Expand All @@ -62,10 +62,11 @@ def run_step_1_and_2():

def run_step_3():
pipeline_3 = [
DedupReader(
f"{os.getcwd()}/intermediate/",
JsonlReader("intermediate/"), # must be the same data that was passed to DatasetToSequence
ESRangeRemover(
sequence_folder=f"{os.getcwd()}/es/",
)
),
JsonlWriter("final-deduped-data"),
]

executor_3: PipelineExecutor = LocalPipelineExecutor(pipeline=pipeline_3, workers=4, tasks=4)
Expand Down
2 changes: 1 addition & 1 deletion examples/process_common_crawl_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
tasks=8000,
time="10:00:00",
logging_dir=f"{MAIN_OUTPUT_PATH}/logs/base_processing/{DUMP}",
slurm_logs_folder=f"/fsx/guilherme/logs/process_dump/processing/base_processing/{DUMP}/slurm_logs",
slurm_logs_folder=f"logs/process_dump/processing/base_processing/{DUMP}/slurm_logs",
randomize_start=True,
mem_per_cpu_gb=2,
partition="hopper-cpu",
Expand Down
8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ where = ["src"]
datatrove = ["assets/*"]

[tool.ruff]
ignore = [
lint.ignore = [
"C901", # `function_name` is too complex
"E501", # line length violation
]
select = [
lint.select = [
"C",
"E",
"F",
Expand All @@ -110,12 +110,12 @@ select = [
]
line-length = 119

[tool.ruff.per-file-ignores]
[tool.ruff.lint.per-file-ignores]
"__init__.py" = [
"F401" # module imported but unused
]

[tool.ruff.isort]
[tool.ruff.lint.isort]
lines-after-imports = 2
known-first-party = [
"datatrove"
Expand Down
29 changes: 21 additions & 8 deletions src/datatrove/data.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
"""Data classes for the datatrove package."""

from dataclasses import dataclass, field
from typing import Generator, NewType


class MediaType:
"""Media types

For future uses, currently not used.
"""

IMAGE = 0
VIDEO = 1
AUDIO = 2


@dataclass
class Media:
"""
For possible future versions of datatrove
"""Media metadata

For future uses, currently not used.
"""

type: int
Expand All @@ -22,12 +30,17 @@ class Media:

@dataclass
class Document:
"""
Base datatrove data format.
- `text` the actual text content for each sample
- `id` a unique id (string) for this sample
- `metadata` a dictionary where any additional info may be stored

"""Main Document dataclass going through the processing pipeline

Args:
text: str
the actual text content for each sample
id: str
a unique id (string) for this sample
media: list[Media]
The media associated with the document
metadata: dict[str, str | int | float | bool]
a dictionary where any additional info may be stored
"""

text: str
Expand Down
32 changes: 17 additions & 15 deletions src/datatrove/executor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,32 @@


class PipelineExecutor(ABC):
"""Base class for pipeline executors (local, slurm, etc.)

Args:
pipeline: a list of PipelineStep and/or custom functions
with arguments (data: DocumentsPipeline, rank: int, world_size: int)
logging_dir: where to save logs, stats, etc. Should be parsable into a datatrove.io.DataFolder
skip_completed: whether to skip tasks that were completed in
previous runs. default: True
"""

@abstractmethod
def __init__(
self,
pipeline: list[PipelineStep | Callable],
logging_dir: DataFolderLike = None,
skip_completed: bool = True,
):
"""
Args:
pipeline: a list of PipelineStep and/or custom functions
with arguments (data: DocumentsPipeline, rank: int, world_size: int)
logging_dir: where to save logs, stats, etc. Should be parsable into a datatrove.io.DataFolder
skip_completed: whether to skip tasks that were completed in
previous runs. default: True
"""
self.pipeline: list[PipelineStep | Callable] = pipeline
self.logging_dir = get_datafolder(logging_dir if logging_dir else f"logs/{get_timestamp()}_{get_random_str()}")
self.skip_completed = skip_completed

@abstractmethod
def run(self):
"""
This method is responsible for correctly invoking `self._run_for_rank` for each task that is to be run.
See slurm and local executor for example usage.
Returns:

"""Run the pipeline on all tasks.
This method is responsible for correctly invoking `self._run_for_rank` for each task that is to be run.
See slurm and local executor for example usage.
"""
pass

Expand Down Expand Up @@ -121,7 +121,7 @@ def mark_rank_as_completed(self, rank: int):
"""
self.logging_dir.open(f"completions/{rank:05d}", "w").close()

def get_incomplete_ranks(self) -> list[int]:
def get_incomplete_ranks(self, ranks=None) -> list[int]:
"""
Gets a full list of ranks that are still incomplete.
Usually faster than calling `is_rank_completed` for each task.
Expand All @@ -132,7 +132,7 @@ def get_incomplete_ranks(self) -> list[int]:
return list(
filter(
lambda rank: not self.skip_completed or f"completions/{rank:05d}" not in completed,
range(self.world_size),
ranks if ranks is not None else range(self.world_size),
)
)

Expand Down Expand Up @@ -163,6 +163,8 @@ def save_executor_as_json(self, indent: int = 4):


class ExecutorJSONEncoder(json.JSONEncoder):
"""Custom JSON encoder for the PipelineExecutor class"""

def default(self, o):
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
Expand Down
Loading