Skip to content

Commit

Permalink
fixed a bug causing segmentation fault in Long-Mate-Pair mode
Browse files Browse the repository at this point in the history
  • Loading branch information
relipmoc committed Jun 22, 2015
1 parent d092607 commit e8e05fc
Show file tree
Hide file tree
Showing 5 changed files with 446 additions and 100 deletions.
1 change: 1 addition & 0 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const int MAX_ADAPTER_CNT = 96;

typedef struct tag_INDEX{
int pos;
int pos2;
int bc;
}INDEX;

Expand Down
251 changes: 231 additions & 20 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -695,21 +695,21 @@ friend class cData;
for(i=0; i<int(pAdapters->size()); i++){
cMatrix::AddAdapter(cMatrix::firstAdapters, (char *)(*pAdapters)[i].c_str(), (*pAdapters)[i].length(), trimMode);
}
if(bPaired){
if(!pParameter->bShareAdapter){
pAdapters = &pParameter->adapters2;
for(i=0; i<int(pAdapters->size()); i++){
cMatrix::AddAdapter(cMatrix::secondAdapters,(char *)(*pAdapters)[i].c_str(), (*pAdapters)[i].length(), trimMode);
}
if(!pParameter->bShareAdapter){
pAdapters = &pParameter->adapters2;
for(i=0; i<int(pAdapters->size()); i++){
cMatrix::AddAdapter(cMatrix::secondAdapters,(char *)(*pAdapters)[i].c_str(), (*pAdapters)[i].length(), trimMode);
}
}
cMatrix::CalculateIndices(pParameter->bMatrix, pParameter->rowNames.size(), pParameter->colNames.size());
if(bPaired){
if( (pParameter->trimMode & TRIM_MP) != 0 ){
pAdapters = &pParameter->juncAdapters;
for(i=0; i<int(pAdapters->size()); i++){
cMatrix::AddAdapter(cMatrix::junctionAdapters,(char *)(*pAdapters)[i].c_str(), (*pAdapters)[i].length(), TRIM_ANY);
}
cMatrix::CalculateJunctionLengths();
}
cMatrix::CalculateIndices(pParameter->bMatrix, pParameter->rowNames.size(), pParameter->colNames.size());
if( (pParameter->trimMode & TRIM_AP) != 0 ){
if(pStats->fpMapfile.fp != NULL){
cMatrix::InitBarcodes(cMatrix::firstAdapters, pParameter->iCutF, (pParameter->bShareAdapter ? cMatrix::firstAdapters : cMatrix::secondAdapters), pParameter->iCutR);
Expand Down Expand Up @@ -910,6 +910,205 @@ void * mt_worker(void * data)
return NULL;
}

void * mt_worker_ap(void * data)
{
cData * pData = (cData *)data;
cTaskManager *pTaskMan = pData->pTaskMan;
cStats * pStats = pData->pStats;
int64 file_length = pStats->total_file_length;
cFQ * pfq = pStats->pfq;
FILE *fpOut = pStats->fpOut;
FILE *fpBarcode = pStats->fpBarcode;
int minAverageQual = pStats->minAverageQual;
int minEndQual = pStats->minEndQual;
int minLen = pStats->getMinLen();
int maxLen = pStats->getMaxLen();
bool bFivePrimeEnd = pStats->bFivePrimeEnd;
bool bBarcode = pStats->bBarcode;
int iCutF = pStats->iCutF;
int iCutR = pStats->iCutR;
char barcodeSeq[25];
char barcodeQua[25];
if(fpBarcode != NULL){
memset(barcodeSeq, 0, 25 * sizeof(char));
memset(barcodeQua, 0, 25 * sizeof(char));
}
assert( (iCutF + iCutR) <= 24 );

RECORD *pBuffer, *pRecord;
TASK task;
int size, rc, nItemCnt, nCnt;
int flag;
int64 startId;

pBuffer = pData->pBuffer;
size = pData->size;
rc = 0;

int64 cur_pos;
double cur_ratio;
int pos;

while(true){
while(!pTaskMan->getTask(task)){
if(pTaskMan->IsFinished()){
task.type = TASK_END;
break;
}
usleep(1);
}
if(task.type == TASK_END){
break;
}
startId = task.startId;
if(task.type == TASK_READ){
if(!pTaskMan->increaseCnt()){ // reach the buffer size
pTaskMan->addTask(task); // perform reading later
usleep(1);
continue;
}
// read records from input file to buffer
for(pRecord=&pBuffer[startId % size], nItemCnt=0; nItemCnt<task.nBlockSize; nItemCnt++, pRecord++){
rc = pfq->readRecord(pRecord);
if(rc < 0){
break;
}
}
if(!pStats->bQuiet){
cur_pos = pfq->tell();
if(cur_pos >= pfq->next_pos){
cur_ratio = int64(cur_pos * 10000 / file_length) / 10000.0;
pStats->progress(cur_ratio, 50);
pfq->next_pos = int64(((cur_ratio * 10000 + 1) * file_length + 9999)/10000);
}
}
if(rc < 0){ // error or end of file
pTaskMan->finish();
if(rc < -1) continue; // error
if(nItemCnt == 0) continue; // no record read
}
task.startId += task.nBlockSize;
pTaskMan->addTask(task); // save next task for parallelism

// process the records
for(pRecord=&pBuffer[startId % size], nCnt=0; nCnt < nItemCnt; nCnt++, pRecord++){
if( pStats->bFilterNs && cMatrix::isBlurry(pRecord->seq.s, pRecord->seq.n)){
pRecord->tag = TAG_BLURRY;
continue;
}
if( (minAverageQual > 0) && !cMatrix::checkQualities((uchar *)pRecord->qual.s, pRecord->qual.n, minAverageQual) ){
pRecord->tag = TAG_BADQUAL;
continue;
}
pRecord->tag = TAG_NORMAL;
flag = cMatrix::findAdaptersInARead(pRecord->seq.s, pRecord->seq.n, pRecord->idx);
// TODO:
if(flag >= 0){
pRecord->bExchange = (flag == 1);
if(flag == 0){
pRecord->idx.pos = pRecord->seq.n - iCutF;
pRecord->idx.pos2 = iCutR;
}
else{
pRecord->idx.pos = pRecord->seq.n - iCutR;
pRecord->idx.pos2 = iCutF;
}
if(pRecord->idx.pos < 0){
pRecord->idx.pos = 0;
}
}
if( minEndQual > 0 ){ // TODO: quality trimming from 5' end
if( (pRecord->idx.pos > 0) && (pRecord->qual.n > 0) ){
pRecord->idx.pos = cMatrix::trimByQuality((uchar *)pRecord->qual.s, min(pRecord->idx.pos, pRecord->qual.n), minEndQual);
}
}
}

pRecord = &pBuffer[startId % size];
if(!pTaskMan->setItemCnt(startId, pRecord, nItemCnt)){
task.type = TASK_WRITE;
task.startId = startId;
task.nItemCnt = nItemCnt;
if(pTaskMan->bSingleBlock)
pTaskMan->insertTask(task);
else
pTaskMan->addTask(task);
}
continue;
}
// task.type == TASK_WRITE
pRecord = &pBuffer[startId % size];
nItemCnt = task.nItemCnt;
do{
// write to file
pRecord->nCnt = 0; // reset
for(nCnt=0; nCnt<nItemCnt; nCnt++, pRecord++){
if(pRecord->tag == TAG_BLURRY){
pStats->nBlurry++;
continue;
}
if(pRecord->tag == TAG_BADQUAL){
pStats->nBad++;
continue;
}
// TAG_NORMAL
pos = pRecord->idx.pos - pRecord->idx.pos2;
if(pos < minLen){
if(pos <= 0)
pStats->nEmpty++;
else
pStats->nShort++;
continue;
}
if(pos > maxLen){
pStats->nLong++;
continue;
}
if(pRecord->idx.bc < 0){
fpOut = pStats->fpUntrim.fp;
}
else{
if(bBarcode){
fpOut = pStats->fpOuts[pRecord->idx.bc].fp;
pStats->incrementBarcode(pRecord->idx.bc);
}
else{
fpOut = pStats->fpOut;
}
}
if(bFivePrimeEnd){
if( pRecord->qual.n > 0 ){ // fastq
fprintf(fpOut, "@%s%.*s\n+\n%.*s\n", pRecord->id.s, pos, pRecord->seq.s + pRecord->seq.n - pRecord->idx.pos, pos, pRecord->qual.s + pRecord->qual.n - pRecord->idx.pos);
}
else{ // fasta
fprintf(fpOut, ">%s%.*s\n", pRecord->id.s, pos, pRecord->seq.s + pRecord->seq.n - pRecord->idx.pos);
}
}
else{
if( pRecord->qual.n > 0 ){ // fastq
fprintf(fpOut, "@%s%.*s\n+\n%.*s\n", pRecord->id.s, pos, pRecord->seq.s + pRecord->idx.pos2, pos, pRecord->qual.s + pRecord->idx.pos2);
}
else{ // fasta
fprintf(fpOut, ">%s%.*s\n", pRecord->id.s, pos, pRecord->seq.s + pRecord->idx.pos2);
}
}
if(pRecord->idx.bc < 0){ // assigned
pStats->nUntrimAvail++;
}
else{
pStats->nTrimAvail++;
}
pStats->incrementCount(size_t(pos));
}
pTaskMan->decreaseCnt();
startId += task.nBlockSize;
pRecord = &pBuffer[startId % size];
nItemCnt = pTaskMan->getItemCnt(startId, pRecord);
}while(nItemCnt > 0);
}
return NULL;
}

void * mt_worker2(void * data)
{
cData * pData = (cData *)data;
Expand Down Expand Up @@ -1345,7 +1544,7 @@ void * mt_worker2_sep(void * data)
return NULL;
}

void * mt_worker2_amp(void * data)
void * mt_worker2_ap(void * data)
{
cData * pData = (cData *)data;
cTaskManager *pTaskMan = pData->pTaskMan;
Expand Down Expand Up @@ -1446,7 +1645,7 @@ void * mt_worker2_amp(void * data)
}
}
pRecord->tag = TAG_NORMAL;
flag = cMatrix::findAdaptersBidirectionally(pRecord->seq.s, pRecord->seq.n, (uchar *)pRecord->qual.s, pRecord->qual.n, pRecord2->seq.s, pRecord2->seq.n, (uchar *)pRecord2->qual.s, pRecord2->qual.n, pRecord->idx, pRecord2->idx);
flag = cMatrix::findAdaptersBidirectionally(pRecord->seq.s, pRecord->seq.n, pRecord2->seq.s, pRecord2->seq.n, pRecord->idx, pRecord2->idx);
if(flag >= 0){
pRecord->bExchange = (flag == 1);
if(flag == 0){
Expand Down Expand Up @@ -1609,7 +1808,7 @@ void * mt_worker2_amp(void * data)
return NULL;
}

void * mt_worker3(void * data)
void * mt_worker2_mp(void * data)
{
cData * pData = (cData *)data;
cTaskManager *pTaskMan = pData->pTaskMan;
Expand Down Expand Up @@ -1961,14 +2160,26 @@ int processFile(cParameter * pParameter, cStats * pStats)

int rc;
void *status;
for(i=1; i<mt->n_threads; i++){ // worker 0 is effectively launched by the master thread
rc = pthread_create(&mt->tid[i], &wk.attr, mt_worker, &mt->w[i]);
if(rc != 0){
fprintf(stderr, "Can not create thread %d\n", i);
break;
if( (pParameter->trimMode & TRIM_AP) != 0 ){
for(i=1; i<mt->n_threads; i++){ // worker 0 is effectively launched by the master thread
rc = pthread_create(&mt->tid[i], &wk.attr, mt_worker_ap, &mt->w[i]);
if(rc != 0){
fprintf(stderr, "Can not create thread %d\n", i);
break;
}
}
mt_worker_ap(&mt->w[0]);
}
else{
for(i=1; i<mt->n_threads; i++){ // worker 0 is effectively launched by the master thread
rc = pthread_create(&mt->tid[i], &wk.attr, mt_worker, &mt->w[i]);
if(rc != 0){
fprintf(stderr, "Can not create thread %d\n", i);
break;
}
}
mt_worker(&mt->w[0]);
}
mt_worker(&mt->w[0]);
for(i=1; i<mt->n_threads; ++i){ // waits for termination of other threads
rc = pthread_join(mt->tid[i], &status);
}
Expand Down Expand Up @@ -2034,23 +2245,23 @@ int processPairedFiles(cParameter * pParameter, cStats * pStats)
}
else if( (pParameter->trimMode & TRIM_AP) != 0 ){
for(i=1; i<mt->n_threads; i++){ // worker 0 is effectively launched by the master thread
rc = pthread_create(&mt->tid[i], &wk.attr, mt_worker2_amp, &mt->w[i]);
rc = pthread_create(&mt->tid[i], &wk.attr, mt_worker2_ap, &mt->w[i]);
if(rc != 0){
fprintf(stderr, "Can not create thread %d\n", i);
break;
}
}
mt_worker2_amp(&mt->w[0]);
mt_worker2_ap(&mt->w[0]);
}
else{ // TRIM_MP
for(i=1; i<mt->n_threads; i++){ // worker 0 is effectively launched by the master thread
rc = pthread_create(&mt->tid[i], &wk.attr, mt_worker3, &mt->w[i]);
rc = pthread_create(&mt->tid[i], &wk.attr, mt_worker2_mp, &mt->w[i]);
if(rc != 0){
fprintf(stderr, "Can not create thread %d\n", i);
break;
}
}
mt_worker3(&mt->w[0]);
mt_worker2_mp(&mt->w[0]);
}
for(i=1; i<mt->n_threads; ++i){ // waits for termination of other threads
rc = pthread_join(mt->tid[i], &status);
Expand Down
Loading

0 comments on commit e8e05fc

Please sign in to comment.