-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfragment_seq.py
48 lines (35 loc) · 1.23 KB
/
fragment_seq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#!/usr/bin/env python
# Author: Maurits Evers ([email protected])
import gzip
import numpy as np
from Bio import SeqIO, bgzf
from Bio.SeqRecord import SeqRecord
# Snakemake parameters
infile = snakemake.input[0]
outfile = snakemake.output[0]
method = snakemake.params.method
if method == "uniform_sample":
n_reads = int(snakemake.params.n_reads)
len_frag = int(snakemake.params.len_frag)
step = int(snakemake.params.step)
# Create FASTA record from substring
def get_frag(seq, pos, len_frag, i):
sseq = seq[pos:(pos + len_frag)]
label = "read_%i_pos%i-%i" % (i, pos, pos + len_frag)
return (SeqRecord(sseq, id = label, description = ""))
# Main
with gzip.open(infile, "rt") as fh:
record = SeqIO.read(fh, "fasta")
seq = record.seq
len = len(seq)
if method == "uniform_sample":
#np.random.seed(2020)
idx = np.random.choice(len - len_frag, n_reads)
elif method == "sliding_window":
n_reads = int(((len - len_frag) / step) + 1)
idx = np.arange(0, n_reads * step, step)
subseq = [get_frag(seq, pos, len_frag, i)
for (i, pos) in enumerate(idx, start = 1)]
# Write to bgzip'ed FASTA
with bgzf.BgzfWriter(outfile, "wb") as fh:
SeqIO.write(subseq, fh, "fasta")