Skip to content

Commit

Permalink
Merge pull request #74 from martinghunt/minimise_files
Browse files Browse the repository at this point in the history
Minimise files
  • Loading branch information
martinghunt committed May 3, 2016
2 parents 48e7c10 + 4d3ea9d commit 4588cba
Show file tree
Hide file tree
Showing 25 changed files with 343 additions and 123 deletions.
1 change: 1 addition & 0 deletions ariba/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
'histogram',
'link',
'mapping',
'read_store',
'reference_data',
'ref_genes_getter',
'ref_preparer',
Expand Down
15 changes: 7 additions & 8 deletions ariba/assembly_compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def __init__(self,

self.nucmer_coords_file = self.outprefix + '.nucmer.coords'
self.nucmer_snps_file = self.nucmer_coords_file + '.snps'
self.assembled_ref_seqs_file = self.outprefix + '.assembled_refs.fasta'


def _run_nucmer(self):
Expand Down Expand Up @@ -137,13 +136,13 @@ def ref_cov_per_contig(nucmer_hits):


@staticmethod
def _write_assembled_reference_sequences(nucmer_hits, ref_sequence, assembly, outfile):
def _get_assembled_reference_sequences(nucmer_hits, ref_sequence, assembly):
'''nucmer_hits = hits made by self._parse_nucmer_coords_file.
ref_gene = reference sequence (pyfastaq.sequences.Fasta object)
assembly = dictionary of contig name -> contig.
Writes each piece of assembly that corresponds to the reference sequence
to a fasta file.'''
f = pyfastaq.utils.open_file_write(outfile)
Makes a set of Fasta objects of each piece of assembly that
corresponds to the reference sequeunce.'''
sequences = {}

for contig in sorted(nucmer_hits):
for hit in nucmer_hits[contig]:
Expand All @@ -168,9 +167,9 @@ def _write_assembled_reference_sequences(nucmer_hits, ref_sequence, assembly, ou
if hit.hit_length_ref == hit.ref_length:
fa.id += '.complete'

print(fa, file=f)
sequences[fa.id] = fa

pyfastaq.utils.close(f)
return sequences


@staticmethod
Expand Down Expand Up @@ -287,4 +286,4 @@ def run(self):
self._run_nucmer()
self.nucmer_hits = self._parse_nucmer_coords_file(self.nucmer_coords_file, self.ref_sequence.id)
self.percent_identities = self._nucmer_hits_to_percent_identity(self.nucmer_hits)
self._write_assembled_reference_sequences(self.nucmer_hits, self.ref_sequence, self.assembly_sequences, self.assembled_ref_seqs_file)
self.assembled_reference_sequences = self._get_assembled_reference_sequences(self.nucmer_hits, self.ref_sequence, self.assembly_sequences)
59 changes: 44 additions & 15 deletions ariba/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ def __init__(self,
refdata,
total_reads,
total_reads_bases,
read_store=None,
reference_names=None,
logfile=None,
assembly_coverage=50,
assembly_kmer=21,
Expand All @@ -42,13 +44,19 @@ def __init__(self,
extern_progs=None,
random_seed=42,
):

self.root_dir = os.path.abspath(root_dir)
if not os.path.exists(self.root_dir):
raise Error('Directory ' + self.root_dir + ' not found. Cannot continue')

self.name = name
self.read_store = read_store
self.refdata = refdata
self.name = name
self.reference_fa = os.path.join(self.root_dir, 'reference.fa')
self.reference_names = reference_names
self.all_reads1 = os.path.join(self.root_dir, 'reads_1.fq')
self.all_reads2 = os.path.join(self.root_dir, 'reads_2.fq')
self.references_fa = os.path.join(self.root_dir, 'references.fa')

if os.path.exists(self.root_dir):
self._input_files_exist()

self.total_reads = total_reads
self.total_reads_bases = total_reads_bases
self.logfile = logfile
Expand All @@ -60,17 +68,8 @@ def __init__(self,
self.reads_insert = reads_insert
self.spades_other_options = spades_other_options

self.all_reads1 = os.path.join(self.root_dir, 'reads_1.fq')
self.all_reads2 = os.path.join(self.root_dir, 'reads_2.fq')
self.reads_for_assembly1 = os.path.join(self.root_dir, 'reads_for_assembly_1.fq')
self.reads_for_assembly2 = os.path.join(self.root_dir, 'reads_for_assembly_2.fq')
self.reference_fa = os.path.join(self.root_dir, 'reference.fa')
self.references_fa = os.path.join(self.root_dir, 'references.fa')

for fname in [self.all_reads1, self.all_reads2, self.references_fa]:
if not os.path.exists(fname):
raise Error('File ' + fname + ' not found. Cannot continue')


self.ref_sequence = None

Expand Down Expand Up @@ -124,6 +123,28 @@ def __init__(self,
self.random_seed = random_seed


def _input_files_exist(self):
assert self.read_store is None
if not (os.path.exists(self.all_reads1) and os.path.exists(self.all_reads2)):
raise Error('Error making cluster. Reads files not found')
if not os.path.exists(self.references_fa):
raise Error('Error making cluster. References fasta not found')


def _set_up_input_files(self):
if os.path.exists(self.root_dir):
self._input_files_exist()
else:
assert self.read_store is not None
assert self.reference_names is not None
try:
os.mkdir(self.root_dir)
except:
raise Error('Error making directory ' + seolf.root_dir)
self.read_store.get_reads(self.name, self.all_reads1, self.all_reads2)
self.refdata.write_seqs_to_fasta(self.references_fa, self.reference_names)


def _clean_file(self, filename):
if self.clean:
print('Deleting file', filename, file=self.log_fh)
Expand Down Expand Up @@ -211,6 +232,13 @@ def _make_reads_for_assembly(number_of_wanted_reads, total_reads, reads_in1, rea
def run(self):
if self.logfile is None:
self.logfile = os.path.join(self.root_dir, 'log.txt')

self._set_up_input_files()

for fname in [self.all_reads1, self.all_reads2, self.references_fa]:
if not os.path.exists(fname):
raise Error('File ' + fname + ' not found. Cannot continue')

self.log_fh = pyfastaq.utils.open_file_write(self.logfile)
print('{:_^79}'.format(' LOG FILE START ' + self.name + ' '), file=self.log_fh, flush=True)

Expand Down Expand Up @@ -254,7 +282,8 @@ def run(self):
sspace_k=self.sspace_k,
sspace_sd=self.sspace_sd,
reads_insert=self.reads_insert,
extern_progs=self.extern_progs
extern_progs=self.extern_progs,
clean=self.clean
)

self.assembly.run()
Expand Down
73 changes: 43 additions & 30 deletions ariba/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,23 @@
import multiprocessing
import pysam
import pyfastaq
from ariba import cluster, common, mapping, histogram, report, report_filter, reference_data
from ariba import cluster, common, mapping, histogram, read_store, report, report_filter, reference_data

class Error (Exception): pass


def _run_cluster(obj, verbose):
def _run_cluster(obj, verbose, clean):
if verbose:
print('Start running cluster', obj.name, 'in directory', obj.root_dir, flush=True)
obj.run()
if verbose:
print('Finished running cluster', obj.name, 'in directory', obj.root_dir, flush=True)

if clean:
if verbose:
print('Deleting cluster dir', obj.root_dir, flush=True)
shutil.rmtree(obj.root_dir)

return obj


Expand Down Expand Up @@ -76,7 +82,7 @@ def __init__(self,
self.report_file_all_tsv = os.path.join(self.outdir, 'report.all.tsv')
self.report_file_all_xls = os.path.join(self.outdir, 'report.all.xls')
self.report_file_filtered_prefix = os.path.join(self.outdir, 'report')
self.catted_assembled_seqs_fasta = os.path.join(self.outdir, 'assembled_seqs.fa')
self.catted_assembled_seqs_fasta = os.path.join(self.outdir, 'assembled_seqs.fa.gz')
self.threads = threads
self.verbose = verbose

Expand Down Expand Up @@ -171,9 +177,10 @@ def _map_reads_to_clustered_genes(self):


def _bam_to_clusters_reads(self):
'''Sets up Cluster directories (one for each gene that has reads that mapped to it), writes reads fwd and rev files. Also gathers histogram data of insert size'''
filehandles_1 = {} # gene name -> filehandle of fwd_reads
filehandles_2 = {} # gene name -> filehandle of rev_reads
'''Sets up ReadStore of reads for all the clusters. Also gathers histogram data of insert size'''
reads_file_for_read_store = os.path.join(self.outdir, 'reads')
f_out = pyfastaq.utils.open_file_write(reads_file_for_read_store)

sam_reader = pysam.Samfile(self.bam, "rb")
sam1 = None
self.proper_pairs = 0
Expand Down Expand Up @@ -201,36 +208,39 @@ def _bam_to_clusters_reads(self):

for ref in ref_seqs:
if ref not in self.cluster_to_dir:
assert ref not in filehandles_1
assert ref not in filehandles_2

new_dir = os.path.join(self.clusters_outdir, ref)
try:
os.mkdir(new_dir)
except:
raise Error('Error mkdir ' + new_dir)

self.cluster_to_dir[ref] = new_dir
filehandles_1[ref] = pyfastaq.utils.open_file_write(os.path.join(new_dir, 'reads_1.fq'))
filehandles_2[ref] = pyfastaq.utils.open_file_write(os.path.join(new_dir, 'reads_2.fq'))
if self.verbose:
print('New cluster with reads that hit:', ref, flush=True)

print(read1, file=filehandles_1[ref])
print(read2, file=filehandles_2[ref])
self.cluster_read_counts[ref] = self.cluster_read_counts.get(ref, 0) + 2
self.cluster_base_counts[ref] = self.cluster_base_counts.get(ref, 0) + len(read1) + len(read2)
print(ref, self.cluster_read_counts[ref] - 1, read1.seq, read1.qual, sep='\t', file=f_out)
print(ref, self.cluster_read_counts[ref], read2.seq, read2.qual, sep='\t', file=f_out)

sam1 = None

for ref in filehandles_1:
pyfastaq.utils.close(filehandles_1[ref])
pyfastaq.utils.close(filehandles_2[ref])
pyfastaq.utils.close(f_out)

if len(self.cluster_read_counts):
if self.verbose:
filehandle = sys.stdout
else:
filehandle = None

self.read_store = read_store.ReadStore(
reads_file_for_read_store,
os.path.join(self.outdir, 'read_store'),
log_fh=filehandle
)

os.unlink(reads_file_for_read_store)

if self.verbose:
print('Found', self.proper_pairs, 'proper read pairs')
print('Total clusters to perform local assemblies:', len(self.cluster_to_dir), flush=True)


def _set_insert_size_data(self):
if len(self.insert_hist) == 0:
return False
Expand Down Expand Up @@ -266,7 +276,7 @@ def _init_and_run_clusters(self):
if self.verbose:
print('Constructing cluster', seq_name + '.', counter, 'of', str(len(self.cluster_to_dir)))
new_dir = self.cluster_to_dir[seq_name]
self.refdata.write_seqs_to_fasta(os.path.join(new_dir, 'references.fa'), self.cluster_ids[seq_type][seq_name])
#self.refdata.write_seqs_to_fasta(os.path.join(new_dir, 'references.fa'), self.cluster_ids[seq_type][seq_name])
self.log_files.append(os.path.join(self.logs_dir, seq_name + '.log'))

cluster_list.append(cluster.Cluster(
Expand All @@ -275,6 +285,8 @@ def _init_and_run_clusters(self):
self.refdata,
self.cluster_read_counts[seq_name],
self.cluster_base_counts[seq_name],
read_store=self.read_store,
reference_names=self.cluster_ids[seq_type][seq_name],
logfile=self.log_files[-1],
assembly_coverage=self.assembly_coverage,
assembly_kmer=self.assembly_kmer,
Expand Down Expand Up @@ -303,10 +315,10 @@ def _init_and_run_clusters(self):

if self.threads > 1:
pool = multiprocessing.Pool(self.threads)
cluster_list = pool.starmap(_run_cluster, zip(cluster_list, itertools.repeat(self.verbose)))
cluster_list = pool.starmap(_run_cluster, zip(cluster_list, itertools.repeat(self.verbose), itertools.repeat(self.clean)))
else:
for c in cluster_list:
_run_cluster(c, self.verbose)
_run_cluster(c, self.verbose, self.clean)

self.clusters = {c.name: c for c in cluster_list}

Expand Down Expand Up @@ -343,24 +355,25 @@ def _write_catted_assembled_seqs_fasta(self, outfile):

for gene in sorted(self.clusters):
try:
cluster_fasta = self.clusters[gene].assembly_compare.assembled_ref_seqs_file
seq_dict = self.clusters[gene].assembly_compare.assembled_reference_sequences
except:
continue
if os.path.exists(cluster_fasta):
file_reader = pyfastaq.sequences.file_reader(cluster_fasta)
for seq in file_reader:
print(seq, file=f)

for seq_name in sorted(seq_dict):
print(seq_dict[seq_name], file=f)

pyfastaq.utils.close(f)


def _clean(self):
if self.clean:
if self.verbose:
print('Deleting clusters direcory', self.clusters_outdir)
print('Deleting clusters directory', self.clusters_outdir)
shutil.rmtree(self.clusters_outdir)
print('Deleting Logs directory', self.logs_dir)
shutil.rmtree(self.logs_dir)
print('Deleting reads store files', self.read_store.outfile + '[.tbi]')
self.read_store.clean()
else:
if self.verbose:
print('Not deleting anything because --noclean used')
Expand Down
60 changes: 60 additions & 0 deletions ariba/read_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import pyfastaq
import os
import pysam
from ariba import common

class Error (Exception): pass

class ReadStore:
def __init__(self, infile, outprefix, log_fh=None):
assert infile != outprefix
self.infile = os.path.abspath(infile)
self.outprefix = os.path.abspath(outprefix)
self.outfile = os.path.abspath(outprefix) + '.gz'

if not os.path.exists(self.infile):
raise Error('File not found ' + self.infile + '. Cannot continue')

self._sort_file(self.infile, self.outprefix, log_fh)
self._compress_and_index_file(self.outprefix, log_fh)
os.unlink(self.outprefix)


@staticmethod
def _sort_file(infile, outfile, log_fh=None):
cmd = 'sort -k1,1 -k 2,2n ' + infile + ' > ' + outfile
verbose = log_fh is not None
common.syscall(cmd, verbose=verbose, verbose_filehandle=log_fh)


@staticmethod
def _compress_and_index_file(infile, log_fh=None):
if log_fh is not None:
print('Compressing file', infile, file=log_fh, flush=True)
pysam.tabix_compress(infile, infile + '.gz')
pysam.tabix_index(infile + '.gz', seq_col=0, start_col=1, end_col=1)


def get_reads(self, cluster_name, out1, out2, log_fh=None):
if log_fh is not None:
print('Getting reads for', cluster_name, 'from', self.outfile, file=log_fh)
tabix_file = pysam.TabixFile(self.outfile)
f_out1 = pyfastaq.utils.open_file_write(out1)
f_out2 = pyfastaq.utils.open_file_write(out2)

for line in tabix_file.fetch(reference=cluster_name):
cluster, number, seq, qual = line.rstrip().split()
number = int(number)
if number % 2 == 0:
print('@' + str(number - 1) + '/2', seq, '+', qual, sep='\n', file=f_out2)
else:
print('@' + str(number) + '/1', seq, '+', qual, sep='\n', file=f_out1)

pyfastaq.utils.close(f_out1)
pyfastaq.utils.close(f_out2)
if log_fh is not None:
print('Finished getting reads for', cluster_name, 'from', self.outfile, file=log_fh)

def clean(self):
os.unlink(self.outfile)
os.unlink(self.outfile + '.tbi')
Loading

0 comments on commit 4588cba

Please sign in to comment.