Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating MarkDuplicatesSpark tiebreaking to reflect changes in picard #5011

Merged
merged 4 commits into from
Aug 13, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class MarkDuplicatesSparkUtils {
public static final String OPTICAL_DUPLICATE_TOTAL_ATTRIBUTE_NAME = "OD";
// This comparator represents the tiebreaking for PairedEnds duplicate marking.
// We compare first on score, followed by unclipped start position (which is reversed here because of the expected ordering)
private static final Comparator<PairedEnds> PAIRED_ENDS_SCORE_COMPARATOR = Comparator.comparing(PairedEnds::getScore)
private static final Comparator<TransientFieldPhysicalLocation> PAIRED_ENDS_SCORE_COMPARATOR = Comparator.comparing(TransientFieldPhysicalLocation::getScore)
.thenComparing(PairedEndsCoordinateComparator.INSTANCE.reversed());

/**
Expand Down Expand Up @@ -321,7 +321,7 @@ private static JavaPairRDD<IndexPair<String>, Integer> markDuplicateRecords(fina
//empty MarkDuplicatesSparkRecord signify that a pair has a mate somewhere else
// If there are any non-fragment placeholders at this site, mark everything as duplicates, otherwise compute the best score
if (Utils.isNonEmpty(fragments) && !Utils.isNonEmpty(emptyFragments)) {
final Tuple2<IndexPair<String>, Integer> bestFragment = handleFragments(fragments);
final Tuple2<IndexPair<String>, Integer> bestFragment = handleFragments(fragments, finder);
nonDuplicates.add(bestFragment);
}

Expand Down Expand Up @@ -365,13 +365,18 @@ private static List<Tuple2<IndexPair<String>,Integer>> handlePassthroughs(List<M
}

private static Tuple2<IndexPair<String>, Integer> handlePairs(List<Pair> pairs, OpticalDuplicateFinder finder) {
// save ourselves the trouble when there are no optical duplicates to worry about
if (pairs.size() == 1) {
return (new Tuple2<>(new IndexPair<>(pairs.get(0).getName(), pairs.get(0).getPartitionIndex()), 0));
}

final Pair bestPair = pairs.stream()
.peek(pair -> finder.addLocationInformation(pair.getName(), pair))
.max(PAIRED_ENDS_SCORE_COMPARATOR)
.orElseThrow(() -> new GATKException.ShouldNeverReachHereException("There was no best pair because the stream was empty, but it shouldn't have been empty."));

// Split by orientation and count duplicates in each group separately.
final Map<Byte, List<Pair>> groupByOrientation = pairs.stream()
.peek(pair -> finder.addLocationInformation(pair.getName(), pair))//TODO this needs me to handle the name better
.collect(Collectors.groupingBy(Pair::getOrientationForOpticalDuplicates));
final int numOpticalDuplicates;
//todo do we not have to split the reporting of these by orientation?
Expand Down Expand Up @@ -399,9 +404,10 @@ private static int countOpticalDuplicates(OpticalDuplicateFinder finder, List<Pa
/**
* If there are fragments with no non-fragments overlapping at a site, select the best one according to PAIRED_ENDS_SCORE_COMPARATOR
*/
private static Tuple2<IndexPair<String>, Integer> handleFragments(List<MarkDuplicatesSparkRecord> duplicateFragmentGroup) {
private static Tuple2<IndexPair<String>, Integer> handleFragments(List<MarkDuplicatesSparkRecord> duplicateFragmentGroup, OpticalDuplicateFinder finder) {
return duplicateFragmentGroup.stream()
.map(f -> (Fragment)f)
.peek(f -> finder.addLocationInformation(f.getName(), f))
.max(PAIRED_ENDS_SCORE_COMPARATOR)
.map(best -> new Tuple2<>(new IndexPair<>(best.getName(), best.getPartitionIndex()), -1))
.orElse(null);
Expand Down Expand Up @@ -488,14 +494,14 @@ public static void saveMetricsRDD(final MetricsFile<GATKDuplicationMetrics, Doub
* NOTE: Because the original records were grouped by readname, we know that they must be unique once they hit this
* comparator and thus we don't need to worry about further tiebreaking for this method.
*/
public static final class PairedEndsCoordinateComparator implements Comparator<PairedEnds>, Serializable {
public static final class PairedEndsCoordinateComparator implements Comparator<TransientFieldPhysicalLocation>, Serializable {
private static final long serialVersionUID = 1L;

public static final PairedEndsCoordinateComparator INSTANCE = new PairedEndsCoordinateComparator();
private PairedEndsCoordinateComparator() { }

@Override
public int compare( PairedEnds first, PairedEnds second ) {
public int compare( TransientFieldPhysicalLocation first, TransientFieldPhysicalLocation second ) {
int result = Integer.compare(first.getFirstStartPosition(), second.getFirstStartPosition());
if ( result != 0 ) {
return result;
Expand All @@ -506,6 +512,16 @@ public int compare( PairedEnds first, PairedEnds second ) {
return first.isRead1ReverseStrand() ? -1: 1;
}

if (first.getTile() != second.getTile()) {
return first.getTile() - second.getTile();
}
if (first.getX() != second.getX()) {
return first.getX() - second.getX();
}
if (first.getY() != second.getY()) {
return first.getY() - second.getY();
}

if ( first.getName() != null && second.getName() != null ) {
result = first.getName().compareTo(second.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.broadinstitute.hellbender.utils.read.markduplicates.MarkDuplicatesScoringStrategy;
import org.broadinstitute.hellbender.utils.read.markduplicates.ReadEnds;
import org.broadinstitute.hellbender.utils.read.markduplicates.ReadsKey;
import picard.sam.util.PhysicalLocation;

import java.util.Map;

Expand All @@ -17,7 +18,7 @@
* This class holds onto as little information as possible in an attempt to prevent excessive serialization of
* during the processing step of MarkDuplicatesSpark
*/
public class Fragment extends PairedEnds {
public class Fragment extends TransientFieldPhysicalLocation {
protected transient ReadsKey key;

private final int firstStartPosition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* during the processing step of MarkDuplicatesSpark
*/
@DefaultSerializer(Pair.Serializer.class)
public final class Pair extends PairedEnds implements PhysicalLocation {
public final class Pair extends TransientFieldPhysicalLocation {
protected transient ReadsKey key;

private final int firstStartPosition;
Expand All @@ -32,14 +32,6 @@ public final class Pair extends PairedEnds implements PhysicalLocation {
private final int score;
private final boolean wasFlipped;

// Information used to detect optical dupes
private short readGroupIndex = -1;

private transient short tile = -1;
private transient int x = -1;
private transient int y = -1;
private transient short libraryId = -1;

public Pair(final GATKRead read1, final GATKRead read2, final SAMFileHeader header, int partitionIndex, MarkDuplicatesScoringStrategy scoringStrategy, Map<String, Byte> headerLibraryMap) {
super(partitionIndex, read1.getName());

Expand Down Expand Up @@ -181,28 +173,6 @@ private byte getOrientation(final boolean optical) {
return ReadEnds.FF; //at this point we know for sure isRead1ReverseStrand is false and isRead2ReverseStrand is false
}

// Methods for OpticalDuplicateFinder.PhysicalLocation
@Override
public short getReadGroup() { return this.readGroupIndex; }
@Override
public void setReadGroup(final short readGroup) { this.readGroupIndex = readGroup; }
@Override
public short getTile() { return this.tile; }
@Override
public void setTile(final short tile) { this.tile = tile; }
@Override
public int getX() { return this.x; }
@Override
public void setX(final int x) { this.x = x; }
@Override
public int getY() { return this.y; }
@Override
public void setY(final int y) { this.y = y; }
@Override
public short getLibraryId() { return this.libraryId; }
@Override
public void setLibraryId(final short libraryId) { this.libraryId = libraryId; }

/**
* Serializers for each subclass of PairedEnds which rely on implementations of serializations within each class itself
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.broadinstitute.hellbender.utils.read.markduplicates.sparkrecords;

import picard.sam.util.PhysicalLocation;

/**
* A common interface for holding the fields in PhysicalLocation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't an interface

*/
public abstract class TransientFieldPhysicalLocation extends PairedEnds implements PhysicalLocation {
// Information used to detect optical dupes
protected short readGroupIndex = -1;
protected transient short tile = -1;
protected transient int x = -1;
protected transient int y = -1;
protected transient short libraryId = -1;

public TransientFieldPhysicalLocation(int partitionIndex, String name) {
super(partitionIndex, name);
}

// Methods for OpticalDuplicateFinder.PhysicalLocation
@Override
public short getReadGroup() { return this.readGroupIndex; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's weird that these read group isn't transient but the others are, should you javadoc that?


@Override
public void setReadGroup(final short readGroup) { this.readGroupIndex = readGroup; }

@Override
public short getTile() { return this.tile; }

@Override
public void setTile(final short tile) { this.tile = tile; }

@Override
public int getX() { return this.x; }

@Override
public void setX(final int x) { this.x = x; }

@Override
public int getY() { return this.y; }

@Override
public void setY(final int y) { this.y = y; }

@Override
public short getLibraryId() { return this.libraryId; }

@Override
public void setLibraryId(final short libraryId) { this.libraryId = libraryId; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import picard.sam.util.PhysicalLocationInt;
import picard.sam.util.ReadNameParser;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -157,13 +159,61 @@ public void testOpticalDuplicateFinding() {
tester.runTest();
}

@Test
public void testOpticalDuplicateClusterSamePositionNoOpticalDuplicates() {
@DataProvider
public Object[][] readNameData(){
return new Object[][]{
{"RUNID:7:1203:2886:82292", "RUNID:7:1205:3886:16834"},

{"RUNID:7:1203:2886:16756", "RUNID:7:1205:3886:16756"},
{"RUNID:7:1204:2886:16756", "RUNID:7:1205:3886:16756"},
{"RUNID:7:1205:2886:16756", "RUNID:7:1205:3886:16756"},
{"RUNID:7:1206:2886:16756", "RUNID:7:1205:3886:16756"},
{"RUNID:7:1207:2886:16756", "RUNID:7:1205:3886:16756"},

{"RUNID:7:1203:2886:16756", "RUNID:7:1203:4886:26756"},
{"RUNID:7:1203:3886:16756", "RUNID:7:1203:4886:26756"},
{"RUNID:7:1203:4886:16756", "RUNID:7:1203:4886:26756"},
{"RUNID:7:1203:5886:16756", "RUNID:7:1203:4886:26756"},
{"RUNID:7:1203:6886:16756", "RUNID:7:1203:4886:26756"},

{"RUNID:7:1203:2886:34756", "RUNID:7:1203:2886:36756"},
{"RUNID:7:1203:2886:35756", "RUNID:7:1203:2886:36756"},
{"RUNID:7:1203:2886:37756", "RUNID:7:1203:2886:36756"},
{"RUNID:7:1203:2886:38756", "RUNID:7:1203:2886:36756"},
};
}

@Test(dataProvider = "readNameData")
public void testOpticalDuplicateClusterSamePositionNoOpticalDuplicates(final String readName1, final String readName2) {
// This tests the readname based tiebreaking code in mark duplicates. Since it's ambiguous which read should be marked
// as duplicate or not if scores match we break ties by evaluating the readname for consistencies sake.

final ReadNameParser parser = new ReadNameParser();

final PhysicalLocationInt position1 = new PhysicalLocationInt();
final PhysicalLocationInt position2 = new PhysicalLocationInt();

parser.addLocationInformation(readName1, position1);
parser.addLocationInformation(readName2, position2);

final AbstractMarkDuplicatesTester tester = getTester();
tester.getSamRecordSetBuilder().setReadLength(101);
tester.setExpectedOpticalDuplicate(0);
tester.addMatePair("RUNID:7:1203:2886:82292", 1, 485253, 485253, false, false, true, true, "42M59S", "59S42M", false, true, false, false, false, DEFAULT_BASE_QUALITY); // duplicate
tester.addMatePair("RUNID:7:1203:2884:16834", 1, 485253, 485253, false, false, false, false, "59S42M", "42M59S", true, false, false, false, false, ELIGIBLE_BASE_QUALITY);

int compare = position1.tile - position2.tile;
if (compare == 0) {
compare = position1.x - position2.x;
}

if (compare == 0) {
compare = position1.y - position2.y;
}

final boolean isDuplicate = compare < 0;

tester.addMatePair(readName1, 1,485253, 485253, false, false, !isDuplicate, !isDuplicate, "42M59S", "59S42M", false, true, false, false, false, DEFAULT_BASE_QUALITY);
tester.addMatePair(readName2, 1,485253, 485253, false, false, isDuplicate, isDuplicate, "59S42M", "42M59S", true, false, false, false, false, DEFAULT_BASE_QUALITY);

tester.runTest();
}

Expand Down