Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for parallelizing processing parquet files across workers and nodes. #19400

Merged
merged 17 commits into from
Feb 5, 2024

Conversation

tchaton
Copy link
Contributor

@tchaton tchaton commented Feb 3, 2024

What does this PR do?

The main challenge with parquet files is to make sure each worker across all nodes processes the quantity of data. This PR introduces a lazy slice to make sure all workers process the exact same number of rows.

This PR introduces a ParquetReader to distribute processing parquet files across workers and nodes with ease.

import os
from multiprocessing.pool import ThreadPool
from lightning.data import optimize
from lightning.data.processing.readers import ParquetReader
from lightning.data.processing.image import download_image_with_retry
from lightning.data.processing.utilities import SuppressStdoutStderr
from PIL import Image
from time import sleep

input_dir = "the-eye.eu/public/AI/cah/laion400m-met-release/laion400m-meta"
parquet_files = [os.path.join(input_dir, f) for f in os.listdir(input_dir) if f.endswith(".parquet")]

def process(row):
    image_id, url, text, height, width, image_license, nsfw, similarity = row
    img, err = download_image_with_retry(0, url, timeout=5)
    if err:
        return None, err

    try:
        return [image_id, img, text, image_license, nsfw, similarity], None
    except Exception:
        return None, err

class Fetcher:

    def __init__(self, max_threads=32):
        self.max_threads = max_threads
        self.stored = 0
        self.skipped = 0

    def __call__(self, df):
        print(self.skipped, self.stored)
        rows = [list(row) for row in df.iter_rows() if row[0] is not None]
        with ThreadPool(self.max_threads) as thread_pool:
            for row, err in thread_pool.imap_unordered(process, rows):
                if err is not None:
                    self.skipped += 1
                    continue

                if row[1] is not None:
                    try:
                        row[1] = Image.open(row[1]).resize((224, 224))
                    except:
                        self.skipped += 1
                        continue
                yield row
                self.stored += 1

optimize(
    fn=Fetcher(max_threads=16),
    inputs=parquet_files,
    output_dir="/teamspace/datasets/laion400m",
    num_workers=os.cpu_count(),
    reader=ParquetReader(num_rows=2048, to_pandas=False),
    chunk_bytes="64MB",
)

Fixes #<issue_number>

Before submitting
  • Was this discussed/agreed via a GitHub issue? (not for typos and docs)
  • Did you read the contributor guideline, Pull Request section?
  • Did you make sure your PR does only one thing, instead of bundling different changes together?
  • Did you make sure to update the documentation with your changes? (if necessary)
  • Did you write any new necessary tests? (not for typos and docs)
  • Did you verify new and existing tests pass locally with your changes?
  • Did you list all the breaking changes introduced by this pull request?
  • Did you update the CHANGELOG? (not for typos, docs, test updates, or minor internal changes/refactors)

PR review

Anyone in the community is welcome to review the PR.
Before you start reviewing, make sure you have read the review guidelines. In short, see the following bullet-list:

Reviewer checklist
  • Is this pull request ready for review? (if not, please submit in draft mode)
  • Check that all items from Before submitting are resolved
  • Make sure the title is self-explanatory and the description concisely explains the PR
  • Add labels and milestones (and optionally projects) to the PR so it can be classified

cc @Borda

@github-actions github-actions bot added the data (external) litdata package label Feb 3, 2024
@github-actions github-actions bot added the dependencies Pull requests that update a dependency file label Feb 5, 2024
@tchaton tchaton changed the title Optimize processing parquet files Add support for parallelizing processing parquet files across workers and nodes. Feb 5, 2024
@tchaton tchaton marked this pull request as ready for review February 5, 2024 17:52
Copy link
Contributor

github-actions bot commented Feb 5, 2024

⚡ Required checks status: All passing 🟢

Groups summary

🟢 lightning_data: CPU workflow
Check ID Status
data-cpu (macOS-11, lightning, 3.10, 2.1) success
data-cpu (ubuntu-20.04, lightning, 3.10, 2.1) success
data-cpu (windows-2022, lightning, 3.10, 2.1) success

These checks are required after the changes to requirements/data/test.txt, src/lightning/data/processing/__init__.py, src/lightning/data/processing/image.py, src/lightning/data/processing/readers.py, src/lightning/data/streaming/data_processor.py, src/lightning/data/streaming/functions.py, tests/tests_data/processing/__init__.py, tests/tests_data/processing/test_readers.py.

🟢 mypy
Check ID Status
mypy success

These checks are required after the changes to requirements/data/test.txt, src/lightning/data/processing/__init__.py, src/lightning/data/processing/image.py, src/lightning/data/processing/readers.py, src/lightning/data/streaming/data_processor.py, src/lightning/data/streaming/functions.py.

🟢 install
Check ID Status
install-pkg (ubuntu-22.04, app, 3.8) success
install-pkg (ubuntu-22.04, app, 3.11) success
install-pkg (ubuntu-22.04, fabric, 3.8) success
install-pkg (ubuntu-22.04, fabric, 3.11) success
install-pkg (ubuntu-22.04, pytorch, 3.8) success
install-pkg (ubuntu-22.04, pytorch, 3.11) success
install-pkg (ubuntu-22.04, lightning, 3.8) success
install-pkg (ubuntu-22.04, lightning, 3.11) success
install-pkg (ubuntu-22.04, notset, 3.8) success
install-pkg (ubuntu-22.04, notset, 3.11) success
install-pkg (macOS-12, app, 3.8) success
install-pkg (macOS-12, app, 3.11) success
install-pkg (macOS-12, fabric, 3.8) success
install-pkg (macOS-12, fabric, 3.11) success
install-pkg (macOS-12, pytorch, 3.8) success
install-pkg (macOS-12, pytorch, 3.11) success
install-pkg (macOS-12, lightning, 3.8) success
install-pkg (macOS-12, lightning, 3.11) success
install-pkg (macOS-12, notset, 3.8) success
install-pkg (macOS-12, notset, 3.11) success
install-pkg (windows-2022, app, 3.8) success
install-pkg (windows-2022, app, 3.11) success
install-pkg (windows-2022, fabric, 3.8) success
install-pkg (windows-2022, fabric, 3.11) success
install-pkg (windows-2022, pytorch, 3.8) success
install-pkg (windows-2022, pytorch, 3.11) success
install-pkg (windows-2022, lightning, 3.8) success
install-pkg (windows-2022, lightning, 3.11) success
install-pkg (windows-2022, notset, 3.8) success
install-pkg (windows-2022, notset, 3.11) success

These checks are required after the changes to src/lightning/data/processing/__init__.py, src/lightning/data/processing/image.py, src/lightning/data/processing/readers.py, src/lightning/data/streaming/data_processor.py, src/lightning/data/streaming/functions.py, requirements/data/test.txt.


Thank you for your contribution! 💜

Note
This comment is automatically generated and updates for 60 minutes every 180 seconds. If you have any other questions, contact carmocca for help.

Copy link
Collaborator

@lantiga lantiga left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, added two comments

@mergify mergify bot added the ready PRs ready to be merged label Feb 5, 2024
@tchaton tchaton merged commit 7dfc279 into master Feb 5, 2024
64 of 65 checks passed
@tchaton tchaton deleted the add_parquet_reader branch February 5, 2024 23:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
data (external) litdata package dependencies Pull requests that update a dependency file ready PRs ready to be merged
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants