diff --git a/src/tm-threads.c b/src/tm-threads.c index bd1a856988d0..08bce9f6ad6f 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -1422,6 +1422,36 @@ void TmThreadRemove(ThreadVars *tv, int type) return; } +static bool ThreadStillHasPackets(ThreadVars *tv) +{ + if (tv->inq != NULL) { + /* we wait till we dry out all the inq packets, before we + * kill this thread. Do note that you should have disabled + * packet acquire by now using TmThreadDisableReceiveThreads()*/ + if (!(strlen(tv->inq->name) == strlen("packetpool") && + strcasecmp(tv->inq->name, "packetpool") == 0)) { + PacketQueue *q = &trans_q[tv->inq->id]; + SCMutexLock(&q->mutex_q); + uint32_t len = q->len; + SCMutexUnlock(&q->mutex_q); + if (len != 0) { + return true; + } + } + } + + if (tv->stream_pq != NULL) { + SCMutexLock(&tv->stream_pq->mutex_q); + uint32_t len = tv->stream_pq->len; + SCMutexUnlock(&tv->stream_pq->mutex_q); + + if (len != 0) { + return true; + } + } + return false; +} + /** * \brief Kill a thread. * @@ -1442,19 +1472,6 @@ static int TmThreadKillThread(ThreadVars *tv) return 1; } - if (tv->inq != NULL) { - /* we wait till we dry out all the inq packets, before we - * kill this thread. Do note that you should have disabled - * packet acquire by now using TmThreadDisableReceiveThreads()*/ - if (!(strlen(tv->inq->name) == strlen("packetpool") && - strcasecmp(tv->inq->name, "packetpool") == 0)) { - PacketQueue *q = &trans_q[tv->inq->id]; - if (q->len != 0) { - return 0; - } - } - } - /* set the thread flag informing the thread that it needs to be * terminated */ TmThreadsSetFlag(tv, THV_KILL); @@ -1499,7 +1516,7 @@ static int TmThreadKillThread(ThreadVars *tv) /** \internal * * \brief make sure that all packet threads are done processing their - * in-flight packets + * in-flight packets, including 'injected' flow packets. */ static void TmThreadDrainPacketThreads(void) { @@ -1521,23 +1538,16 @@ static void TmThreadDrainPacketThreads(void) /* all receive threads are part of packet processing threads */ tv = tv_root[TVT_PPT]; while (tv) { - if (tv->inq != NULL) { + if (ThreadStillHasPackets(tv)) { /* we wait till we dry out all the inq packets, before we * kill this thread. Do note that you should have disabled * packet acquire by now using TmThreadDisableReceiveThreads()*/ - if (!(strlen(tv->inq->name) == strlen("packetpool") && - strcasecmp(tv->inq->name, "packetpool") == 0)) { - PacketQueue *q = &trans_q[tv->inq->id]; - if (q->len != 0) { - SCMutexUnlock(&tv_root_lock); + SCMutexUnlock(&tv_root_lock); - /* sleep outside lock */ - SleepMsec(1); - goto again; - } - } + /* sleep outside lock */ + SleepMsec(1); + goto again; } - tv = tv->next; } @@ -1593,20 +1603,14 @@ void TmThreadDisableReceiveThreads(void) } if (disable) { - if (tv->inq != NULL) { + if (ThreadStillHasPackets(tv)) { /* we wait till we dry out all the inq packets, before we * kill this thread. Do note that you should have disabled * packet acquire by now using TmThreadDisableReceiveThreads()*/ - if (!(strlen(tv->inq->name) == strlen("packetpool") && - strcasecmp(tv->inq->name, "packetpool") == 0)) { - PacketQueue *q = &trans_q[tv->inq->id]; - if (q->len != 0) { - SCMutexUnlock(&tv_root_lock); - /* don't sleep while holding a lock */ - SleepMsec(1); - goto again; - } - } + SCMutexUnlock(&tv_root_lock); + /* don't sleep while holding a lock */ + SleepMsec(1); + goto again; } /* we found a receive TV. Send it a KILL_PKTACQ signal. */ @@ -1676,22 +1680,6 @@ void TmThreadDisablePacketThreads(void) * receive TM amongst the slots in a tv, it indicates we are done * with all receive threads */ while (tv) { - if (tv->inq != NULL) { - /* we wait till we dry out all the inq packets, before we - * kill this thread. Do note that you should have disabled - * packet acquire by now using TmThreadDisableReceiveThreads()*/ - if (!(strlen(tv->inq->name) == strlen("packetpool") && - strcasecmp(tv->inq->name, "packetpool") == 0)) { - PacketQueue *q = &trans_q[tv->inq->id]; - if (q->len != 0) { - SCMutexUnlock(&tv_root_lock); - /* don't sleep while holding a lock */ - SleepMsec(1); - goto again; - } - } - } - /* we found our receive TV. Send it a KILL signal. This is all * we need to do to kill receive threads */ TmThreadsSetFlag(tv, THV_KILL);