Skip to content

Commit

Permalink
Manage mappy memory buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
cjw85 committed Jan 7, 2022
1 parent a402042 commit 737d9a2
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
22 changes: 20 additions & 2 deletions bonito/aligner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,24 @@ def align_map(aligner, sequences, n_thread=4):
return ThreadMap(partial(MappyWorker, aligner), sequences, n_thread)


class ManagedThreadBuffer:
"""
Minimap2 ThreadBuffer that is periodically reallocated.
"""
def __init__(self, max_uses=20):
self.max_uses = max_uses
self.uses = 0
self._b = ThreadBuffer()

@property
def buffer(self):
if self.uses > self.max_uses:
self._b = ThreadBuffer()
self.uses = 0
self.uses += 1
return self._b


class MappyWorker(Thread):
"""
Process that reads items from an input_queue, applies a func to them and puts them on an output_queue
Expand All @@ -27,12 +45,12 @@ def __init__(self, aligner, input_queue=None, output_queue=None):
self.output_queue = output_queue

def run(self):
thrbuf = ThreadBuffer()
thrbuf = ManagedThreadBuffer()
while True:
item = self.input_queue.get()
if item is StopIteration:
self.output_queue.put(item)
break
k, v = item
mapping = next(self.aligner.map(v['sequence'], buf=thrbuf, MD=True), None)
mapping = next(self.aligner.map(v['sequence'], buf=thrbuf.buffer, MD=True), None)
self.output_queue.put((k, {**v, 'mapping': mapping}))
3 changes: 2 additions & 1 deletion bonito/cli/basecaller.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def main(args):
if mods_model is not None:
results = process_itemmap(partial(call_mods, mods_model), results)
if aligner:
results = align_map(aligner, results)
results = align_map(aligner, results, n_thread=args.alignment_threads)

writer = ResultsWriter(
fmt.mode, tqdm(results, desc="> calling", unit=" reads", leave=False),
Expand Down Expand Up @@ -171,5 +171,6 @@ def argparser():
parser.add_argument("--chunksize", default=None, type=int)
parser.add_argument("--batchsize", default=None, type=int)
parser.add_argument("--max-reads", default=0, type=int)
parser.add_argument("--alignment_threads", default=8, type=int)
parser.add_argument('-v', '--verbose', action='count', default=0)
return parser

0 comments on commit 737d9a2

Please sign in to comment.