Skip to content

Commit

Permalink
Merge pull request #9718 from rouault/coverity_locks
Browse files Browse the repository at this point in the history
Fix/suppress Coverity Scan warnings about locking
  • Loading branch information
rouault authored Apr 23, 2024
2 parents 2b14d96 + fe55778 commit d14f23c
Show file tree
Hide file tree
Showing 15 changed files with 78 additions and 46 deletions.
1 change: 1 addition & 0 deletions alg/gdalgrid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2602,6 +2602,7 @@ static int GDALGridProgressMultiThread(GDALGridJob *psJob)
static int GDALGridProgressMonoThread(GDALGridJob *psJob)
{
const int nCounter = ++(*psJob->pnCounter);
// coverity[missing_lock]
if (!psJob->pfnRealProgress(nCounter / static_cast<double>(psJob->nYSize),
"", psJob->pRealProgressArg))
{
Expand Down
1 change: 1 addition & 0 deletions alg/gdalwarpkernel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ static int GWKProgressThread(GWKJobStruct *psJob)
static int GWKProgressMonoThread(GWKJobStruct *psJob)
{
GDALWarpKernel *poWK = psJob->poWK;
// coverity[missing_lock]
if (!poWK->pfnProgress(
poWK->dfProgressBase +
poWK->dfProgressScale *
Expand Down
1 change: 1 addition & 0 deletions autotest/cpp/test_cpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4616,6 +4616,7 @@ TEST_F(test_cpl, CPLWorkerThreadPool_recursion)
// takes sufficiently long that job 2 has been submitted
// before it completes
std::unique_lock<std::mutex> guard(psData2->psCtxt->mutex);
// coverity[missing_lock:FALSE]
while (!psData2->psCtxt->you_can_leave)
{
psData2->psCtxt->cv.wait(guard);
Expand Down
2 changes: 0 additions & 2 deletions frmts/gtiff/gtiffdataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,6 @@ std::tuple<CPLErr, bool> GTiffDataset::Finalize()
CPLFree(m_asCompressionJobs[i].pszTmpFilename);
}
}
CPLDestroyMutex(m_hCompressThreadPoolMutex);
m_hCompressThreadPoolMutex = nullptr;
m_poCompressQueue.reset();
}

Expand Down
3 changes: 2 additions & 1 deletion frmts/gtiff/gtiffdataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#include "gdal_pam.h"

#include <mutex>
#include <queue>

#include "cpl_mem_cache.h"
Expand Down Expand Up @@ -164,7 +165,7 @@ class GTiffDataset final : public GDALPamDataset
CPLVirtualMem *m_psVirtualMemIOMapping = nullptr;
CPLWorkerThreadPool *m_poThreadPool = nullptr;
std::unique_ptr<CPLJobQueue> m_poCompressQueue{};
CPLMutex *m_hCompressThreadPoolMutex = nullptr;
std::mutex m_oCompressThreadPoolMutex{};

lru11::Cache<int, std::pair<vsi_l_offset, vsi_l_offset>>
m_oCacheStrileToOffsetByteCount{1024};
Expand Down
42 changes: 21 additions & 21 deletions frmts/gtiff/gtiffdataset_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -943,8 +943,6 @@ void GTiffDataset::InitCompressionThreads(bool bUpdateMode,
&m_asCompressionJobs[i]));
m_asCompressionJobs[i].nStripOrTile = -1;
}
m_hCompressThreadPoolMutex = CPLCreateMutex();
CPLReleaseMutex(m_hCompressThreadPoolMutex);

// This is kind of a hack, but basically using
// TIFFWriteRawStrip/Tile and then TIFFReadEncodedStrip/Tile
Expand Down Expand Up @@ -1053,13 +1051,11 @@ void GTiffDataset::ThreadCompressionFunc(void *pData)
psJob->nCompressedBufferSize = 0;
}

auto mutex = poDS->m_poBaseDS ? poDS->m_poBaseDS->m_hCompressThreadPoolMutex
: poDS->m_hCompressThreadPoolMutex;
if (mutex)
auto poMainDS = poDS->m_poBaseDS ? poDS->m_poBaseDS : poDS;
if (poMainDS->m_poCompressQueue)
{
CPLAcquireMutex(mutex, 1000.0);
std::lock_guard oLock(poMainDS->m_oCompressThreadPoolMutex);
psJob->bReady = true;
CPLReleaseMutex(mutex);
}
}

Expand Down Expand Up @@ -1223,13 +1219,11 @@ void GTiffDataset::WriteRawStripOrTile(int nStripOrTile,

void GTiffDataset::WaitCompletionForJobIdx(int i)
{
auto poQueue = m_poBaseDS ? m_poBaseDS->m_poCompressQueue.get()
: m_poCompressQueue.get();
auto &oQueue = m_poBaseDS ? m_poBaseDS->m_asQueueJobIdx : m_asQueueJobIdx;
auto &asJobs =
m_poBaseDS ? m_poBaseDS->m_asCompressionJobs : m_asCompressionJobs;
auto mutex = m_poBaseDS ? m_poBaseDS->m_hCompressThreadPoolMutex
: m_hCompressThreadPoolMutex;
auto poMainDS = m_poBaseDS ? m_poBaseDS : this;
auto poQueue = poMainDS->m_poCompressQueue.get();
auto &oQueue = poMainDS->m_asQueueJobIdx;
auto &asJobs = poMainDS->m_asCompressionJobs;
auto &mutex = poMainDS->m_oCompressThreadPoolMutex;

CPLAssert(i >= 0 && static_cast<size_t>(i) < asJobs.size());
CPLAssert(asJobs[i].nStripOrTile >= 0);
Expand All @@ -1238,9 +1232,11 @@ void GTiffDataset::WaitCompletionForJobIdx(int i)
bool bHasWarned = false;
while (true)
{
CPLAcquireMutex(mutex, 1000.0);
const bool bReady = asJobs[i].bReady;
CPLReleaseMutex(mutex);
bool bReady;
{
std::lock_guard oLock(mutex);
bReady = asJobs[i].bReady;
}
if (!bReady)
{
if (!bHasWarned)
Expand All @@ -1266,7 +1262,11 @@ void GTiffDataset::WaitCompletionForJobIdx(int i)
}
asJobs[i].pabyCompressedBuffer = nullptr;
asJobs[i].nBufferSize = 0;
asJobs[i].bReady = false;
{
// Likely useless, but makes Coverity happy
std::lock_guard oLock(mutex);
asJobs[i].bReady = false;
}
asJobs[i].nStripOrTile = -1;
oQueue.pop();
}
Expand Down Expand Up @@ -1395,9 +1395,9 @@ bool GTiffDataset::SubmitCompressionJob(int nStripOrTile, GByte *pabyData,
return false;
}

auto &oQueue = m_poBaseDS ? m_poBaseDS->m_asQueueJobIdx : m_asQueueJobIdx;
auto &asJobs =
m_poBaseDS ? m_poBaseDS->m_asCompressionJobs : m_asCompressionJobs;
auto poMainDS = m_poBaseDS ? m_poBaseDS : this;
auto &oQueue = poMainDS->m_asQueueJobIdx;
auto &asJobs = poMainDS->m_asCompressionJobs;

int nNextCompressionJobAvail = -1;

Expand Down
14 changes: 12 additions & 2 deletions gcore/gdal_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,21 @@

#include <mutex>

static std::mutex gMutexThreadPool;
// For unclear reasons, attempts at making this a std::unique_ptr<>, even
// through a GetCompressThreadPool() method like GetMutexThreadPool(), lead
// to "ctest -R autotest_alg" (and other autotest components as well)
// to hang forever once the tests have terminated.
static CPLWorkerThreadPool *gpoCompressThreadPool = nullptr;

static std::mutex &GetMutexThreadPool()
{
static std::mutex gMutexThreadPool;
return gMutexThreadPool;
}

CPLWorkerThreadPool *GDALGetGlobalThreadPool(int nThreads)
{
std::lock_guard<std::mutex> oGuard(gMutexThreadPool);
std::lock_guard oGuard(GetMutexThreadPool());
if (gpoCompressThreadPool == nullptr)
{
gpoCompressThreadPool = new CPLWorkerThreadPool();
Expand All @@ -55,6 +64,7 @@ CPLWorkerThreadPool *GDALGetGlobalThreadPool(int nThreads)

void GDALDestroyGlobalThreadPool()
{
std::lock_guard oGuard(GetMutexThreadPool());
delete gpoCompressThreadPool;
gpoCompressThreadPool = nullptr;
}
2 changes: 2 additions & 0 deletions gcore/overview.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4535,6 +4535,7 @@ CPLErr GDALRegenerateOverviewsEx(GDALRasterBandH hSrcBand, int nOverviewCount,
auto poOldestJob = jobList.front().get();
{
std::unique_lock<std::mutex> oGuard(poOldestJob->mutex);
// coverity[missing_lock:FALSE]
while (!poOldestJob->bFinished)
{
poOldestJob->cv.wait(oGuard);
Expand Down Expand Up @@ -5283,6 +5284,7 @@ CPLErr GDALRegenerateOverviewsMultiBand(
auto poOldestJob = jobList.front().get();
{
std::unique_lock<std::mutex> oGuard(poOldestJob->mutex);
// coverity[missing_lock:FALSE]
while (!poOldestJob->bFinished)
{
poOldestJob->cv.wait(oGuard);
Expand Down
21 changes: 13 additions & 8 deletions ogr/ogrsf_frmts/gpkg/ogrgeopackagetablelayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1554,7 +1554,7 @@ void OGRGeoPackageTableLayer::CancelAsyncNextArrowArray()
{
if (m_poFillArrowArray)
{
std::lock_guard<std::mutex> oLock(m_poFillArrowArray->oMutex);
std::lock_guard oLock(m_poFillArrowArray->oMutex);
m_poFillArrowArray->nCountRows = -1;
m_poFillArrowArray->oCV.notify_one();
}
Expand All @@ -1572,7 +1572,7 @@ void OGRGeoPackageTableLayer::CancelAsyncNextArrowArray()
m_oQueueArrowArrayPrefetchTasks.pop();

{
std::lock_guard<std::mutex> oLock(task->m_oMutex);
std::lock_guard oLock(task->m_oMutex);
task->m_bStop = true;
task->m_oCV.notify_one();
}
Expand Down Expand Up @@ -8260,7 +8260,7 @@ int OGRGeoPackageTableLayer::GetNextArrowArrayAsynchronous(

if (m_poFillArrowArray)
{
std::lock_guard<std::mutex> oLock(m_poFillArrowArray->oMutex);
std::lock_guard oLock(m_poFillArrowArray->oMutex);
if (m_poFillArrowArray->bIsFinished)
{
return 0;
Expand Down Expand Up @@ -8341,7 +8341,7 @@ int OGRGeoPackageTableLayer::GetNextArrowArrayAsynchronous(
}
else
{
std::lock_guard<std::mutex> oLock(m_poFillArrowArray->oMutex);
std::lock_guard oLock(m_poFillArrowArray->oMutex);
if (m_poFillArrowArray->bErrorOccurred)
{
CPLError(CE_Failure, CPLE_AppDefined, "%s",
Expand Down Expand Up @@ -8514,7 +8514,7 @@ void OGRGeoPackageTableLayer::GetNextArrowArrayAsynchronousWorker()
-1, SQLITE_UTF8 | SQLITE_DETERMINISTIC, nullptr,
nullptr, nullptr, nullptr);

std::lock_guard<std::mutex> oLock(m_poFillArrowArray->oMutex);
std::lock_guard oLock(m_poFillArrowArray->oMutex);
m_poFillArrowArray->bIsFinished = true;
if (m_poFillArrowArray->nCountRows >= 0)
{
Expand Down Expand Up @@ -8625,7 +8625,7 @@ int OGRGeoPackageTableLayer::GetNextArrowArray(struct ArrowArrayStream *stream,
const auto stopThread = [&task]()
{
{
std::lock_guard<std::mutex> oLock(task->m_oMutex);
std::lock_guard oLock(task->m_oMutex);
task->m_bStop = true;
task->m_oCV.notify_one();
}
Expand Down Expand Up @@ -8675,7 +8675,7 @@ int OGRGeoPackageTableLayer::GetNextArrowArray(struct ArrowArrayStream *stream,
{
// Wake-up thread with new task
{
std::lock_guard<std::mutex> oLock(task->m_oMutex);
std::lock_guard oLock(task->m_oMutex);
task->m_bFetchRows = true;
task->m_oCV.notify_one();
}
Expand Down Expand Up @@ -8777,7 +8777,7 @@ int OGRGeoPackageTableLayer::GetNextArrowArray(struct ArrowArrayStream *stream,
auto taskPtr = task.get();
auto taskRunner = [taskPtr]()
{
std::unique_lock<std::mutex> oLock(taskPtr->m_oMutex);
std::unique_lock oLock(taskPtr->m_oMutex);
do
{
taskPtr->m_bFetchRows = false;
Expand All @@ -8789,6 +8789,11 @@ int OGRGeoPackageTableLayer::GetNextArrowArray(struct ArrowArrayStream *stream,
if (taskPtr->m_bMemoryLimitReached)
break;
// cppcheck-suppress knownConditionTrueFalse
// Coverity apparently is confused by the fact that we
// use unique_lock here to guard access for m_bStop whereas
// in other places we use a lock_guard, but there's nothing
// wrong.
// coverity[missing_lock:FALSE]
while (!taskPtr->m_bStop && !taskPtr->m_bFetchRows)
{
taskPtr->m_oCV.wait(oLock);
Expand Down
10 changes: 4 additions & 6 deletions ogr/ogrsf_frmts/mvt/ogrmvtdataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4265,8 +4265,9 @@ OGRErr OGRMVTWriterDataset::PreGenerateForTileReal(
oBuffer.assign(static_cast<char *>(pCompressed), nCompressedSize);
CPLFree(pCompressed);

std::unique_ptr<std::lock_guard<std::mutex>> poLockGuard;
if (m_bThreadPoolOK)
m_oDBMutex.lock();
poLockGuard = std::make_unique<std::lock_guard<std::mutex>>(m_oDBMutex);

m_nTempTiles++;
sqlite3_bind_int(m_hInsertStmt, 1, nZ);
Expand All @@ -4283,9 +4284,6 @@ OGRErr OGRMVTWriterDataset::PreGenerateForTileReal(
int rc = sqlite3_step(m_hInsertStmt);
sqlite3_reset(m_hInsertStmt);

if (m_bThreadPoolOK)
m_oDBMutex.unlock();

if (!(rc == SQLITE_OK || rc == SQLITE_DONE))
{
return OGRERR_FAILURE;
Expand Down Expand Up @@ -4326,9 +4324,8 @@ void OGRMVTWriterDataset::WriterTaskFunc(void *pParam)
poTask->nSerial, poTask->poGeom.get(), poTask->sEnvelope);
if (eErr != OGRERR_NONE)
{
poTask->poDS->m_oDBMutex.lock();
std::lock_guard oLock(poTask->poDS->m_oDBMutex);
poTask->poDS->m_bWriteFeatureError = true;
poTask->poDS->m_oDBMutex.unlock();
}
delete poTask;
}
Expand Down Expand Up @@ -4367,6 +4364,7 @@ OGRErr OGRMVTWriterDataset::PreGenerateForTile(
// Do not queue more than 1000 jobs to avoid memory exhaustion
m_oThreadPool.WaitCompletion(1000);

std::lock_guard oLock(m_oDBMutex);
return m_bWriteFeatureError ? OGRERR_FAILURE : OGRERR_NONE;
}
}
Expand Down
19 changes: 14 additions & 5 deletions port/cpl_multiproc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,15 +218,23 @@ CPLMutexHolder::CPLMutexHolder(CPLMutex * /* hMutexIn */,
{
}
#else
CPLMutexHolder::CPLMutexHolder(CPLMutex *hMutexIn, double dfWaitInSeconds,
const char *pszFileIn, int nLineIn)
: hMutex(hMutexIn), pszFile(pszFileIn), nLine(nLineIn)

static CPLMutex *GetMutexHolderMutexMember(CPLMutex *hMutexIn,
double dfWaitInSeconds)
{
if (hMutex != nullptr && !CPLAcquireMutex(hMutex, dfWaitInSeconds))
if (hMutexIn && !CPLAcquireMutex(hMutexIn, dfWaitInSeconds))
{
fprintf(stderr, "CPLMutexHolder: Failed to acquire mutex!\n");
hMutex = nullptr;
return nullptr;
}
return hMutexIn;
}

CPLMutexHolder::CPLMutexHolder(CPLMutex *hMutexIn, double dfWaitInSeconds,
const char *pszFileIn, int nLineIn)
: hMutex(GetMutexHolderMutexMember(hMutexIn, dfWaitInSeconds)),
pszFile(pszFileIn), nLine(nLineIn)
{
}
#endif // ndef MUTEX_NONE

Expand Down Expand Up @@ -2587,6 +2595,7 @@ void CPLReleaseLock(CPLLock *psLock)
if (psLock->bDebugPerf && CPLAtomicDec(&(psLock->nCurrentHolders)) == 0)
{
const GUIntBig nStopTime = CPLrdtscp();
// coverity[missing_lock:FALSE]
const GIntBig nDiffTime =
static_cast<GIntBig>(nStopTime - psLock->nStartTime);
if (nDiffTime > psLock->nMaxDiff)
Expand Down
1 change: 1 addition & 0 deletions port/cpl_vsi_mem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ size_t VSIMemHandle::Write(const void *pBuffer, size_t nSize, size_t nCount)
int VSIMemHandle::Eof()

{
CPL_SHARED_LOCK oLock(poFile->m_oMutex);
return bEOF;
}

Expand Down
2 changes: 2 additions & 0 deletions port/cpl_vsil_curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3000,6 +3000,7 @@ size_t VSICurlHandle::PRead(void *pBuffer, size_t nSize,
{
{
std::unique_lock<std::mutex> oLock(poRange->oMutex);
// coverity[missing_lock:FALSE]
while (!poRange->bDone)
{
poRange->oCV.wait(oLock);
Expand Down Expand Up @@ -3472,6 +3473,7 @@ void VSICurlHandle::AdviseRead(int nRanges, const vsi_l_offset *panOffsets,

for (size_t i = 0; i < m_aoAdviseReadRanges.size(); ++i)
{
// coverity[missing_lock]
if (!m_aoAdviseReadRanges[i]->bDone)
{
DealWithRequest(aHandles[i]);
Expand Down
4 changes: 3 additions & 1 deletion port/cpl_vsil_gzip.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2331,10 +2331,12 @@ bool VSIGZipWriteHandleMT::ProcessCompletedJobs()
{
apoFinishedJobs_.erase(iter);

const bool bIsSeqNumberExpectedZero =
(nSeqNumberExpected_ == 0);
sMutex_.unlock();

const size_t nToWrite = psJob->sCompressedData_.size();
if (panSOZIPIndex_ && nSeqNumberExpected_ != 0 &&
if (panSOZIPIndex_ && !bIsSeqNumberExpectedZero &&
!psJob->pBuffer_->empty())
{
uint64_t nOffset = poBaseHandle_->Tell() - nStartOffset_;
Expand Down
1 change: 1 addition & 0 deletions port/cpl_worker_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ bool CPLJobQueue::SubmitJob(CPLThreadFunc pfnFunc, void *pData)
void CPLJobQueue::WaitCompletion(int nMaxRemainingJobs)
{
std::unique_lock<std::mutex> oGuard(m_mutex);
// coverity[missing_lock:FALSE]
while (m_nPendingJobs > nMaxRemainingJobs)
{
m_cv.wait(oGuard);
Expand Down

0 comments on commit d14f23c

Please sign in to comment.