Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Native pytorch Multiprocessing for data loading #3079

Closed
DeNeutoy opened this issue Jul 18, 2019 · 12 comments
Closed

Native pytorch Multiprocessing for data loading #3079

DeNeutoy opened this issue Jul 18, 2019 · 12 comments
Assignees
Milestone

Comments

@DeNeutoy
Copy link
Contributor

Summary

We've struggled for a while to make our multiprocessing code performant (and in it's current state, i'm not actually sure it works). Instead of doing this ourselves (or even trying to fix what we currently have), we should use the optimized torch.util.data.DatasetLoader when pytorch release v1.1.X.

I dug into this a bit and I think I came up with a solution that will work with the next pytorch release. The reason for this is the introduction of a torch.utils.data.IterableDataset, which allows datasets to be viewed as streams (compared to being indexable, which currently they are required to be).
https://pytorch.org/docs/master/data.html#torch.utils.data.IterableDataset

Current Blockers

  1. The pytorch Dataset class assumes all datasets have a __len__ method, which may not be true of allennlp datasets, if they are lazy, or infinite streams.

  2. The way pytorch exposes creating iterables is via __getitem__ and correspondingly which indices a batch is composed of. This is not suitable for allennlp, where we must be able to control the full generator to do things like bucketing by length.

Implementation idea

I think that this can be implemented in a backward compatible way via:

  1. A torch.utils.data.IterableDataset subclass which performs the functions of our iterators, e.g loading max_instances_in_memory at a time, sorting by length etc but still yielding single instances.

  2. Modifying MultiProcessDataIterator to create a DataLoader inside its __call__ method (as it has the same Iterable[TensorDict] api as the DataLoader), where internally, the Iterable[Instances] it is passed is expected to be the IterableDataset subclass from above.

Basically the current Iterator functionality would be moved into subclassed IterableDatasets and we would iterate over these datasets using only the DataLoader, where the only job of the DataLoader would be to 1. do multiprocessing and 2. convert instances to tensor dicts.

Controlling multiple workers

Rather than having wrapper classes which control sharding in dataset readers like the MultiprocessDatasetIterator/Reader, dataset_readers should control their own sharding if they expect to be used for multiprocess reading using torch.utils.data.get_worker_info(). This returns None if not in a data reading process, but returns worker information if it is. This could be used to e.g read different data shards from a directory.

Possible difficult things

  1. Variable sized batches - some of the allennlp data functionality allows you to return variable sized batches, such as batches with a fixed number of tokens in. I'm not sure how to do this currently with the torch dataloader. I think it involves implementing a BatchSampler which returns different lengths of indices, but it's a little complicated because we want to view our datasets as streams, and not indexable objects, so i'm not quite sure how that would work.

  2. Caching instances - I'm not sure how this works with multiprocessing - but part of the reason we need it in the first place is because the multiprocessing doesn't work properly anyway.

Below is an example of a very simple wrapper of an allennlp dataset reader to allow use with the DataLoader from pytorch. Currently it is difficult to bucket this reader, because of the way we are forced to access only one element at a time.

from torch.utils.data import Dataset, DataLoader


from allennlp.data.dataset_readers import LanguageModelingReader, DatasetReader
from allennlp.data.fields.text_field import TextField
from allennlp.data.dataset import Batch
from allennlp.data.instance import Instance
from allennlp.data.vocabulary import Vocabulary

class AllennlpDataset(Dataset):

    def __init__(self,
                 reader: DatasetReader,
                 dataset_path: str):

        self.reader = reader
        self.dataset_path = dataset_path

        self.iterable = self.reader.read(dataset_path)
        self.iterator = (x for x in self.iterable)

        self._length = None

    def __len__(self):
        """
        This is gross but will go away in the next pytorch release,
        as they are introducing an `IterableDataset` class which doesn't
        need to have a length:
        https://pytorch.org/docs/master/data.html#torch.utils.data.IterableDataset
        """
        if self._length is None:
            self._length = 0
            for i in self.iterator:
                self._length += 1
            self.iterator = (x for x in self.iterable)
        return self._length

    def __getitem__(self, idx) -> Instance:
        get_next = next(self.iterator, None)
        if get_next is None:
            self.iterator = (x for x in self.iterable)
            get_next = next(self.iterator)
        return get_next


vocab = Vocabulary.from_instances((dataset[i] for i in range(len(dataset))))

# Function to tell the torch DataLoader how to batch up our custom data, i.e Instances
def allennlp_collocate(batch):
    batch = Batch(batch)
    batch.index_instances(vocab)
    return batch.as_tensor_dict(batch.get_padding_lengths())

dataset = AllennlpDataset(LanguageModelingReader(lazy=True), "./baby_data.txt")
loader = DataLoader(dataset, batch_size=2, num_workers =2, collate_fn=allennlp_collocate)

for batch in loader:
    print(batch)

Let me know what you think or if you can foresee any other big problems!
@matt-gardner, @joelgrus and @brendan-ai2, would be good to get your opinion on this.

@joelgrus
Copy link
Contributor

I haven't thought through the details yet, but I like the basic idea a lot.

@matt-gardner
Copy link
Contributor

Seems reasonable to me. Integrating with native pytorch data stuff has been on our wish list for a long time, it just never seemed feasible. If you can make it work, great.

@sai-prasanna
Copy link
Contributor

@DeNeutoy Any progress in this front? I progress along this line to integrate pytorch dataloaders this would be ideal combined with a move to DistributedDataParallel.

@DeNeutoy
Copy link
Contributor Author

@sai-prasanna this is something we've got bookmarked as a possible change for allennlp v1.0, as it might be a bit difficult to make it backward compatible. Actually i'm not so sure that this would impact moving to DistributedDataParallel, as from what I understand (I may be wrong about this), in the DistributedDataParallel setup, each GPU has it's own process, so the pressure for generating the data is removed. Maybe you have more information though?

@scarecrow1123
Copy link
Contributor

@DeNeutoy I've posted about my attempt to bring in DistributedDataParallel in #2536 comment. Can you see if that helps?

@sai-prasanna
Copy link
Contributor

sai-prasanna commented Sep 28, 2019

@brendan-ai2

A move to DDP would require changes in dataset loading for splitting the dataset across workers. Pytorch provides two ways for doing this depending on the style of dataset being used.

For map-style datasets, the main process generates the indices using sampler and sends them to the workers. So any shuffle randomization is done in the main process which guides loading by assigning indices to load.

For iterable-style datasets, since each worker process gets a replica of the dataset object, naive multi-process loading will often result in duplicated data. Using torch.utils.data.get_worker_info() and/or worker_init_fn, users may configure each replica independently. (See IterableDataset documentations for how to achieve this. ) For similar reasons, in multi-process loading, the drop_last argument drops the last non-full batch of each worker’s iterable-style dataset replica.

@DeNeutoy
Copy link
Contributor Author

DeNeutoy commented Sep 30, 2019

@scarecrow1123 / @sai-prasanna Thanks. I don't think it's actually required - it's just a nice to have. I think we should not modify existing dataset readers to allow them to do this, as people who are working with DDP are likely to be advanced users, and it is relatively easy to make a dataset reader conform to either of these specs. After we have DDP and pytorch dataloaders actually working, if loads of people are using it, we can think about making existing dataset readers easy to use out of the box with DDP. Does that sound good?

@DeNeutoy DeNeutoy added this to the 1.0.0 milestone Sep 30, 2019
@schmmd schmmd changed the title Idea: Native pytorch Multiprocessing for data loading Native pytorch Multiprocessing for data loading Sep 30, 2019
@sai-prasanna
Copy link
Contributor

@DeNeutoy I am coming from the perspective that making DDP as the default way of multi GPU would be better (if it proves faster for most tasks). And also that since 1.0 will have breaking changes, we can solve this problem in that milestone.

I am currently trying out @scarecrow1123 's DDP code and inside the dataset reader, using pytorch's Dataset and Dataloader to speed things up.

Here are a few points which I think might make the design user friendly.

Dataset vs IterableDataset

Though Dataset needs implementing the len, it is generally less cognitive overload for end user. And I believe most datasets can implement it, the case for using true streams (as in from the database)is not generally norm

  1. Dataset + Dataloader solves splitting the load across multiple process out of the box, which is not the case with IterableDataset where one has to get the worker id to do the sharding.
  2. Dataset + DistributedSampler + Dataloader solves sharding data across multiple GPUS in DDP and multiple workers for each GPUs elegantly.
  3. Dataset + DistributedSampler solves the issue that total batches should be divisible by total number of GPUs.
  4. When using Dataset we can show proper status bars with estimated time for training as len is implemented.

Wrapping of Dataset Reader vs Replace Dataset Readers with torch Datasets

Instead of wrapping datasetreader in a pytorch dataset, I think it would be preferable to make all DatasetReaders into pytorch Datasets. This would require removing the _read function and passing the paths in the constructor for train, test and validation.

If we wrap dataset readers it would be possible to only use IterableDatasets, but with replacing datasetreaders with Dataset approach, we can support both Dataset and IterableDataset.

Minor issues with mutation across multiprocessing

Even when only using DataLoader inside a DatasetReader, I faced an issue where the pickling and depickling during multi processing destroys object references, so stateful update of token_indexers
such as

https://github.com/allenai/allennlp/blob/master/allennlp/data/token_indexers/wordpiece_indexer.py#L149

don't happen as expected. i.e self._added_to_vocabulary = True which happens during indexing sets the value of de-pickled token indexer from multi processing. We actually wanted this to set the value on the token_indexer referenced by a dataset reader (datasetreader._source_token_indexers), but it doesn't.

@DeNeutoy
Copy link
Contributor Author

@sai-prasanna thanks - all good points! I wasn't sure initially that we were going to pick this up, and the wrapper was the easiest way to make a proof of concept that something like this was possible. We'll definitely be inheriting from one of the dataset classes.

That pickling thing is very annoying. There might be a way to get round that, which is to ensure that the dataset reader is stateless when it is building the vocab, or just require (initially) that you pre-construct a vocab. I'll be sure to check this when implementing this though. Thanks for your input!

Do you have a link to your modifications of @scarecrow1123's DDP code which internally uses the torch datasets?

@DeNeutoy
Copy link
Contributor Author

Update on this, to figure out precisely what speedup we expect:

The current implementation of multiprocessing does provide small speedups when the data reading function (in allennlp's case, the read function for a particular dataset reader) is very slow. However, for e.g SNLI, which just reads json blobs, there is no speedup over using the master process.

Full Multiprocessing Worker Comparison (Mean 5 runs)
DatasetReader: SNLI
Measurement: Time taken to read from disk, index, and tensorise 30 batches of batch size 32 instances.

Run                              Mean   Std
baseline:                        0.5600 0.0486
pytorch, baseline:               0.5434 0.0387
two workers:                     0.7009 0.0475
five workers:                    0.7834 0.0793
pytorch, two workers:            0.3529 0.0509
pytorch, five workers:           0.2032 0.0101

Full Multiprocessing Worker Comparison (Mean 5 runs)
DatasetReader: SNLI with forced 0.01 sec sleep in text_to_instance to simulate a slow data generating process
Measurement: Time taken to read from disk, index, and tensorise 30 batches of batch size 32 instances.

Run								 Mean    Std
baseline:                        12.5646 0.0293
pytorch, baseline:               12.4746 0.0368
allennlp, two workers:           11.0769 0.0789
allennlp, five workers:          10.5293 0.0338
pytorch, two workers:            6.3277 0.0738
pytorch, five workers:           2.7609 0.0074

So in summary:

  • Our current multiprocessing code gives a slight slowdown (0.71x with 5 workers) if the data generation process is fast.
  • Our current multiprocessing code gives a slight speedup (1.13x with 5 workers) if the data generation process is slow.
  • Using pytorch data loaders give an almost linear speedup (4.55x with 5 workers) when the data generation process is slow.
  • Using pytorch data loaders give a sub-linear speedup (2.8x with 5 workers) when the data generation process is fast.

@sai-prasanna
Copy link
Contributor

sai-prasanna commented Oct 17, 2019

@sai-prasanna thanks - all good points! I wasn't sure initially that we were going to pick this up, and the wrapper was the easiest way to make a proof of concept that something like this was possible. We'll definitely be inheriting from one of the dataset classes.

That pickling thing is very annoying. There might be a way to get round that, which is to ensure that the dataset reader is stateless when it is building the vocab, or just require (initially) that you pre-construct a vocab. I'll be sure to check this when implementing this though. Thanks for your input!

Do you have a link to your modifications of @scarecrow1123's DDP code which internally uses the torch datasets?

Here is the DatasetReader for seq2seq which uses torch Datasets.
https://gist.github.com/sai-prasanna/4562d73146af8b7a55b4b9d96da5a9a3

@matt-gardner
Copy link
Contributor

Closing as fixed by #3700.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants