-
Notifications
You must be signed in to change notification settings - Fork 159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implementing thread based PrefetcherIterDataPipe #770
Implementing thread based PrefetcherIterDataPipe #770
Conversation
[ghstack-poisoned]
ghstack-source-id: 30bea32365cafafd00f125e744ed9cd76d86ef6c Pull Request resolved: #770
[ghstack-poisoned]
ghstack-source-id: 25bee7325112b321f403b2805dd9f6d23bd90af1 Pull Request resolved: #770
[ghstack-poisoned]
ghstack-source-id: 635a985ea38c220345a1b7c08d5220e7a24c15c1 Pull Request resolved: #770
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments about threading below.
time.sleep(PRODUCER_SLEEP_INTERVAL) | ||
|
||
def __iter__(self): | ||
self.reset() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: reset
can be omitted.
# TODO: Potential optimization is changing buffer from list to dequeue | ||
self.prefetch_buffer = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I agree changing to deque
because it's thread safe.
|
||
class _PrefetchData: | ||
def __init__(self, source_datapipe, buffer_size): | ||
self.run_prefetcher = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And, should we add a thread lock around run_prefetcher
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my thread lock is GIL =)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lol. Kind make sense.
except communication.iter.InvalidStateResetRequired: | ||
stop_iteration = True | ||
except communication.iter.TerminateRequired: | ||
prefetch_data.run_prefetcher = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to handle those two exceptions? communication is more or less a sub module for MultiprocessingReadingService. I personally feel better to remove those exceptions from a DataPipe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to it here, because it is a separate thread that needs to be terminated nicely in case of source Datapipe is out of commission.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see. This prefetch is not only attached to the main problem but also to child processes.
[ghstack-poisoned]
ghstack-source-id: 4fc437ef879ad557340db8b547cbdf3625bc5acf Pull Request resolved: #770
[ghstack-poisoned]
ghstack-source-id: dd95cfdcba960cea4294bc7207c3ee005cf0fca5 Pull Request resolved: #770
[ghstack-poisoned]
ghstack-source-id: 5fdc9ba2e6420371c4c4069039bd88fa6b902674 Pull Request resolved: #770
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I assume there will be test added after the corresponding changes are made into PrototypeMPRS
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we intend for this DataPipe to be user-facing? (I'm guessing not?) If it is, adding a docstring and adding this to torchdata.datapipes.iter.rst
will be good.
def reset_iterator(self): | ||
self.reset() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: What is the expected behavior of reset_iterator
in PrototypeRS? How is that different from the usual DataPipe reset
?
if self.buffer_size < 1: | ||
yield from self.source_datapipe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This case should not be possible because of the check in __init__
prefetch_data = _PrefetchData(self.source_datapipe, self.buffer_size) | ||
self.prefetch_data = prefetch_data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Rename these to prefetcher_thread_worker
or something?
@VitalyFedyunin has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
Differential Revision: [D39816751](https://our.internmc.facebook.com/intern/diff/D39816751) [ghstack-poisoned]
ghstack-source-id: 0bbf6ffe1ecb74ffd182c081328d61440fa195d2 Pull Request resolved: #770
@VitalyFedyunin has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
Differential Revision: [D39816751](https://our.internmc.facebook.com/intern/diff/D39816751) [ghstack-poisoned]
ghstack-source-id: 6b9c54f3ba2786e6c3fe7a47cfd0fe3387451635 Pull Request resolved: #770
@VitalyFedyunin has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
Stack from ghstack (oldest at bottom):
Differential Revision: D39816751