-
Notifications
You must be signed in to change notification settings - Fork 596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added an argument to specify the parts directory for ReadsSparkSink #4666
Conversation
Codecov Report
@@ Coverage Diff @@
## master #4666 +/- ##
==============================================
+ Coverage 79.823% 80.37% +0.547%
- Complexity 17328 18189 +861
==============================================
Files 1075 1083 +8
Lines 62917 66551 +3634
Branches 10181 11208 +1027
==============================================
+ Hits 50222 53487 +3265
- Misses 8714 8924 +210
- Partials 3981 4140 +159
|
|
||
final JavaRDD<SAMRecord> sortedReads = SparkUtils.sortReads(reads, header, numReducers); | ||
final String outputPartsDirectory = outputFile + ".parts/"; | ||
final String outputPartsDirectory = outputPartsDir==null?outputFile + ".parts/":outputPartsDir; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spaces please
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
BufferedWriter testfile = new BufferedWriter(new FileWriter(defaultDir.getAbsolutePath() + "/test.txt")); | ||
testfile.write("test"); | ||
testfile.close(); | ||
Runtime.getRuntime().exec("chmod a-w -R " + defaultDir.getAbsolutePath()+"/"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you have to shell out? can't you just use defaultDir.setWriteable(false)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IntelliJ makes a big fuss about how defaultDir.setWriteable(false)
will not be respected in the test suite, and i found that it wasn't resulting in failure when we tried to write to it.
|
||
// Make a directory with unusable permissions in place of where the default file will live | ||
final File defaultDir = new File(outputFile.getAbsolutePath() + ".parts/"); | ||
defaultDir.mkdir(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should assert that this returned true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
assertSingleShardedWritingWorks(inputBam, referenceFile, outputFile.getAbsolutePath(), outputDir.getAbsolutePath()); | ||
|
||
// Test that the file wasn't deleted when spark cleared its temp directory | ||
Assert.assertEquals(new File(defaultDir.getAbsolutePath() + "/test.txt").exists(), true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertTrue is probably better here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
final File outputDir = createTempDir(outputFileName + ".someOtherPlace"); | ||
|
||
// Make a directory with unusable permissions in place of where the default file will live | ||
final File defaultDir = new File(outputFile.getAbsolutePath() + ".parts/"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extract a constant for this value in the ReadSparkSink
and refer to it, or even a method that takes a file and returns the default parts folder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
fullName = OUTPUT_SHARD_DIR_LONG_NAME, | ||
optional = true) | ||
protected String shardedPartsDir = null; | ||
|
||
@Argument(doc="For tools that shuffle data or write an output, sets the number of reducers. Defaults to 0, which gives one partition per 10MB of input.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably want a check that makes sure you're not setting this if shardedOutput == true
, because it will be confusingly ignored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a check in the writer which will throw.
@@ -180,7 +180,7 @@ private void writeBam(final JavaRDD<GATKRead> reads, final String inputBamPath, | |||
try { | |||
ReadsSparkSink.writeReads(ctx, outputPath, bwaArgs.referencePath, reads, header, | |||
shardedOutput ? ReadsWriteFormat.SHARDED : ReadsWriteFormat.SINGLE, | |||
PSUtils.pathseqGetRecommendedNumReducers(inputBamPath, numReducers, getTargetPartitionSize())); | |||
PSUtils.pathseqGetRecommendedNumReducers(inputBamPath, numReducers, getTargetPartitionSize()), null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these nulls should all be replaced with references to shardedPartsDir
@@ -289,7 +289,7 @@ protected void runTool(final JavaSparkContext ctx) { | |||
final int numPartitions = Math.max(1, (int) (numTotalReads / readsPerPartitionOutput)); | |||
final JavaRDD<GATKRead> readsFinalRepartitioned = readsFinal.coalesce(numPartitions, false); | |||
ReadsSparkSink.writeReads(ctx, outputPath, null, readsFinalRepartitioned, header, | |||
shardedOutput ? ReadsWriteFormat.SHARDED : ReadsWriteFormat.SINGLE, numPartitions); | |||
shardedOutput ? ReadsWriteFormat.SHARDED : ReadsWriteFormat.SINGLE, numPartitions, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
likewise
@@ -221,7 +221,7 @@ protected void runTool(final JavaSparkContext ctx) { | |||
if (outputPath != null) { | |||
try { | |||
ReadsSparkSink.writeReads(ctx, outputPath, null, readsFinal, header, | |||
shardedOutput ? ReadsWriteFormat.SHARDED : ReadsWriteFormat.SINGLE, recommendedNumReducers); | |||
shardedOutput ? ReadsWriteFormat.SHARDED : ReadsWriteFormat.SINGLE, recommendedNumReducers, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
@@ -66,7 +66,7 @@ protected void runTool(final JavaSparkContext ctx) { | |||
referenceArguments.getReferencePath().toAbsolutePath().toUri().toString(), | |||
markedReads, bwaEngine.getHeader(), | |||
shardedOutput ? ReadsWriteFormat.SHARDED : ReadsWriteFormat.SINGLE, | |||
getRecommendedNumReducers()); | |||
getRecommendedNumReducers(), null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and here
Responded to a round of comments |
} | ||
|
||
@Test(dataProvider = "loadReadsBAM", groups = "spark") | ||
public void outputDirectoryTest(String inputBam, String outputFileName, String referenceFile, String outputFileExtension) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I rewrote this:
@Test(dataProvider = "loadReadsBAM", groups = "spark")
public void testSpecifyPartsDir(String inputBam, String outputFileName, String referenceFile, String outputFileExtension) throws IOException {
final File outputFile = createTempFile(outputFileName, outputFileExtension);
final File nonDefaultShardsDir = createTempDir(outputFileName + ".someOtherPlace");
final java.nio.file.Path defaultPartsDir = IOUtils.getPath(ReadsSparkSink.getDefaultPartsDirectory(outputFile.getAbsolutePath()));
final java.nio.file.Path subpath = defaultPartsDir.resolve("subpath");
// Make a directory with unusable permissions in place of where the default parts file will live
try {
final Set<PosixFilePermission> readOnly = EnumSet.of(PosixFilePermission.OWNER_READ);
final FileAttribute<Set<PosixFilePermission>> readOnlyPermissions = PosixFilePermissions.asFileAttribute(readOnly);
Files.createDirectory(defaultPartsDir);
// An empty directory seems to be able to be deleted even with write permissions disabled, so put a file in it
Files.createFile(subpath, readOnlyPermissions);
Files.setPosixFilePermissions(defaultPartsDir, readOnly);
//assert it fails when writing to the default path
Assert.assertThrows(() -> assertSingleShardedWritingWorks(inputBam, referenceFile, outputFile.getAbsolutePath(), null));
//show this succeeds when specifying a different path for the parts directory
assertSingleShardedWritingWorks(inputBam, referenceFile, outputFile.getAbsolutePath(), nonDefaultShardsDir.getAbsolutePath());
// Test that the file wasn't deleted when spark cleared its temp directory
Assert.assertTrue(Files.exists(defaultPartsDir));
} finally {
try {
final EnumSet<PosixFilePermission> readWriteExecute = EnumSet.of(PosixFilePermission.OWNER_READ,
PosixFilePermission.OWNER_WRITE,
PosixFilePermission.OWNER_EXECUTE);
Files.setPosixFilePermissions(defaultPartsDir, readWriteExecute);
Files.setPosixFilePermissions(subpath, readWriteExecute);
Files.deleteIfExists(subpath);
Files.deleteIfExists(defaultPartsDir);
} catch (IOException e) {
System.out.print("Failed to delete test file");
e.printStackTrace();
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jamesemery I changed it to be more assertive and also to not shell out.
@jamesemery Apparently my test doesn't work on travis. Gaaah. |
@lbergelson Returned to shelling out. |
so sad |
This should allow for the .parts whiteout to live elsewhere which is needed for using LOCAL disks with spark.
Fixes #4636