-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix harvester shutdown for prospector reloading #3563
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
package channel | ||
|
||
import ( | ||
"sync" | ||
"sync/atomic" | ||
|
||
"github.com/elastic/beats/filebeat/input" | ||
) | ||
|
||
type Outlet struct { | ||
wg *sync.WaitGroup | ||
done <-chan struct{} | ||
signal <-chan struct{} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make difference between signal and done clear as godoc here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding a comment. |
||
channel chan *input.Event | ||
isOpen int32 // atomic indicator | ||
} | ||
|
||
func NewOutlet( | ||
done <-chan struct{}, | ||
c chan *input.Event, | ||
wg *sync.WaitGroup, | ||
) *Outlet { | ||
return &Outlet{ | ||
done: done, | ||
channel: c, | ||
wg: wg, | ||
isOpen: 1, | ||
} | ||
} | ||
|
||
// SetSignal sets the signal channel for OnEventSignal | ||
func (o *Outlet) SetSignal(signal <-chan struct{}) { | ||
o.signal = signal | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why we need There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need it to make a difference between the harvester stopping itself and the state.Finished must always be reported and beater.done closing, which means it should directly shut down. I added a note in the godocs. |
||
|
||
func (o *Outlet) OnEvent(event *input.Event) bool { | ||
open := atomic.LoadInt32(&o.isOpen) == 1 | ||
if !open { | ||
return false | ||
} | ||
|
||
if o.wg != nil { | ||
o.wg.Add(1) | ||
} | ||
|
||
select { | ||
case <-o.done: | ||
if o.wg != nil { | ||
o.wg.Done() | ||
} | ||
atomic.StoreInt32(&o.isOpen, 0) | ||
return false | ||
case o.channel <- event: | ||
return true | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if outlet is used by multiple producers this can race I think. If it's used by one single producer only, there is no need for the atomics. Also, setting a channel to
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer to keep the current implementation to make it possible to use the same Outlet also for prospector to spooler in the future. So we only have one implementation which supports both use cases. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's time to get rid of the spooler... |
||
} | ||
|
||
// OnEventSignal can be stopped by the signal that is set with SetSignal | ||
// This does not close the outlet. Only OnEvent does close the outlet. | ||
func (o *Outlet) OnEventSignal(event *input.Event) bool { | ||
open := atomic.LoadInt32(&o.isOpen) == 1 | ||
if !open { | ||
return false | ||
} | ||
|
||
if o.wg != nil { | ||
o.wg.Add(1) | ||
} | ||
|
||
select { | ||
case <-o.signal: | ||
if o.wg != nil { | ||
o.wg.Done() | ||
} | ||
o.signal = nil | ||
return false | ||
case o.channel <- event: | ||
return true | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wg used for counting active events?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, added a comment.