-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpeak_finder.py
52 lines (44 loc) · 1.81 KB
/
peak_finder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class Peak:
def __init__(self, startidx):
self.born = self.left = self.right = startidx
self.died = None
def get_persistence(self, seq):
return seq[self.born] if self.died is None else seq[self.born] - seq[self.died]
def get_persistent_homology(seq):
peaks = []
# Maps indices to peaks
idxtopeak = [None for s in seq]
# Sequence indices sorted by values
indices = range(len(seq))
indices = sorted(indices, key = lambda i: seq[i], reverse=True)
# Process each sample in descending order
for idx in indices:
lftdone = (idx > 0 and idxtopeak[idx-1] is not None)
rgtdone = (idx < len(seq)-1 and idxtopeak[idx+1] is not None)
il = idxtopeak[idx-1] if lftdone else None
ir = idxtopeak[idx+1] if rgtdone else None
# New peak born
if not lftdone and not rgtdone:
peaks.append(Peak(idx))
idxtopeak[idx] = len(peaks)-1
# Directly merge to next peak left
if lftdone and not rgtdone:
peaks[il].right += 1
idxtopeak[idx] = il
# Directly merge to next peak right
if not lftdone and rgtdone:
peaks[ir].left -= 1
idxtopeak[idx] = ir
# Merge left and right peaks
if lftdone and rgtdone:
# Left was born earlier: merge right to left
if seq[peaks[il].born] > seq[peaks[ir].born]:
peaks[ir].died = idx
peaks[il].right = peaks[ir].right
idxtopeak[peaks[il].right] = idxtopeak[idx] = il
else:
peaks[il].died = idx
peaks[ir].left = peaks[il].left
idxtopeak[peaks[ir].left] = idxtopeak[idx] = ir
# This is optional convenience
return sorted(peaks, key=lambda p: p.get_persistence(seq), reverse=True)