Skip to content
This repository has been archived by the owner on Oct 29, 2023. It is now read-only.

Commit

Permalink
Merge pull request #34 from iliat/dev-broad
Browse files Browse the repository at this point in the history
Sharded BAM reader and a sample read counting pipeline
  • Loading branch information
iliat committed Mar 12, 2015
2 parents af2db8d + 653ae81 commit 70bc9f7
Show file tree
Hide file tree
Showing 16 changed files with 1,669 additions and 0 deletions.
11 changes: 11 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,17 @@
<version>2.5</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.github.samtools</groupId>
<artifactId>htsjdk</artifactId>
<version>1.128</version>
<exclusions>
<exclusion>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.genomics.dataflow.pipelines;

import static com.google.common.collect.Lists.newArrayList;

import com.google.api.services.genomics.model.Read;
import com.google.api.services.genomics.model.SearchReadsRequest;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.options.Default;
import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
import com.google.cloud.dataflow.sdk.options.Description;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.Count;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.genomics.dataflow.readers.ReadReader;
import com.google.cloud.genomics.dataflow.readers.bam.ReadBAMTransform;
import com.google.cloud.genomics.dataflow.readers.bam.Reader;
import com.google.cloud.genomics.dataflow.utils.DataflowWorkarounds;
import com.google.cloud.genomics.dataflow.utils.GCSOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsDatasetOptions;
import com.google.cloud.genomics.dataflow.utils.GenomicsOptions;
import com.google.cloud.genomics.utils.Contig;
import com.google.cloud.genomics.utils.GenomicsFactory;
import com.google.cloud.genomics.utils.Paginator;
import com.google.cloud.genomics.utils.Paginator.ShardBoundary;
import com.google.common.base.Function;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;

import org.codehaus.jackson.annotate.JsonIgnore;

import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.logging.Logger;

/**
* Simple read counting pipeline, intended as an example for reading data from
* APIs OR BAM files and invoking GATK tools.
*/
public class CountReads {
private static final Logger LOG = Logger.getLogger(CountReads.class.getName());
private static CountReadsOptions options;
private static Pipeline p;
private static GenomicsFactory.OfflineAuth auth;
public static interface CountReadsOptions extends GenomicsDatasetOptions, GCSOptions {
@Description("The ID of the Google Genomics ReadGroupSet this pipeline is working with. "
+ "Default (empty) indicates all ReadGroupSets.")
@Default.String("")
String getReadGroupSetId();

void setReadGroupSetId(String readGroupSetId);

@Description("The path to the BAM file to get reads data from.")
@Default.String("")
String getBAMFilePath();

void setBAMFilePath(String filePath);

@Description("Whether to shard BAM reading")
@Default.Boolean(true)
boolean getShardBAMReading();

void setShardBAMReading(boolean newValue);

class ContigsFactory implements DefaultValueFactory<Iterable<Contig>> {
@Override
public Iterable<Contig> create(PipelineOptions options) {
return Iterables.transform(Splitter.on(",").split(options.as(CountReadsOptions.class).getReferences()),
new Function<String, Contig>() {
@Override
public Contig apply(String contigString) {
ArrayList<String> contigInfo = newArrayList(Splitter.on(":").split(contigString));
return new Contig(contigInfo.get(0),
contigInfo.size() > 1 ?
Long.valueOf(contigInfo.get(1)) : 0,
contigInfo.size() > 2 ?
Long.valueOf(contigInfo.get(2)) : -1);
}
});
}
}

@Default.InstanceFactory(ContigsFactory.class)
@JsonIgnore
Iterable<Contig> getContigs();

void setContigs(Iterable<Contig> contigs);
}

public static void main(String[] args) throws GeneralSecurityException, IOException {
options = PipelineOptionsFactory.fromArgs(args).withValidation().as(CountReadsOptions.class);
GenomicsOptions.Methods.validateOptions(options);
auth = GCSOptions.Methods.createGCSAuth(options);
p = Pipeline.create(options);
DataflowWorkarounds.registerGenomicsCoders(p);

PCollection<Read> reads = getReads();
PCollection<Long> readCount = reads.apply(Count.<Read>globally());
PCollection<String> readCountText = readCount.apply(ParDo.of(new DoFn<Long, String>() {
@Override
public void processElement(DoFn<Long, String>.ProcessContext c) throws Exception {
c.output(String.valueOf(c.element()));
}
}).named("toString"));
readCountText.apply(TextIO.Write.to(options.getOutput()).named("WriteOutput"));
p.run();
}

private static PCollection<Read> getReads() throws IOException {
if (!options.getBAMFilePath().isEmpty()) {
return getReadsFromBAMFile();
}
if (!options.getReadGroupSetId().isEmpty()) {
return getReadsFromAPI();
}
throw new IOException("Either BAM file or ReadGroupSet must be specified");
}

private static PCollection<Read> getReadsFromAPI() {
List<SearchReadsRequest> requests = getReadRequests(options);
PCollection<SearchReadsRequest> readRequests =
DataflowWorkarounds.getPCollection(requests, p, options.getNumWorkers());
PCollection<Read> reads =
readRequests.apply(
ParDo.of(
new ReadReader(auth, Paginator.ShardBoundary.OVERLAPS))
.named(ReadReader.class.getSimpleName()));
return reads;
}

private static List<SearchReadsRequest> getReadRequests(CountReadsOptions options) {
List<SearchReadsRequest> requests = Lists.newArrayList();
requests.add(new SearchReadsRequest()
.setReadGroupSetIds(
Collections.singletonList(options.getReadGroupSetId()))
.setReferenceName(options.getReferences())
.setPageSize(2048));

return requests;
}

private static PCollection<Read> getReadsFromBAMFile() throws IOException {
LOG.info("getReadsFromBAMFile");

final Iterable<Contig> contigs = options.getContigs();

if (options.getShardBAMReading()) {
return ReadBAMTransform.getReadsFromBAMFilesSharded(p,
auth,
contigs,
Collections.singletonList(options.getBAMFilePath()));
} else { // For testing and comparing sharded vs. not sharded only
return p.apply(
Create.of(
Reader.readSequentiallyForTesting(
GCSOptions.Methods.createStorageClient(options, auth),
options.getBAMFilePath(),
contigs.iterator().next())));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.genomics.dataflow.readers.bam;

import com.google.api.services.storage.Storage;

import htsjdk.samtools.SamInputResource;
import htsjdk.samtools.SamReader;
import htsjdk.samtools.SamReaderFactory;
import htsjdk.samtools.seekablestream.SeekableStream;

import java.io.IOException;
import java.util.logging.Logger;

/**
* Utility methods for opening BAM files from GCS storage using HTSJDK.
* For sharding in pipelines we need access to the guts of BAM index,
* so openBAMAndExposeIndex provides a convenient way to get both SamReader and
* a stream for an index file.
*/
public class BAMIO {
public static class ReaderAndIndex {
public SamReader reader;
public SeekableStream index;
}
private static final Logger LOG = Logger.getLogger(BAMIO.class.getName());

public static ReaderAndIndex openBAMAndExposeIndex(Storage.Objects storageClient,String gcsStoragePath) throws IOException {
ReaderAndIndex result = new ReaderAndIndex();
result.index = openIndexForPath(storageClient, gcsStoragePath);
result.reader = openBAMReader(
openBAMFile(storageClient, gcsStoragePath,result.index));
return result;
}

public static SamReader openBAM(Storage.Objects storageClient,String gcsStoragePath) throws IOException {
return openBAMReader(openBAMFile(storageClient, gcsStoragePath,
openIndexForPath(storageClient, gcsStoragePath)));
}

private static SeekableStream openIndexForPath(Storage.Objects storageClient,String gcsStoragePath) {
final String indexPath = gcsStoragePath + ".bai";
try {
return new SeekableGCSStream(storageClient, indexPath);
} catch (IOException ex) {
LOG.info("No index for " + indexPath);
// Ignore if there is no bai file
}
return null;
}

private static SamInputResource openBAMFile(Storage.Objects storageClient, String gcsStoragePath, SeekableStream index) throws IOException {
SamInputResource samInputResource =
SamInputResource.of(new SeekableGCSStream(storageClient, gcsStoragePath));
if (index != null) {
samInputResource.index(index);
}

LOG.info("getReadsFromBAMFile - got input resources");
return samInputResource;
}

private static SamReader openBAMReader(SamInputResource resource) {
SamReaderFactory samReaderFactory = SamReaderFactory.makeDefault()
.enable(SamReaderFactory.Option.CACHE_FILE_BASED_INDEXES);
final SamReader samReader = samReaderFactory.open(resource);
return samReader;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.genomics.dataflow.readers.bam;

import com.google.cloud.genomics.utils.Contig;
import com.google.common.collect.Lists;

import htsjdk.samtools.BAMFileIndexImpl;
import htsjdk.samtools.Chunk;
import htsjdk.samtools.SAMFileSpanImpl;

import java.io.Serializable;
import java.util.List;

/**
* A shard of BAM data we will create during sharding and then use to drive the reading.
* We use this class during shard generation, by iteratively building
* a shard, extending it bin by bin (@see #addBin)
* At the end of the process, the shard is finalized (@see #finalize)
* and SAMFileSpan that has all the chunks we want to read is produced.
*/
public class BAMShard implements Serializable {
public String file;
public SAMFileSpanImpl span;
public Contig contig;
public List<Chunk> chunks;
public long cachedSizeInBytes = -1;

/**
* Begins a new shard with an empty chunk list and a starting locus.
*/
public BAMShard(String file, String referenceName, int firstLocus) {
this.file = file;
this.contig = new Contig(referenceName, firstLocus, -1);
this.chunks = Lists.newLinkedList();
this.span = null;
}

/**
* Creates a shard with a known file span.
* Such shard is not expected to be extended and calling addBin or finalize on it will fail.
* This constructor is used for "degenerate" shards like unmapped reads or
* all reads in cases where we don't have an index.
*/
public BAMShard(String file, SAMFileSpanImpl span, Contig contig) {
this.file = file;
this.span = span;
this.contig = contig;
this.chunks = null;
}

/**
* Appends chunks from another bin to the list and moved the end position.
*/
public void addBin(List<Chunk> chunksToAdd, int lastLocus) {
assert chunks != null;
contig = new Contig(contig.referenceName, contig.start, lastLocus);
chunks.addAll(chunksToAdd);
updateSpan();
}

/**
* Generates a final list of chunks, now that we know the exact bounding loci
* for this shard. We get all chunks overlapping this loci, and then ask the index
* for the chunks overlapping them.
*/
public BAMShard finalize(BAMFileIndexImpl index, int lastLocus) {
if (lastLocus >= 0) {
contig = new Contig(contig.referenceName, contig.start, lastLocus);
}
this.chunks = index.getChunksOverlapping(contig.referenceName,
(int)contig.start, (int)contig.end);
updateSpan();
return this;
}

/**
* Updates the underlying file span by optimizing and coalescing the current chunk list.
*/
private void updateSpan() {
span = new SAMFileSpanImpl(Chunk.optimizeChunkList(chunks, this.contig.start));
cachedSizeInBytes = -1;
}

public long sizeInLoci() {
return contig.end > 0 ? contig.end - contig.start : 0;
}

public long approximateSizeInBytes() {
if (cachedSizeInBytes < 0) {
cachedSizeInBytes = span.approximateSizeInBytes();
}
return cachedSizeInBytes;
}

@Override
public String toString() {
return file + ": " + contig.toString() + ", locus size = " + sizeInLoci() +
", span size = " + approximateSizeInBytes();
}
}
Loading

0 comments on commit 70bc9f7

Please sign in to comment.