Skip to content

Commit

Permalink
Put doc into in_queue but # of docs in in_queue is capped
Browse files Browse the repository at this point in the history
Currently, preprocessor and parser are executed in a complete sequential order.
i.e., preprocess N docs (and load them into a queue), then parse N docs.
This has two drawbacks:
  1. the progress bar shows nothing during preprocessing.
  2. the machine RAM may not be large enought o hold N preprocessed docs.
They become more serious when N is large and/or each HTML file is large.
  • Loading branch information
Hiromu Hota committed Jun 17, 2020
1 parent 11f30e4 commit 26ac90f
Showing 1 changed file with 20 additions and 8 deletions.
28 changes: 20 additions & 8 deletions src/fonduer/utils/udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,14 @@ def _apply(

# Create an input queue to feed documents to UDF workers
manager = Manager()
in_queue = manager.Queue()
# Set maxsize (#435). The number is heuristically determined.
in_queue = manager.Queue(maxsize=parallelism * 2)
# Use an output queue to track multiprocess progress
out_queue = manager.Queue()

# Clear the last documents parsed by the last run
self.last_docs = set()
# Fill input queue with documents
for doc in doc_loader:
in_queue.put(doc)
self.last_docs.add(doc.name)
total_count = in_queue.qsize()
total_count = len(doc_loader)

# Create UDF Processes
for i in range(parallelism):
Expand All @@ -131,12 +128,23 @@ def _apply(
for udf in self.udfs:
udf.start()

iter = doc_loader.__iter__()
count_parsed = 0
while (
any([udf.is_alive() for udf in self.udfs]) or not out_queue.empty()
) and count_parsed < total_count:
# Fill input queue with documents but # of docs in queue is capped (#435).
if not in_queue.full():
try:
doc = next(iter)
in_queue.put(doc)
self.last_docs.add(doc.name)
except StopIteration:
pass

# Get doc from the out_queue and persist the result into postgres
try:
y = out_queue.get(timeout=1)
y = out_queue.get_nowait()
self._add(y)
# Update progress bar whenever an item has been processed
count_parsed += 1
Expand All @@ -150,6 +158,8 @@ def _apply(
raise (e)

# Join the UDF processes
for _ in self.udfs:
in_queue.put(UDF.TASK_DONE)
for udf in self.udfs:
udf.join()

Expand Down Expand Up @@ -200,6 +210,8 @@ def run(self) -> None:
while True:
try:
doc = self.in_queue.get_nowait()
if doc == UDF.TASK_DONE:
break
# Merge the object with the session owned by the current child process.
# If transient (ie not saved), save the object to the database.
# If not, load it from the database w/o the overhead of reconciliation.
Expand All @@ -210,7 +222,7 @@ def run(self) -> None:
y = self.apply(doc, **self.apply_kwargs)
self.out_queue.put(y)
except Empty:
break
pass
session.commit()
session.close()

Expand Down

0 comments on commit 26ac90f

Please sign in to comment.