Skip to content

Commit

Permalink
sdjournal: adds a follow with fields
Browse files Browse the repository at this point in the history
this allows users to pull data from journal entries that aren't MESSAGE
  • Loading branch information
faiq committed Apr 25, 2016
1 parent 7b2428f commit 1b5d41c
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 0 deletions.
25 changes: 25 additions & 0 deletions sdjournal/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,31 @@ func (j *Journal) GetData(field string) (string, error) {
return msg, nil
}

// EnumerateData may be used to iterate through all data fields used in the opened journal files
// the order of the returned field names is not defined.
func (j *Journal) EnumerateData() (string, error) {
var d unsafe.Pointer
var l C.size_t

j.mu.Lock()
r := C.sd_journal_enumerate_data(j.cjournal, &d, &l)
j.mu.Unlock()

if r < 0 {
return "", fmt.Errorf("failed to read message: %d", r)
}

msg := C.GoStringN((*C.char)(d), C.int(l))
return msg, nil
}

//RestartData resets the field name enumeration index to the beginning of the list.
func (j *Journal) RestartData() {
j.mu.Lock()
C.sd_journal_restart_data(j.cjournal)
j.mu.Unlock()
}

// GetDataValue gets the data object associated with a specific field from the
// current journal entry, returning only the value of the object.
func (j *Journal) GetDataValue(field string) (string, error) {
Expand Down
79 changes: 79 additions & 0 deletions sdjournal/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
package sdjournal

import (
"encoding/json"
"errors"
"fmt"
"io"
"log"
"strings"
"time"
)

Expand Down Expand Up @@ -188,6 +190,83 @@ process:
return
}

func (r *JournalReader) FollowWithFields(until <-chan time.Time, fields map[string]struct{}, writer io.Writer) (err error) {
enc := json.NewEncoder(writer)
journal:
for {
kvMap := make(map[string]string)
select {
case <-until:
return ErrExpired
default:
var err error
var c int

// Advance the journal cursor
c, err = r.journal.Next()

// An unexpected error
if err != nil {
return err
}

// We have a new journal entry go over the fields
// get the data for what we care about and return
r.journal.RestartData()
if c > 0 {
fields:
for {
s, err := r.journal.EnumerateData()
if err != nil || len(s) == 0 {
break fields
}
s = s[:len(s)]
arr := strings.SplitN(s, "=", 2)
if _, ok := fields[arr[0]]; ok {
kvMap[arr[0]] = arr[1]
}

}
if err := enc.Encode(kvMap); err != nil {
return err
}
}
}

// we're at the tail, so wait for new events or time out.
// holds journal events to process. tightly bounded for now unless there's a
// reason to unblock the journal watch routine more quickly
events := make(chan int, 1)
pollDone := make(chan bool, 1)
go func() {
for {
select {
case <-pollDone:
return
default:
events <- r.journal.Wait(time.Duration(1) * time.Second)
}
}
}()

select {
case <-until:
pollDone <- true
return ErrExpired
case e := <-events:
pollDone <- true
switch e {
case SD_JOURNAL_NOP, SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE:
// TODO: need to account for any of these?
default:
log.Printf("Received unknown event: %d\n", e)
}
continue journal
}
}
return
}

// buildMessage returns a string representing the current journal entry in a simple format which
// includes the entry timestamp and MESSAGE field.
func (r *JournalReader) buildMessage() (string, error) {
Expand Down

0 comments on commit 1b5d41c

Please sign in to comment.