From e06c7c3c53dcaebf727da89c9f24b0af790a9fce Mon Sep 17 00:00:00 2001 From: "Sweeney, Mack" Date: Tue, 30 May 2017 05:41:50 -0400 Subject: [PATCH] #1342: Do not swallow `KeyboardInterrupt` naively in `WikiCorpus.get_texts`; instead, log warning and do not set `length`. --- gensim/corpora/wikicorpus.py | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/gensim/corpora/wikicorpus.py b/gensim/corpora/wikicorpus.py index 209946fb41..13b111db4f 100755 --- a/gensim/corpora/wikicorpus.py +++ b/gensim/corpora/wikicorpus.py @@ -250,7 +250,8 @@ def process_article(args): return result, title, pageid -def init_worker(): +def init_to_ignore_interrupt(): + """Should only be used when master is prepared to handle termination of child processes.""" signal.signal(signal.SIGINT, signal.SIG_IGN) @@ -304,13 +305,16 @@ def get_texts(self): """ articles, articles_all = 0, 0 positions, positions_all = 0, 0 - texts = ((text, self.lemmatize, title, pageid) for title, text, pageid in extract_pages(bz2.BZ2File(self.fname), self.filter_namespaces)) - pool = multiprocessing.Pool(self.processes, init_worker) - # process the corpus in smaller chunks of docs, because multiprocessing.Pool - # is dumb and would load the entire input into RAM at once... + texts = ((text, self.lemmatize, title, pageid) + for title, text, pageid + in extract_pages(bz2.BZ2File(self.fname), self.filter_namespaces)) + pool = multiprocessing.Pool(self.processes, init_to_ignore_interrupt) + try: + # process the corpus in smaller chunks of docs, because multiprocessing.Pool + # is dumb and would load the entire input into RAM at once... for group in utils.chunkize(texts, chunksize=10 * self.processes, maxsize=1): - for tokens, title, pageid in pool.imap(process_article, group): # chunksize=10): + for tokens, title, pageid in pool.imap(process_article, group): articles_all += 1 positions_all += len(tokens) # article redirects and short stubs are pruned here @@ -323,13 +327,15 @@ def get_texts(self): else: yield tokens except KeyboardInterrupt: - pass - - pool.terminate() - - logger.info( - "finished iterating over Wikipedia corpus of %i documents with %i positions" - " (total %i articles, %i positions before pruning articles shorter than %i words)", - articles, positions, articles_all, positions_all, ARTICLE_MIN_WORDS) - self.length = articles # cache corpus length + logger.warn("user terminated iteration over Wikipedia corpus after %i documents with %i positions" + " (total %i articles, %i positions before pruning articles shorter than %i words)", + articles, positions, articles_all, positions_all, ARTICLE_MIN_WORDS) + else: + logger.info( + "finished iterating over Wikipedia corpus of %i documents with %i positions" + " (total %i articles, %i positions before pruning articles shorter than %i words)", + articles, positions, articles_all, positions_all, ARTICLE_MIN_WORDS) + self.length = articles # cache corpus length + finally: + pool.terminate() # endclass WikiCorpus