Skip to content

Commit

Permalink
piskvorky#1342: Do not swallow KeyboardInterrupt naively in `WikiCo…
Browse files Browse the repository at this point in the history
…rpus.get_texts`; instead, log warning and do not set `length`.
  • Loading branch information
Sweeney, Mack committed May 30, 2017
1 parent 3c0752b commit e06c7c3
Showing 1 changed file with 21 additions and 15 deletions.
36 changes: 21 additions & 15 deletions gensim/corpora/wikicorpus.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ def process_article(args):
return result, title, pageid


def init_worker():
def init_to_ignore_interrupt():
"""Should only be used when master is prepared to handle termination of child processes."""
signal.signal(signal.SIGINT, signal.SIG_IGN)


Expand Down Expand Up @@ -304,13 +305,16 @@ def get_texts(self):
"""
articles, articles_all = 0, 0
positions, positions_all = 0, 0
texts = ((text, self.lemmatize, title, pageid) for title, text, pageid in extract_pages(bz2.BZ2File(self.fname), self.filter_namespaces))
pool = multiprocessing.Pool(self.processes, init_worker)
# process the corpus in smaller chunks of docs, because multiprocessing.Pool
# is dumb and would load the entire input into RAM at once...
texts = ((text, self.lemmatize, title, pageid)
for title, text, pageid
in extract_pages(bz2.BZ2File(self.fname), self.filter_namespaces))
pool = multiprocessing.Pool(self.processes, init_to_ignore_interrupt)

try:
# process the corpus in smaller chunks of docs, because multiprocessing.Pool
# is dumb and would load the entire input into RAM at once...
for group in utils.chunkize(texts, chunksize=10 * self.processes, maxsize=1):
for tokens, title, pageid in pool.imap(process_article, group): # chunksize=10):
for tokens, title, pageid in pool.imap(process_article, group):
articles_all += 1
positions_all += len(tokens)
# article redirects and short stubs are pruned here
Expand All @@ -323,13 +327,15 @@ def get_texts(self):
else:
yield tokens
except KeyboardInterrupt:
pass

pool.terminate()

logger.info(
"finished iterating over Wikipedia corpus of %i documents with %i positions"
" (total %i articles, %i positions before pruning articles shorter than %i words)",
articles, positions, articles_all, positions_all, ARTICLE_MIN_WORDS)
self.length = articles # cache corpus length
logger.warn("user terminated iteration over Wikipedia corpus after %i documents with %i positions"
" (total %i articles, %i positions before pruning articles shorter than %i words)",
articles, positions, articles_all, positions_all, ARTICLE_MIN_WORDS)
else:
logger.info(
"finished iterating over Wikipedia corpus of %i documents with %i positions"
" (total %i articles, %i positions before pruning articles shorter than %i words)",
articles, positions, articles_all, positions_all, ARTICLE_MIN_WORDS)
self.length = articles # cache corpus length
finally:
pool.terminate()
# endclass WikiCorpus

0 comments on commit e06c7c3

Please sign in to comment.