From 1b5d41c39ccd2607b9496fb534db5b89247b1cc4 Mon Sep 17 00:00:00 2001 From: faiq Date: Fri, 22 Apr 2016 16:31:21 -0700 Subject: [PATCH] sdjournal: adds a follow with fields this allows users to pull data from journal entries that aren't MESSAGE --- sdjournal/journal.go | 25 ++++++++++++++ sdjournal/read.go | 79 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+) diff --git a/sdjournal/journal.go b/sdjournal/journal.go index 0324983a..78773a5e 100644 --- a/sdjournal/journal.go +++ b/sdjournal/journal.go @@ -251,6 +251,31 @@ func (j *Journal) GetData(field string) (string, error) { return msg, nil } +// EnumerateData may be used to iterate through all data fields used in the opened journal files +// the order of the returned field names is not defined. +func (j *Journal) EnumerateData() (string, error) { + var d unsafe.Pointer + var l C.size_t + + j.mu.Lock() + r := C.sd_journal_enumerate_data(j.cjournal, &d, &l) + j.mu.Unlock() + + if r < 0 { + return "", fmt.Errorf("failed to read message: %d", r) + } + + msg := C.GoStringN((*C.char)(d), C.int(l)) + return msg, nil +} + +//RestartData resets the field name enumeration index to the beginning of the list. +func (j *Journal) RestartData() { + j.mu.Lock() + C.sd_journal_restart_data(j.cjournal) + j.mu.Unlock() +} + // GetDataValue gets the data object associated with a specific field from the // current journal entry, returning only the value of the object. func (j *Journal) GetDataValue(field string) (string, error) { diff --git a/sdjournal/read.go b/sdjournal/read.go index 8944448c..ef93cb94 100644 --- a/sdjournal/read.go +++ b/sdjournal/read.go @@ -16,10 +16,12 @@ package sdjournal import ( + "encoding/json" "errors" "fmt" "io" "log" + "strings" "time" ) @@ -188,6 +190,83 @@ process: return } +func (r *JournalReader) FollowWithFields(until <-chan time.Time, fields map[string]struct{}, writer io.Writer) (err error) { + enc := json.NewEncoder(writer) +journal: + for { + kvMap := make(map[string]string) + select { + case <-until: + return ErrExpired + default: + var err error + var c int + + // Advance the journal cursor + c, err = r.journal.Next() + + // An unexpected error + if err != nil { + return err + } + + // We have a new journal entry go over the fields + // get the data for what we care about and return + r.journal.RestartData() + if c > 0 { + fields: + for { + s, err := r.journal.EnumerateData() + if err != nil || len(s) == 0 { + break fields + } + s = s[:len(s)] + arr := strings.SplitN(s, "=", 2) + if _, ok := fields[arr[0]]; ok { + kvMap[arr[0]] = arr[1] + } + + } + if err := enc.Encode(kvMap); err != nil { + return err + } + } + } + + // we're at the tail, so wait for new events or time out. + // holds journal events to process. tightly bounded for now unless there's a + // reason to unblock the journal watch routine more quickly + events := make(chan int, 1) + pollDone := make(chan bool, 1) + go func() { + for { + select { + case <-pollDone: + return + default: + events <- r.journal.Wait(time.Duration(1) * time.Second) + } + } + }() + + select { + case <-until: + pollDone <- true + return ErrExpired + case e := <-events: + pollDone <- true + switch e { + case SD_JOURNAL_NOP, SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE: + // TODO: need to account for any of these? + default: + log.Printf("Received unknown event: %d\n", e) + } + continue journal + } + } + return +} + // buildMessage returns a string representing the current journal entry in a simple format which // includes the entry timestamp and MESSAGE field. func (r *JournalReader) buildMessage() (string, error) {