Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Benchmark iterator, avoid redundant queue, remove managers. #3119

Merged
merged 77 commits into from
Aug 29, 2019
Merged
Show file tree
Hide file tree
Changes from 76 commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
6cb9f2e
scripts
brendan-ai2 Jun 6, 2019
20064ee
pass loop
brendan-ai2 Jun 6, 2019
d337056
Merge branch 'master' into speedups-timing
brendan-ai2 Jul 9, 2019
7ca8e8d
fix
brendan-ai2 Jul 9, 2019
4e876fc
timing
brendan-ai2 Jul 9, 2019
77a4d13
div by zero fix
brendan-ai2 Jul 9, 2019
5e735f9
fix
brendan-ai2 Jul 9, 2019
8a31485
fix
brendan-ai2 Jul 9, 2019
492a6cf
fix
brendan-ai2 Jul 9, 2019
fb5bc92
fix
brendan-ai2 Jul 9, 2019
9da4d81
fix
brendan-ai2 Jul 9, 2019
f46eb3f
fix
brendan-ai2 Jul 9, 2019
15c0759
fix
brendan-ai2 Jul 9, 2019
00dad6f
fix
brendan-ai2 Jul 9, 2019
eb37282
fix
brendan-ai2 Jul 10, 2019
83b0f2b
fix
brendan-ai2 Jul 10, 2019
2dd3b51
fix
brendan-ai2 Jul 10, 2019
2710899
fix
brendan-ai2 Jul 10, 2019
2eca72e
fix
brendan-ai2 Jul 10, 2019
bf5d73f
Better debugging
brendan-ai2 Jul 11, 2019
5b91c84
Refactoring to avoid need for iterating over reader
brendan-ai2 Jul 11, 2019
fa4d57b
dif threads
brendan-ai2 Jul 11, 2019
96e8742
Merge branch 'speedups-timing' into speedups-single-queue
brendan-ai2 Jul 11, 2019
8937e14
type fix
brendan-ai2 Jul 11, 2019
5417c09
pure timing
brendan-ai2 Jul 12, 2019
d0931ed
Merge branch 'speedups-timing' into speedups-single-queue
brendan-ai2 Jul 12, 2019
c4bfd38
time to first
brendan-ai2 Jul 12, 2019
5a664c9
Merge branch 'speedups-timing' into speedups-single-queue
brendan-ai2 Jul 12, 2019
281c2f2
Put queue logging on a timer
brendan-ai2 Jul 13, 2019
c9f0076
Merge branch 'speedups-timing' into speedups-single-queue
brendan-ai2 Jul 26, 2019
3e88e19
Break out QIterable
brendan-ai2 Jul 27, 2019
b2645b8
Add test
brendan-ai2 Jul 29, 2019
c93bf8b
Access queue directly
brendan-ai2 Jul 30, 2019
c02cbd1
cleanup, add arguments
brendan-ai2 Jul 30, 2019
b389bfa
more cleanup
brendan-ai2 Jul 31, 2019
37c6fd6
cleanups
brendan-ai2 Aug 6, 2019
fe7e2df
Delete benchmark iter
brendan-ai2 Aug 6, 2019
d1668e0
Add example usages
brendan-ai2 Aug 6, 2019
635c0bc
Merge branch 'master' into speedups-single-queue
brendan-ai2 Aug 7, 2019
83d5ed9
Remove manager, use daemon processes, ball of hacks
brendan-ai2 Aug 8, 2019
cd61b39
adsjklafdskljgflkjfd
brendan-ai2 Aug 9, 2019
ac13bbb
infligh items
brendan-ai2 Aug 9, 2019
49688a6
Fix
brendan-ai2 Aug 9, 2019
d6a3b9a
bring back tests
brendan-ai2 Aug 9, 2019
acb4747
remove debugging
brendan-ai2 Aug 9, 2019
f7d9253
hcks
brendan-ai2 Aug 9, 2019
dc970c7
Reuse buffers
brendan-ai2 Aug 9, 2019
d6fd1e2
Revert "Reuse buffers"
brendan-ai2 Aug 9, 2019
8962e18
Cleanups
brendan-ai2 Aug 9, 2019
058ee7a
config fix
brendan-ai2 Aug 9, 2019
93d921a
deadlock stab
brendan-ai2 Aug 9, 2019
078a563
Merge branch 'speedups-single-queue' of github.com:brendan-ai2/allenn…
brendan-ai2 Aug 9, 2019
a1b269b
fewer workers
brendan-ai2 Aug 9, 2019
ba21113
Merge branch 'master' into speedups-single-queue
brendan-ai2 Aug 9, 2019
33ba898
lint
brendan-ai2 Aug 9, 2019
ff9432c
Respond to feedback
brendan-ai2 Aug 17, 2019
49fd2d9
stray comma
brendan-ai2 Aug 17, 2019
290e816
Update another reference
brendan-ai2 Aug 20, 2019
f5fbc2d
Merge branch 'master' into speedups-single-queue
brendan-ai2 Aug 20, 2019
afcd9ca
Merge branch 'master' into speedups-single-queue
brendan-ai2 Aug 20, 2019
cf0e22a
Merge branch 'master' into speedups-single-queue
brendan-ai2 Aug 22, 2019
f2c546b
Merge branch 'master' into speedups-single-queue
brendan-ai2 Aug 23, 2019
45bb1de
Merge branch 'master' into speedups-single-queue
brendan-ai2 Aug 23, 2019
5e51a43
Merge branch 'speedups-single-queue' of github.com:brendan-ai2/allenn…
brendan-ai2 Aug 23, 2019
68e9168
Fix crashes on linux
brendan-ai2 Aug 27, 2019
21553fe
Remove redundant closes and joins
brendan-ai2 Aug 27, 2019
5b57987
Better comment
brendan-ai2 Aug 27, 2019
9313908
Better comment
brendan-ai2 Aug 27, 2019
ac720d5
Merge branch 'master' into speedups-single-queue
brendan-ai2 Aug 27, 2019
df4cc2d
fix lint
brendan-ai2 Aug 27, 2019
31ab0db
Merge branch 'master' into speedups-single-queue
brendan-ai2 Aug 27, 2019
7497ed6
Isolate weird test
brendan-ai2 Aug 28, 2019
f9a4178
combine
brendan-ai2 Aug 28, 2019
c5a9cde
RAII
brendan-ai2 Aug 29, 2019
e5e885e
Replace code
brendan-ai2 Aug 29, 2019
71425e1
Merge branch 'master' into speedups-single-queue
brendan-ai2 Aug 29, 2019
dc23b04
Merge branch 'master' into speedups-single-queue
brendan-ai2 Aug 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 125 additions & 75 deletions allennlp/data/dataset_readers/multiprocess_dataset_reader.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from typing import List, Iterable, Iterator
import glob
import logging
import os
from queue import Empty
from typing import List, Iterable, Iterator, Optional

import numpy as np
from torch.multiprocessing import Manager, Process, Queue, log_to_stderr
from torch.multiprocessing import Process, Queue, Value, log_to_stderr

from allennlp.data.dataset_readers.dataset_reader import DatasetReader
from allennlp.data.instance import Instance


class logger:
"""
multiprocessing.log_to_stderr causes some output in the logs
Expand All @@ -30,26 +33,133 @@ def info(cls, message: str) -> None:
def _worker(reader: DatasetReader,
input_queue: Queue,
output_queue: Queue,
index: int) -> None:
num_active_workers: Value,
num_inflight_items: Value,
worker_id: int) -> None:
"""
A worker that pulls filenames off the input queue, uses the dataset reader
to read them, and places the generated instances on the output queue.
When there are no filenames left on the input queue, it puts its ``index``
on the output queue and doesn't do anything else.
to read them, and places the generated instances on the output queue. When
there are no filenames left on the input queue, it decrements
num_active_workers to signal completion.
"""
logger.info(f"Reader worker: {worker_id} PID: {os.getpid()}")
# Keep going until you get a file_path that's None.
while True:
file_path = input_queue.get()
if file_path is None:
# Put my index on the queue to signify that I'm finished
output_queue.put(index)
# It's important that we close and join the queue here before
# decrementing num_active_workers. Otherwise our parent may join us
# before the queue's feeder thread has passed all buffered items to
# the underlying pipe resulting in a deadlock.
#
# See:
# https://docs.python.org/3.6/library/multiprocessing.html?highlight=process#pipes-and-queues
# https://docs.python.org/3.6/library/multiprocessing.html?highlight=process#programming-guidelines
output_queue.close()
output_queue.join_thread()
# Decrementing is not atomic.
# See https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Value.
with num_active_workers.get_lock():
num_active_workers.value -= 1
logger.info(f"Reader worker {worker_id} finished")
break

logger.info(f"reading instances from {file_path}")
for instance in reader.read(file_path):
with num_inflight_items.get_lock():
num_inflight_items.value += 1
output_queue.put(instance)


class QIterable(Iterable[Instance]):
"""
You can't set attributes on Iterators, so this is just a dumb wrapper
that exposes the output_queue.
"""
def __init__(self, output_queue_size, epochs_per_read, num_workers, reader, file_path) -> None:
self.output_queue = Queue(output_queue_size)
self.epochs_per_read = epochs_per_read
self.num_workers = num_workers
self.reader = reader
self.file_path = file_path

# Initialized in start.
self.input_queue: Optional[Queue] = None
self.processes: List[Process] = []
# The num_active_workers and num_inflight_items counts in conjunction
# determine whether there could be any outstanding instances.
self.num_active_workers: Optional[Value] = None
self.num_inflight_items: Optional[Value] = None

def __iter__(self) -> Iterator[Instance]:
self.start()

# Keep going as long as not all the workers have finished or there are items in flight.
while self.num_active_workers.value > 0 or self.num_inflight_items.value > 0:
# Inner loop to minimize locking on self.num_active_workers.
while True:
try:
# Non-blocking to handle the empty-queue case.
yield self.output_queue.get(block=False, timeout=1.0)
with self.num_inflight_items.get_lock():
self.num_inflight_items.value -= 1
except Empty:
# The queue could be empty because the workers are
# all finished or because they're busy processing.
# The outer loop distinguishes between these two
# cases.
break

self.join()

def start(self) -> None:
shards = glob.glob(self.file_path)
# Ensure a consistent order before shuffling for testing.
shards.sort()
num_shards = len(shards)

# If we want multiple epochs per read, put shards in the queue multiple times.
self.input_queue = Queue(num_shards * self.epochs_per_read + self.num_workers)
for _ in range(self.epochs_per_read):
np.random.shuffle(shards)
for shard in shards:
self.input_queue.put(shard)

# Then put a None per worker to signify no more files.
for _ in range(self.num_workers):
self.input_queue.put(None)


assert not self.processes, "Process list non-empty! You must call QIterable.join() before restarting."
self.num_active_workers = Value('i', self.num_workers)
self.num_inflight_items = Value('i', 0)
for worker_id in range(self.num_workers):
process = Process(target=_worker,
args=(self.reader, self.input_queue, self.output_queue,
self.num_active_workers, self.num_inflight_items, worker_id))
logger.info(f"starting worker {worker_id}")
process.start()
self.processes.append(process)

def join(self) -> None:
for process in self.processes:
process.join()
self.processes.clear()

def __del__(self) -> None:
"""
Terminate processes if the user hasn't joined. This is necessary as
leaving stray processes running can corrupt shared state. In brief,
we've observed shared memory counters being reused (when the memory was
free from the perspective of the parent process) while the stray
workers still held a reference to them.

For a discussion of using destructors in Python in this manner, see
https://eli.thegreenplace.net/2009/06/12/safely-using-destructors-in-python/.
"""
for process in self.processes:
process.terminate()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joelgrus, last bug should be fixed now. It was a nasty race in the tests due to the stray child processes. I ended up using __del__ here for that. I know that using with is preferred in Python, but I'm not sure we have that as an option given that we're attempting to fit the Iterable interface. We aren't holding any circular refs here, so this should be safe, IIUC. Certainly it fixes the problem empirically. Any concerns?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it works, I have no concerns 😇

(I am somewhat far from this code at this point, and I'm not really a multiprocessing expert)



@DatasetReader.register('multiprocess')
class MultiprocessDatasetReader(DatasetReader):
Expand Down Expand Up @@ -103,70 +213,10 @@ def _read(self, file_path: str) -> Iterable[Instance]:
raise RuntimeError("Multiprocess reader implements read() directly.")

def read(self, file_path: str) -> Iterable[Instance]:
outer_self = self

class QIterable(Iterable[Instance]):
"""
You can't set attributes on Iterators, so this is just a dumb wrapper
that exposes the output_queue. Currently you probably shouldn't touch
the output queue, but this is done with an eye toward implementing
a data iterator that can read directly from the queue (instead of having
to use the _instances iterator we define here.)
"""
def __init__(self) -> None:
self.manager = Manager()
self.output_queue = self.manager.Queue(outer_self.output_queue_size)
self.num_workers = outer_self.num_workers

def __iter__(self) -> Iterator[Instance]:
# pylint: disable=protected-access
return outer_self._instances(file_path, self.manager, self.output_queue)

return QIterable()

def _instances(self, file_path: str, manager: Manager, output_queue: Queue) -> Iterator[Instance]:
"""
A generator that reads instances off the output queue and yields them up
until none are left (signified by all ``num_workers`` workers putting their
ids into the queue).
"""
shards = glob.glob(file_path)
# Ensure a consistent order before shuffling for testing.
shards.sort()
num_shards = len(shards)

# If we want multiple epochs per read, put shards in the queue multiple times.
input_queue = manager.Queue(num_shards * self.epochs_per_read + self.num_workers)
for _ in range(self.epochs_per_read):
np.random.shuffle(shards)
for shard in shards:
input_queue.put(shard)

# Then put a None per worker to signify no more files.
for _ in range(self.num_workers):
input_queue.put(None)

processes: List[Process] = []
num_finished = 0

for worker_id in range(self.num_workers):
process = Process(target=_worker,
args=(self.reader, input_queue, output_queue, worker_id))
logger.info(f"starting worker {worker_id}")
process.start()
processes.append(process)

# Keep going as long as not all the workers have finished.
while num_finished < self.num_workers:
item = output_queue.get()
if isinstance(item, int):
# Means a worker has finished, so increment the finished count.
num_finished += 1
logger.info(f"worker {item} finished ({num_finished}/{self.num_workers})")
else:
# Otherwise it's an ``Instance``, so yield it up.
yield item

for process in processes:
process.join()
processes.clear()
return QIterable(
output_queue_size=self.output_queue_size,
epochs_per_read=self.epochs_per_read,
num_workers=self.num_workers,
reader=self.reader,
file_path=file_path
)
Loading