Skip to content

Commit

Permalink
Updated the scoring code to no longer take into account the unclipped…
Browse files Browse the repository at this point in the history
… start position of mismatching reads. Additionally changed the score to be a double packed short value in order to better reflect picard scoring code.
  • Loading branch information
jamesemery committed Aug 13, 2018
1 parent 3b4e273 commit 8c978ad
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public class MarkDuplicatesSparkUtils {
public static final String OPTICAL_DUPLICATE_TOTAL_ATTRIBUTE_NAME = "OD";
// This comparator represents the tiebreaking for PairedEnds duplicate marking.
// We compare first on score, followed by unclipped start position (which is reversed here because of the expected ordering)
private static final Comparator<PairedEnds> PAIRED_ENDS_SCORE_COMPARATOR = Comparator.comparing(PairedEnds::getScore)
.thenComparing(PairedEndsCoordinateComparator.INSTANCE.reversed());
private static final Comparator<TransientFieldPhysicalLocation> PAIRED_ENDS_SCORE_COMPARATOR = Comparator.comparing(TransientFieldPhysicalLocation::getScore)
.thenComparing(PairedEndsLocationComparator.INSTANCE.reversed());

/**
* Wrapper object used for storing an object and some type of index information.
Expand Down Expand Up @@ -321,7 +321,7 @@ private static JavaPairRDD<IndexPair<String>, Integer> markDuplicateRecords(fina
//empty MarkDuplicatesSparkRecord signify that a pair has a mate somewhere else
// If there are any non-fragment placeholders at this site, mark everything as duplicates, otherwise compute the best score
if (Utils.isNonEmpty(fragments) && !Utils.isNonEmpty(emptyFragments)) {
final Tuple2<IndexPair<String>, Integer> bestFragment = handleFragments(fragments);
final Tuple2<IndexPair<String>, Integer> bestFragment = handleFragments(fragments, finder);
nonDuplicates.add(bestFragment);
}

Expand Down Expand Up @@ -365,13 +365,18 @@ private static List<Tuple2<IndexPair<String>,Integer>> handlePassthroughs(List<M
}

private static Tuple2<IndexPair<String>, Integer> handlePairs(List<Pair> pairs, OpticalDuplicateFinder finder) {
// save ourselves the trouble when there are no optical duplicates to worry about
if (pairs.size() == 1) {
return (new Tuple2<>(new IndexPair<>(pairs.get(0).getName(), pairs.get(0).getPartitionIndex()), 0));
}

final Pair bestPair = pairs.stream()
.peek(pair -> finder.addLocationInformation(pair.getName(), pair))
.max(PAIRED_ENDS_SCORE_COMPARATOR)
.orElseThrow(() -> new GATKException.ShouldNeverReachHereException("There was no best pair because the stream was empty, but it shouldn't have been empty."));

// Split by orientation and count duplicates in each group separately.
final Map<Byte, List<Pair>> groupByOrientation = pairs.stream()
.peek(pair -> finder.addLocationInformation(pair.getName(), pair))//TODO this needs me to handle the name better
.collect(Collectors.groupingBy(Pair::getOrientationForOpticalDuplicates));
final int numOpticalDuplicates;
//todo do we not have to split the reporting of these by orientation?
Expand Down Expand Up @@ -399,9 +404,10 @@ private static int countOpticalDuplicates(OpticalDuplicateFinder finder, List<Pa
/**
* If there are fragments with no non-fragments overlapping at a site, select the best one according to PAIRED_ENDS_SCORE_COMPARATOR
*/
private static Tuple2<IndexPair<String>, Integer> handleFragments(List<MarkDuplicatesSparkRecord> duplicateFragmentGroup) {
private static Tuple2<IndexPair<String>, Integer> handleFragments(List<MarkDuplicatesSparkRecord> duplicateFragmentGroup, OpticalDuplicateFinder finder) {
return duplicateFragmentGroup.stream()
.map(f -> (Fragment)f)
.peek(f -> finder.addLocationInformation(f.getName(), f))
.max(PAIRED_ENDS_SCORE_COMPARATOR)
.map(best -> new Tuple2<>(new IndexPair<>(best.getName(), best.getPartitionIndex()), -1))
.orElse(null);
Expand Down Expand Up @@ -474,38 +480,43 @@ public static void saveMetricsRDD(final MetricsFile<GATKDuplicationMetrics, Doub
}

/**
* Comparator for sorting Reads by coordinate. Note that a header is required in
* order to meaningfully compare contigs.
* Comparator for by their PhysicalLoccation attributes and strandedness. This comparator is intended to serve as a tiebreaker
* for the score comparator.
*
* Uses the various other fields in a read to break ties for reads that share
* the same location.
* NOTE: we don't need to worry about reads genomic location as they will necessarily be grouped by start positions if they are being compared
*
* Ordering is not the same as {@link htsjdk.samtools.SAMRecordCoordinateComparator}.
* It compares two pairedEnds objects by their first reads according to first their clipped start positions
* (they were matched together based on UnclippedStartOriginally), then the orientation of the strand, followed by
* the readname lexicographical order.
* Ordering is not the same as {@link htsjdk.samtools.SAMRecordCoordinateComparator}, except for the alignment position.
* It compares two PhysicalLocation the orientation of the strand, followed by their physical location attributes,
* and finally as a final tiebreaker the readname lexicographical order.
*
* NOTE: Because the original records were grouped by readname, we know that they must be unique once they hit this
* NOTE: Because the original records were grouped by start position, we know that they must be unique once they hit this
* comparator and thus we don't need to worry about further tiebreaking for this method.
*/
public static final class PairedEndsCoordinateComparator implements Comparator<PairedEnds>, Serializable {
public static final class PairedEndsLocationComparator implements Comparator<TransientFieldPhysicalLocation>, Serializable {
private static final long serialVersionUID = 1L;

public static final PairedEndsCoordinateComparator INSTANCE = new PairedEndsCoordinateComparator();
private PairedEndsCoordinateComparator() { }
public static final PairedEndsLocationComparator INSTANCE = new PairedEndsLocationComparator();
private PairedEndsLocationComparator() { }

@Override
public int compare( PairedEnds first, PairedEnds second ) {
int result = Integer.compare(first.getFirstStartPosition(), second.getFirstStartPosition());
if ( result != 0 ) {
return result;
}
public int compare( TransientFieldPhysicalLocation first, TransientFieldPhysicalLocation second ) {
int result = 0;

//This is done to mimic SAMRecordCoordinateComparator's behavior
if (first.isRead1ReverseStrand() != second.isRead1ReverseStrand()) {
return first.isRead1ReverseStrand() ? -1: 1;
}

if (first.getTile() != second.getTile()) {
return first.getTile() - second.getTile();
}
if (first.getX() != second.getX()) {
return first.getX() - second.getX();
}
if (first.getY() != second.getY()) {
return first.getY() - second.getY();
}

if ( first.getName() != null && second.getName() != null ) {
result = first.getName().compareTo(second.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.broadinstitute.hellbender.utils.Utils;
import org.broadinstitute.hellbender.utils.read.GATKRead;
import org.broadinstitute.hellbender.utils.read.ReadUtils;

import java.util.function.ToIntFunction;

Expand All @@ -25,10 +26,13 @@ public enum MarkDuplicatesScoringStrategy {

/**
* Given a read, compute the reads's score for mark duplicates.
*
* NOTE: this code is intended to match {@link htsjdk.samtools.DuplicateScoringStrategy.ScoringStrategy} behavior.
*/
public int score(final GATKRead read){
public short score(final GATKRead read){
Utils.nonNull(read);
return scoring.applyAsInt(read);
// We max out at Short.MAX_VALUE / 2 to prevent overflow if we add two very high quality/length reads into pairs
return (short) (Math.min(scoring.applyAsInt(read), Short.MAX_VALUE / 2) + (read.failsVendorQualityCheck() ? (short) (Short.MIN_VALUE / 2) : 0));
}

//Bases below this quality will not be included in picking the best read from a set of duplicates.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public Type getType() {
return Type.EMPTY_FRAGMENT;
}
@Override
public int getScore() {
public short getScore() {
return 0;
}
@Override
Expand All @@ -49,10 +49,6 @@ public ReadsKey key() {
return key;
}
@Override
public int getFirstStartPosition() {
throw new UnsupportedOperationException("Empty fragments do not support requests for positional information");
}
@Override
public boolean isRead1ReverseStrand() {
return R1R;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.broadinstitute.hellbender.utils.read.markduplicates.MarkDuplicatesScoringStrategy;
import org.broadinstitute.hellbender.utils.read.markduplicates.ReadEnds;
import org.broadinstitute.hellbender.utils.read.markduplicates.ReadsKey;
import picard.sam.util.PhysicalLocation;

import java.util.Map;

Expand All @@ -17,18 +18,16 @@
* This class holds onto as little information as possible in an attempt to prevent excessive serialization of
* during the processing step of MarkDuplicatesSpark
*/
public class Fragment extends PairedEnds {
public class Fragment extends TransientFieldPhysicalLocation {
protected transient ReadsKey key;

private final int firstStartPosition;
private final boolean R1R;

protected final int score;
protected final short score;

public Fragment(final GATKRead first, final SAMFileHeader header, int partitionIndex, MarkDuplicatesScoringStrategy scoringStrategy, Map<String, Byte> headerLibraryMap) {
super(partitionIndex, first.getName());

this.firstStartPosition = first.getAssignedStart();
this.score = scoringStrategy.score(first);
this.R1R = first.isReverseStrand();
this.key = ReadsKey.getKeyForFragment(ReadUtils.getStrandedUnclippedStart(first),
Expand All @@ -47,14 +46,10 @@ public ReadsKey key() {
return key;
}
@Override
public int getScore() {
public short getScore() {
return score;
}
@Override
public int getFirstStartPosition() {
return firstStartPosition;
}
@Override
public boolean isRead1ReverseStrand() {
return R1R;
}
Expand All @@ -64,6 +59,6 @@ public byte getOrientationForPCRDuplicates() {
}
@Override
public String toString() {
return "fragment: " + name + " " + firstStartPosition;
return "fragment: " + name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,23 @@
* during the processing step of MarkDuplicatesSpark
*/
@DefaultSerializer(Pair.Serializer.class)
public final class Pair extends PairedEnds implements PhysicalLocation {
public final class Pair extends TransientFieldPhysicalLocation {
protected transient ReadsKey key;

private final int firstStartPosition;
private final boolean isRead1ReverseStrand;

private final boolean isRead2ReverseStrand;
private final int score;
private final short score;
private final boolean wasFlipped;

// Information used to detect optical dupes
private short readGroupIndex = -1;

private transient short tile = -1;
private transient int x = -1;
private transient int y = -1;
private transient short libraryId = -1;

public Pair(final GATKRead read1, final GATKRead read2, final SAMFileHeader header, int partitionIndex, MarkDuplicatesScoringStrategy scoringStrategy, Map<String, Byte> headerLibraryMap) {
super(partitionIndex, read1.getName());

final String name1 = read1.getName();
final String name2 = read2.getName();
Utils.validate(name1.equals(name2), () -> "Paired reads have different names\n" + name1 + "\n" + name2);

this.score = scoringStrategy.score(read1) + scoringStrategy.score(read2);
this.score = (short)(scoringStrategy.score(read1) + scoringStrategy.score(read2));

GATKRead first = read1;
GATKRead second;
Expand All @@ -65,7 +56,6 @@ public Pair(final GATKRead read1, final GATKRead read2, final SAMFileHeader head
first = read2;
second = read1;
}
firstStartPosition = first.getAssignedStart();

// if the two read ends are in the same position, pointing in opposite directions,
// the orientation is undefined and the procedure above
Expand Down Expand Up @@ -101,9 +91,8 @@ private Pair(Kryo kryo, Input input){
y = -1;
libraryId = -1;

score = input.readInt();
score = input.readShort();

firstStartPosition = input.readInt();
isRead1ReverseStrand = input.readBoolean();

isRead2ReverseStrand = input.readBoolean();
Expand All @@ -116,9 +105,8 @@ protected void serialize(Kryo kryo, Output output) {
output.writeInt(partitionIndex, true);
output.writeAscii(name);

output.writeInt(score);
output.writeShort(score);

output.writeInt(firstStartPosition);
output.writeBoolean(isRead1ReverseStrand);

output.writeBoolean(isRead2ReverseStrand);
Expand All @@ -137,20 +125,17 @@ public ReadsKey key() {
return key;
}
@Override
public int getScore() {
public short getScore() {
return score;
}
@Override
public int getFirstStartPosition() {
return firstStartPosition;
}

@Override
public boolean isRead1ReverseStrand() {
return isRead1ReverseStrand;
}
@Override
public String toString() {
return name + " " + firstStartPosition + " score:" + score;
return name + " score:" + score;
}

/**
Expand Down Expand Up @@ -181,28 +166,6 @@ private byte getOrientation(final boolean optical) {
return ReadEnds.FF; //at this point we know for sure isRead1ReverseStrand is false and isRead2ReverseStrand is false
}

// Methods for OpticalDuplicateFinder.PhysicalLocation
@Override
public short getReadGroup() { return this.readGroupIndex; }
@Override
public void setReadGroup(final short readGroup) { this.readGroupIndex = readGroup; }
@Override
public short getTile() { return this.tile; }
@Override
public void setTile(final short tile) { this.tile = tile; }
@Override
public int getX() { return this.x; }
@Override
public void setX(final int x) { this.x = x; }
@Override
public int getY() { return this.y; }
@Override
public void setY(final int y) { this.y = y; }
@Override
public short getLibraryId() { return this.libraryId; }
@Override
public void setLibraryId(final short libraryId) { this.libraryId = libraryId; }

/**
* Serializers for each subclass of PairedEnds which rely on implementations of serializations within each class itself
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ public abstract class PairedEnds extends MarkDuplicatesSparkRecord {
super(partitionIndex, name);
}

public abstract int getFirstStartPosition();
public abstract int getScore();
public abstract short getScore();
public abstract boolean isRead1ReverseStrand();

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.broadinstitute.hellbender.utils.read.markduplicates.sparkrecords;

import picard.sam.util.PhysicalLocation;

/**
* A common class for holding the fields in PhysicalLocation that we don't want to be serialized by kryo.
*
* NOTE: readGroupIndex is not transient as the readgroup is needed in several stages of MarkDuplicatesSpark, but is still
* contained in this class to mirror {@link PhysicalLocation}
*/
public abstract class TransientFieldPhysicalLocation extends PairedEnds implements PhysicalLocation {
// Information used to detect optical dupes
protected short readGroupIndex = -1;
protected transient short tile = -1;
protected transient int x = -1;
protected transient int y = -1;
protected transient short libraryId = -1;

public TransientFieldPhysicalLocation(int partitionIndex, String name) {
super(partitionIndex, name);
}

// Methods for OpticalDuplicateFinder.PhysicalLocation
@Override
public short getReadGroup() { return this.readGroupIndex; }

@Override
public void setReadGroup(final short readGroup) { this.readGroupIndex = readGroup; }

@Override
public short getTile() { return this.tile; }

@Override
public void setTile(final short tile) { this.tile = tile; }

@Override
public int getX() { return this.x; }

@Override
public void setX(final int x) { this.x = x; }

@Override
public int getY() { return this.y; }

@Override
public void setY(final int y) { this.y = y; }

@Override
public short getLibraryId() { return this.libraryId; }

@Override
public void setLibraryId(final short libraryId) { this.libraryId = libraryId; }
}
Loading

0 comments on commit 8c978ad

Please sign in to comment.