From c5ccf1c72a4523d0b211241a45f3f412fe58967d Mon Sep 17 00:00:00 2001 From: Jeff Lucovsky Date: Sat, 7 Oct 2023 17:08:27 -0400 Subject: [PATCH] main/flush: Support periodic flush logs Issue: 3449 --- src/Makefile.am | 2 + src/detect-engine.c | 51 ++++++++++-- src/detect-engine.h | 2 + src/flow-worker.c | 29 ++++++- src/flow-worker.h | 3 + src/log-flush.c | 199 ++++++++++++++++++++++++++++++++++++++++++++ src/log-flush.h | 26 ++++++ src/output.c | 15 ++++ src/output.h | 1 + src/runmodes.c | 3 + src/runmodes.h | 1 + src/suricata.c | 2 +- src/suricata.h | 1 + suricata.yaml.in | 15 ++-- 14 files changed, 333 insertions(+), 17 deletions(-) create mode 100644 src/log-flush.c create mode 100644 src/log-flush.h diff --git a/src/Makefile.am b/src/Makefile.am index b0f841cfd0c2..f173aaff8866 100755 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -340,6 +340,7 @@ noinst_HEADERS = \ ippair-storage.h \ ippair-timeout.h \ log-cf-common.h \ + log-flush.h \ log-httplog.h \ log-pcap.h \ log-stats.h \ @@ -900,6 +901,7 @@ libsuricata_c_a_SOURCES = \ ippair-storage.c \ ippair-timeout.c \ log-cf-common.c \ + log-flush.c \ log-httplog.c \ log-pcap.c \ log-stats.c \ diff --git a/src/detect-engine.c b/src/detect-engine.c index 77c25a1cf3a9..afbc42a5876f 100644 --- a/src/detect-engine.c +++ b/src/detect-engine.c @@ -2286,15 +2286,50 @@ int DetectEngineInspectPktBufferGeneric( } /** \internal - * \brief inject a pseudo packet into each detect thread that doesn't use the - * new det_ctx yet + * \brief inject a pseudo packet into each detect thread + * if the thread should flush its output logs. */ -static void InjectPackets(ThreadVars **detect_tvs, - DetectEngineThreadCtx **new_det_ctx, - int no_of_detect_tvs) -{ - /* inject a fake packet if the detect thread isn't using the new ctx yet, - * this speeds up the process */ +void InjectPacketsForFlush(ThreadVars **detect_tvs, int no_of_detect_tvs) +{ + /* inject a fake packet if the detect thread that needs it. This function + * is called when a heartbeat log-flush request has been made + * and it should process a pseudo packet and flush its output logs + * to speed the process. */ +#if DEBUG + int count = 0; +#endif + for (int i = 0; i < no_of_detect_tvs; i++) { + if (detect_tvs[i]) { // && detect_tvs[i]->inq != NULL) { + Packet *p = PacketGetFromAlloc(); + if (p != NULL) { + SCLogDebug("Injecting pkt for tv %s[i=%d] %d", detect_tvs[i]->name, i, count++); + p->flags |= PKT_PSEUDO_STREAM_END; + p->flags |= PKT_PSEUDO_LOG_FLUSH; + PKT_SET_SRC(p, PKT_SRC_DETECT_RELOAD_FLUSH); + PacketQueue *q = detect_tvs[i]->stream_pq; + SCMutexLock(&q->mutex_q); + PacketEnqueue(q, p); + SCCondSignal(&q->cond_q); + SCMutexUnlock(&q->mutex_q); + } + } + } + SCLogDebug("leaving: thread notification count = %d", count); +} + +/** \internal + * \brief inject a pseudo packet into each detect thread + * -that doesn't use the new det_ctx yet + * -*or*, if the thread should flush its output logs. + */ +static void InjectPackets( + ThreadVars **detect_tvs, DetectEngineThreadCtx **new_det_ctx, int no_of_detect_tvs) +{ + /* inject a fake packet if the detect thread that needs it. This function + * is called if + * - A thread isn't using a DE ctx and should + * - Or, it should process a pseudo packet and flush its output logs. + * to speed the process. */ for (int i = 0; i < no_of_detect_tvs; i++) { if (SC_ATOMIC_GET(new_det_ctx[i]->so_far_used_by_detect) != 1) { if (detect_tvs[i]->inq != NULL) { diff --git a/src/detect-engine.h b/src/detect-engine.h index b75d124f9cd4..dfba0b374b5d 100644 --- a/src/detect-engine.h +++ b/src/detect-engine.h @@ -209,4 +209,6 @@ void DetectEngineStateResetTxs(Flow *f); void DeStateRegisterTests(void); +/* packet injection */ +void InjectPacketsForFlush(ThreadVars **detect_tvs, int no_of_detect_tvs); #endif /* SURICATA_DETECT_ENGINE_H */ diff --git a/src/flow-worker.c b/src/flow-worker.c index 63de42a26650..764383d00631 100644 --- a/src/flow-worker.c +++ b/src/flow-worker.c @@ -73,6 +73,8 @@ typedef struct FlowWorkerThreadData_ { SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread); + SC_ATOMIC_DECLARE(bool, flush_ack); + void *output_thread; /* Output thread data. */ void *output_thread_flow; /* Output thread data. */ @@ -555,9 +557,17 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data) SCLogDebug("packet %"PRIu64, p->pcap_cnt); /* update time */ - if (!(PKT_IS_PSEUDOPKT(p))) { + if (!(PKT_IS_PSEUDOPKT(p) || PKT_IS_FLUSHPKT(p))) { TimeSetByThread(tv->id, p->ts); } + if ((PKT_IS_FLUSHPKT(p))) { + SCLogDebug("thread %s flushing", tv->printable_name); + OutputLoggerFlush(tv, p, fw->output_thread); + /* Ack if a flush was requested */ + bool notset = false; + SC_ATOMIC_CAS(&fw->flush_ack, notset, true); + return TM_ECODE_OK; + } /* handle Flow */ if (p->flags & PKT_WANTS_FLOW) { @@ -719,6 +729,23 @@ void *FlowWorkerGetDetectCtxPtr(void *flow_worker) return SC_ATOMIC_GET(fw->detect_thread); } +void *FlowWorkerGetThreadData(void *flow_worker) +{ + return (FlowWorkerThreadData *)flow_worker; +} + +bool FlowWorkerGetFlushAck(void *flow_worker) +{ + FlowWorkerThreadData *fw = flow_worker; + return SC_ATOMIC_GET(fw->flush_ack) == true; +} + +void FlowWorkerSetFlushAck(void *flow_worker) +{ + FlowWorkerThreadData *fw = flow_worker; + SC_ATOMIC_SET(fw->flush_ack, false); +} + const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi) { switch (fwi) { diff --git a/src/flow-worker.h b/src/flow-worker.h index 951878eb062e..6bdea551935d 100644 --- a/src/flow-worker.h +++ b/src/flow-worker.h @@ -32,6 +32,9 @@ const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi); void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx); void *FlowWorkerGetDetectCtxPtr(void *flow_worker); +void *FlowWorkerGetThreadData(void *flow_worker); +bool FlowWorkerGetFlushAck(void *flow_worker); +void FlowWorkerSetFlushAck(void *flow_worker); void TmModuleFlowWorkerRegister (void); diff --git a/src/log-flush.c b/src/log-flush.c new file mode 100644 index 000000000000..a8c94e5b3e66 --- /dev/null +++ b/src/log-flush.c @@ -0,0 +1,199 @@ +/* Copyright (C) 2024 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \file + * + * \author Jeff Lucovsky + */ + +#include "suricata-common.h" +#include "suricata.h" +#include "detect.h" +#include "detect-engine.h" +#include "flow-worker.h" +#include "log-flush.h" +#include "tm-threads.h" +#include "conf.h" +#include "conf-yaml-loader.h" +#include "util-privs.h" + +/** + * \brief Trigger detect threads to flush their output logs + * + * This function is intended to be called at regular intervals to force + * buffered log data to be persisted + */ +static void WorkerFlushLogs(void) +{ + SCEnter(); + + /* count detect threads in use */ + uint32_t no_of_detect_tvs = TmThreadCountThreadsByTmmFlags(TM_FLAG_DETECT_TM); + /* can be zero in unix socket mode */ + if (no_of_detect_tvs == 0) { + return; + } + + /* prepare swap structures */ + void *fw_threads[no_of_detect_tvs]; + ThreadVars *detect_tvs[no_of_detect_tvs]; + memset(fw_threads, 0x00, (no_of_detect_tvs * sizeof(void *))); + memset(detect_tvs, 0x00, (no_of_detect_tvs * sizeof(ThreadVars *))); + + /* start by initiating the log flushes */ + + uint32_t i = 0; + SCMutexLock(&tv_root_lock); + /* get reference to tv's and setup fw_threads array */ + for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) { + if ((tv->tmm_flags & TM_FLAG_DETECT_TM) == 0) { + continue; + } + for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) { + TmModule *tm = TmModuleGetById(s->tm_id); + if (!(tm->flags & TM_FLAG_DETECT_TM)) { + continue; + } + + if (suricata_ctl_flags != 0) { + SCMutexUnlock(&tv_root_lock); + goto error; + } + + fw_threads[i] = FlowWorkerGetThreadData(SC_ATOMIC_GET(s->slot_data)); + if (fw_threads[i]) { + FlowWorkerSetFlushAck(fw_threads[i]); + SCLogDebug("Setting flush-ack for thread %s[i=%d]", tv->printable_name, i); + detect_tvs[i] = tv; + } + + i++; + break; + } + } + BUG_ON(i != no_of_detect_tvs); + + SCMutexUnlock(&tv_root_lock); + + SCLogDebug("Creating flush pseudo packets for %d threads", no_of_detect_tvs); + InjectPacketsForFlush(detect_tvs, no_of_detect_tvs); + + uint32_t threads_done = 0; +retry: + for (i = 0; i < no_of_detect_tvs; i++) { + if (suricata_ctl_flags != 0) { + threads_done = no_of_detect_tvs; + break; + } + usleep(1000); + if (fw_threads[i] && FlowWorkerGetFlushAck(fw_threads[i])) { + SCLogDebug("thread slot %d has ack'd flush request", i); + threads_done++; + } else if (detect_tvs[i]) { + SCLogDebug("thread slot %d not yet ack'd flush request", i); + TmThreadsCaptureBreakLoop(detect_tvs[i]); + } + } + if (threads_done < no_of_detect_tvs) { + threads_done = 0; + SleepMsec(250); + goto retry; + } + +error: + return; +} + +static int OutputFlushInterval(void) +{ + intmax_t output_flush_interval = 0; + if (ConfGetInt("heartbeat.output-flush-interval", &output_flush_interval) == 0) { + output_flush_interval = 0; + } + if (output_flush_interval < 0 || output_flush_interval > 60) { + SCLogConfig("flush_interval must be 0 or less than 60; using 0"); + output_flush_interval = 0; + } + + return (int)output_flush_interval; +} + +static void *LogFlusherWakeupThread(void *arg) +{ + int output_flush_interval = OutputFlushInterval(); + /* This was checked by the logic creating this thread */ + BUG_ON(output_flush_interval == 0); + + SCLogConfig("Using output-flush-interval of %d seconds", output_flush_interval); + /* + * Calculate the number of sleep intervals based on the output flush interval. This is necessary + * because this thread pauses a fixed amount of time to react to shutdown situations more + * quickly. + */ + const int log_flush_sleep_time = 500; /* milliseconds */ + const int flush_wait_count = (1000 * output_flush_interval) / log_flush_sleep_time; + + ThreadVars *tv_local = (ThreadVars *)arg; + SCSetThreadName(tv_local->name); + + if (tv_local->thread_setup_flags != 0) + TmThreadSetupOptions(tv_local); + + /* Set the threads capability */ + tv_local->cap_flags = 0; + SCDropCaps(tv_local); + + TmThreadsSetFlag(tv_local, THV_INIT_DONE | THV_RUNNING); + + int wait_count = 0; + uint64_t worker_flush_count = 0; + bool run = TmThreadsWaitForUnpause(tv_local); + while (run) { + usleep(log_flush_sleep_time * 1000); + + if (++wait_count == flush_wait_count) { + worker_flush_count++; + WorkerFlushLogs(); + wait_count = 0; + } + + if (TmThreadsCheckFlag(tv_local, THV_KILL)) { + break; + } + } + + TmThreadsSetFlag(tv_local, THV_RUNNING_DONE); + TmThreadWaitForFlag(tv_local, THV_DEINIT); + TmThreadsSetFlag(tv_local, THV_CLOSED); + SCLogInfo("%s: initiated %" PRIu64 " flushes", tv_local->name, worker_flush_count); + return NULL; +} + +void LogFlushThreads(void) +{ + if (0 == OutputFlushInterval()) { + SCLogConfig("log flusher thread not used with heartbeat.output-flush-interval of 0"); + return; + } + + ThreadVars *tv_log_flush = + TmThreadCreateMgmtThread(thread_name_heartbeat, LogFlusherWakeupThread, 1); + if (!tv_log_flush || (TmThreadSpawn(tv_log_flush) != 0)) { + FatalError("Unable to create and start log flush thread"); + } +} diff --git a/src/log-flush.h b/src/log-flush.h new file mode 100644 index 000000000000..e201942b2eb6 --- /dev/null +++ b/src/log-flush.h @@ -0,0 +1,26 @@ +/* Copyright (C) 2024 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \file + * + * \author Jeff Lucovsky + */ +#ifndef SURICATA_LOG_FLUSH_H__ +#define SURICATA_LOG_FLUSH_H__ +void LogFlushThreads(void); +#endif /* SURICATA_LOG_FLUSH_H__ */ diff --git a/src/output.c b/src/output.c index 748df0d08d95..512f2d175bac 100644 --- a/src/output.c +++ b/src/output.c @@ -708,6 +708,21 @@ void OutputNotifyFileRotation(void) { } } +TmEcode OutputLoggerFlush(ThreadVars *tv, Packet *p, void *thread_data) +{ + LoggerThreadStore *thread_store = (LoggerThreadStore *)thread_data; + RootLogger *logger = TAILQ_FIRST(&active_loggers); + LoggerThreadStoreNode *thread_store_node = TAILQ_FIRST(thread_store); + while (logger && thread_store_node) { + if (logger->FlushFunc) + logger->FlushFunc(tv, p, thread_store_node->thread_data); + + logger = TAILQ_NEXT(logger, entries); + thread_store_node = TAILQ_NEXT(thread_store_node, entries); + } + return TM_ECODE_OK; +} + TmEcode OutputLoggerLog(ThreadVars *tv, Packet *p, void *thread_data) { LoggerThreadStore *thread_store = (LoggerThreadStore *)thread_data; diff --git a/src/output.h b/src/output.h index e6a41fb1d51a..1f834c76a75f 100644 --- a/src/output.h +++ b/src/output.h @@ -164,6 +164,7 @@ void OutputRegisterRootLogger(ThreadInitFunc ThreadInit, ThreadDeinitFunc Thread void TmModuleLoggerRegister(void); TmEcode OutputLoggerLog(ThreadVars *, Packet *, void *); +TmEcode OutputLoggerFlush(ThreadVars *, Packet *, void *); TmEcode OutputLoggerThreadInit(ThreadVars *, const void *, void **); TmEcode OutputLoggerThreadDeinit(ThreadVars *, void *); void OutputLoggerExitPrintStats(ThreadVars *, void *); diff --git a/src/runmodes.c b/src/runmodes.c index b326a96e3a67..1c46882f014d 100644 --- a/src/runmodes.c +++ b/src/runmodes.c @@ -28,6 +28,7 @@ #include "util-debug.h" #include "util-affinity.h" #include "conf.h" +#include "log-flush.h" #include "runmodes.h" #include "runmode-af-packet.h" #include "runmode-af-xdp.h" @@ -72,6 +73,7 @@ const char *thread_name_unix_socket = "US"; const char *thread_name_detect_loader = "DL"; const char *thread_name_counter_stats = "CS"; const char *thread_name_counter_wakeup = "CW"; +const char *thread_name_heartbeat = "HB"; /** * \brief Holds description for a runmode. @@ -436,6 +438,7 @@ void RunModeDispatch(int runmode, const char *custom_mode, const char *capture_p BypassedFlowManagerThreadSpawn(); } StatsSpawnThreads(); + LogFlushThreads(); } } diff --git a/src/runmodes.h b/src/runmodes.h index cce5fcbbaa42..18fd5886d3e6 100644 --- a/src/runmodes.h +++ b/src/runmodes.h @@ -73,6 +73,7 @@ extern const char *thread_name_unix_socket; extern const char *thread_name_detect_loader; extern const char *thread_name_counter_stats; extern const char *thread_name_counter_wakeup; +extern const char *thread_name_heartbeat; char *RunmodeGetActive(void); const char *RunModeGetMainMode(void); diff --git a/src/suricata.c b/src/suricata.c index ee9dfc0b5b69..9ee9e4c2a585 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -3019,7 +3019,7 @@ void SuricataPostInit(void) #if defined(HAVE_SYS_RESOURCE_H) #ifdef linux if (geteuid() == 0) { - SCLogWarning("setrlimit has no effet when running as root."); + SCLogWarning("setrlimit has no effect when running as root."); } #endif struct rlimit r = { 0, 0 }; diff --git a/src/suricata.h b/src/suricata.h index 70393541f6fc..8c6488d06942 100644 --- a/src/suricata.h +++ b/src/suricata.h @@ -152,6 +152,7 @@ typedef struct SCInstance_ { int offline; int verbose; int checksum_validation; + int output_flush_interval; struct timeval start_time; diff --git a/suricata.yaml.in b/suricata.yaml.in index e3e95063a126..5cc98c11a7cb 100644 --- a/suricata.yaml.in +++ b/suricata.yaml.in @@ -563,19 +563,20 @@ outputs: scripts: # - script1.lua -# Logging configuration. This is not about logging IDS alerts/events, but -# output about what Suricata is doing, like startup messages, errors, etc. -logging: - # The flush-interval governs how often Suricata will instruct the detection - # threads to flush their EVE output. Specify the value in seconds [1-60] +heartbeat: + # The output-flush-interval value governs how often Suricata will instruct the + # detection threads to flush their EVE output. Specify the value in seconds [1-60] # and Suricata will initiate EVE log output flushes at that interval. A value # of 0 means no EVE log output flushes are initiated. When the EVE output # buffer-size value is non-zero, some EVE output that was written may remain - # buffered. The flush-interval governs how much buffered data exists. + # buffered. The output-flush-interval governs how much buffered data exists. # # The default value is: 0 (never instruct detection threads to flush output) - #flush-interval: 0 + #output-flush-interval: 0 +# Logging configuration. This is not about logging IDS alerts/events, but +# output about what Suricata is doing, like startup messages, errors, etc. +logging: # The default log level: can be overridden in an output section. # Note that debug level logging will only be emitted if Suricata was # compiled with the --enable-debug configure option.