Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log errors from v2 client errors channel #34392

Merged
merged 4 commits into from
Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix namespacing on self-monitoring {pull}32336[32336]
- Fix race condition when stopping runners {pull}32433[32433]
- Fix concurrent map writes when system/process code called from reporter code {pull}32491[32491]
- Log errors from the Elastic Agent V2 client errors channel. Avoids blocking when error occurs communicating with the Elastic Agent. {pull}34392[34392]

*Auditbeat*

Expand Down
29 changes: 27 additions & 2 deletions x-pack/libbeat/management/managerV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type BeatV2Manager struct {

logger *logp.Logger

// handles client errors
errCanceller context.CancelFunc

// track individual units given to us by the V2 API
mx sync.Mutex
units map[unitKey]*client.Unit
Expand Down Expand Up @@ -175,11 +178,18 @@ func (cm *BeatV2Manager) Start() error {
if !cm.Enabled() {
return fmt.Errorf("V2 Manager is disabled")
}
err := cm.client.Start(context.Background())
if cm.errCanceller != nil {
cm.errCanceller()
cm.errCanceller = nil
}
ctx := context.Background()
err := cm.client.Start(ctx)
if err != nil {
return fmt.Errorf("error starting connection to client")
}

ctx, canceller := context.WithCancel(ctx)
cm.errCanceller = canceller
go cm.watchErrChan(ctx)
cm.client.RegisterDiagnosticHook("beat-rendered-config", "the rendered config used by the beat", "beat-rendered-config.yml", "application/yaml", cm.handleDebugYaml)
go cm.unitListen()
cm.isRunning = true
Expand Down Expand Up @@ -325,6 +335,17 @@ func (cm *BeatV2Manager) deleteUnit(unit *client.Unit) {
// Private V2 implementation
// ================================

func (cm *BeatV2Manager) watchErrChan(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case err := <-cm.client.Errors():
cm.logger.Errorf("elastic-agent-client error: %s", err)
}
}
}

func (cm *BeatV2Manager) unitListen() {
const changeDebounce = 100 * time.Millisecond

Expand Down Expand Up @@ -405,6 +426,10 @@ func (cm *BeatV2Manager) stopBeat() {
}
cm.client.Stop()
cm.UpdateStatus(lbmanagement.Stopped, "Stopped")
if cm.errCanceller != nil {
cm.errCanceller()
cm.errCanceller = nil
}
}

func (cm *BeatV2Manager) reload(units map[unitKey]*client.Unit) {
Expand Down