-
-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow use of truncated Dictionary for coherence measures #1349
Merged
Merged
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit
Hold shift + click to select a range
9d06a1f
#1342: Allow use of truncated `Dictionary` for coherence calculation …
f69a2ff
#1342: Do not produce sliding windows for texts with no relevant word…
26de547
#1342: Remove unused multiprocessing import in `coherencemodel` module.
dfe159b
add utility functions for strided windowing of texts (lists of string…
2e3852e
handle edge cases with window_size equal to or exceeding document siz…
ec7af1b
move code for building inverted index into a new text_analysis module…
3f8fb7f
complete migration to using the accumulators in the new text_analysis…
b12edef
fix bug in WordOccurrenceAccumulator so that co-occurrences of same w…
91b8a05
make wikicorpus parsing handle KeyboardInterrupt gracefully
c6224b7
add ParallelWordOccurrenceAccumulator and make default method for p_b…
f00d389
clean up, clarify, and optimize the indirect_confirmation_measure.cos…
327b739
#1342: Cleanup, documentation improvements, proper caching of accumul…
3c0752b
Merge branch 'develop' into develop
macks22 e06c7c3
#1342: Do not swallow `KeyboardInterrupt` naively in `WikiCorpus.get_…
2ca43f7
#1342: Formatting fixes (hanging indent in `coherencemodel` and non-e…
825b0e9
#1342: Improve `CoherenceModel` documentation and minor refactor for …
314a400
#1342: Optimize word occurrence accumulation and fix a bug with repea…
e785773
#1342: Minor bug fixes and improved logging in text_analysis module; …
5f78cdb
#1342: Optimize data structures being used for window set tracking an…
bbd2748
#1342: Fix accidental typo.
5fb0b95
#1342: Further optimize word co-occurrence accumulation by using a `c…
880b8d0
#1342: Clean up logging in `text_analysis` module and remove empty li…
1d32b8e
#1342: Remove unused traceback module.
8e04b41
#1342: Fixes for python3 compatibility.
e3ce402
#1342: Hopefully `six.viewitems` works for python3 compatibility?
7f7f55d
#1342: Realized the python3 compatibility issue was due to the Dictio…
343da69
#1342: Fixed a few bugs and added test coverage for the coherencemode…
1ce8a72
#1342: Further tests for persistence of accumulator.
96fd343
#1342: Add test case for `CorpusAccumulator`.
a631ab6
#1342: Formatting fixes for hanging indents and overly long lines.
5f58bda
#1342: Fix `indirect_confirmation_measure.cosine_similarity` to retur…
b941f3c
#1342: Fix `direct_confirmation_measure` functions to return individu…
75fcac8
#1342: Hanging indents and switch out `union` with `update` for uniqu…
96d1349
#1342: Clarify documentation in the `probability_estimation` module.
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,6 +51,7 @@ class BaseAnalyzer(object): | |
|
||
def __init__(self, relevant_ids): | ||
self.relevant_ids = relevant_ids | ||
self._vocab_size = len(self.relevant_ids) | ||
self.id2contiguous = {word_id: n for n, word_id in enumerate(self.relevant_ids)} | ||
self.log_every = 1000 | ||
self._num_docs = 0 | ||
|
@@ -92,7 +93,8 @@ def _get_co_occurrences(self, word_id1, word_id2): | |
|
||
class UsesDictionary(BaseAnalyzer): | ||
"""A BaseAnalyzer that uses a Dictionary, hence can translate tokens to counts. | ||
The standard BaseAnalyzer can only deal with token ids since it does not have access to the token2id mapping. | ||
The standard BaseAnalyzer can only deal with token ids since it doesn't have the token2id | ||
mapping. | ||
""" | ||
|
||
def __init__(self, relevant_ids, dictionary): | ||
|
@@ -128,8 +130,7 @@ class InvertedIndexBased(BaseAnalyzer): | |
|
||
def __init__(self, *args): | ||
super(InvertedIndexBased, self).__init__(*args) | ||
vocab_size = len(self.relevant_ids) | ||
self._inverted_index = np.array([set() for _ in range(vocab_size)]) | ||
self._inverted_index = np.array([set() for _ in range(self._vocab_size)]) | ||
|
||
def _get_occurrences(self, word_id): | ||
return len(self._inverted_index[word_id]) | ||
|
@@ -169,15 +170,10 @@ def __init__(self, relevant_ids, dictionary): | |
Args: | ||
---- | ||
relevant_ids: the set of words that occurrences should be accumulated for. | ||
dictionary: gensim.corpora.dictionary.Dictionary instance with mappings for the relevant_ids. | ||
dictionary: Dictionary instance with mappings for the relevant_ids. | ||
""" | ||
super(WindowedTextsAnalyzer, self).__init__(relevant_ids, dictionary) | ||
|
||
def filter_to_relevant_words(self, text): | ||
"""Lazily filter the text to only those words which are relevant.""" | ||
relevant_words = (word for word in text if word in self.relevant_words) | ||
relevant_ids = (self.token2id[word] for word in relevant_words) | ||
return (self.id2contiguous[word_id] for word_id in relevant_ids) | ||
self._none_token = self._vocab_size # see _iter_texts for use of none token | ||
|
||
def accumulate(self, texts, window_size): | ||
relevant_texts = self._iter_texts(texts) | ||
|
@@ -189,11 +185,13 @@ def accumulate(self, texts, window_size): | |
return self | ||
|
||
def _iter_texts(self, texts): | ||
dtype = np.uint16 if np.iinfo(np.uint16).max >= self._vocab_size else np.uint32 | ||
for text in texts: | ||
if self.text_is_relevant(text): | ||
token_ids = (self.token2id[word] if word in self.relevant_words else None | ||
for word in text) | ||
yield [self.id2contiguous[_id] if _id is not None else None for _id in token_ids] | ||
yield np.array([ | ||
self.id2contiguous[self.token2id[w]] if w in self.relevant_words | ||
else self._none_token | ||
for w in text], dtype=dtype) | ||
|
||
def text_is_relevant(self, text): | ||
"""Return True if the text has any relevant words, else False.""" | ||
|
@@ -208,7 +206,7 @@ class InvertedIndexAccumulator(WindowedTextsAnalyzer, InvertedIndexBased): | |
|
||
def analyze_text(self, window, doc_num=None): | ||
for word_id in window: | ||
if word_id is not None: | ||
if word_id is not self._none_token: | ||
self._inverted_index[word_id].add(self._num_docs) | ||
|
||
|
||
|
@@ -217,9 +215,11 @@ class WordOccurrenceAccumulator(WindowedTextsAnalyzer): | |
|
||
def __init__(self, *args): | ||
super(WordOccurrenceAccumulator, self).__init__(*args) | ||
vocab_size = len(self.relevant_words) | ||
self._occurrences = np.zeros(vocab_size, dtype='uint32') | ||
self._co_occurrences = sps.lil_matrix((vocab_size, vocab_size), dtype='uint32') | ||
self._occurrences = np.zeros(self._vocab_size, dtype='uint32') | ||
self._co_occurrences = sps.lil_matrix((self._vocab_size, self._vocab_size), dtype='uint32') | ||
|
||
self._uniq_words = np.zeros((self._vocab_size + 1,), dtype=bool) # add 1 for none token | ||
self._mask = self._uniq_words[:-1] # to exclude none token | ||
|
||
def __str__(self): | ||
return self.__class__.__name__ | ||
|
@@ -242,25 +242,23 @@ def partial_accumulate(self, texts, window_size): | |
return self | ||
|
||
def analyze_text(self, window, doc_num=None): | ||
self.slide_window(window, doc_num) | ||
if self._mask.any(): | ||
self._occurrences[self._mask] += 1 | ||
|
||
for combo in itertools.combinations(np.nonzero(mask)[0], 2): | ||
self._co_occurrences[combo] += 1 | ||
|
||
def slide_window(self, window, doc_num): | ||
if doc_num != self._current_doc_num: | ||
self._uniq_words = set(window) | ||
self._uniq_words.discard(None) | ||
self._token_at_edge = window[0] | ||
self._uniq_words[:] = False | ||
self._uniq_words[np.unique(window)] = True | ||
self._current_doc_num = doc_num | ||
else: | ||
if self._token_at_edge is not None: | ||
self._uniq_words.discard(self._token_at_edge) # may be irrelevant token | ||
self._token_at_edge = window[0] | ||
self._uniq_words[self._token_at_edge] = False | ||
self._uniq_words[window[-1]] = True | ||
|
||
if window[-1] is not None: | ||
self._uniq_words.add(window[-1]) | ||
|
||
if self._uniq_words: | ||
words_idx = np.array(list(self._uniq_words)) | ||
self._occurrences[words_idx] += 1 | ||
|
||
for combo in itertools.combinations(words_idx, 2): | ||
self._co_occurrences[combo] += 1 | ||
self._token_at_edge = window[0] | ||
|
||
def _symmetrize(self): | ||
"""Word pairs may have been encountered in (i, j) and (j, i) order. | ||
|
@@ -283,15 +281,31 @@ def merge(self, other): | |
self._num_docs += other._num_docs | ||
|
||
|
||
class PatchedWordOccurrenceAccumulator(WordOccurrenceAccumulator): | ||
"""Monkey patched for multiprocessing worker usage, | ||
to move some of the logic to the master process. | ||
""" | ||
def _iter_texts(self, texts): | ||
return texts # master process will handle this | ||
|
||
|
||
class ParallelWordOccurrenceAccumulator(WindowedTextsAnalyzer): | ||
"""Accumulate word occurrences in parallel.""" | ||
|
||
def __init__(self, processes, *args, **kwargs): | ||
""" | ||
Args: | ||
---- | ||
processes : number of processes to use; must be at least two. | ||
args : should include `relevant_ids` and `dictionary` (see `UsesDictionary.__init__`). | ||
kwargs : can include `batch_size`, which is the number of docs to send to a worker at a | ||
time. If not included, it defaults to 32. | ||
""" | ||
super(ParallelWordOccurrenceAccumulator, self).__init__(*args) | ||
if processes < 2: | ||
raise ValueError("Must have at least 2 processes to run in parallel; got %d" % processes) | ||
raise ValueError("Must have at least 2 processes to run in parallel; got %d", processes) | ||
self.processes = processes | ||
self.batch_size = kwargs.get('batch_size', 16) | ||
self.batch_size = kwargs.get('batch_size', 32) | ||
|
||
def __str__(self): | ||
return "%s(processes=%s, batch_size=%s)" % ( | ||
|
@@ -303,7 +317,8 @@ def accumulate(self, texts, window_size): | |
self.queue_all_texts(input_q, texts, window_size) | ||
interrupted = False | ||
except KeyboardInterrupt: | ||
logger.warn("stats accumulation interrupted; <= %d documents processed" % self._num_docs) | ||
logger.warn("stats accumulation interrupted; <= %d documents processed", | ||
self._num_docs) | ||
interrupted = True | ||
|
||
accumulators = self.terminate_workers(input_q, output_q, workers, interrupted) | ||
|
@@ -320,7 +335,7 @@ def start_workers(self, window_size): | |
output_q = mp.Queue() | ||
workers = [] | ||
for _ in range(self.processes): | ||
accumulator = WordOccurrenceAccumulator(self.relevant_ids, self.dictionary) | ||
accumulator = PatchedWordOccurrenceAccumulator(self.relevant_ids, self.dictionary) | ||
worker = AccumulatingWorker(input_q, output_q, accumulator, window_size) | ||
worker.start() | ||
workers.append(worker) | ||
|
@@ -332,7 +347,7 @@ def yield_batches(self, texts): | |
`batch_size` texts at a time. | ||
""" | ||
batch = [] | ||
for text in texts: | ||
for text in self._iter_texts(texts): | ||
batch.append(text) | ||
if len(batch) == self.batch_size: | ||
yield batch | ||
|
@@ -345,14 +360,14 @@ def queue_all_texts(self, q, texts, window_size): | |
"""Sequentially place batches of texts on the given queue until `texts` is consumed. | ||
The texts are filtered so that only those with at least one relevant token are queued. | ||
""" | ||
relevant_texts = (text for text in texts if self.text_is_relevant(text)) | ||
for batch_num, batch in enumerate(self.yield_batches(relevant_texts)): | ||
for batch_num, batch in enumerate(self.yield_batches(texts)): | ||
q.put(batch, block=True) | ||
before = self._num_docs / self.log_every | ||
self._num_docs += sum(len(doc) - window_size + 1 for doc in batch) | ||
if before < (self._num_docs / self.log_every): | ||
logger.info("submitted %d batches to accumulate stats from %d documents (%d virtual)" % ( | ||
batch_num, (batch_num + 1) * self.batch_size, self._num_docs)) | ||
logger.info("%d batches submitted to accumulate stats from %d documents (%d " | ||
"virtual)", | ||
(batch_num + 1), (batch_num + 1) * self.batch_size, self._num_docs) | ||
|
||
def terminate_workers(self, input_q, output_q, workers, interrupted=False): | ||
"""Wait until all workers have transmitted their WordOccurrenceAccumulator instances, | ||
|
@@ -392,10 +407,10 @@ def merge_accumulators(self, accumulators): | |
accumulator = accumulators[0] | ||
for other_accumulator in accumulators[1:]: | ||
accumulator.merge(other_accumulator) | ||
# Workers perform partial accumulation, so none of the co-occurrence matrices are symmetrized. | ||
# This is by design, to avoid unnecessary matrix additions during accumulation. | ||
# Workers do partial accumulation, so none of the co-occurrence matrices are symmetrized. | ||
# This is by design, to avoid unnecessary matrix additions/conversions during accumulation. | ||
accumulator._symmetrize() | ||
logger.info("accumulated word occurrence stats for %d virtual documents" % | ||
logger.info("accumulated word occurrence stats for %d virtual documents", | ||
accumulator.num_docs) | ||
return accumulator | ||
|
||
|
@@ -414,30 +429,32 @@ def __init__(self, input_q, output_q, accumulator, window_size): | |
def run(self): | ||
try: | ||
self._run() | ||
print("finished normally") | ||
except KeyboardInterrupt: | ||
logger.info("%s interrupted after processing %d documents" % ( | ||
self.__class__.__name__, self.accumulator.num_docs)) | ||
logger.info("%s interrupted after processing %d documents", | ||
self.__class__.__name__, self.accumulator.num_docs) | ||
except Exception as e: | ||
logger.error("worker encountered unexpected exception: %s" % e) | ||
logger.error(traceback.format_exc()) | ||
logger.error("worker encountered unexpected exception: %s\n%s", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
e, traceback.format_exc()) | ||
finally: | ||
self.reply_to_master() | ||
|
||
def _run(self): | ||
batch_num = 0 | ||
batch_num = -1 | ||
n_docs = 0 | ||
while True: | ||
batch_num += 1 | ||
docs = self.input_q.get(block=True) | ||
if docs is None: # sentinel value | ||
logger.debug("observed sentinel value; terminating") | ||
break | ||
|
||
self.accumulator.partial_accumulate(docs, self.window_size) | ||
n_docs += len(docs) | ||
logger.debug("completed batch %d; %d documents processed (%d virtual)" % ( | ||
batch_num, n_docs, self.accumulator.num_docs)) | ||
batch_num += 1 | ||
logger.debug("completed batch %d; %d documents processed (%d virtual)", | ||
batch_num, n_docs, self.accumulator.num_docs) | ||
|
||
logger.debug("finished all batches; %d documents processed (%d virtual)", | ||
n_docs, self.accumulator.num_docs) | ||
|
||
def reply_to_master(self): | ||
logger.info("serializing accumulator to return to master...") | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hanging indent please (here and elsewhere).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand the desired format here. The hanging indent makes sense when using the
%
operator to format a string. However, my desire here is to use the lazy formatting of thelogging
module instead. In this case, the string and all following format arguments are equivalent arguments of the call, so pep8 dictates alignment with the opening delimiter.