Skip to content

Commit

Permalink
I/O improvement
Browse files Browse the repository at this point in the history
- auto_open defaults to stdin/stdout when path evaluates to False.
resolved #48

- auto_open defaults to stdin/stdout when the path is "-"

- if the stream is optional, it's controlled by the module itself

Warning: this might be unstable because not all the usecases were
tested!
  • Loading branch information
agalitsyna committed Apr 8, 2022
1 parent 4feda3a commit b002dbe
Show file tree
Hide file tree
Showing 13 changed files with 89 additions and 121 deletions.
11 changes: 9 additions & 2 deletions pairtools/_fileio.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import shutil
import pipes
import subprocess

import sys

class ParseError(Exception):
pass
Expand All @@ -23,6 +23,14 @@ def auto_open(path, mode, nproc=1, command=None):
.gz - pbgzip if available, otherwise bgzip
.lz4 - lz4c (does not support parallel execution)
'''

# Empty filepath or False provided
if not path or path=="-":
if mode=="r":
return sys.stdin
if mode=="w":
return sys.stdout

if command:
if mode =='w':
t = pipes.Template()
Expand Down Expand Up @@ -135,7 +143,6 @@ def auto_open(path, mode, nproc=1, command=None):
return open(path, mode)



class PipedIO:
def __init__(self, file_or_path, command, mode='r'):
"""
Expand Down
8 changes: 3 additions & 5 deletions pairtools/pairtools_markasdup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,12 @@ def markasdup(pairsam_path, output, **kwargs):
markasdup_py(pairsam_path, output, **kwargs)

def markasdup_py(pairsam_path, output, **kwargs):
instream = (_fileio.auto_open(pairsam_path, mode='r',
instream = _fileio.auto_open(pairsam_path, mode='r',
nproc=kwargs.get('nproc_in'),
command=kwargs.get('cmd_in', None))
if pairsam_path else sys.stdin)
outstream = (_fileio.auto_open(output, mode='w',
command=kwargs.get('cmd_in', None))
outstream = _fileio.auto_open(output, mode='w',
nproc=kwargs.get('nproc_out'),
command=kwargs.get('cmd_out', None))
if output else sys.stdout)

header, body_stream = _headerops.get_header(instream)
header = _headerops.append_new_pg(header, ID=UTIL_NAME, PN=UTIL_NAME)
Expand Down
5 changes: 2 additions & 3 deletions pairtools/pairtools_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,9 @@ def merge_py(pairs_path, output, max_nmerge, tmpdir, memory, compress_program, n
if len(paths)==0:
raise ValueError(f"No input paths: {pairs_path}")

outstream = (_fileio.auto_open(output, mode='w',
outstream = _fileio.auto_open(output, mode='w',
nproc=kwargs.get('nproc_out'),
command=kwargs.get('cmd_out', None))
if output else sys.stdout)
command=kwargs.get('cmd_out', None))

# if there is only one input, bypass merging and do not modify the header
if len(paths) == 1:
Expand Down
47 changes: 15 additions & 32 deletions pairtools/pairtools_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,36 +231,19 @@ def parse_py(
input_sam = AlignmentFilePairtoolized("-", "r", threads=kwargs.get('nproc_in'))

### Set up output streams
outstream = (
_fileio.auto_open(
output,
mode="w",
nproc=kwargs.get("nproc_out"),
command=kwargs.get("cmd_out", None),
)
if output
else sys.stdout
)
out_alignments_stream = (
_fileio.auto_open(
output_parsed_alignments,
mode="w",
nproc=kwargs.get("nproc_out"),
command=kwargs.get("cmd_out", None),
)
if output_parsed_alignments
else None
)
out_stats_stream = (
_fileio.auto_open(
output_stats,
mode="w",
nproc=kwargs.get("nproc_out"),
command=kwargs.get("cmd_out", None),
)
if output_stats
else None
)
outstream = _fileio.auto_open(output, mode="w",
nproc=kwargs.get("nproc_out"),
command=kwargs.get("cmd_out", None))

out_alignments_stream, out_stats_stream = None, None
if output_parsed_alignments:
out_alignments_stream = _fileio.auto_open(output_parsed_alignments, mode="w",
nproc=kwargs.get("nproc_out"),
command=kwargs.get("cmd_out", None))
if output_stats:
out_stats_stream = _fileio.auto_open(output_stats, mode="w",
nproc=kwargs.get("nproc_out"),
command=kwargs.get("cmd_out", None))

if out_alignments_stream:
out_alignments_stream.write(
Expand Down Expand Up @@ -335,9 +318,9 @@ def parse_py(
if outstream != sys.stdout:
outstream.close()
# close optional output streams if needed:
if out_alignments_stream:
if out_alignments_stream and out_alignments_stream != sys.stdout:
out_alignments_stream.close()
if out_stats_stream:
if out_stats_stream and out_stats_stream != sys.stdout:
out_stats_stream.close()


Expand Down
10 changes: 4 additions & 6 deletions pairtools/pairtools_phase.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,12 @@ def phase_py(
**kwargs
):

instream = (_fileio.auto_open(pairs_path, mode='r',
instream = _fileio.auto_open(pairs_path, mode='r',
nproc=kwargs.get('nproc_in'),
command=kwargs.get('cmd_in', None))
if pairs_path else sys.stdin)
outstream = (_fileio.auto_open(output, mode='w',
command=kwargs.get('cmd_in', None))
outstream = _fileio.auto_open(output, mode='w',
nproc=kwargs.get('nproc_out'),
command=kwargs.get('cmd_out', None))
if output else sys.stdout)
command=kwargs.get('cmd_out', None))

header, body_stream = _headerops.get_header(instream)
header = _headerops.append_new_pg(header, ID=UTIL_NAME, PN=UTIL_NAME)
Expand Down
6 changes: 2 additions & 4 deletions pairtools/pairtools_restrict.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,13 @@ def restrict(pairs_path, frags, output, **kwargs):
restrict_py(pairs_path, frags, output, **kwargs)

def restrict_py(pairs_path, frags, output, **kwargs):
instream = (_fileio.auto_open(pairs_path, mode='r',
instream = _fileio.auto_open(pairs_path, mode='r',
nproc=kwargs.get('nproc_in'),
command=kwargs.get('cmd_in', None))
if pairs_path else sys.stdin)

outstream = (_fileio.auto_open(output, mode='w',
outstream = _fileio.auto_open(output, mode='w',
nproc=kwargs.get('nproc_out'),
command=kwargs.get('cmd_out', None))
if output else sys.stdout)


header, body_stream = _headerops.get_header(instream)
Expand Down
10 changes: 4 additions & 6 deletions pairtools/pairtools_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,12 @@ def sample_py(
**kwargs
):

instream = (_fileio.auto_open(pairs_path, mode='r',
instream = _fileio.auto_open(pairs_path, mode='r',
nproc=kwargs.get('nproc_in'),
command=kwargs.get('cmd_in', None))
if pairs_path else sys.stdin)
outstream = (_fileio.auto_open(output, mode='w',
command=kwargs.get('cmd_in', None))
outstream = _fileio.auto_open(output, mode='w',
nproc=kwargs.get('nproc_out'),
command=kwargs.get('cmd_out', None))
if output else sys.stdout)
command=kwargs.get('cmd_out', None))

header, body_stream = _headerops.get_header(instream)
header = _headerops.append_new_pg(header, ID=UTIL_NAME, PN=UTIL_NAME)
Expand Down
43 changes: 23 additions & 20 deletions pairtools/pairtools_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@
' If the path ends with .gz or .lz4, the output is bgzip-/lz4c-compressed.'
' By default, such pairs are dropped.')

@click.option(
"--send-comments-to",
type=click.Choice(['selected', 'rest', 'both', 'none']),
default="both",
help="Which of the outputs should receive header and comment lines",
show_default=True)
# Deprecated option to be removed in the future:
# @click.option(
# "--send-comments-to",
# type=click.Choice(['selected', 'rest', 'both', 'none']),
# default="both",
# help="Which of the outputs should receive header and comment lines",
# show_default=True)

@click.option(
"--chrom-subset",
Expand Down Expand Up @@ -73,7 +74,7 @@
@common_io_options

def select(
condition, pairs_path, output, output_rest, send_comments_to,
condition, pairs_path, output, output_rest, #send_comments_to,
chrom_subset, startup_code, type_cast,
**kwargs
):
Expand Down Expand Up @@ -115,29 +116,31 @@ def select(
'''
select_py(
condition, pairs_path, output, output_rest, send_comments_to,
condition, pairs_path, output, output_rest, #send_comments_to,
chrom_subset, startup_code, type_cast,
**kwargs
)

def select_py(
condition, pairs_path, output, output_rest, send_comments_to, chrom_subset,
condition, pairs_path, output, output_rest, #send_comments_to,
chrom_subset,
startup_code, type_cast,
**kwargs
):

instream = (_fileio.auto_open(pairs_path, mode='r',
instream = _fileio.auto_open(pairs_path, mode='r',
nproc=kwargs.get('nproc_in'),
command=kwargs.get('cmd_in', None))
if pairs_path else sys.stdin)
outstream = (_fileio.auto_open(output, mode='w',
command=kwargs.get('cmd_in', None))
outstream = _fileio.auto_open(output, mode='w',
nproc=kwargs.get('nproc_out'),
command=kwargs.get('cmd_out', None))
if output else sys.stdout)
outstream_rest = (_fileio.auto_open(output_rest, mode='w',
command=kwargs.get('cmd_out', None))

# Optional output created only if requested:
outstream_rest = None
if output_rest:
outstream_rest = _fileio.auto_open(output_rest, mode='w',
nproc=kwargs.get('nproc_out'),
command=kwargs.get('cmd_out', None))
if output_rest else None)
command=kwargs.get('cmd_out', None))

wildcard_library = {}
def wildcard_match(x, wildcard):
Expand Down Expand Up @@ -176,7 +179,7 @@ def regex_match(x, regex):
if new_chroms is not None:
header = _headerops.subset_chroms_in_pairsheader(header, new_chroms)
outstream.writelines((l+'\n' for l in header))
if outstream_rest:
if output_rest:
outstream_rest.writelines((l+'\n' for l in header))

column_names = _headerops.extract_column_names(header)
Expand Down Expand Up @@ -213,7 +216,7 @@ def regex_match(x, regex):
if outstream != sys.stdout:
outstream.close()

if outstream_rest:
if output_rest and outstream_rest != sys.stdout:
outstream_rest.close()

if __name__ == '__main__':
Expand Down
10 changes: 4 additions & 6 deletions pairtools/pairtools_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,12 @@ def sort(pairs_path, output, nproc, tmpdir, memory, compress_program, **kwargs):

def sort_py(pairs_path, output, nproc, tmpdir, memory, compress_program, **kwargs):

instream = (_fileio.auto_open(pairs_path, mode='r',
instream = _fileio.auto_open(pairs_path, mode='r',
nproc=kwargs.get('nproc_in'),
command=kwargs.get('cmd_in', None))
if pairs_path else sys.stdin)
outstream = (_fileio.auto_open(output, mode='w',
command=kwargs.get('cmd_in', None))
outstream = _fileio.auto_open(output, mode='w',
nproc=kwargs.get('nproc_out'),
command=kwargs.get('cmd_out', None))
if output else sys.stdout)
command=kwargs.get('cmd_out', None))

header, body_stream = _headerops.get_header(instream)
header = _headerops.append_new_pg(header, ID=UTIL_NAME, PN=UTIL_NAME)
Expand Down
32 changes: 16 additions & 16 deletions pairtools/pairtools_split.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,27 +47,27 @@ def split(pairsam_path, output_pairs, output_sam, **kwargs):


def split_py(pairsam_path, output_pairs, output_sam, **kwargs):
instream = (_fileio.auto_open(pairsam_path, mode='r',
instream = _fileio.auto_open(pairsam_path, mode='r',
nproc=kwargs.get('nproc_in'),
command=kwargs.get('cmd_in', None))
if pairsam_path else sys.stdin)
command=kwargs.get('cmd_in', None))

# Output streams
if (not output_pairs) and (not output_sam):
raise ValueError('At least one output (pairs and/or sam) must be specified!')
if (output_pairs == '-') and (output_sam == '-'):
raise ValueError('Only one output (pairs or sam) can be printed in stdout!')

outstream_pairs = (sys.stdout if (output_pairs=='-')
else (_fileio.auto_open(output_pairs, mode='w',
outstream_pairs = None
outstream_sam = None

if output_pairs:
outstream_pairs = _fileio.auto_open(output_pairs, mode='w',
nproc=kwargs.get('nproc_out'),
command=kwargs.get('cmd_out', None))
if output_pairs else None))
outstream_sam = (sys.stdout if (output_sam=='-')
else (_fileio.auto_open(output_sam, mode='w',
command=kwargs.get('cmd_out', None))
if output_sam:
outstream_sam = _fileio.auto_open(output_sam, mode='w',
nproc=kwargs.get('nproc_out'),
command=kwargs.get('cmd_out', None))
if output_sam else None))

header, body_stream = _headerops.get_header(instream)
header = _headerops.append_new_pg(header, ID=UTIL_NAME, PN=UTIL_NAME)
Expand All @@ -93,9 +93,9 @@ def split_py(pairsam_path, output_pairs, output_sam, **kwargs):
sam2col = _pairsam_format.COL_SAM2
has_sams = True

if outstream_pairs:
if output_pairs:
outstream_pairs.writelines((l+'\n' for l in header))
if outstream_sam:
if output_sam:
outstream_sam.writelines(
(l[11:].strip()+'\n' for l in header if l.startswith('#samheader:')))

Expand All @@ -112,22 +112,22 @@ def split_py(pairsam_path, output_pairs, output_sam, **kwargs):
sam1 = cols.pop(sam1col)
sam2 = cols.pop(sam2col)

if outstream_pairs:
if output_pairs:
# hard-coded tab separator to follow the DCIC pairs standard
outstream_pairs.write('\t'.join(cols))
outstream_pairs.write('\n')

if (outstream_sam and has_sams):
if (output_sam and has_sams):
for col in (sam1, sam2):
if col != '.':
for sam_entry in col.split(_pairsam_format.INTER_SAM_SEP):
outstream_sam.write(sam_entry.replace(_pairsam_format.SAM_SEP,'\t'))
outstream_sam.write('\n')

if outstream_pairs and outstream_pairs != sys.stdout:
if output_pairs and outstream_pairs != sys.stdout:
outstream_pairs.close()

if outstream_sam and outstream_sam != sys.stdout:
if output_sam and outstream_sam != sys.stdout:
outstream_sam.close()


Expand Down
26 changes: 6 additions & 20 deletions pairtools/pairtools_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,12 @@ def stats_py(input_path, output, merge, **kwargs):
do_merge(output, input_path, **kwargs)
return

instream = (
_fileio.auto_open(
input_path[0],
mode="r",
nproc=kwargs.get("nproc_in"),
command=kwargs.get("cmd_in", None),
)
if input_path
else sys.stdin
)
outstream = (
_fileio.auto_open(
output,
mode="w",
nproc=kwargs.get("nproc_out"),
command=kwargs.get("cmd_out", None),
)
if output
else sys.stdout
)
instream = _fileio.auto_open(input_path[0], mode="r",
nproc=kwargs.get("nproc_in"),
command=kwargs.get("cmd_in", None))
outstream = _fileio.auto_open(output, mode="w",
nproc=kwargs.get("nproc_out"),
command=kwargs.get("cmd_out", None))

header, body_stream = _headerops.get_header(instream)
cols = _headerops.extract_column_names(header)
Expand Down
Loading

0 comments on commit b002dbe

Please sign in to comment.