From a9129f5f59951b20031df2c08b18dfd28b0d9e90 Mon Sep 17 00:00:00 2001 From: faiq Date: Mon, 18 Apr 2016 11:33:43 -0700 Subject: [PATCH] makes follow wrap async follow --- sdjournal/read.go | 82 ++++++++++++++++++++++------------------------- 1 file changed, 39 insertions(+), 43 deletions(-) diff --git a/sdjournal/read.go b/sdjournal/read.go index 172603d5..70c5052c 100644 --- a/sdjournal/read.go +++ b/sdjournal/read.go @@ -132,60 +132,25 @@ func (r *JournalReader) Close() error { // Follow synchronously follows the JournalReader, writing each new journal entry to writer. The // follow will continue until a single time.Time is received on the until channel. func (r *JournalReader) Follow(until <-chan time.Time, writer io.Writer) (err error) { - // Process journal entries and events. Entries are flushed until the tail or // timeout is reached, and then we wait for new events or the timeout. - var msg = make([]byte, 64*1<<(10)) -process: + msgChan := make(chan []byte) + errChan := r.FollowAsync(until, msgChan) for { - c, err := r.Read(msg) - if err != nil && err != io.EOF { - break process - } - select { + case err := <-errChan: + if err != nil && err != io.EOF { + return err + } case <-until: return ErrExpired - default: + case msg := <-msgChan: + c := len(msg) if c > 0 { writer.Write(msg[:c]) - continue process - } - } - - // We're at the tail, so wait for new events or time out. - // Holds journal events to process. Tightly bounded for now unless there's a - // reason to unblock the journal watch routine more quickly. - events := make(chan int, 1) - pollDone := make(chan bool, 1) - go func() { - for { - select { - case <-pollDone: - return - default: - events <- r.journal.Wait(time.Duration(1) * time.Second) - } - } - }() - - select { - case <-until: - pollDone <- true - return ErrExpired - case e := <-events: - pollDone <- true - switch e { - case SD_JOURNAL_NOP, SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE: - // TODO: need to account for any of these? - default: - log.Printf("Received unknown event: %d\n", e) } - continue process } } - - return } // FollowAsync asynchronously follows the JournalReader, writing each new journal entry to the provided receiving byte channel. @@ -193,12 +158,27 @@ process: // NOTE: The until channel, must be closed or this method will leak a Go routine func (r *JournalReader) FollowAsync(until <-chan time.Time, recieve chan<- []byte) <-chan error { errCh := make(chan error, 1) + events := make(chan int, 1) + pollDone := make(chan bool, 1) + + go func() { + for { + select { + case <-pollDone: + return + default: + events <- r.journal.Wait(time.Duration(1) * time.Second) + } + } + }() + go func() { var msg = make([]byte, 64*1<<(10)) for { c, err := r.Read(msg) if err != nil && err != io.EOF { errCh <- err + return } select { case <-until: @@ -209,6 +189,22 @@ func (r *JournalReader) FollowAsync(until <-chan time.Time, recieve chan<- []byt recieve <- msg[:c] } } + + } + + select { + case <-until: + pollDone <- true + errCh <- ErrExpired + return + case e := <-events: + pollDone <- true + switch e { + case SD_JOURNAL_NOP, SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE: + // TODO: need to account for any of these? + default: + log.Printf("Received unknown event: %d\n", e) + } } }() return errCh