Skip to content

Commit

Permalink
Updating MarkDuplicatesSpark tie-breaking to reflect changes in picard (
Browse files Browse the repository at this point in the history
#5011)

* updated tiebreaking
* checked that values were being set properly in every case
* added test from Picard for the tie-breaking code
* updated documentation
  • Loading branch information
jamesemery authored and lbergelson committed Aug 13, 2018
1 parent 3b4e273 commit 0f5b253
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class MarkDuplicatesSparkUtils {
public static final String OPTICAL_DUPLICATE_TOTAL_ATTRIBUTE_NAME = "OD";
// This comparator represents the tiebreaking for PairedEnds duplicate marking.
// We compare first on score, followed by unclipped start position (which is reversed here because of the expected ordering)
private static final Comparator<PairedEnds> PAIRED_ENDS_SCORE_COMPARATOR = Comparator.comparing(PairedEnds::getScore)
private static final Comparator<TransientFieldPhysicalLocation> PAIRED_ENDS_SCORE_COMPARATOR = Comparator.comparing(TransientFieldPhysicalLocation::getScore)
.thenComparing(PairedEndsCoordinateComparator.INSTANCE.reversed());

/**
Expand Down Expand Up @@ -321,7 +321,7 @@ private static JavaPairRDD<IndexPair<String>, Integer> markDuplicateRecords(fina
//empty MarkDuplicatesSparkRecord signify that a pair has a mate somewhere else
// If there are any non-fragment placeholders at this site, mark everything as duplicates, otherwise compute the best score
if (Utils.isNonEmpty(fragments) && !Utils.isNonEmpty(emptyFragments)) {
final Tuple2<IndexPair<String>, Integer> bestFragment = handleFragments(fragments);
final Tuple2<IndexPair<String>, Integer> bestFragment = handleFragments(fragments, finder);
nonDuplicates.add(bestFragment);
}

Expand Down Expand Up @@ -365,13 +365,18 @@ private static List<Tuple2<IndexPair<String>,Integer>> handlePassthroughs(List<M
}

private static Tuple2<IndexPair<String>, Integer> handlePairs(List<Pair> pairs, OpticalDuplicateFinder finder) {
// save ourselves the trouble when there are no optical duplicates to worry about
if (pairs.size() == 1) {
return (new Tuple2<>(new IndexPair<>(pairs.get(0).getName(), pairs.get(0).getPartitionIndex()), 0));
}

final Pair bestPair = pairs.stream()
.peek(pair -> finder.addLocationInformation(pair.getName(), pair))
.max(PAIRED_ENDS_SCORE_COMPARATOR)
.orElseThrow(() -> new GATKException.ShouldNeverReachHereException("There was no best pair because the stream was empty, but it shouldn't have been empty."));

// Split by orientation and count duplicates in each group separately.
final Map<Byte, List<Pair>> groupByOrientation = pairs.stream()
.peek(pair -> finder.addLocationInformation(pair.getName(), pair))//TODO this needs me to handle the name better
.collect(Collectors.groupingBy(Pair::getOrientationForOpticalDuplicates));
final int numOpticalDuplicates;
//todo do we not have to split the reporting of these by orientation?
Expand Down Expand Up @@ -399,9 +404,10 @@ private static int countOpticalDuplicates(OpticalDuplicateFinder finder, List<Pa
/**
* If there are fragments with no non-fragments overlapping at a site, select the best one according to PAIRED_ENDS_SCORE_COMPARATOR
*/
private static Tuple2<IndexPair<String>, Integer> handleFragments(List<MarkDuplicatesSparkRecord> duplicateFragmentGroup) {
private static Tuple2<IndexPair<String>, Integer> handleFragments(List<MarkDuplicatesSparkRecord> duplicateFragmentGroup, OpticalDuplicateFinder finder) {
return duplicateFragmentGroup.stream()
.map(f -> (Fragment)f)
.peek(f -> finder.addLocationInformation(f.getName(), f))
.max(PAIRED_ENDS_SCORE_COMPARATOR)
.map(best -> new Tuple2<>(new IndexPair<>(best.getName(), best.getPartitionIndex()), -1))
.orElse(null);
Expand Down Expand Up @@ -488,14 +494,14 @@ public static void saveMetricsRDD(final MetricsFile<GATKDuplicationMetrics, Doub
* NOTE: Because the original records were grouped by readname, we know that they must be unique once they hit this
* comparator and thus we don't need to worry about further tiebreaking for this method.
*/
public static final class PairedEndsCoordinateComparator implements Comparator<PairedEnds>, Serializable {
public static final class PairedEndsCoordinateComparator implements Comparator<TransientFieldPhysicalLocation>, Serializable {
private static final long serialVersionUID = 1L;

public static final PairedEndsCoordinateComparator INSTANCE = new PairedEndsCoordinateComparator();
private PairedEndsCoordinateComparator() { }

@Override
public int compare( PairedEnds first, PairedEnds second ) {
public int compare( TransientFieldPhysicalLocation first, TransientFieldPhysicalLocation second ) {
int result = Integer.compare(first.getFirstStartPosition(), second.getFirstStartPosition());
if ( result != 0 ) {
return result;
Expand All @@ -506,6 +512,16 @@ public int compare( PairedEnds first, PairedEnds second ) {
return first.isRead1ReverseStrand() ? -1: 1;
}

if (first.getTile() != second.getTile()) {
return first.getTile() - second.getTile();
}
if (first.getX() != second.getX()) {
return first.getX() - second.getX();
}
if (first.getY() != second.getY()) {
return first.getY() - second.getY();
}

if ( first.getName() != null && second.getName() != null ) {
result = first.getName().compareTo(second.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.broadinstitute.hellbender.utils.read.markduplicates.MarkDuplicatesScoringStrategy;
import org.broadinstitute.hellbender.utils.read.markduplicates.ReadEnds;
import org.broadinstitute.hellbender.utils.read.markduplicates.ReadsKey;
import picard.sam.util.PhysicalLocation;

import java.util.Map;

Expand All @@ -17,7 +18,7 @@
* This class holds onto as little information as possible in an attempt to prevent excessive serialization of
* during the processing step of MarkDuplicatesSpark
*/
public class Fragment extends PairedEnds {
public class Fragment extends TransientFieldPhysicalLocation {
protected transient ReadsKey key;

private final int firstStartPosition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* during the processing step of MarkDuplicatesSpark
*/
@DefaultSerializer(Pair.Serializer.class)
public final class Pair extends PairedEnds implements PhysicalLocation {
public final class Pair extends TransientFieldPhysicalLocation {
protected transient ReadsKey key;

private final int firstStartPosition;
Expand All @@ -32,14 +32,6 @@ public final class Pair extends PairedEnds implements PhysicalLocation {
private final int score;
private final boolean wasFlipped;

// Information used to detect optical dupes
private short readGroupIndex = -1;

private transient short tile = -1;
private transient int x = -1;
private transient int y = -1;
private transient short libraryId = -1;

public Pair(final GATKRead read1, final GATKRead read2, final SAMFileHeader header, int partitionIndex, MarkDuplicatesScoringStrategy scoringStrategy, Map<String, Byte> headerLibraryMap) {
super(partitionIndex, read1.getName());

Expand Down Expand Up @@ -181,28 +173,6 @@ private byte getOrientation(final boolean optical) {
return ReadEnds.FF; //at this point we know for sure isRead1ReverseStrand is false and isRead2ReverseStrand is false
}

// Methods for OpticalDuplicateFinder.PhysicalLocation
@Override
public short getReadGroup() { return this.readGroupIndex; }
@Override
public void setReadGroup(final short readGroup) { this.readGroupIndex = readGroup; }
@Override
public short getTile() { return this.tile; }
@Override
public void setTile(final short tile) { this.tile = tile; }
@Override
public int getX() { return this.x; }
@Override
public void setX(final int x) { this.x = x; }
@Override
public int getY() { return this.y; }
@Override
public void setY(final int y) { this.y = y; }
@Override
public short getLibraryId() { return this.libraryId; }
@Override
public void setLibraryId(final short libraryId) { this.libraryId = libraryId; }

/**
* Serializers for each subclass of PairedEnds which rely on implementations of serializations within each class itself
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.broadinstitute.hellbender.utils.read.markduplicates.sparkrecords;

import picard.sam.util.PhysicalLocation;

/**
* A common class for holding the fields in PhysicalLocation that we don't want to be serialized by kryo.
*
* NOTE: readGroupIndex is not transient as the readgroup is needed in several stages of MarkDuplicatesSpark, but is still
* contained in this class to mirror {@link PhysicalLocation}
*/
public abstract class TransientFieldPhysicalLocation extends PairedEnds implements PhysicalLocation {
// Information used to detect optical dupes
protected short readGroupIndex = -1;
protected transient short tile = -1;
protected transient int x = -1;
protected transient int y = -1;
protected transient short libraryId = -1;

public TransientFieldPhysicalLocation(int partitionIndex, String name) {
super(partitionIndex, name);
}

// Methods for OpticalDuplicateFinder.PhysicalLocation
@Override
public short getReadGroup() { return this.readGroupIndex; }

@Override
public void setReadGroup(final short readGroup) { this.readGroupIndex = readGroup; }

@Override
public short getTile() { return this.tile; }

@Override
public void setTile(final short tile) { this.tile = tile; }

@Override
public int getX() { return this.x; }

@Override
public void setX(final int x) { this.x = x; }

@Override
public int getY() { return this.y; }

@Override
public void setY(final int y) { this.y = y; }

@Override
public short getLibraryId() { return this.libraryId; }

@Override
public void setLibraryId(final short libraryId) { this.libraryId = libraryId; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import picard.sam.util.PhysicalLocationInt;
import picard.sam.util.ReadNameParser;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -157,13 +159,61 @@ public void testOpticalDuplicateFinding() {
tester.runTest();
}

@Test
public void testOpticalDuplicateClusterSamePositionNoOpticalDuplicates() {
@DataProvider
public Object[][] readNameData(){
return new Object[][]{
{"RUNID:7:1203:2886:82292", "RUNID:7:1205:3886:16834"},

{"RUNID:7:1203:2886:16756", "RUNID:7:1205:3886:16756"},
{"RUNID:7:1204:2886:16756", "RUNID:7:1205:3886:16756"},
{"RUNID:7:1205:2886:16756", "RUNID:7:1205:3886:16756"},
{"RUNID:7:1206:2886:16756", "RUNID:7:1205:3886:16756"},
{"RUNID:7:1207:2886:16756", "RUNID:7:1205:3886:16756"},

{"RUNID:7:1203:2886:16756", "RUNID:7:1203:4886:26756"},
{"RUNID:7:1203:3886:16756", "RUNID:7:1203:4886:26756"},
{"RUNID:7:1203:4886:16756", "RUNID:7:1203:4886:26756"},
{"RUNID:7:1203:5886:16756", "RUNID:7:1203:4886:26756"},
{"RUNID:7:1203:6886:16756", "RUNID:7:1203:4886:26756"},

{"RUNID:7:1203:2886:34756", "RUNID:7:1203:2886:36756"},
{"RUNID:7:1203:2886:35756", "RUNID:7:1203:2886:36756"},
{"RUNID:7:1203:2886:37756", "RUNID:7:1203:2886:36756"},
{"RUNID:7:1203:2886:38756", "RUNID:7:1203:2886:36756"},
};
}

@Test(dataProvider = "readNameData")
public void testOpticalDuplicateClusterSamePositionNoOpticalDuplicates(final String readName1, final String readName2) {
// This tests the readname based tiebreaking code in mark duplicates. Since it's ambiguous which read should be marked
// as duplicate or not if scores match we break ties by evaluating the readname for consistencies sake.

final ReadNameParser parser = new ReadNameParser();

final PhysicalLocationInt position1 = new PhysicalLocationInt();
final PhysicalLocationInt position2 = new PhysicalLocationInt();

parser.addLocationInformation(readName1, position1);
parser.addLocationInformation(readName2, position2);

final AbstractMarkDuplicatesTester tester = getTester();
tester.getSamRecordSetBuilder().setReadLength(101);
tester.setExpectedOpticalDuplicate(0);
tester.addMatePair("RUNID:7:1203:2886:82292", 1, 485253, 485253, false, false, true, true, "42M59S", "59S42M", false, true, false, false, false, DEFAULT_BASE_QUALITY); // duplicate
tester.addMatePair("RUNID:7:1203:2884:16834", 1, 485253, 485253, false, false, false, false, "59S42M", "42M59S", true, false, false, false, false, ELIGIBLE_BASE_QUALITY);

int compare = position1.tile - position2.tile;
if (compare == 0) {
compare = position1.x - position2.x;
}

if (compare == 0) {
compare = position1.y - position2.y;
}

final boolean isDuplicate = compare < 0;

tester.addMatePair(readName1, 1,485253, 485253, false, false, !isDuplicate, !isDuplicate, "42M59S", "59S42M", false, true, false, false, false, DEFAULT_BASE_QUALITY);
tester.addMatePair(readName2, 1,485253, 485253, false, false, isDuplicate, isDuplicate, "59S42M", "42M59S", true, false, false, false, false, DEFAULT_BASE_QUALITY);

tester.runTest();
}

Expand Down

0 comments on commit 0f5b253

Please sign in to comment.