Skip to content

Commit

Permalink
Put doc into in_queue but # of docs in in_queue is capped
Browse files Browse the repository at this point in the history
  • Loading branch information
Hiromu Hota committed Jun 17, 2020
1 parent 7e71944 commit a267b88
Showing 1 changed file with 25 additions and 14 deletions.
39 changes: 25 additions & 14 deletions src/fonduer/utils/udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,14 @@ def _apply(

# Create an input queue to feed documents to UDF workers
manager = Manager()
in_queue = manager.Queue()
# Set maxsize (#435). The number is heuristically determined.
in_queue = manager.Queue(maxsize=parallelism * 2)
# Use an output queue to track multiprocess progress
out_queue = manager.Queue()

# Clear the last documents parsed by the last run
self.last_docs = set()
# Fill input queue with documents
for doc in doc_loader:
if inspect(doc).transient: # This only happens during parser.apply
self.session.add(doc)
self.session.commit()
in_queue.put(doc.id)
self.last_docs.add(doc.name)
total_count = in_queue.qsize()
total_count = len(doc_loader)

# Create UDF Processes
for i in range(parallelism):
Expand All @@ -134,12 +128,25 @@ def _apply(
for udf in self.udfs:
udf.start()

iter = doc_loader.__iter__()
count_parsed = 0
while (
any([udf.is_alive() for udf in self.udfs]) or not out_queue.empty()
) and count_parsed < total_count:
while count_parsed < total_count:
# Fill input queue with documents but # of docs in queue is capped (#435).
if not in_queue.full():
try:
doc = next(iter)
# This only happens during parser.apply
if inspect(doc).transient:
self.session.add(doc)
self.session.commit()
in_queue.put(doc.id)
self.last_docs.add(doc.name)
except StopIteration:
pass

# Get doc from the out_queue and persist the result into postgres
try:
y = out_queue.get(timeout=1)
y = out_queue.get_nowait()
self._add(y)
# Update progress bar whenever an item has been processed
count_parsed += 1
Expand All @@ -153,6 +160,8 @@ def _apply(
raise (e)

# Join the UDF processes
for _ in self.udfs:
in_queue.put(UDF.TASK_DONE)
for udf in self.udfs:
udf.join()

Expand Down Expand Up @@ -203,11 +212,13 @@ def run(self) -> None:
while True:
try:
doc_id = self.in_queue.get_nowait()
if doc_id == UDF.TASK_DONE:
break
doc = session.query(Document).filter(Document.id == doc_id).one()
y = self.apply(doc, **self.apply_kwargs)
self.out_queue.put(y)
except Empty:
break
pass
session.commit()
session.close()

Expand Down

0 comments on commit a267b88

Please sign in to comment.