Skip to content

Commit

Permalink
main/flush: Support periodic flush logs
Browse files Browse the repository at this point in the history
Issue: 3449
  • Loading branch information
jlucovsky committed Nov 20, 2024
1 parent dc70b40 commit c41ed85
Show file tree
Hide file tree
Showing 14 changed files with 333 additions and 17 deletions.
2 changes: 2 additions & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ noinst_HEADERS = \
ippair-storage.h \
ippair-timeout.h \
log-cf-common.h \
log-flush.h \
log-httplog.h \
log-pcap.h \
log-stats.h \
Expand Down Expand Up @@ -900,6 +901,7 @@ libsuricata_c_a_SOURCES = \
ippair-storage.c \
ippair-timeout.c \
log-cf-common.c \
log-flush.c \
log-httplog.c \
log-pcap.c \
log-stats.c \
Expand Down
51 changes: 43 additions & 8 deletions src/detect-engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -2286,15 +2286,50 @@ int DetectEngineInspectPktBufferGeneric(
}

/** \internal
* \brief inject a pseudo packet into each detect thread that doesn't use the
* new det_ctx yet
* \brief inject a pseudo packet into each detect thread
* if the thread should flush its output logs.
*/
static void InjectPackets(ThreadVars **detect_tvs,
DetectEngineThreadCtx **new_det_ctx,
int no_of_detect_tvs)
{
/* inject a fake packet if the detect thread isn't using the new ctx yet,
* this speeds up the process */
void InjectPacketsForFlush(ThreadVars **detect_tvs, int no_of_detect_tvs)
{
/* inject a fake packet if the detect thread that needs it. This function
* is called when a heartbeat log-flush request has been made
* and it should process a pseudo packet and flush its output logs
* to speed the process. */
#if DEBUG
int count = 0;
#endif
for (int i = 0; i < no_of_detect_tvs; i++) {
if (detect_tvs[i]) { // && detect_tvs[i]->inq != NULL) {
Packet *p = PacketGetFromAlloc();
if (p != NULL) {
SCLogDebug("Injecting pkt for tv %s[i=%d] %d", detect_tvs[i]->name, i, count++);
p->flags |= PKT_PSEUDO_STREAM_END;
p->flags |= PKT_PSEUDO_LOG_FLUSH;
PKT_SET_SRC(p, PKT_SRC_DETECT_RELOAD_FLUSH);
PacketQueue *q = detect_tvs[i]->stream_pq;
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p);
SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q);
}
}
}
SCLogDebug("leaving: thread notification count = %d", count);
}

/** \internal
* \brief inject a pseudo packet into each detect thread
* -that doesn't use the new det_ctx yet
* -*or*, if the thread should flush its output logs.
*/
static void InjectPackets(
ThreadVars **detect_tvs, DetectEngineThreadCtx **new_det_ctx, int no_of_detect_tvs)
{
/* inject a fake packet if the detect thread that needs it. This function
* is called if
* - A thread isn't using a DE ctx and should
* - Or, it should process a pseudo packet and flush its output logs.
* to speed the process. */
for (int i = 0; i < no_of_detect_tvs; i++) {
if (SC_ATOMIC_GET(new_det_ctx[i]->so_far_used_by_detect) != 1) {
if (detect_tvs[i]->inq != NULL) {
Expand Down
2 changes: 2 additions & 0 deletions src/detect-engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,6 @@ void DetectEngineStateResetTxs(Flow *f);

void DeStateRegisterTests(void);

/* packet injection */
void InjectPacketsForFlush(ThreadVars **detect_tvs, int no_of_detect_tvs);
#endif /* SURICATA_DETECT_ENGINE_H */
29 changes: 28 additions & 1 deletion src/flow-worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ typedef struct FlowWorkerThreadData_ {

SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread);

SC_ATOMIC_DECLARE(bool, flush_ack);

void *output_thread; /* Output thread data. */
void *output_thread_flow; /* Output thread data. */

Expand Down Expand Up @@ -555,9 +557,17 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
SCLogDebug("packet %"PRIu64, p->pcap_cnt);

/* update time */
if (!(PKT_IS_PSEUDOPKT(p))) {
if (!(PKT_IS_PSEUDOPKT(p) || PKT_IS_FLUSHPKT(p))) {
TimeSetByThread(tv->id, p->ts);
}
if ((PKT_IS_FLUSHPKT(p))) {
SCLogDebug("thread %s flushing", tv->printable_name);
OutputLoggerFlush(tv, p, fw->output_thread);
/* Ack if a flush was requested */
bool notset = false;
SC_ATOMIC_CAS(&fw->flush_ack, notset, true);
return TM_ECODE_OK;
}

/* handle Flow */
if (p->flags & PKT_WANTS_FLOW) {
Expand Down Expand Up @@ -719,6 +729,23 @@ void *FlowWorkerGetDetectCtxPtr(void *flow_worker)
return SC_ATOMIC_GET(fw->detect_thread);
}

void *FlowWorkerGetThreadData(void *flow_worker)
{
return (FlowWorkerThreadData *)flow_worker;
}

bool FlowWorkerGetFlushAck(void *flow_worker)
{
FlowWorkerThreadData *fw = flow_worker;
return SC_ATOMIC_GET(fw->flush_ack) == true;
}

void FlowWorkerSetFlushAck(void *flow_worker)
{
FlowWorkerThreadData *fw = flow_worker;
SC_ATOMIC_SET(fw->flush_ack, false);
}

const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi)
{
switch (fwi) {
Expand Down
3 changes: 3 additions & 0 deletions src/flow-worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi);

void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx);
void *FlowWorkerGetDetectCtxPtr(void *flow_worker);
void *FlowWorkerGetThreadData(void *flow_worker);
bool FlowWorkerGetFlushAck(void *flow_worker);
void FlowWorkerSetFlushAck(void *flow_worker);

void TmModuleFlowWorkerRegister (void);

Expand Down
199 changes: 199 additions & 0 deletions src/log-flush.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/* Copyright (C) 2024 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/

/**
* \file
*
* \author Jeff Lucovsky <[email protected]>
*/

#include "suricata-common.h"
#include "suricata.h"
#include "detect.h"
#include "detect-engine.h"
#include "flow-worker.h"
#include "log-flush.h"
#include "tm-threads.h"
#include "conf.h"
#include "conf-yaml-loader.h"
#include "util-privs.h"

/**
* \brief Trigger detect threads to flush their output logs
*
* This function is intended to be called at regular intervals to force
* buffered log data to be persisted
*/
static void WorkerFlushLogs(void)
{
SCEnter();

/* count detect threads in use */
uint32_t no_of_detect_tvs = TmThreadCountThreadsByTmmFlags(TM_FLAG_DETECT_TM);
/* can be zero in unix socket mode */
if (no_of_detect_tvs == 0) {
return;
}

/* prepare swap structures */
void *fw_threads[no_of_detect_tvs];
ThreadVars *detect_tvs[no_of_detect_tvs];
memset(fw_threads, 0x00, (no_of_detect_tvs * sizeof(void *)));
memset(detect_tvs, 0x00, (no_of_detect_tvs * sizeof(ThreadVars *)));

/* start by initiating the log flushes */

uint32_t i = 0;
SCMutexLock(&tv_root_lock);
/* get reference to tv's and setup fw_threads array */
for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
if ((tv->tmm_flags & TM_FLAG_DETECT_TM) == 0) {
continue;
}
for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
TmModule *tm = TmModuleGetById(s->tm_id);
if (!(tm->flags & TM_FLAG_DETECT_TM)) {
continue;
}

if (suricata_ctl_flags != 0) {
SCMutexUnlock(&tv_root_lock);
goto error;
}

fw_threads[i] = FlowWorkerGetThreadData(SC_ATOMIC_GET(s->slot_data));
if (fw_threads[i]) {
FlowWorkerSetFlushAck(fw_threads[i]);
SCLogDebug("Setting flush-ack for thread %s[i=%d]", tv->printable_name, i);
detect_tvs[i] = tv;
}

i++;
break;
}
}
BUG_ON(i != no_of_detect_tvs);

SCMutexUnlock(&tv_root_lock);

SCLogDebug("Creating flush pseudo packets for %d threads", no_of_detect_tvs);
InjectPacketsForFlush(detect_tvs, no_of_detect_tvs);

uint32_t threads_done = 0;
retry:
for (i = 0; i < no_of_detect_tvs; i++) {
if (suricata_ctl_flags != 0) {
threads_done = no_of_detect_tvs;
break;
}
usleep(1000);
if (fw_threads[i] && FlowWorkerGetFlushAck(fw_threads[i])) {
SCLogDebug("thread slot %d has ack'd flush request", i);
threads_done++;
} else if (detect_tvs[i]) {
SCLogDebug("thread slot %d not yet ack'd flush request", i);
TmThreadsCaptureBreakLoop(detect_tvs[i]);
}
}
if (threads_done < no_of_detect_tvs) {
threads_done = 0;
SleepMsec(250);
goto retry;
}

error:
return;
}

static int OutputFlushInterval(void)
{
intmax_t output_flush_interval = 0;
if (ConfGetInt("heartbeat.output-flush-interval", &output_flush_interval) == 0) {
output_flush_interval = 0;
}
if (output_flush_interval < 0 || output_flush_interval > 60) {
SCLogConfig("flush_interval must be 0 or less than 60; using 0");
output_flush_interval = 0;
}

return (int)output_flush_interval;
}

static void *LogFlusherWakeupThread(void *arg)
{
int output_flush_interval = OutputFlushInterval();
/* This was checked by the logic creating this thread */
BUG_ON(output_flush_interval == 0);

SCLogConfig("Using output-flush-interval of %d seconds", output_flush_interval);
/*
* Calculate the number of sleep intervals based on the output flush interval. This is necessary
* because this thread pauses a fixed amount of time to react to shutdown situations more
* quickly.
*/
const int log_flush_sleep_time = 500; /* milliseconds */
const int flush_wait_count = (1000 * output_flush_interval) / log_flush_sleep_time;

ThreadVars *tv_local = (ThreadVars *)arg;
SCSetThreadName(tv_local->name);

if (tv_local->thread_setup_flags != 0)
TmThreadSetupOptions(tv_local);

/* Set the threads capability */
tv_local->cap_flags = 0;
SCDropCaps(tv_local);

TmThreadsSetFlag(tv_local, THV_INIT_DONE | THV_RUNNING);

int wait_count = 0;
uint64_t worker_flush_count = 0;
bool run = TmThreadsWaitForUnpause(tv_local);
while (run) {
usleep(log_flush_sleep_time * 1000);

if (++wait_count == flush_wait_count) {
worker_flush_count++;
WorkerFlushLogs();
wait_count = 0;
}

if (TmThreadsCheckFlag(tv_local, THV_KILL)) {
break;
}
}

TmThreadsSetFlag(tv_local, THV_RUNNING_DONE);
TmThreadWaitForFlag(tv_local, THV_DEINIT);
TmThreadsSetFlag(tv_local, THV_CLOSED);
SCLogInfo("%s: initiated %" PRIu64 " flushes", tv_local->name, worker_flush_count);
return NULL;
}

void LogFlushThreads(void)
{
if (0 == OutputFlushInterval()) {
SCLogConfig("log flusher thread not used with heartbeat.output-flush-interval of 0");
return;
}

ThreadVars *tv_log_flush =
TmThreadCreateMgmtThread(thread_name_heartbeat, LogFlusherWakeupThread, 1);
if (!tv_log_flush || (TmThreadSpawn(tv_log_flush) != 0)) {
FatalError("Unable to create and start log flush thread");
}
}
26 changes: 26 additions & 0 deletions src/log-flush.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/* Copyright (C) 2024 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/

/**
* \file
*
* \author Jeff Lucovsky <[email protected]>
*/
#ifndef SURICATA_LOG_FLUSH_H__
#define SURICATA_LOG_FLUSH_H__
void LogFlushThreads(void);
#endif /* SURICATA_LOG_FLUSH_H__ */
15 changes: 15 additions & 0 deletions src/output.c
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,21 @@ void OutputNotifyFileRotation(void) {
}
}

TmEcode OutputLoggerFlush(ThreadVars *tv, Packet *p, void *thread_data)
{
LoggerThreadStore *thread_store = (LoggerThreadStore *)thread_data;
RootLogger *logger = TAILQ_FIRST(&active_loggers);
LoggerThreadStoreNode *thread_store_node = TAILQ_FIRST(thread_store);
while (logger && thread_store_node) {
if (logger->FlushFunc)
logger->FlushFunc(tv, p, thread_store_node->thread_data);

logger = TAILQ_NEXT(logger, entries);
thread_store_node = TAILQ_NEXT(thread_store_node, entries);
}
return TM_ECODE_OK;
}

TmEcode OutputLoggerLog(ThreadVars *tv, Packet *p, void *thread_data)
{
LoggerThreadStore *thread_store = (LoggerThreadStore *)thread_data;
Expand Down
Loading

0 comments on commit c41ed85

Please sign in to comment.