diff --git a/fgpyo/sam/__init__.py b/fgpyo/sam/__init__.py index 8fa2c19..aa24dbb 100644 --- a/fgpyo/sam/__init__.py +++ b/fgpyo/sam/__init__.py @@ -139,31 +139,45 @@ ## Examples of parsing the SA tag and individual supplementary alignments ```python - >>> from fgpyo.sam import SupplementaryAlignment - >>> sup = SupplementaryAlignment.parse("chr1,123,+,50S100M,60,0") - >>> sup.reference_name - 'chr1 - >>> sup.nm - 0 - >>> from typing import List - >>> sa_tag = "chr1,123,+,50S100M,60,0;chr2,456,-,75S75M,60,1" - >>> sups: List[SupplementaryAlignment] = SupplementaryAlignment.parse_sa_tag(tag=sa_tag) - >>> len(sups) - 2 - >>> [str(sup.cigar) for sup in sups] - ['50S100M', '75S75M'] +>>> from fgpyo.sam import AuxAlignment +>>> supp = AuxAlignment.from_tag_value("SA", "chr1,123,+,50S100M,60,0") +>>> supp.reference_name +'chr1' +>>> supp.edit_distance +0 ``` + +## Examples of parsing bwa's `XA` and `XB` tags into individual secondary alignments + +```python +>>> from fgpyo.sam import AuxAlignment +>>> xa = AuxAlignment.from_tag_value("XA", "chr9,-104599381,49M,4") +>>> xa.reference_name +'chr9' +>>> xb = AuxAlignment.from_tag_value("XB", "chr9,-104599381,49M,4,0,30") +>>> xb.reference_name +'chr9' +>>> xb.mapping_quality +30 +>>> xa.cigar == xb.cigar +True +``` + """ import enum import io import sys +from array import array from collections.abc import Collection +from dataclasses import dataclass +from functools import cached_property from itertools import chain from pathlib import Path from typing import IO from typing import Any from typing import Callable +from typing import ClassVar from typing import Dict from typing import Iterable from typing import Iterator @@ -178,10 +192,13 @@ from pysam import AlignedSegment from pysam import AlignmentFile as SamFile from pysam import AlignmentHeader as SamHeader +from pysam import qualitystring_to_array +from typing_extensions import Self from typing_extensions import deprecated import fgpyo.io from fgpyo.collections import PeekableIterator +from fgpyo.sequence import reverse_complement SamPath = Union[IO[Any], Path, str] """The valid base classes for opening a SAM/BAM/CRAM file.""" @@ -195,6 +212,12 @@ NO_REF_POS: int = -1 """The reference position to use to indicate no position in SAM/BAM.""" +NO_QUERY_BASES: str = "*" +"""The string to use for a SAM record with missing query bases.""" + +NO_QUERY_QUALS: array = qualitystring_to_array("*") +"""The array to use for a SAM record with missing query qualities.""" + _IOClasses = (io.TextIOBase, io.BufferedIOBase, io.RawIOBase, io.IOBase) """The classes that should be treated as file-like classes""" @@ -765,6 +788,7 @@ def is_proper_pair( ) +@deprecated("SupplementaryAlignment is deprecated after fgpyo 0.8.0. Use AuxAlignment instead!") @attr.s(frozen=True, auto_attribs=True) class SupplementaryAlignment: """Stores a supplementary alignment record produced by BWA and stored in the SA SAM tag. @@ -1203,6 +1227,12 @@ def set_tag( for rec in self.all_recs(): rec.set_tag(tag, value) + def with_aux_alignments(self) -> "Template": + """Rebuild this template by adding auxiliary alignments from the primary alignment tags.""" + r1_aux = iter([]) if self.r1 is None else AuxAlignment.many_pysam_from_rec(self.r1) + r2_aux = iter([]) if self.r2 is None else AuxAlignment.many_pysam_from_rec(self.r2) + return self.build(recs=chain(self.all_recs(), r1_aux, r2_aux)) + class TemplateIterator(Iterator[Template]): """ @@ -1231,3 +1261,267 @@ class SamOrder(enum.Enum): Coordinate = "coordinate" #: coordinate sorted QueryName = "queryname" #: queryname sorted Unknown = "unknown" # Unknown SAM / BAM / CRAM sort order + + +@dataclass(frozen=True) +class AuxAlignment: + """An auxiliary alignment as parsed from the data stored in the SAM tags of a SAM record. + + Format of a single supplementary alignment in the `SA` tag (`,`-delimited): + + ```text + chr,<1-based position>,strand,cigar,MapQ,NM + ``` + + Full example of an `SA` tag value (`;`-delimited): + + ```text + SA:Z:chr9,104599381,-,49M,60,4;chr3,170653467,+,49M,40,4;chr12,46991828,+,49M,40,5; + ``` + + Format of a single secondary alignment in the `XA` tag (`,`-delimited): + + ```text + chr,<1-based position>,cigar,NM + ``` + + Full example of an `XA` tag value (`;`-delimited): + + ```text + XA:Z:chr9,-104599381,49M,4;chr3,+170653467,49M,4;chr12,+46991828,49M,5; + ``` + + Format of a single secondary alignment in the `XB` tag (`,`-delimited): + + ```text + chr,<1-based position>,cigar,NM,AS,MapQ + ``` + + Full example of an `XB` tag value (`;`-delimited): + + ```text + XB:Z:chr9,-104599381,49M,4,0,30;chr3,+170653467,49M,4,0,20;chr12,+46991828,49M,5,0,10; + ``` + + Attributes: + SAM_TAGS: The SAM tags this class is capable of parsing. + + Args: + reference_name: The reference sequence name. + reference_start: The 0-based start position of the alignment. + is_forward: If the alignment is in the forward orientation or not. + cigar: The Cigar sequence representing the alignment. + mapping_quality: The aligner-reported probability of an incorrect mapping, if available. + edit_distance: The number of mismatches between the query and the target, if available. + alignment_score: The aligner-reported alignment score, if available. + is_secondary: If this auxiliary alignment is a secondary alignment or not. + is_supplementary: If this auxiliary alignment is a supplementary alignment or not. + + Raises: + ValueError: If `reference_start` is set to a value less than zero. + ValueError: If `mapping_quality` is set to a value less than zero. + ValueError: If `edit_distance` is set to a value less than zero. + + See: + - [BWA User Manual](https://bio-bwa.sourceforge.net/bwa.shtml) + - [https://github.com/lh3/bwa/pull/292](https://github.com/lh3/bwa/pull/292) + - [https://github.com/lh3/bwa/pull/293](https://github.com/lh3/bwa/pull/293) + - [https://github.com/lh3/bwa/pull/355](https://github.com/lh3/bwa/pull/355) + """ + + SAM_TAGS: ClassVar[Collection[str]] = ("SA", "XA", "XB") + + reference_name: str + reference_start: int + is_forward: bool + cigar: Cigar + mapping_quality: Optional[int] = None + edit_distance: Optional[int] = None + alignment_score: Optional[int] = None + is_secondary: bool = False + is_supplementary: bool = False + + def __post_init__(self) -> None: + """Perform post-initialization validation on this dataclass.""" + errors: list[str] = [] + if self.reference_start < 0: + errors.append(f"Reference start cannot be less than 0! Found: {self.reference_start}") + if self.mapping_quality is not None and self.mapping_quality < 0: + errors.append(f"Mapping quality cannot be less than 0! Found: {self.mapping_quality}") + if self.edit_distance is not None and self.edit_distance < 0: + errors.append(f"Edit distance cannot be less than 0! Found: {self.edit_distance}") + if len(errors) > 0: + raise ValueError("\n".join(errors)) + + @cached_property + def reference_end(self) -> int: + """Returns the 0-based half-open end coordinate of this auxiliary alignment.""" + return self.reference_start + self.cigar.length_on_target() + + @classmethod + def from_tag_value(cls, tag: str, value: str) -> Self: + """Parse a single auxiliary alignment from a single value in a given SAM tag. + + Args: + tag: The SAM tag used to store the value. + value: The SAM tag value encoding a single auxiliary alignment. + + Raises: + ValueError: If `tag` is not one of `SA`, `XA`, or `XB`. + ValueError: If `tag` is `SA` and `value` does not have 6 comma-separated fields. + ValueError: If `tag` is `XA` and `value` does not have 4 comma-separated fields. + ValueError: If `tag` is `XA` and `value` does not have 6 comma-separated fields. + ValueError: If `tag` is `SA` and `value` does not have '+' or '-' as a strand. + ValueError: If `tag` is `XA` or `XB` and position is not a stranded integer. + """ + if ";" in value: + raise ValueError(f"Cannot parse a multi-value string! Found: {value} for tag {tag}") + + fields: list[str] = value.rstrip(",").split(",") + + if tag not in cls.SAM_TAGS: + raise ValueError(f"Tag {tag} is not one of {', '.join(cls.SAM_TAGS)}!") + + elif tag == "SA" and len(fields) == 6: + reference_name, reference_start, strand, cigar, mapq, edit_distance = fields + + if strand not in ("+", "-"): + raise ValueError(f"The strand field is not either '+' or '-': {strand}") + + return cls( + reference_name=reference_name, + reference_start=int(reference_start) - 1, + is_forward=strand == "+", + cigar=Cigar.from_cigarstring(cigar), + mapping_quality=None if mapq is None else int(mapq), + edit_distance=int(edit_distance), + is_secondary=False, + is_supplementary=True, + ) + + elif tag in ("XA", "XB") and (num_fields := len(fields)) in (4, 6): + if num_fields == 4: + mapq = None + alignment_score = None + reference_name, stranded_start, cigar, edit_distance = fields + else: + reference_name, stranded_start, cigar, edit_distance, alignment_score, mapq = fields + + if len(stranded_start) <= 1 or stranded_start[0] not in ("+", "-"): + raise ValueError(f"The stranded start field is malformed: {stranded_start}") + + return cls( + reference_name=reference_name, + reference_start=int(stranded_start[1:]) - 1, + is_forward=stranded_start[0] == "+", + cigar=Cigar.from_cigarstring(cigar), + mapping_quality=None if mapq is None else int(mapq), + edit_distance=int(edit_distance), + alignment_score=None if alignment_score is None else int(alignment_score), + is_secondary=True, + is_supplementary=False, + ) + + else: + raise ValueError(f"{tag} tag value has the incorrect number of fields: {value}") + + @classmethod + def many_from_rec(cls, rec: AlignedSegment) -> list[Self]: + """Build many auxiliary alignments from a single pysam alignment. + + Args: + rec: The SAM record to generate auxiliary alignments from. + """ + aux_alignments: list[Self] = [] + + for tag in filter(lambda tag: rec.has_tag(tag), cls.SAM_TAGS): + values: list[str] = cast(str, rec.get_tag(tag)).rstrip(";").split(";") + for value in filter(lambda value: value != "", values): + aux_alignments.append(cls.from_tag_value(tag, value)) + + return aux_alignments + + @classmethod + def many_pysam_from_rec(cls, rec: AlignedSegment) -> Iterator[AlignedSegment]: + """Build many SAM auxiliary alignments from a single pysam alignment. + + All reconstituted auxiliary alignments will have the `rh` SAM tag set upon them. + + By default, the query bases and qualities of the auxiliary alignment will be set to the + query bases and qualities of the record that created the auxiliary alignments. However, if + there are hard-clips in the record used to create the auxiliary alignments, then this + function will set the query bases and qualities to the space-saving and/or unknown marker + `*`. A future development for this function should correctly pad-out (with No-calls) or clip + the query sequence and qualities depending on the hard-clipping found in both ends of the + source (often a primary) record and both ends of the destination (auxiliary) record. + + Args: + rec: The SAM record to generate auxiliary alignments from. + """ + if ( + rec.is_unmapped + or rec.cigarstring is None + or rec.query_sequence is None + or rec.query_qualities is None + or rec.query_sequence == NO_QUERY_BASES + or rec.query_qualities == NO_QUERY_QUALS + ): + return + + for hit in cls.many_from_rec(rec): + # TODO: When the original record has hard clips we must set the bases and quals to "*". + # It would be smarter to pad/clip the sequence to be compatible with new cigar... + if "H" in rec.cigarstring: + query_sequence = None + query_qualities = None + elif rec.is_forward and not hit.is_forward: + query_sequence = reverse_complement(rec.query_sequence) + query_qualities = rec.query_qualities[::-1] + else: + query_sequence = rec.query_sequence + query_qualities = rec.query_qualities + + aux = AlignedSegment(header=rec.header) + aux.query_name = rec.query_name + aux.query_sequence = query_sequence + aux.query_qualities = query_qualities + + # Set all alignment and mapping information for this auxiliary alignment. + aux.cigarstring = str(hit.cigar) + aux.mapping_quality = 0 if hit.mapping_quality is None else hit.mapping_quality + aux.reference_id = rec.header.get_tid(hit.reference_name) + aux.reference_name = hit.reference_name + aux.reference_start = hit.reference_start + aux.is_secondary = hit.is_secondary + aux.is_supplementary = hit.is_supplementary + aux.is_proper_pair = rec.is_proper_pair if hit.is_supplementary else False + + # Set all fields that relate to the template. + aux.is_duplicate = rec.is_duplicate + aux.is_forward = hit.is_forward + aux.is_mapped = True + aux.is_paired = rec.is_paired + aux.is_qcfail = rec.is_qcfail + aux.is_read1 = rec.is_read1 + aux.is_read2 = rec.is_read2 + + # Set some optional, but highly recommended, SAM tags on the auxiliary alignment. + aux.set_tag("AS", hit.alignment_score) + aux.set_tag("NM", hit.edit_distance) + aux.set_tag("RG", rec.get_tag("RG") if rec.has_tag("RG") else None) + aux.set_tag("RX", rec.get_tag("RX") if rec.has_tag("RX") else None) + + # Auxiliary alignment mate information points to the mate/next primary alignment. + aux.next_reference_id = rec.next_reference_id + aux.next_reference_name = rec.next_reference_name + aux.next_reference_start = rec.next_reference_start + aux.mate_is_mapped = rec.mate_is_mapped + aux.mate_is_reverse = rec.mate_is_reverse + aux.set_tag("MC", rec.get_tag("MC") if rec.has_tag("MC") else None) + aux.set_tag("MQ", rec.get_tag("MQ") if rec.has_tag("MQ") else None) + aux.set_tag("ms", rec.get_tag("ms") if rec.has_tag("ms") else None) + + # Finally, set a tag that marks this alignment as a reconstituted auxiliary alignment. + aux.set_tag("rh", True) + + yield aux diff --git a/tests/fgpyo/sam/test_aux_alignment.py b/tests/fgpyo/sam/test_aux_alignment.py new file mode 100644 index 0000000..246f44f --- /dev/null +++ b/tests/fgpyo/sam/test_aux_alignment.py @@ -0,0 +1,603 @@ +from typing import Any + +import pytest + +from fgpyo.sam import AuxAlignment +from fgpyo.sam import Cigar +from fgpyo.sam import sum_of_base_qualities +from fgpyo.sam.builder import SamBuilder +from fgpyo.sequence import reverse_complement + + +@pytest.mark.parametrize( + ["kwargs", "error"], + [ + ( + { + "reference_name": "chr9", + "reference_start": -1, + "is_forward": False, + "cigar": Cigar.from_cigarstring("49M"), + "edit_distance": 4, + "alignment_score": 0, + "mapping_quality": 30, + }, + "Reference start cannot be less than 0! Found: -1", + ), + ( + { + "reference_name": "chr9", + "reference_start": 123232, + "is_forward": False, + "cigar": Cigar.from_cigarstring("49M"), + "edit_distance": -1, + "alignment_score": 0, + "mapping_quality": 30, + }, + "Edit distance cannot be less than 0! Found: -1", + ), + ( + { + "reference_name": "chr9", + "reference_start": 123232, + "is_forward": False, + "cigar": Cigar.from_cigarstring("49M"), + "edit_distance": 4, + "alignment_score": 4, + "mapping_quality": -1, + }, + "Mapping quality cannot be less than 0! Found: -1", + ), + ], +) +def test_auxiliary_alignment_validation(kwargs: dict[str, Any], error: str) -> None: + """Test that we raise exceptions for invalid arguments to AuxAlignment.""" + with pytest.raises(ValueError, match=error): + AuxAlignment(**kwargs) + + +@pytest.mark.parametrize( + ["tag", "value", "expected"], + [ + [ + # Test a well-formed negatively stranded SA + "SA", + "chr9,104599381,-,49M,60,4,,,", + AuxAlignment( + reference_name="chr9", + reference_start=104599380, + is_forward=False, + cigar=Cigar.from_cigarstring("49M"), + edit_distance=4, + alignment_score=None, + mapping_quality=60, + is_secondary=False, + is_supplementary=True, + ), + ], + [ + # Test a positive stranded SA and extra trailing commas + "SA", + "chr9,104599381,+,49M,20,4,,,,,", + AuxAlignment( + reference_name="chr9", + reference_start=104599380, + is_forward=True, + cigar=Cigar.from_cigarstring("49M"), + edit_distance=4, + alignment_score=None, + mapping_quality=20, + is_secondary=False, + is_supplementary=True, + ), + ], + [ + # Test a well-formed negatively stranded XB + "XB", + "chr9,-104599381,49M,4,0,30", + AuxAlignment( + reference_name="chr9", + reference_start=104599380, + is_forward=False, + cigar=Cigar.from_cigarstring("49M"), + edit_distance=4, + alignment_score=0, + mapping_quality=30, + is_secondary=True, + is_supplementary=False, + ), + ], + [ + # Test a positive stranded XB and extra trailing commas + "XB", + "chr3,+170653467,49M,4,0,20,,,,", + AuxAlignment( + reference_name="chr3", + reference_start=170653466, + is_forward=True, + cigar=Cigar.from_cigarstring("49M"), + edit_distance=4, + alignment_score=0, + mapping_quality=20, + is_secondary=True, + is_supplementary=False, + ), + ], + [ + # Test a well-formed negatively stranded XA + "XA", + "chr9,-104599381,49M,4", + AuxAlignment( + reference_name="chr9", + reference_start=104599380, + is_forward=False, + cigar=Cigar.from_cigarstring("49M"), + edit_distance=4, + alignment_score=None, + mapping_quality=None, + is_secondary=True, + is_supplementary=False, + ), + ], + [ + # Test a positive stranded XA and extra trailing commas + "XA", + "chr3,+170653467,49M,4,,,,", + AuxAlignment( + reference_name="chr3", + reference_start=170653466, + is_forward=True, + cigar=Cigar.from_cigarstring("49M"), + edit_distance=4, + alignment_score=None, + mapping_quality=None, + is_secondary=True, + is_supplementary=False, + ), + ], + ], +) +def test_auxiliary_alignment_from_tag_value(tag: str, value: str, expected: AuxAlignment) -> None: + """Test that we can build an SA, XA, or XB from a item of the tag value.""" + assert AuxAlignment.from_tag_value(tag, value) == expected + + +@pytest.mark.parametrize("tag", ["SA", "XA", "XB"]) +def test_from_tag_value_invalid_number_of_commas(tag: str) -> None: + """Test that we raise an exception if we don't have the right number of fields.""" + with pytest.raises( + ValueError, match=rf"{tag} tag value has the incorrect number of fields: chr9,104599381" + ): + AuxAlignment.from_tag_value(tag, "chr9,104599381") + + +def test_from_tag_value_raises_invalid_multi_value() -> None: + """Test that we raise an exception if we receive a multi-value.""" + with pytest.raises( + ValueError, + match=( + r"Cannot parse a multi-value string! Found: " + + r"chr3,\+170653467,49M,4;chr3,\+170653467,49M,4 for tag XA" + ), + ): + AuxAlignment.from_tag_value("XA", "chr3,+170653467,49M,4;chr3,+170653467,49M,4") + + +def test_from_tag_value_raises_invalid_tag() -> None: + """Test that we raise an exception if we receive an unsupported SAM tag.""" + with pytest.raises(ValueError, match="Tag XF is not one of SA, XA, XB!"): + AuxAlignment.from_tag_value("XF", "chr3,+170653467,49M,4") + + +def test_from_tag_value_raises_for_invalid_sa_strand() -> None: + """Test that we raise an exception when strand is malformed for an SA value.""" + with pytest.raises(ValueError, match=r"The strand field is not either '\+' or '-': !"): + AuxAlignment.from_tag_value("SA", "chr3,2000,!,49M,60,4") + + +@pytest.mark.parametrize("stranded_start", ["!1", "1"]) +def test_from_tag_value_raises_for_invalid_xa_stranded_start(stranded_start: str) -> None: + """Test that we raise an exception when stranded start is malformed for an XA value.""" + with pytest.raises( + ValueError, match=f"The stranded start field is malformed: {stranded_start}" + ): + AuxAlignment.from_tag_value("XA", f"chr3,{stranded_start},49M,4") + + +@pytest.mark.parametrize("stranded_start", ["!1", "1"]) +def test_from_tag_value_raises_for_invalid_xb_stranded_start(stranded_start: str) -> None: + """Test that we raise an exception when stranded start is malformed for an XA value.""" + with pytest.raises( + ValueError, match=f"The stranded start field is malformed: {stranded_start}" + ): + AuxAlignment.from_tag_value("XB", f"chr3,{stranded_start},49M,4,0,20") + + +@pytest.mark.parametrize( + ["auxiliary", "expected"], + [ + ( + AuxAlignment( + reference_name="chr3", + reference_start=170653466, + is_forward=True, + cigar=Cigar.from_cigarstring("1H49M"), + edit_distance=4, + ), + 170653466 + 49, + ), + ( + AuxAlignment( + reference_name="chr3", + reference_start=170653466, + is_forward=True, + cigar=Cigar.from_cigarstring("10M10I10D10M"), + edit_distance=4, + ), + 170653466 + 30, + ), + ], +) +def test_auxiliary_alignment_reference_end_property(auxiliary: AuxAlignment, expected: int) -> None: + """Test that we correctly calculate reference end based on start and cigar.""" + assert auxiliary.reference_end == expected + + +def test_many_from_rec() -> None: + """Test that we can build many auxiliary alignments from multiple parts in a tag value.""" + builder = SamBuilder() + rec = builder.add_single() + rec.set_tag("XA", "chr9,-104599381,49M,4;chr3,+170653467,49M,4;;;") + + assert AuxAlignment.many_from_rec(rec) == [ + AuxAlignment( + reference_name="chr9", + reference_start=104599380, + is_forward=False, + cigar=Cigar.from_cigarstring("49M"), + edit_distance=4, + alignment_score=None, + mapping_quality=None, + is_secondary=True, + is_supplementary=False, + ), + AuxAlignment( + reference_name="chr3", + reference_start=170653466, + is_forward=True, + cigar=Cigar.from_cigarstring("49M"), + edit_distance=4, + alignment_score=None, + mapping_quality=None, + is_secondary=True, + is_supplementary=False, + ), + ] + + +def test_many_from_rec_returns_no_auxiliaries_when_unmapped() -> None: + """Test that many_from_rec returns no auxiliary alignments when unmapped.""" + builder = SamBuilder() + rec = builder.add_single() + assert rec.is_unmapped + assert len(list(AuxAlignment.many_from_rec(rec))) == 0 + + +def test_sa_many_from_rec() -> None: + """Test that we return supplementary alignments from a SAM record with multiple SAs.""" + value: str = "chr9,104599381,-,49M,20,4;chr3,170653467,+,49M,30,4;;;" # with trailing ';' + builder = SamBuilder() + rec = builder.add_single(chrom="chr1", start=32) + + assert list(AuxAlignment.many_from_rec(rec)) == [] + + rec.set_tag("SA", value) + + assert list(AuxAlignment.many_from_rec(rec)) == [ + AuxAlignment( + reference_name="chr9", + reference_start=104599380, + is_forward=False, + cigar=Cigar.from_cigarstring("49M"), + edit_distance=4, + alignment_score=None, + mapping_quality=20, + is_secondary=False, + is_supplementary=True, + ), + AuxAlignment( + reference_name="chr3", + reference_start=170653466, + is_forward=True, + cigar=Cigar.from_cigarstring("49M"), + edit_distance=4, + alignment_score=None, + mapping_quality=30, + is_secondary=False, + is_supplementary=True, + ), + ] + + +def test_xa_many_from_rec() -> None: + """Test that we return secondary alignments from a SAM record with multiple XAs.""" + value: str = "chr9,-104599381,49M,4;chr3,+170653467,49M,4;;;" # with trailing ';' + builder = SamBuilder() + rec = builder.add_single(chrom="chr1", start=32) + + assert list(AuxAlignment.many_from_rec(rec)) == [] + + rec.set_tag("XA", value) + + assert list(AuxAlignment.many_from_rec(rec)) == [ + AuxAlignment( + reference_name="chr9", + reference_start=104599380, + is_forward=False, + cigar=Cigar.from_cigarstring("49M"), + edit_distance=4, + alignment_score=None, + mapping_quality=None, + is_secondary=True, + is_supplementary=False, + ), + AuxAlignment( + reference_name="chr3", + reference_start=170653466, + is_forward=True, + cigar=Cigar.from_cigarstring("49M"), + edit_distance=4, + alignment_score=None, + mapping_quality=None, + is_secondary=True, + is_supplementary=False, + ), + ] + + +def test_xb_many_from_rec() -> None: + """Test that we return secondary alignments from a SAM record with multiple XBs.""" + value: str = "chr9,-104599381,49M,4,0,30;chr3,+170653467,49M,4,0,20;;;" # with trailing ';' + builder = SamBuilder() + rec = builder.add_single(chrom="chr1", start=32) + + assert list(AuxAlignment.many_from_rec(rec)) == [] + + rec.set_tag("XB", value) + + assert list(AuxAlignment.many_from_rec(rec)) == [ + AuxAlignment( + reference_name="chr9", + reference_start=104599380, + is_forward=False, + cigar=Cigar.from_cigarstring("49M"), + edit_distance=4, + alignment_score=0, + mapping_quality=30, + is_secondary=True, + is_supplementary=False, + ), + AuxAlignment( + reference_name="chr3", + reference_start=170653466, + is_forward=True, + cigar=Cigar.from_cigarstring("49M"), + edit_distance=4, + alignment_score=0, + mapping_quality=20, + is_secondary=True, + is_supplementary=False, + ), + ] + + +def test_many_pysam_from_rec_returns_no_auxiliaries_when_unmapped() -> None: + """Test that many_pysam_from_rec returns no auxiliaries when unmapped.""" + builder = SamBuilder() + rec = builder.add_single() + assert rec.is_unmapped + assert len(list(AuxAlignment.many_pysam_from_rec(rec))) == 0 + + +def test_sa_many_pysam_from_rec() -> None: + """Test that we return supp. alignments as SAM records from a record with multiple SAs.""" + value: str = "chr9,104599381,-,49M,20,4;chr3,170653467,+,49M,20,4;;;" # with trailing ';' + builder = SamBuilder() + rec, mate = builder.add_pair(chrom="chr1", start1=32, start2=49) + rec.set_tag("RX", "ACGT") + + assert rec.query_sequence is not None # for type narrowing + assert rec.query_qualities is not None # for type narrowing + assert list(AuxAlignment.many_from_rec(rec)) == [] + + rec.set_tag("SA", value) + first, second = list(AuxAlignment.many_pysam_from_rec(rec)) + + assert first.reference_name == "chr9" + assert first.reference_id == rec.header.get_tid("chr9") + assert first.reference_start == 104599380 + assert not first.is_forward + assert first.query_sequence == reverse_complement(rec.query_sequence) + assert first.query_qualities == rec.query_qualities[::-1] + assert first.cigarstring == "49M" + assert not first.has_tag("AS") + assert first.get_tag("NM") == 4 + assert first.get_tag("rh") == 1 + assert first.mapping_quality == 20 + + assert second.reference_name == "chr3" + assert second.reference_id == rec.header.get_tid("chr3") + assert second.reference_start == 170653466 + assert second.is_forward + assert second.query_sequence == rec.query_sequence + assert second.query_qualities == rec.query_qualities + assert second.cigarstring == "49M" + assert not second.has_tag("AS") + assert second.get_tag("NM") == 4 + assert second.get_tag("rh") == 1 + assert second.mapping_quality == 20 + + for result in (first, second): + assert result.query_name == rec.query_name + assert result.is_read1 is rec.is_read1 + assert result.is_read2 is rec.is_read2 + assert result.is_duplicate is rec.is_duplicate + assert result.is_paired is rec.is_paired + assert result.is_proper_pair + assert result.is_qcfail is rec.is_qcfail + assert not result.is_secondary + assert result.is_supplementary + assert result.is_mapped + + assert result.next_reference_id == mate.reference_id + assert result.next_reference_name == mate.reference_name + assert result.next_reference_start == mate.reference_start + assert result.mate_is_mapped is mate.is_mapped + assert result.mate_is_reverse is mate.is_reverse + assert result.get_tag("MC") == mate.cigarstring + assert result.get_tag("ms") == sum_of_base_qualities(mate) + assert result.get_tag("MQ") == mate.mapping_quality + assert result.get_tag("RG") == rec.get_tag("RG") + assert result.get_tag("RX") == rec.get_tag("RX") + + +def test_xa_many_pysam_from_rec() -> None: + """Test that we return secondary alignments as SAM records from a record with multiple XAs.""" + value: str = "chr9,-104599381,49M,4;chr3,+170653467,49M,4;;;" # with trailing ';' + builder = SamBuilder() + rec, mate = builder.add_pair(chrom="chr1", start1=32, start2=49) + rec.set_tag("RX", "ACGT") + + assert rec.query_sequence is not None # for type narrowing + assert rec.query_qualities is not None # for type narrowing + assert list(AuxAlignment.many_from_rec(rec)) == [] + + rec.set_tag("XB", value) + first, second = list(AuxAlignment.many_pysam_from_rec(rec)) + + assert first.reference_name == "chr9" + assert first.reference_id == rec.header.get_tid("chr9") + assert first.reference_start == 104599380 + assert not first.is_forward + assert first.query_sequence == reverse_complement(rec.query_sequence) + assert first.query_qualities == rec.query_qualities[::-1] + assert first.cigarstring == "49M" + assert not first.has_tag("AS") + assert first.get_tag("NM") == 4 + assert first.get_tag("rh") == 1 + assert first.mapping_quality == 0 + + assert second.reference_name == "chr3" + assert second.reference_id == rec.header.get_tid("chr3") + assert second.reference_start == 170653466 + assert second.is_forward + assert second.query_sequence == rec.query_sequence + assert second.query_qualities == rec.query_qualities + assert second.cigarstring == "49M" + assert not second.has_tag("AS") + assert second.get_tag("NM") == 4 + assert second.get_tag("rh") == 1 + assert second.mapping_quality == 0 + + for result in (first, second): + assert result.query_name == rec.query_name + assert result.is_read1 is rec.is_read1 + assert result.is_read2 is rec.is_read2 + assert result.is_duplicate is rec.is_duplicate + assert result.is_paired is rec.is_paired + assert not result.is_proper_pair + assert result.is_qcfail is rec.is_qcfail + assert result.is_secondary + assert not result.is_supplementary + assert result.is_mapped + + assert result.next_reference_id == mate.reference_id + assert result.next_reference_name == mate.reference_name + assert result.next_reference_start == mate.reference_start + assert result.mate_is_mapped is mate.is_mapped + assert result.mate_is_reverse is mate.is_reverse + assert result.get_tag("MC") == mate.cigarstring + assert result.get_tag("ms") == sum_of_base_qualities(mate) + assert result.get_tag("MQ") == mate.mapping_quality + assert result.get_tag("RG") == rec.get_tag("RG") + assert result.get_tag("RX") == rec.get_tag("RX") + + +def test_xb_many_pysam_from_rec() -> None: + """Test that we return secondary alignments as SAM records from a record with multiple XBs.""" + value: str = "chr9,-104599381,49M,4,0,30;chr3,+170653467,49M,4,0,20;;;" # with trailing ';' + builder = SamBuilder() + rec, mate = builder.add_pair(chrom="chr1", start1=32, start2=49) + rec.set_tag("RX", "ACGT") + + assert rec.query_sequence is not None # for type narrowing + assert rec.query_qualities is not None # for type narrowing + assert list(AuxAlignment.many_from_rec(rec)) == [] + + rec.set_tag("XB", value) + first, second = list(AuxAlignment.many_pysam_from_rec(rec)) + + assert first.reference_name == "chr9" + assert first.reference_id == rec.header.get_tid("chr9") + assert first.reference_start == 104599380 + assert not first.is_forward + assert first.query_sequence == reverse_complement(rec.query_sequence) + assert first.query_qualities == rec.query_qualities[::-1] + assert first.cigarstring == "49M" + assert first.get_tag("AS") == 0 + assert first.get_tag("NM") == 4 + assert first.get_tag("rh") == 1 + assert first.mapping_quality == 30 + + assert second.reference_name == "chr3" + assert second.reference_id == rec.header.get_tid("chr3") + assert second.reference_start == 170653466 + assert second.is_forward + assert second.query_sequence == rec.query_sequence + assert second.query_qualities == rec.query_qualities + assert second.cigarstring == "49M" + assert second.get_tag("AS") == 0 + assert second.get_tag("NM") == 4 + assert second.get_tag("rh") == 1 + assert second.mapping_quality == 20 + + for result in (first, second): + assert result.query_name == rec.query_name + assert result.is_read1 is rec.is_read1 + assert result.is_read2 is rec.is_read2 + assert result.is_duplicate is rec.is_duplicate + assert result.is_paired is rec.is_paired + assert not result.is_proper_pair + assert result.is_qcfail is rec.is_qcfail + assert result.is_secondary + assert not result.is_supplementary + assert result.is_mapped + + assert result.next_reference_id == mate.reference_id + assert result.next_reference_name == mate.reference_name + assert result.next_reference_start == mate.reference_start + assert result.mate_is_mapped is mate.is_mapped + assert result.mate_is_reverse is mate.is_reverse + assert result.get_tag("MC") == mate.cigarstring + assert result.get_tag("ms") == sum_of_base_qualities(mate) + assert result.get_tag("MQ") == mate.mapping_quality + assert result.get_tag("RG") == rec.get_tag("RG") + assert result.get_tag("RX") == rec.get_tag("RX") + + +def test_many_pysam_from_rec_with_hard_clips() -> None: + """Test that we can't reconstruct the bases and qualities if there are hard clips.""" + value: str = "chr9,-104599381,31M,4,0,30" + builder = SamBuilder() + rec, _ = builder.add_pair(chrom="chr1", start1=32, start2=49, cigar1="1H30M") + + assert rec.query_sequence is not None # for type narrowing + assert rec.query_qualities is not None # for type narrowing + assert list(AuxAlignment.many_pysam_from_rec(rec)) == [] + + rec.set_tag("XB", value) + + (actual,) = AuxAlignment.many_pysam_from_rec(rec) + + assert actual.query_sequence is None + assert actual.query_qualities is None diff --git a/tests/fgpyo/sam/test_supplementary_alignments.py b/tests/fgpyo/sam/test_supplementary_alignments.py index 388a037..453e562 100644 --- a/tests/fgpyo/sam/test_supplementary_alignments.py +++ b/tests/fgpyo/sam/test_supplementary_alignments.py @@ -5,6 +5,7 @@ from fgpyo.sam.builder import SamBuilder +@pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_supplementary_alignment() -> None: # reverse assert SupplementaryAlignment.parse("chr1,123,-,100M50S,60,4") == SupplementaryAlignment( @@ -31,6 +32,7 @@ def test_supplementary_alignment() -> None: SupplementaryAlignment.parse("chr1,123,+,50S100M,60") +@pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_parse_sa_tag() -> None: assert SupplementaryAlignment.parse_sa_tag("") == [] assert SupplementaryAlignment.parse_sa_tag(";") == [] @@ -45,12 +47,14 @@ def test_parse_sa_tag() -> None: assert SupplementaryAlignment.parse_sa_tag(f"{s1};{s2};") == [sa1, sa2] +@pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_format_supplementary_alignment() -> None: for sa_string in ["chr1,123,-,100M50S,60,4", "chr1,123,+,100M50S,60,3"]: sa = SupplementaryAlignment.parse(sa_string) assert str(sa) == sa_string +@pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_from_read() -> None: """Test that we can construct a SupplementaryAlignment from an AlignedSegment.""" @@ -71,6 +75,7 @@ def test_from_read() -> None: assert SupplementaryAlignment.from_read(read) == [sa1, sa2] +@pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_end() -> None: """Test that we can get the end of a SupplementaryAlignment.""" diff --git a/tests/fgpyo/sam/test_template_iterator.py b/tests/fgpyo/sam/test_template_iterator.py index d2f44f7..39ea640 100644 --- a/tests/fgpyo/sam/test_template_iterator.py +++ b/tests/fgpyo/sam/test_template_iterator.py @@ -2,6 +2,7 @@ import pytest +from fgpyo.sam import AuxAlignment from fgpyo.sam import Template from fgpyo.sam import TemplateIterator from fgpyo.sam import reader @@ -219,3 +220,22 @@ def test_set_tag() -> None: for bad_tag in ["", "A", "ABC", "ABCD"]: with pytest.raises(AssertionError, match="Tags must be 2 characters"): template.set_tag(bad_tag, VALUE) + + +def test_with_aux_alignments() -> None: + """Test that we add auxiliary alignments as SAM records to a template.""" + secondary: str = "chr9,-104599381,49M,4,0,30;chr3,+170653467,49M,4,0,20;;;" # with trailing ';' + supplementary: str = "chr9,104599381,-,39M,50,2" + builder = SamBuilder() + rec = builder.add_single(chrom="chr1", start=32) + rec.set_tag("RX", "ACGT") + + assert list(AuxAlignment.many_pysam_from_rec(rec)) == [] + + rec.set_tag("SA", supplementary) + rec.set_tag("XB", secondary) + + actual = Template.build([rec]).with_aux_alignments() + expected = Template.build([rec] + list(AuxAlignment.many_pysam_from_rec(rec))) + + assert actual == expected