-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathutilities.py
123 lines (101 loc) · 4.17 KB
/
utilities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def scale01(values, mini=None, maxi=None, tol=1e-6):
""" Scale the values in [0, 1]. """
from numpy import amax, amin
if not mini:
mini = amin(values)
if not maxi:
maxi = amax(values)
scaled_values = [(val - mini) / (maxi - mini + tol) for val in values]
return scaled_values, mini, maxi
def not_na(item):
""" Remove NAs and empty values. """
return not(item == "NA" or item == "")
def extended_hit_pos(hit, peak_chrom, peak_start, extension=0):
""" Extend the hit by 'extension' nt to compute DNAshape features. """
start = peak_start + hit.start - extension - 2 # BED
end = peak_start + hit.end + extension - 1 # BED
return peak_chrom, start, end
def print_extended_hits(hits, positions, extension=0):
"""
Print the extended hits to a temporary bed file.
:returns: The name of the temporary file.
:rtype: str
"""
import tempfile
import os
fdescr, tmp_file = tempfile.mkstemp()
os.close(fdescr)
with open(tmp_file, 'w') as stream:
for hit in hits:
if hit:
identifier = hit.seq_record.id
peak_chrom, peak_start, _ = positions[identifier]
chrom, start, end = extended_hit_pos(hit, peak_chrom,
peak_start, extension)
if not chrom.startswith("chr"):
chrom = "chr{0}".format(chrom)
if hit.score >= 0.0 and hit.score <= 1.0:
stream.write("{0}\t{1:d}\t{2:d}\t{3}\t{4:d}\t{5}\n".format(
chrom, start, end, identifier, int(hit.score * 100),
hit.strand))
else:
stream.write("{0}\t{1:d}\t{2:d}\t{3}\t{4:d}\t{5}\n".format(
chrom, start, end, identifier, 0,
hit.strand))
return tmp_file
def get_positions_from_bed(bed_file):
""" Get the positions of the sequences described in the bed file. """
with open(bed_file) as stream:
positions = {}
for line in stream:
spl = line.split()
positions[spl[3]] = (spl[0], eval(spl[1]) + 1, eval(spl[2]))
return positions
def contains_zero(motif):
""" Return True if the PSSM contains a 0 frequency at one position. """
for nucleotide in 'ACGT':
for count in motif.counts[nucleotide]:
if count == 0.:
return True
return False
def get_jaspar_pssm(jaspar, bool_id=True):
"""
Construct the PSSM from the JASPAR ID or JASPAR formatted file.
We assume that we are using profiles from the CORE JASPAR db when providing
a JASPAR ID. Hence the JASPAR ID should starts with 'MA'.
If a filename is provided, we assume that the TF binding profile is using
the JASPAR format as documented in the Bio.motifs.jaspar BioPython module.
"""
import Bio.motifs
if bool_id:
from Bio.motifs.jaspar.db import JASPAR5
# Please put your local JASPAR database information below
jaspar_db_host = ""
jaspar_db_name = ""
jaspar_db_user = ""
jaspar_db_pass = ""
jdb = JASPAR5(host=jaspar_db_host, name=jaspar_db_name,
user=jaspar_db_user, password=jaspar_db_pass)
motif = jdb.fetch_motif_by_id(jaspar)
motif.pseudocounts = Bio.motifs.jaspar.calculate_pseudocounts(motif)
else:
with open(jaspar) as handle:
motif = Bio.motifs.read(handle, 'jaspar')
# If the PFM contains a zero, need to use pseudocounts
if contains_zero(motif):
import sys
# The pseudocount will be minimal
motif.pseudocounts = sys.float_info.min
return motif.pssm
def encode_hits(hits):
"""
Encode the sequence at hits using a binary encoding (4bits per nucleotide).
hits corresponds to a list of HIT (TFFM module) instances.
"""
mapping = {'A': [1, 0, 0, 0], 'T': [0, 1, 0, 0],
'G': [0, 0, 1, 0], 'C': [0, 0, 0, 1]}
encoding = []
for hit in hits:
encoding.append(
[val for nucl in hit.sequence() for val in mapping[nucl]])
return encoding