Skip to content

Commit

Permalink
Preprocess docs and load them into a queue in a different thread
Browse files Browse the repository at this point in the history
  • Loading branch information
Hiromu Hota committed Jun 17, 2020
1 parent 26ac90f commit 3ba328e
Showing 1 changed file with 10 additions and 15 deletions.
25 changes: 10 additions & 15 deletions src/fonduer/utils/udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
from multiprocessing import Manager, Process
from queue import Empty, Queue
from threading import Thread
from typing import Any, Collection, Dict, Iterator, List, Optional, Set, Type

from sqlalchemy import inspect
Expand Down Expand Up @@ -128,31 +129,25 @@ def _apply(
for udf in self.udfs:
udf.start()

iter = doc_loader.__iter__()
count_parsed = 0
while (
any([udf.is_alive() for udf in self.udfs]) or not out_queue.empty()
) and count_parsed < total_count:

def in_thread_func() -> None:
# Fill input queue with documents but # of docs in queue is capped (#435).
if not in_queue.full():
try:
doc = next(iter)
in_queue.put(doc)
self.last_docs.add(doc.name)
except StopIteration:
pass
for doc in doc_loader:
in_queue.put(doc) # block until a free slot is available
self.last_docs.add(doc.name)

Thread(target=in_thread_func).start()

while count_parsed < total_count:
# Get doc from the out_queue and persist the result into postgres
try:
y = out_queue.get_nowait()
y = out_queue.get() # block until an item is available
self._add(y)
# Update progress bar whenever an item has been processed
count_parsed += 1
if self.pb is not None:
self.pb.update(1)
except Empty:
# This happens when any child process is alive and still processing.
pass
except Exception as e:
# Raise an error for all the other exceptions.
raise (e)
Expand Down

0 comments on commit 3ba328e

Please sign in to comment.