Skip to content

Commit

Permalink
oneshot: continue reading events
Browse files Browse the repository at this point in the history
After EOF on the last file, continue reading from the last file
to pickup new events that may be logged.
  • Loading branch information
jasonish committed Jul 13, 2017
1 parent a451de6 commit 2305829
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 21 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
- Commenting support for PostgreSQL.
- With "has:comment" query string support.
- And "comment:SOME_STRING" for search comments.
- In oneshot mode, continue reading the last file to pickup new events
(https://github.com/jasonish/evebox/issues/54).

**Fixed**
- Fix an issue with updating the "active" row after archiving events.
Expand Down
78 changes: 57 additions & 21 deletions cmd/oneshot/oneshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,63 +163,99 @@ Example:
&useragent.EveUserAgentFilter{},
}
Loop:
for _, filename := range flagset.Args() {
for i, filename := range flagset.Args() {
last := len(flagset.Args()) == i+1
done := false
if last {
log.Info("Last file...")
}
reader, err := evereader.NewBasicReader(filename)
if err != nil {
log.Fatal(err)
}
size, _ := reader.FileSize()
log.Info("Reading %s (%d bytes)", filename, size)
lastPercent := 0

// The number of events queued to be committed.
queued := 0

for {
select {
case <-stopReading:
break Loop
default:
}

eof := false

event, err := reader.Next()
if err != nil {
if err == io.EOF {
break
if !last {
break
} else {
eof = true
}
} else {
log.Fatal(err)
}
log.Fatal(err)
}

for _, filter := range filters {
filter.Filter(event)
}
if event != nil {
for _, filter := range filters {
filter.Filter(event)
}

if err := eventSink.Submit(event); err != nil {
log.Fatal(err)
if err := eventSink.Submit(event); err != nil {
log.Fatal(err)
}
queued++
}

// Commit every 10000 events...
if count > 0 && count%10000 == 0 {
// Commit every 10000 events, or an EOF.
if (eof && queued > 0) || count > 0 && count%10000 == 0 {
// Only log when we are in the following mode of the
// last file.
if eof && done {
log.Info("Adding %d events.", queued)
}
if _, err := eventSink.Commit(); err != nil {
log.Fatal(err)
}
queued = 0
}

// But only log when the percentage goes up a full percent.
offset, _ := reader.FileOffset()
percent := int((float64(offset) / float64(size)) * 100.0)
if percent > lastPercent {
log.Info("%s: %d events (%d%%)", filename, count, percent)
lastPercent = percent
// But only log when the percentage goes up a full percent. And
// when we are actively processing a log file to the end.
if !done {
offset, _ := reader.FileOffset()
percent := int((float64(offset) / float64(size)) * 100.0)
if percent > lastPercent {
log.Info("%s: %d events (%d%%)", filename, count, percent)
lastPercent = percent
}
}

if eof {
time.Sleep(100 * time.Millisecond)
}

count++

if !done && last && eof {
if !nowait {
log.Debug("Sending done signal.")
doneReading <- 1
}
done = true
}
}

log.Info("%s: %d events (100%%)", filename, count)
if _, err := eventSink.Commit(); err != nil {
log.Fatal(err)
}
}

if !nowait {
doneReading <- 1
log.Info("%s: %d events (100%%)", filename, count)
}
}()
if !nowait {
Expand Down

0 comments on commit 2305829

Please sign in to comment.