Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #166 #192

Merged
merged 5 commits into from
Jun 5, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions splunk_eventgen/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ def parse_args():
help="Use multiprocesing instead of threading")
generate_subparser.add_argument("--profiler", action="store_true", help="Turn on cProfiler")
generate_subparser.add_argument("--log-path", type=str, default="{0}/logs".format(FILE_LOCATION))
generate_subparser.add_argument(
"--generator-queue-size", type=int, default=500, help="the max queue size for the "
"generator queue, timer object puts all the generator tasks into this queue, default max size is 500")
# Build subparser
build_subparser = subparsers.add_parser('build', help="Will build different forms of sa-eventgen")
build_subparser.add_argument("--mode", type=str, default="splunk-app",
Expand Down
10 changes: 8 additions & 2 deletions splunk_eventgen/eventgen_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,15 @@ def __init__(self, args=None):
# attach to the logging queue
self.logger.info("Logging Setup Complete.")

self._generator_queue_size = getattr(self.args, 'generator_queue_size', 500)
if self._generator_queue_size < 0:
self._generator_queue_size = 0
self.logger.info("set generator queue size to %d", self._generator_queue_size)

if self.args and 'configfile' in self.args and self.args.configfile:
self._load_config(self.args.configfile, args=args)


def _load_config(self, configfile, **kwargs):
'''
This method will use a configfile and set self.confg as a processeded config object,
Expand Down Expand Up @@ -216,11 +222,11 @@ def _create_generator_pool(self, workercount=20):
self.logging_pool = Thread(target=self.logger_thread, args=(self.loggingQueue, ), name="LoggerThread")
self.logging_pool.start()
# since we're now in multiprocess, we need to use better queues.
self.workerQueue = multiprocessing.JoinableQueue(maxsize=500)
self.workerQueue = multiprocessing.JoinableQueue(maxsize=self._generator_queue_size)
self.genconfig = self.manager.dict()
self.genconfig["stopping"] = False
else:
self.workerQueue = Queue(maxsize=500)
self.workerQueue = Queue(maxsize=self._generator_queue_size)
worker_threads = workercount
if hasattr(self.config, 'outputCounter') and self.config.outputCounter:
self.output_counters = []
Expand Down
21 changes: 14 additions & 7 deletions splunk_eventgen/lib/eventgentimer.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,12 @@ def predict_event_size(self):
except TypeError:
self.logger.error("Error loading sample file for sample '%s'" % self.sample.name)
return
return len(self.sample.sampleDict[0]['_raw'])
total_len = sum([len(e['_raw']) for e in self.sample.sampleDict])
sample_count = len(self.sample.sampleDict)
if sample_count == 0:
return 0
else:
return total_len/sample_count

def run(self):
"""
Expand Down Expand Up @@ -135,9 +140,9 @@ def real_run(self):
elif char != "-":
backfillletter += char
backfillearliest = timeParserTimeMath(plusminus=mathsymbol, num=backfillnumber, unit=backfillletter,
ret=realtime)
ret=realtime)
while backfillearliest < realtime:
if self.executions == int(self.end):
if self.end and self.executions == int(self.end):
self.logger.info("End executions %d reached, ending generation of sample '%s'" % (int(
self.end), self.sample.name))
break
Expand All @@ -148,12 +153,14 @@ def real_run(self):
genPlugin.updateConfig(config=self.config, outqueue=self.outputQueue)
genPlugin.updateCounts(count=count, start_time=et, end_time=lt)
try:
self.generatorQueue.put(genPlugin)
self.generatorQueue.put(genPlugin, True, 3)
self.executions += 1
backfillearliest = lt
except Full:
self.logger.warning("Generator Queue Full. Skipping current generation.")
backfillearliest = lt

self.logger.warning("Generator Queue Full. Reput the backfill generator task later. %d backfill generators are dispatched.", self.executions)
backfillearliest = et
jmeixensperger marked this conversation as resolved.
Show resolved Hide resolved
realtime = self.sample.now(realnow=True)

self.sample.backfilldone = True
else:
# 12/15/13 CS Moving the rating to a separate plugin architecture
Expand Down
3 changes: 3 additions & 0 deletions splunk_eventgen/lib/outputplugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ def run(self):
if self.output_counter is not None:
self.output_counter.collect(len(self.events), sum([len(e['_raw']) for e in self.events]))
self.events = None
self._output_end()

def _output_end(self):
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the purpose of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this callback hook is added as a exit hook point for output plugin.
This hook is used in the counter output plugin to collect information.
The counter plugin is used for debug and issue triaging scenario.


def load():
return OutputPlugin
49 changes: 49 additions & 0 deletions splunk_eventgen/lib/plugins/output/counter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import logging
import datetime
import pprint
import sys

from outputplugin import OutputPlugin


class CounterOutputPlugin(OutputPlugin):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we use this good stuff?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we want to use this dummy counter plugin, we can set outputMode=counter in eventgen conf stanza.
I did not update the readme doc because I think this is only used for debugging purpose. Do we have any place to hold the docs for developers? I can add some content to that place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's wrong with exposing this option to customers?

name = 'counter'
MAXQUEUELENGTH = 1000
useOutputQueue = True

dataSizeHistogram = {}
eventCountHistogram = {}
flushCount = 0
lastPrintAt = 0

def __init__(self, sample, output_counter=None):
OutputPlugin.__init__(self, sample, output_counter)

def flush(self, q):
CounterOutputPlugin.flushCount += 1
for e in q:
ts = datetime.datetime.fromtimestamp(int(e['_time']))
text = e['_raw']
day = ts.strftime('%Y-%m-%d')
CounterOutputPlugin.dataSizeHistogram[day] = CounterOutputPlugin.dataSizeHistogram.get(day, 0) + len(text)
CounterOutputPlugin.eventCountHistogram[day] = CounterOutputPlugin.eventCountHistogram.get(day, 0) + 1

def _output_end(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is where the output_end hook is used.

if CounterOutputPlugin.flushCount - CounterOutputPlugin.lastPrintAt > 0:
self._print_info('----- print the output histogram -----')
self._print_info('--- data size histogram ---')
self._print_info(pprint.pformat(CounterOutputPlugin.dataSizeHistogram))
self._print_info('--- event count histogram ---')
self._print_info(pprint.pformat(CounterOutputPlugin.eventCountHistogram))
CounterOutputPlugin.lastPrintAt = CounterOutputPlugin.flushCount

def _print_info(self, msg):
print >> sys.stderr, '{} {}'.format(datetime.datetime.now(), msg)

def _setup_logging(self):
self.logger = logging.getLogger('eventgen_counter_out')


def load():
"""Returns an instance of the plugin"""
return CounterOutputPlugin