Skip to content

Commit

Permalink
Merge pull request #11 from althonos/main
Browse files Browse the repository at this point in the history
Update GECCO parsing code to avoid reading files more than once
  • Loading branch information
raufs authored Jul 11, 2022
2 parents e9e9fcd + f3aac6a commit d95e713
Showing 1 changed file with 78 additions and 86 deletions.
164 changes: 78 additions & 86 deletions lsaBGC/classes/BGC.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,38 +20,32 @@ def parseGECCO(self, comprehensive_parsing=True, flank_size=2000):
domains = []
full_sequence = ""
domain_evalues = {}
with open(self.bgc_genbank) as ogbk:
for rec in SeqIO.parse(ogbk, 'genbank'):
full_sequence = str(rec.seq)
for feature in rec.features:
if feature.type == 'misc_feature':
start = min([int(x) for x in str(feature.location)[1:].split(']')[0].split(':')]) + 1
end = max([int(x) for x in str(feature.location)[1:].split(']')[0].split(':')])
aSDomain = "NA"
description = "NA"
evalue = 1e10
try:
aSDomain = feature.qualifiers.get('standard_name')[0]
except:
pass
try:
description = feature.qualifiers.get('function')[0]
except:
pass
try:
evalue = [float(x.split(': ')[1].strip()) for x in feature.qualifiers.get('note') if x.startswith('e-value')][0]
except:
pass
domain_evalues[aSDomain + '|' + str(start+1) + '|' + str(end)] = evalue
domains.append({'start': start + 1, 'end': end, 'type': feature.type, 'aSDomain': aSDomain, 'description': description})

product = "NA"
with open(self.bgc_genbank) as ogbk:
for line in ogbk:
line = line.strip()
if line.startswith('biosyn_class'):
ls = line.split()
product = ' '.join(ls[2:])
rec = SeqIO.read(self.bgc_genbank, 'genbank')
full_sequence = str(rec.seq)
for feature in rec.features:
if feature.type == 'misc_feature':
start = feature.location.start + 1
end = feature.location.end
aSDomain = "NA"
description = "NA"
evalue = 1e10
try:
aSDomain = feature.qualifiers['standard_name'][0]
except:
pass
try:
description = feature.qualifiers['function'][0]
except:
pass
try:
evalue = next(float(x.split(': ')[1]) for x in feature.qualifiers['note'] if x.startswith('e-value'))
except:
pass
domain_evalues[aSDomain + '|' + str(start+1) + '|' + str(end)] = evalue
domains.append({'start': start + 1, 'end': end, 'type': feature.type, 'aSDomain': aSDomain, 'description': description})

product = rec.annotations['structured_comment']['GECCO-Data']['biosyn_class']
bgc_info = [{'prediction_method': self.prediction_method, 'detection_rule': 'NA', 'product': product, 'contig_edge': 'NA', 'full_sequence': full_sequence}]

# determine top 10% of domains with lowest e-values
Expand All @@ -65,73 +59,72 @@ def parseGECCO(self, comprehensive_parsing=True, flank_size=2000):
genes = {}
core_genes = set([])
gene_order = {}
with open(self.bgc_genbank) as ogbk:
for rec in SeqIO.parse(ogbk, 'genbank'):
for feature in rec.features:
if feature.type == "CDS":
lt = feature.qualifiers.get('locus_tag')[0]
start = min([int(x) for x in str(feature.location)[1:].split(']')[0].split(':')]) + 1
end = max([int(x) for x in str(feature.location)[1:].split(']')[0].split(':')])
direction = str(feature.location).split('(')[1].split(')')[0]

try:
product = feature.qualifiers.get('product')[0]
except:
product = "hypothetical protein"

grange = set(range(start, end + 1))
for feature in rec.features:
if feature.type == "CDS":
lt = feature.qualifiers.get('locus_tag')[0]
start = feature.location.start + 1
end = feature.location.end
direction = "-" if feature.location.strand == -1 else "+"

gene_domains = []
core_overlap = False
for d in domains:
drange = set(range(d['start'], d['end'] + 1))
if len(drange.intersection(grange)) > 0:
gene_domains.append(d)
if (d['aSDomain'] + '|' + str(d['start']) + '|' + str(d['end'])) in core_domains:
core_overlap = True
core_genes.add(lt)
try:
product = feature.qualifiers.get('product')[0]
except:
product = "hypothetical protein"

grange = set(range(start, end + 1))

gene_domains = []
core_overlap = False
for d in domains:
drange = set(range(d['start'], d['end'] + 1))
if len(drange.intersection(grange)) > 0:
gene_domains.append(d)
if (d['aSDomain'] + '|' + str(d['start']) + '|' + str(d['end'])) in core_domains:
core_overlap = True
core_genes.add(lt)

gene_order[lt] = start
gene_order[lt] = start

prot_seq, nucl_seq, nucl_seq_with_flanks, relative_start, relative_end = [None] * 5
if comprehensive_parsing:
prot_seq = feature.qualifiers.get('translation')[0]
prot_seq, nucl_seq, nucl_seq_with_flanks, relative_start, relative_end = [None] * 5
if comprehensive_parsing:
prot_seq = feature.qualifiers.get('translation')[0]

flank_start = start - flank_size
flank_end = end + flank_size
flank_start = start - flank_size
flank_end = end + flank_size

if flank_start < 1: flank_start = 1
if flank_start < 1: flank_start = 1

if flank_end >= len(full_sequence): flank_end = None
if end >= len(full_sequence): end = None
if flank_end >= len(full_sequence): flank_end = None
if end >= len(full_sequence): end = None

if end:
nucl_seq = full_sequence[start - 1:end]
else:
nucl_seq = full_sequence[start - 1:]
end = len(full_sequence)
if end:
nucl_seq = full_sequence[start - 1:end]
else:
nucl_seq = full_sequence[start - 1:]
end = len(full_sequence)

if flank_end:
nucl_seq_with_flanks = full_sequence[flank_start - 1:flank_end]
else:
nucl_seq_with_flanks = full_sequence[flank_start - 1:]
if flank_end:
nucl_seq_with_flanks = full_sequence[flank_start - 1:flank_end]
else:
nucl_seq_with_flanks = full_sequence[flank_start - 1:]

gene_length = end - start
gene_length = end - start

relative_start = nucl_seq_with_flanks.find(nucl_seq)
relative_end = relative_start + gene_length
relative_start = nucl_seq_with_flanks.find(nucl_seq)
relative_end = relative_start + gene_length

if direction == '-':
nucl_seq = str(Seq(nucl_seq).reverse_complement())
nucl_seq_with_flanks = str(Seq(nucl_seq_with_flanks).reverse_complement())
relative_start = nucl_seq_with_flanks.find(nucl_seq)
relative_end = relative_start + gene_length
if direction == '-':
nucl_seq = str(Seq(nucl_seq).reverse_complement())
nucl_seq_with_flanks = str(Seq(nucl_seq_with_flanks).reverse_complement())
relative_start = nucl_seq_with_flanks.find(nucl_seq)
relative_end = relative_start + gene_length

genes[lt] = {'bgc_name': self.bgc_id, 'start': start, 'end': end, 'direction': direction,
'product': product, 'prot_seq': prot_seq, 'nucl_seq': nucl_seq,
'nucl_seq_with_flanks': nucl_seq_with_flanks, 'gene_domains': gene_domains,
'core_overlap': core_overlap, 'relative_start': relative_start,
'relative_end': relative_end}
genes[lt] = {'bgc_name': self.bgc_id, 'start': start, 'end': end, 'direction': direction,
'product': product, 'prot_seq': prot_seq, 'nucl_seq': nucl_seq,
'nucl_seq_with_flanks': nucl_seq_with_flanks, 'gene_domains': gene_domains,
'core_overlap': core_overlap, 'relative_start': relative_start,
'relative_end': relative_end}

number_of_core_gene_groups = 0
tmp = []
Expand Down Expand Up @@ -515,4 +508,3 @@ def refineGenbank(self, refined_genbank_file, first_bg, second_bg):
rgf_handle.close()
except Exception as e:
raise RuntimeError(traceback.format_exc())

0 comments on commit d95e713

Please sign in to comment.