Skip to content

Commit

Permalink
makes follow wrap async follow
Browse files Browse the repository at this point in the history
  • Loading branch information
faiq committed Apr 18, 2016
1 parent c95371b commit a9129f5
Showing 1 changed file with 39 additions and 43 deletions.
82 changes: 39 additions & 43 deletions sdjournal/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,73 +132,53 @@ func (r *JournalReader) Close() error {
// Follow synchronously follows the JournalReader, writing each new journal entry to writer. The
// follow will continue until a single time.Time is received on the until channel.
func (r *JournalReader) Follow(until <-chan time.Time, writer io.Writer) (err error) {

// Process journal entries and events. Entries are flushed until the tail or
// timeout is reached, and then we wait for new events or the timeout.
var msg = make([]byte, 64*1<<(10))
process:
msgChan := make(chan []byte)
errChan := r.FollowAsync(until, msgChan)
for {
c, err := r.Read(msg)
if err != nil && err != io.EOF {
break process
}

select {
case err := <-errChan:
if err != nil && err != io.EOF {
return err
}
case <-until:
return ErrExpired
default:
case msg := <-msgChan:
c := len(msg)
if c > 0 {
writer.Write(msg[:c])
continue process
}
}

// We're at the tail, so wait for new events or time out.
// Holds journal events to process. Tightly bounded for now unless there's a
// reason to unblock the journal watch routine more quickly.
events := make(chan int, 1)
pollDone := make(chan bool, 1)
go func() {
for {
select {
case <-pollDone:
return
default:
events <- r.journal.Wait(time.Duration(1) * time.Second)
}
}
}()

select {
case <-until:
pollDone <- true
return ErrExpired
case e := <-events:
pollDone <- true
switch e {
case SD_JOURNAL_NOP, SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE:
// TODO: need to account for any of these?
default:
log.Printf("Received unknown event: %d\n", e)
}
continue process
}
}

return
}

// FollowAsync asynchronously follows the JournalReader, writing each new journal entry to the provided receiving byte channel.
// The follow will continue until a single time.Time is received on the until channel.
// NOTE: The until channel, must be closed or this method will leak a Go routine
func (r *JournalReader) FollowAsync(until <-chan time.Time, recieve chan<- []byte) <-chan error {
errCh := make(chan error, 1)
events := make(chan int, 1)
pollDone := make(chan bool, 1)

go func() {
for {
select {
case <-pollDone:
return
default:
events <- r.journal.Wait(time.Duration(1) * time.Second)
}
}
}()

go func() {
var msg = make([]byte, 64*1<<(10))
for {
c, err := r.Read(msg)
if err != nil && err != io.EOF {
errCh <- err
return
}
select {
case <-until:
Expand All @@ -209,6 +189,22 @@ func (r *JournalReader) FollowAsync(until <-chan time.Time, recieve chan<- []byt
recieve <- msg[:c]
}
}

}

select {
case <-until:
pollDone <- true
errCh <- ErrExpired
return
case e := <-events:
pollDone <- true
switch e {
case SD_JOURNAL_NOP, SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE:
// TODO: need to account for any of these?
default:
log.Printf("Received unknown event: %d\n", e)
}
}
}()
return errCh
Expand Down

0 comments on commit a9129f5

Please sign in to comment.