diff --git a/pairtools/cli/parse.py b/pairtools/cli/parse.py index e1eb674..e954195 100644 --- a/pairtools/cli/parse.py +++ b/pairtools/cli/parse.py @@ -123,7 +123,7 @@ @click.option( "--walks-policy", type=click.Choice(["mask", "5any", "5unique", "3any", "3unique", "all"]), - default="mask", + default="5unique", help="the policy for reporting unrescuable walks (reads containing more" " than one alignment on one or both sides, that can not be explained by a" " single ligation between two mappable DNA fragments)." diff --git a/pairtools/lib/pairsam_format.py b/pairtools/lib/pairsam_format.py index a472e9f..7383bab 100644 --- a/pairtools/lib/pairsam_format.py +++ b/pairtools/lib/pairsam_format.py @@ -100,6 +100,10 @@ "dist_to_3", "seq", "mismatches", # Format: "{ref_letter}:{mut_letter}:{phred}:{ref_position}:{read_position}" + "read_side", + "algn_idx", + "same_side_algn_count" + ] DTYPES_EXTRA_COLUMNS = { @@ -115,4 +119,7 @@ "dist_to_3": int, "seq": str, "mismatches": str, + "read_side": int, + "algn_idx": int, + "same_side_algn_count": int, } diff --git a/pairtools/lib/parse.py b/pairtools/lib/parse.py index a144fa4..33fb8a1 100644 --- a/pairtools/lib/parse.py +++ b/pairtools/lib/parse.py @@ -38,6 +38,35 @@ from .parse_pysam import get_mismatches_c +def group_alignments_by_side(sams): + return [sam for sam in sams if sam.is_read1], [sam for sam in sams if sam.is_read2] + + +def read_alignment_block(instream, sort=True, group_by_side=True, return_readID=True): + sams = [] + + prev_readID = None + while True: + sam_entry = next(instream, None) + readID = sam_entry.query_name if sam_entry else None + + # Read is fully populated, then parse and write: + if not (sam_entry) or ((readID != prev_readID) and prev_readID): + if sort: + sams = sorted(sams, key=lambda a: (a.is_read2, a.query_alignment_start)) + out = sams if not group_by_side else group_alignments_by_side(sams) + out = out if not return_readID else (prev_readID, out) + yield out + + sams.clear() + + if sam_entry is None: + break + else: + sams.append(sam_entry) + prev_readID = readID + + def streaming_classify( instream, outstream, chromosomes, out_alignments_stream, out_stat, **kwargs ): @@ -93,118 +122,94 @@ def streaming_classify( if readID_transform is not None: readID_transform = compile(readID_transform, "", "eval") - ### Prepare for iterative parsing of the input stream - # Each read is represented by readID, sams1 (left alignments) and sams2 (right alignments) - readID = "" # Read id of the current read - sams1 = [] # Placeholder for the left alignments - sams2 = [] # Placeholder for the right alignments - # Each read is comprised of multiple alignments, or sam entries: - sam_entry = "" # Placeholder for each aligned segment - # Keep the id of the previous sam entry to detect when the read is completely populated: - prev_readID = "" # Placeholder for the read id - ### Iterate over input pysam: instream = iter(instream) - while sam_entry is not None: - sam_entry = next(instream, None) - - readID = sam_entry.query_name if sam_entry else None - if readID_transform is not None and readID is not None: + for (readID, (sams1, sams2)) in read_alignment_block(instream, sort=True, group_by_side=True, return_readID=True): + if readID_transform is not None: readID = eval(readID_transform) - # Read is fully populated, then parse and write: - if not (sam_entry) or ((readID != prev_readID) and prev_readID): - - ### Parse - if not parse2: # regular parser: - pairstream, all_algns1, all_algns2 = parse_read( - sams1, - sams2, - min_mapq=kwargs["min_mapq"], - max_molecule_size=kwargs["max_molecule_size"], - max_inter_align_gap=kwargs["max_inter_align_gap"], - walks_policy=kwargs["walks_policy"], - sam_tags=sam_tags, - store_seq=store_seq, - report_mismatches=True if "mismatches" in add_columns else False, - ) - else: # parse2 parser: - pairstream, all_algns1, all_algns2 = parse2_read( - sams1, - sams2, - min_mapq=kwargs["min_mapq"], - max_inter_align_gap=kwargs["max_inter_align_gap"], - max_insert_size=kwargs.get("max_insert_size", 500), - single_end=kwargs["single_end"], - report_position=kwargs["report_position"], - report_orientation=kwargs["report_orientation"], - sam_tags=sam_tags, - dedup_max_mismatch=kwargs["dedup_max_mismatch"], - store_seq=store_seq, - expand=kwargs["expand"], - max_expansion_depth=kwargs["max_expansion_depth"], - report_mismatches=True if "mismatches" in add_columns else False, - ) - - ### Write: - read_has_alignments = False - for (algn1, algn2, pair_index) in pairstream: - read_has_alignments = True + ### Parse + if not parse2: # regular parser: + pairstream, all_algns1, all_algns2 = parse_read( + sams1, + sams2, + min_mapq=kwargs["min_mapq"], + max_molecule_size=kwargs["max_molecule_size"], + max_inter_align_gap=kwargs["max_inter_align_gap"], + walks_policy=kwargs["walks_policy"], + sam_tags=sam_tags, + store_seq=store_seq, + report_mismatches=True if "mismatches" in add_columns else False, + ) + else: # parse2 parser: + pairstream, all_algns1, all_algns2 = parse2_read( + sams1, + sams2, + min_mapq=kwargs["min_mapq"], + max_inter_align_gap=kwargs["max_inter_align_gap"], + max_insert_size=kwargs.get("max_insert_size", 500), + single_end=kwargs["single_end"], + report_position=kwargs["report_position"], + report_orientation=kwargs["report_orientation"], + sam_tags=sam_tags, + dedup_max_mismatch=kwargs["dedup_max_mismatch"], + store_seq=store_seq, + expand=kwargs["expand"], + max_expansion_depth=kwargs["max_expansion_depth"], + report_mismatches=True if "mismatches" in add_columns else False, + ) - # Alignment end defaults to 5' if report_alignment_end is unspecified: - if kwargs.get("report_alignment_end", "5") == "5": - algn1["pos"] = algn1["pos5"] - algn2["pos"] = algn2["pos5"] - else: - algn1["pos"] = algn1["pos3"] - algn2["pos"] = algn2["pos3"] - - if kwargs["flip"]: - flip_pair = not check_pair_order(algn1, algn2, chrom_enum) - if flip_pair: - algn1, algn2 = algn2, algn1 - sams1, sams2 = sams2, sams1 - - write_pairsam( - algn1, - algn2, - readID=prev_readID, - pair_index=pair_index, - sams1=sams1, - sams2=sams2, - out_file=outstream, - drop_readid=kwargs["drop_readid"], - drop_seq=kwargs["drop_seq"], - drop_sam=kwargs["drop_sam"], - add_pair_index=kwargs["add_pair_index"], - add_columns=add_columns, - ) + ### Write: + read_has_alignments = False + for (algn1, algn2, pair_index) in pairstream: + read_has_alignments = True - # add a pair to PairCounter for stats output: - if out_stat: - out_stat.add_pair( - algn1["chrom"], - int(algn1["pos"]), - algn1["strand"], - algn2["chrom"], - int(algn2["pos"]), - algn2["strand"], - algn1["type"] + algn2["type"], - ) + # Alignment end defaults to 5' if report_alignment_end is unspecified: + if kwargs.get("report_alignment_end", "5") == "5": + algn1["pos"] = algn1["pos5"] + algn2["pos"] = algn2["pos5"] + else: + algn1["pos"] = algn1["pos3"] + algn2["pos"] = algn2["pos3"] + + if kwargs["flip"]: + flip_pair = not check_pair_order(algn1, algn2, chrom_enum) + if flip_pair: + algn1, algn2 = algn2, algn1 + sams1, sams2 = sams2, sams1 + + write_pairsam( + algn1, + algn2, + readID=readID, + pair_index=pair_index, + sams1=sams1, + sams2=sams2, + out_file=outstream, + drop_readid=kwargs["drop_readid"], + drop_seq=kwargs["drop_seq"], + drop_sam=kwargs["drop_sam"], + add_pair_index=kwargs["add_pair_index"], + add_columns=add_columns, + ) - # write all alignments: - if out_alignments_stream and read_has_alignments: - write_all_algnments( - prev_readID, all_algns1, all_algns2, out_alignments_stream + # add a pair to PairCounter for stats output: + if out_stat: + out_stat.add_pair( + algn1["chrom"], + int(algn1["pos"]), + algn1["strand"], + algn2["chrom"], + int(algn2["pos"]), + algn2["strand"], + algn1["type"] + algn2["type"], ) - # Empty read after writing: - sams1.clear() - sams2.clear() - - if sam_entry is not None: - push_pysam(sam_entry, sams1, sams2) - prev_readID = readID + # write all alignments: + if out_alignments_stream and read_has_alignments: + write_all_algnments( + readID, all_algns1, all_algns2, out_alignments_stream + ) ############################ @@ -409,6 +414,66 @@ def flip_position(hic_algn): return hic_algn + +def _convert_gaps_into_alignments(sorted_algns, max_inter_align_gap): + """ + Inplace conversion of gaps longer than max_inter_align_gap into alignments + """ + if (len(sorted_algns) == 1) and (not sorted_algns[0]["is_mapped"]): + return + + last_5_pos = 0 + for i in range(len(sorted_algns)): + algn = sorted_algns[i] + if algn["dist_to_5"] - last_5_pos > max_inter_align_gap: + new_algn = empty_alignment() + new_algn["dist_to_5"] = last_5_pos + new_algn["algn_read_span"] = algn["dist_to_5"] - last_5_pos + new_algn["read_len"] = algn["read_len"] + new_algn["dist_to_3"] = new_algn["read_len"] - algn["dist_to_5"] + + last_5_pos = algn["dist_to_5"] + algn["algn_read_span"] + + sorted_algns.insert(i, new_algn) + i += 2 + else: + last_5_pos = max(last_5_pos, algn["dist_to_5"] + algn["algn_read_span"]) + i += 1 + + +def normalize_alignment_list(algns, side, sort_by="dist_to_5", max_inter_align_gap=None): + """ + Normalize the alignment list: insert empty alignments in gaps between alignments, + sort by distance to the 5' end, add read side, alignment index. + + Args: + algns (list): The list of alignments. + side (str): The side of the alignment. + sort_by (str, optional): The key to sort the alignments by. Defaults to "dist_to_5". + max_inter_align_gap (int, optional): The maximum allowed gap between alignments. Defaults to None. + + Returns: + list: The normalized alignment list. + + """ + if len(algns) == 0: + algns = [empty_alignment()] + + if sort_by: + algns = sorted(algns, key=lambda algn: algn[sort_by]) + + if max_inter_align_gap is not None: + _convert_gaps_into_alignments(algns, max_inter_align_gap) + + for i, algn in enumerate(algns): + algn["read_side"] = side + algn["algn_idx"] = i + algn["same_side_algn_count"] = len(algns) + + return algns + + + #################### ### Parsing utilities: #################### @@ -470,18 +535,9 @@ def parse_read( for sam in sams2 ] - if len(algns1) > 0: - algns1 = sorted(algns1, key=lambda algn: algn["dist_to_5"]) - else: - algns1 = [empty_alignment()] # Empty alignment dummy - if len(algns2) > 0: - algns2 = sorted(algns2, key=lambda algn: algn["dist_to_5"]) - else: - algns2 = [empty_alignment()] # Empty alignment dummy + algns1 = normalize_alignment_list(algns1, 1, sort_by="dist_to_5", max_inter_align_gap=max_inter_align_gap) + algns2 = normalize_alignment_list(algns2, 2, sort_by="dist_to_5", max_inter_align_gap=max_inter_align_gap) - if max_inter_align_gap is not None: - _convert_gaps_into_alignments(algns1, max_inter_align_gap) - _convert_gaps_into_alignments(algns2, max_inter_align_gap) # By default, assume each molecule is a single pair with single unconfirmed pair: hic_algn1 = algns1[0] @@ -559,14 +615,14 @@ def parse_read( hic_algn2 = algn break - # lower-case reported walks on the chimeric side + # DEPRECATED: lower-case reported walks on the chimeric side if walks_policy != "mask": if is_chimeric_1: hic_algn1 = dict(hic_algn1) - hic_algn1["type"] = hic_algn1["type"].lower() + # hic_algn1["type"] = hic_algn1["type"].lower() if is_chimeric_2: hic_algn2 = dict(hic_algn2) - hic_algn2["type"] = hic_algn2["type"].lower() + # hic_algn2["type"] = hic_algn2["type"].lower() else: raise ValueError(f"Walks policy {walks_policy} is not supported.") @@ -614,10 +670,8 @@ def parse2_read( ) for sam in sams2 # note sams2, that's how these reads are typically parsed ] - algns1 = sorted(algns1, key=lambda algn: algn["dist_to_5"]) - if max_inter_align_gap is not None: - _convert_gaps_into_alignments(algns1, max_inter_align_gap) - + + algns1 = normalize_alignment_list(algns1, 1, sort_by="dist_to_5", max_inter_align_gap=max_inter_align_gap) algns2 = [empty_alignment()] # Empty alignment dummy if len(algns1) > 1: @@ -673,20 +727,8 @@ def parse2_read( for sam in sams2 ] - # Sort alignments by the distance to the 5'-end: - if len(algns1) > 0: - algns1 = sorted(algns1, key=lambda algn: algn["dist_to_5"]) - else: - algns1 = [empty_alignment()] # Empty alignment dummy - if len(algns2) > 0: - algns2 = sorted(algns2, key=lambda algn: algn["dist_to_5"]) - else: - algns2 = [empty_alignment()] # Empty alignment dummy - - # Convert alignment gaps to alignments: - if max_inter_align_gap is not None: - _convert_gaps_into_alignments(algns1, max_inter_align_gap) - _convert_gaps_into_alignments(algns2, max_inter_align_gap) + algns1 = normalize_alignment_list(algns1, 1, sort_by="dist_to_5", max_inter_align_gap=max_inter_align_gap) + algns2 = normalize_alignment_list(algns2, 2, sort_by="dist_to_5", max_inter_align_gap=max_inter_align_gap) is_chimeric_1 = len(algns1) > 1 is_chimeric_2 = len(algns2) > 1 @@ -824,32 +866,6 @@ def rescue_walk(algns1, algns2, max_molecule_size): return None -def _convert_gaps_into_alignments(sorted_algns, max_inter_align_gap): - """ - Inplace conversion of gaps longer than max_inter_align_gap into alignments - """ - if (len(sorted_algns) == 1) and (not sorted_algns[0]["is_mapped"]): - return - - last_5_pos = 0 - for i in range(len(sorted_algns)): - algn = sorted_algns[i] - if algn["dist_to_5"] - last_5_pos > max_inter_align_gap: - new_algn = empty_alignment() - new_algn["dist_to_5"] = last_5_pos - new_algn["algn_read_span"] = algn["dist_to_5"] - last_5_pos - new_algn["read_len"] = algn["read_len"] - new_algn["dist_to_3"] = new_algn["read_len"] - algn["dist_to_5"] - - last_5_pos = algn["dist_to_5"] + algn["algn_read_span"] - - sorted_algns.insert(i, new_algn) - i += 2 - else: - last_5_pos = max(last_5_pos, algn["dist_to_5"] + algn["algn_read_span"]) - i += 1 - - def parse_complex_walk( algns1, algns2,