Skip to content

Commit

Permalink
zUMIs2.9.2
Browse files Browse the repository at this point in the history
  • Loading branch information
cziegenhain committed Jul 14, 2020
1 parent 16072cb commit 98835fe
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 53 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ We provide a script to convert zUMIs output into loom file automatically based o
zUMIs will try to automatically do this, otherwise convert zUMIs output to loom by simply running `Rscript rds2loom.R myRun.yaml`.

## Changelog
14 July 2020: [zUMIs2.9.2](https://github.com/sdparekh/zUMIs/releases/tag/2.9.2): Several bug fixes: Prevent RAM from ballooning, issues with resuming from different stage. Speed up demultiplexing further by chrosome-wise operations. Remove need for second bam file sorting after hamming collapse by keeping sort order.

10 July 2020: [zUMIs2.9.1](https://github.com/sdparekh/zUMIs/releases/tag/2.9.1): Revert changes from pull request #194 (commit #ff7e879) that introduced a bug when using PE reads.

05 July 2020: [zUMIs2.9.0 released](https://github.com/sdparekh/zUMIs/releases/tag/2.9.0): Speed up STAR read mapping by parallel instances if enough resources are available. Change the main zUMIs script to `zUMIs.sh`. Speed up and reduce clutter by loading reads from bam files using parallelised Rsamtools calls instead of printing temporary text files. Speed up counting by parallelising exon / intron / exon+intron counting as well as downsamplings. Speed up by parallelising creation of wide format count matrices.
Expand Down
123 changes: 89 additions & 34 deletions UMIstuffFUN.R
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@

splitRG<-function(bccount,mem){
splitRG<-function(bccount,mem,hamdist){
if(is.null(mem) || mem==0){
maxR<- Inf
}else{
maxR<- floor( mem*1000 * 4500 )
}
if( (maxR > 2e+09 & opt$read_layout == "SE") | (maxR > 1e+09 & opt$read_layout == "PE") ){
maxR <- ifelse(opt$read_layout == "SE",2e+09,1e+09)
#if( (maxR > 2e+09 & opt$read_layout == "SE") | (maxR > 1e+09 & opt$read_layout == "PE") ){
# maxR <- ifelse(opt$read_layout == "SE",2e+09,1e+09)
#}
if(maxR > 2e+09){
maxR <- 2e+09
}
if(opt$counting_opts$Ham_Dist>0){ #multicore hamming distance takes a lot of memory
if(hamdist>0){ #multicore hamming distance takes a lot of memory
#ram_factor <- ifelse(opt$num_threads>10, 5, 2)
ram_factor <- 2
ram_factor <- 3
maxR <- floor( maxR/ram_factor )
}

Expand Down Expand Up @@ -283,20 +286,23 @@ collectCounts<-function(reads,bccount,subsample.splits, mapList, ...){
umiFUN<-"umiCollapseID"
parallel::mclapply(mapList,function(tt){
ll<-list( all=umiFUNs[[umiFUN]](reads=reads,
bccount=bccount,
ftype=tt),
downsampling=parallel::mclapply( 1:nrow(subsample.splits) , function(i){
bccount=bccount,
ftype=tt),
#downsampling=parallel::mclapply( 1:nrow(subsample.splits) , function(i){
downsampling=lapply( 1:nrow(subsample.splits) , function(i){
umiFUNs[[umiFUN]](reads,bccount,
nmin=subsample.splits[i,1],
nmax=subsample.splits[i,2],
ftype=tt)}, mc.cores = floor(opt$num_threads/length(mapList)) )
ftype=tt)} )
#ftype=tt)}, mc.cores = floor(opt$num_threads/length(mapList)) )
)
names(ll$downsampling)<-subNames
ll
}, mc.cores = length(mapList))

}


bindList<-function(alldt,newdt){
for( i in names(alldt)){
alldt[[i]][[1]]<-rbind(alldt[[i]][[1]], newdt[[i]][[1]] )
Expand Down Expand Up @@ -340,86 +346,129 @@ correct_UB_tags <- function(bccount, samtoolsexc){
UB_cmd <- paste(pl_path,bam,bamout,mm,samtoolsexc)
UB_cmd_list[[i]] <- UB_cmd
}
bla <- parallel::mclapply(UB_cmd_list, system, ignore.stderr = TRUE, mc.cores = opt$num_threads, mc.preschedule=F, mc.silent = TRUE)
bla <- parallel::mclapply(UB_cmd_list, system, ignore.stderr = TRUE, mc.cores = ceiling(opt$num_threads/2), mc.preschedule=F, mc.silent = TRUE)
UB_cmd_list <- unlist(UB_cmd_list)
UB_files <- as.character(data.frame(strsplit(UB_cmd_list," "),stringsAsFactors=F)[3,])
#UB_files <- paste(UB_files, collapse = " ")
UB_mergelist <- paste0(opt$out_dir,"/zUMIs_output/molecule_mapping/",opt$project,"mergelist.txt")
write(UB_files, file = UB_mergelist)
outbam <- paste0(opt$out_dir,"/",opt$project,".filtered.Aligned.GeneTagged.UBcorrected.bam")
#merge_cmd <- paste(samtoolsexc,"merge -n -t BC -@",opt$num_threads,"-b",UB_mergelist,outbam)
merge_cmd <- paste(samtoolsexc,"cat -b",UB_mergelist,"-o",outbam)
outbam <- paste0(opt$out_dir,"/",opt$project,".filtered.Aligned.GeneTagged.UBcorrected.sorted.bam")
print("Creating sorted final bam file...")
merge_cmd <- paste(samtoolsexc,"merge -f -@",opt$num_threads,"-b",UB_mergelist,outbam)
#merge_cmd <- paste(samtoolsexc,"cat -b",UB_mergelist,"-o",outbam)
write(merge_cmd, file = paste0(opt$out_dir,"/",opt$project,".merge.sh"))
system(paste0("bash ",opt$out_dir,"/",opt$project,".merge.sh"))
return(outbam)
}

demultiplex_bam <- function(opt, bamfile, nBCs){
demultiplex_bam <- function(opt, bamfile, nBCs, samtoolsexc, bccount){
if(!dir.exists( paste0(opt$out_dir,"/zUMIs_output/demultiplexed/") )){
dir.create( paste0(opt$out_dir,"/zUMIs_output/demultiplexed/") )
}

installed_py <- try(system("pip freeze", intern = TRUE))

if(any(grepl("pysam==",installed_py))){
print("Using python implementation to demultiplex.")
print(Sys.time())
max_filehandles <- as.numeric(system("ulimit -n", intern = TRUE))
threads_perBC <- floor(max_filehandles/nBCs)

if(max_filehandles < nBCs | nBCs > 10000){
#print("Warning! You cannot open enough filehandles for demultiplexing! Increase ulimit -n")
#break up in several demultiplexing runs to avoid choke
nchunks <- ifelse(max_filehandles < nBCs, no = ceiling(nBCs/10000), yes = ceiling(nBCs/(max_filehandles-100)))
if(nchunks == 1){nchunks = 2}
print(paste("Breaking up demultiplexing in",nchunks,"chunks. This may be because you have >10000 cells or a too low filehandle limit (ulimit -n)."))

full_bclist <- paste0(opt$out_dir,"/zUMIs_output/",opt$project,"kept_barcodes.txt")
bcsplit_prefix <- paste0(opt$out_dir,"/zUMIs_output/.",opt$project,"kept_barcodes.")

split_cmd <- paste0("split -a 3 -n l/",nchunks," ",full_bclist, " ", bcsplit_prefix)
system(split_cmd)
bclist <- list.files(path = paste0(opt$out_dir,"/zUMIs_output/"), pattern = paste0(".",opt$project,"kept_barcodes."),all.files = TRUE, full.names = TRUE)
header_cmd <- paste('sed -i -e \'1s/^/XC,n,cellindex\\n/\'', bclist[-1], collapse = '; ', sep = ' ')
system(header_cmd)

if(max_filehandles < nBCs){threads_perBC <- 1}
}else{
bclist <- paste0(opt$out_dir,"/zUMIs_output/",opt$project,"kept_barcodes.txt")
}

if(threads_perBC > 2){
threads_perBC <- 2
}
threads_decompress <- opt$num_threads - threads_perBC
outstub <- paste0(opt$out_dir,"/zUMIs_output/demultiplexed/",opt$project,".")
py_script <- paste0(opt$zUMIs_directory,"/misc/demultiplex_BC.py")
demux_cmd <- paste(
"python3", py_script,
"--bam", bamfile,
"--out", outstub,
"--bc", bclist,
"--pout", threads_perBC,
"--pin", threads_decompress
, collapse = "; ")
print("Demultiplexing zUMIs bam file...")

if(threads_decompress > 10){ #if capacity is there, do demultiplexing parallelised per chromosome
collect_demultiplex = TRUE #set a flag to remember to collect the output chunks later
demux_cmd <- "sleep 1" #set a decoy system command
threads_decompress = 10
threads_chromosomes = ceiling(opt$num_threads/threads_decompress)
if(threads_chromosomes > 10){
threads_chromosomes <- 10 #prevent mayhem
}
chromosomes_todo <- Rsamtools::seqinfo(Rsamtools::BamFile(bamfile))
chromosomes_todo <- c(seqnames(chromosomes_todo), "zunmapped") #don't forget the unmapped reads :)
tmp_outdir <- paste0(opt$out_dir,"/zUMIs_output/demultiplexed/",opt$project,"/")
dir.create(tmp_outdir, showWarnings = FALSE)
xyz <- parallel::mclapply(chromosomes_todo, function(chr){
pysam_cmd <- paste(
"python3", py_script,
"--bam", bamfile,
"--out", tmp_outdir,
"--bc", bclist,
"--pout", threads_perBC,
"--pin", threads_decompress,
"--chr", chr
, collapse = "; ")
system(pysam_cmd)
}, mc.cores = threads_chromosomes)

}else{
outstub <- paste0(opt$out_dir,"/zUMIs_output/demultiplexed/",opt$project,".")
demux_cmd <- paste(
"python3", py_script,
"--bam", bamfile,
"--out", outstub,
"--bc", bclist,
"--pout", threads_perBC,
"--pin", threads_decompress,
"--chr", "allreads"
, collapse = "; ")
}

}else{
print("Using perl implementation to demultiplex.")
demux_cmd <- paste0(opt$zUMIs_directory,"/misc/demultiplex_BC.pl ",opt$out_dir," ",opt$project, " ", bamfile, " ", samtoolsexc )
}
system(demux_cmd)
if(collect_demultiplex){
xyz <- parallel::mclapply(bccount$XC, function(x){
cell_files <- paste0(tmp_outdir,x,".",chromosomes_todo,".demx.bam", collapse = " ")
output_file <- paste0(opt$out_dir,"/zUMIs_output/demultiplexed/",opt$project,".",x,".demx.bam")
cat_cmd <- paste(samtoolsexc, "cat", "-o", output_file, cell_files)
system(cat_cmd)
}, mc.cores = opt$num_threads)
#remove the temp folder
system(paste("rm -r", tmp_outdir))
}
print("Demultiplexing complete.")
print(Sys.time())
}

split_bam <- function(bam, cpu, samtoolsexc){
UMIbam <- paste0(bam,".UMI.bam")
internalbam <- paste0(bam,".internal.bam")
cpus <- floor((cpu-4)/2)
cpus <- floor((cpu-10)/2)
if(cpus<1){
cpus <- 2
}

cmd_umi <- paste(samtoolsexc, "view -h -@ 2", bam, " | grep -v -P 'UB:Z:\t' | ", samtoolsexc, "view -b -@",cpus,"-o",UMIbam,"&")
cmd_internal <- paste(samtoolsexc, "view -h -@ 2", bam, " | grep -v 'UB:Z:[A-Z]' | ", samtoolsexc, "view -b -@",cpus,"-o",internalbam,"&")
cmd_umi <- paste(samtoolsexc, "view -h -@ 5", bam, " | grep -v -P 'UB:Z:\t' | ", samtoolsexc, "view -b -@",cpus,"-o",UMIbam,"&")
cmd_internal <- paste(samtoolsexc, "view -h -@ 5", bam, " | grep -v 'UB:Z:[A-Z]' | ", samtoolsexc, "view -b -@",cpus,"-o",internalbam,"&")
system(paste(cmd_umi,cmd_internal,"wait"))
return(c(internalbam,UMIbam))
}
Expand Down Expand Up @@ -601,3 +650,9 @@ RPKM.calc <- function(exprmat, gene.length){

return (dt[,c("GE","RG","prob"), with = FALSE])
}

check_read_layout <- function(bamfile){
isbamPE <- suppressMessages(Rsamtools::testPairedEndBam(bamfile))
found_read_layout <- ifelse(isbamPE == TRUE, "PE", "SE")
return(found_read_layout)
}
28 changes: 21 additions & 7 deletions misc/demultiplex_BC.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,26 @@ def read_cellBCs (bcfile):
bclist.pop(0)
return(bclist)

def demultiplex_bam (bamfile, bcdict, outpath, pout, pin):
def demultiplex_bam (bamfile, bcdict, outpath, pout, pin, chr):
inbam = pysam.AlignmentFile(bamfile, 'rb', threads = pin)

if chr == 'zunmapped':
label = 'zunmapped'
chr = '*'
elif chr == 'allreads':
chr = None
else:
label = chr

bcfilehandles = {}
for bc in bcdict:
path = outpath + bc + '.demx.bam'
if chr is None:
path = outpath + bc + '.demx.bam'
else:
path = outpath + bc + "." + label + '.demx.bam'
bcfilehandles[bc] = pysam.AlignmentFile(path, "wb", template=inbam, threads = pout)

for read in inbam:
for read in inbam.fetch(contig=chr):
thisBC = read.get_tag("BC")
if thisBC in bcfilehandles:
bcfilehandles[thisBC].write(read)
Expand All @@ -41,18 +52,21 @@ def main():
parser.add_argument('--pout', type=int,
help='Number of processes for output bams')
parser.add_argument('--pin', type=int,
help='Number of processes for input bam')
help='Number of processes for input bam'),
parser.add_argument('--chr', type=str,
help='Chromosome to use')
args = parser.parse_args()

print("Demultiplexing zUMIs bam file...")
#print("Demultiplexing zUMIs bam file...")
bc_whitelist = read_cellBCs(args.bc)
demultiplex_bam(
bamfile = args.bam,
bcdict = bc_whitelist,
outpath = args.out,
pout = args.pout,
pin = args.pin)
print("Demultiplexing complete.")
pin = args.pin,
chr = args.chr)
#print("Demultiplexing complete.")

if __name__ == "__main__":
main()
6 changes: 6 additions & 0 deletions statsFUN.R
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,10 @@ sumGene <- function(counts){
})
outdt <- rbindlist(outlist)
return(outdt)
}

check_read_layout <- function(bamfile){
isbamPE <- suppressMessages(Rsamtools::testPairedEndBam(bamfile))
found_read_layout <- ifelse(isbamPE == TRUE, "PE", "SE")
return(found_read_layout)
}
31 changes: 22 additions & 9 deletions zUMIs-dge2.R
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ if(opt$barcodes$BarcodeBinning > 0){
}else{
bccount <- fread(paste0(opt$out_dir,"/zUMIs_output/",opt$project,"kept_barcodes.txt"))
}
bccount<-splitRG(bccount=bccount, mem= opt$mem_limit)
bccount<-splitRG(bccount=bccount, mem= opt$mem_limit, hamdist = opt$counting_opts$Ham_Dist)

##############################################################
##### featureCounts
Expand All @@ -67,6 +67,8 @@ try(data.table::fwrite(gene_name_mapping, file = paste0(opt$out_dir,"/zUMIs_outp

if(smart3_flag & opt$counting_opts$strand == 1){
#split bam in UMU ends and internal
print("Preparing Smart-seq3 data for stranded gene assignment...")
print(Sys.time())
tmp_bams <- split_bam(bam = abamfile, cpu = opt$num_threads, samtoolsexc=samtoolsexc)

#assign features with appropriate strand
Expand Down Expand Up @@ -119,8 +121,13 @@ if(is.null(opt$mem_limit)){
mempercpu <- max(round(opt$mem_limit/opt$num_threads,0),1)
}


if(opt$counting_opts$Ham_Dist == 0){
sortbamfile <-paste0(opt$out_dir,"/",opt$project,".filtered.Aligned.GeneTagged.sorted.bam")
print(Sys.time())
print("Coordinate sorting final bam file...")
sort_cmd <- paste0(samtoolsexc," sort -O 'BAM' -@ ",opt$num_threads," -m ",mempercpu,"G -o ",sortbamfile," ",outbamfile)
system(sort_cmd)
}else{
#run hamming distance collapsing here and write output into bam file
if(!dir.exists( paste0(opt$out_dir,"/zUMIs_output/molecule_mapping/") )){
Expand All @@ -138,6 +145,11 @@ if(opt$counting_opts$Ham_Dist == 0){
system(paste0("rm ",tmpbamfile))
print(Sys.time())

#check if PE / SE flag is set correctly
if(is.null(opt$read_layout)){
opt$read_layout <- check_read_layout(outbamfile)
}

for(i in unique(bccount$chunkID)){
print( paste( "Hamming distance collapse in barcode chunk", i, "out of",length(unique(bccount$chunkID)) ))
reads <- reads2genes_new(featfile = outbamfile,
Expand All @@ -149,20 +161,21 @@ if(opt$counting_opts$Ham_Dist == 0){
u <- umiCollapseHam(reads,bccount, HamDist=opt$counting_opts$Ham_Dist)
}
print("Demultiplexing output bam file by cell barcode...")
demultiplex_bam(opt, outbamfile, nBCs = length(unique(bccount$XC)))
demultiplex_bam(opt, outbamfile, nBCs = length(unique(bccount$XC)), bccount = bccount, samtoolsexc = samtoolsexc)
print("Correcting UMI barcode tags...")
outbamfile <- correct_UB_tags(bccount, samtoolsexc)
sortbamfile <-paste0(opt$out_dir,"/",opt$project,".filtered.Aligned.GeneTagged.UBcorrected.sorted.bam")
sortbamfile <- correct_UB_tags(bccount, samtoolsexc)
#sortbamfile <-paste0(opt$out_dir,"/",opt$project,".filtered.Aligned.GeneTagged.UBcorrected.sorted.bam")
bccount<-splitRG(bccount=bccount, mem= opt$mem_limit, hamdist = 0) # allow more reads to be in RAM fur subsequent steps
}
print(Sys.time())
print("Coordinate sorting final bam file...")
sort_cmd <- paste0(samtoolsexc," sort -O 'BAM' -@ ",opt$num_threads," -m ",mempercpu,"G -o ",sortbamfile," ",outbamfile)
system(sort_cmd)
index_cmd <- paste(samtoolsexc,"index -@",opt$num_threads,sortbamfile)
system(index_cmd)
system(paste0("rm ",outbamfile))
print(Sys.time())

#check if PE / SE flag is set correctly
if(is.null(opt$read_layout)){
opt$read_layout <- check_read_layout(sortbamfile)
}

##########################################
#set Downsampling ranges
Expand Down Expand Up @@ -263,7 +276,7 @@ if(opt$counting_opts$intronProb == TRUE){
#demultiplexing
if(opt$counting_opts$Ham_Dist == 0 && opt$barcodes$demultiplex == TRUE ){ #otherwise its already demultiplexed!
print("Demultiplexing output bam file by cell barcode...")
demultiplex_bam(opt, sortbamfile, nBCs = length(unique(bccount$XC)))
demultiplex_bam(opt, sortbamfile, nBCs = length(unique(bccount$XC)), bccount = bccount, samtoolsexc = samtoolsexc)
}

#################
Expand Down
Loading

0 comments on commit 98835fe

Please sign in to comment.