Skip to content

Commit

Permalink
Issue 160 (#163)
Browse files Browse the repository at this point in the history
* Fixed timer and token

* Added a conditional for end=-1

* Update eventgentimer.py
  • Loading branch information
arctan5x authored Apr 18, 2019
1 parent 1a35eea commit 2a416a9
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 8 deletions.
29 changes: 22 additions & 7 deletions splunk_eventgen/lib/eventgentimer.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,12 @@ def real_run(self):
end = False
previous_count_left = 0
raw_event_size = self.predict_event_size()
if self.end and int(self.end) == 0:
self.logger.info("End = 0, no events will be generated for sample '%s'" % self.sample.name)
end = True
if self.end:
if int(self.end) == 0:
self.logger.info("End = 0, no events will be generated for sample '%s'" % self.sample.name)
end = True
elif int(self.end) == -1:
self.logger.info("End is set to -1. Will be running without stopping for sample %s" % self.sample.name)
while not end:
# Need to be able to stop threads by the main thread or this thread. self.config will stop all threads
# referenced in the config object, while, self.stopping will only stop this one.
Expand All @@ -119,7 +122,6 @@ def real_run(self):
count = self.rater.rate()
# First run of the generator, see if we have any backfill work to do.
if self.countdown <= 0:

if self.sample.backfill and not self.sample.backfilldone:
realtime = self.sample.now(realnow=True)
if "-" in self.sample.backfill[0]:
Expand All @@ -136,6 +138,10 @@ def real_run(self):
backfillearliest = timeParserTimeMath(plusminus=mathsymbol, num=backfillnumber, unit=backfillletter,
ret=realtime)
while backfillearliest < realtime:
if self.executions == int(self.end):
self.logger.info("End executions %d reached, ending generation of sample '%s'" % (int(
self.end), self.sample.name))
break
et = backfillearliest
lt = timeParserTimeMath(plusminus="+", num=self.interval, unit="s", ret=et)
genPlugin = self.generatorPlugin(sample=self.sample)
Expand All @@ -144,9 +150,11 @@ def real_run(self):
genPlugin.updateCounts(count=count, start_time=et, end_time=lt)
try:
self.generatorQueue.put(genPlugin)
self.executions += 1
except Full:
self.logger.warning("Generator Queue Full. Skipping current generation.")
backfillearliest = lt

self.sample.backfilldone = True
else:
# 12/15/13 CS Moving the rating to a separate plugin architecture
Expand Down Expand Up @@ -181,15 +189,18 @@ def real_run(self):
# self.generatorPlugin is only an instance, now we need a real plugin. Make a copy of
# of the sample in case another generator corrupts it.
copy_sample = copy.copy(self.sample)
tokens = copy.deepcopy(self.sample.tokens)
copy_sample.tokens = tokens
copy_tokens = []
for token in self.sample.tokens:
copy_tokens.append(token.deepcopy(self.sample))
copy_sample.tokens = copy_tokens
genPlugin = self.generatorPlugin(sample=copy_sample)
# Adjust queue for threading mode
genPlugin.updateConfig(config=self.config, outqueue=self.outputQueue)
genPlugin.updateCounts(count=count, start_time=et, end_time=lt)

try:
self.generatorQueue.put(genPlugin)
self.executions += 1
self.logger.info(("Worker# {0}: Put {1} MB of events in queue for sample '{2}'" +
"with et '{3}' and lt '{4}'").format(
worker_id, round((count / 1024.0 / 1024), 4),
Expand All @@ -204,10 +215,14 @@ def real_run(self):

# Sleep until we're supposed to wake up and generate more events
self.countdown = self.interval
self.executions += 1

# 8/20/15 CS Adding support for ending generation at a certain time

if self.end:
if int(self.end) == -1:
time.sleep(self.time)
self.countdown -= self.time
continue
# 3/16/16 CS Adding support for ending on a number of executions instead of time
# Should be fine with storing state in this sample object since each sample has it's own unique
# timer thread
Expand Down
13 changes: 12 additions & 1 deletion splunk_eventgen/lib/eventgentoken.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ def __init__(self, sample=None):
self._earliestTime = (None, None)
self._latestTime = (None, None)

if sample:
self.sample = sample

def __str__(self):
"""Only used for debugging, outputs a pretty printed representation of this token"""
# Eliminate recursive going back to parent
Expand All @@ -65,10 +68,18 @@ def __getstate__(self):
def __setstate__(self, d):
self.__dict__ = d
self._setup_logging()

def deepcopy(self, sample=None):
# temp = dict([(key, value) for (key, value) in token_object.items() if key != 'sample' and key != 'logger'])
cp = Token()
cp.__setstate__(self.__getstate__())
if sample:
cp.sample = sample
return cp

def _setup_logging(self):
self.logger = logging.getLogger('eventgen')

def _match(self, event):
"""Executes regular expression match and returns the re.Match object"""
return re.match(self.token, event)
Expand Down

0 comments on commit 2a416a9

Please sign in to comment.