-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
Copy pathheartbeat.go
366 lines (316 loc) · 12.2 KB
/
heartbeat.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package beater
import (
"context"
"errors"
"fmt"
"sync"
"syscall"
"time"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/beats/v7/heartbeat/config"
"github.com/elastic/beats/v7/heartbeat/hbregistry"
"github.com/elastic/beats/v7/heartbeat/monitors"
"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate"
"github.com/elastic/beats/v7/heartbeat/scheduler"
_ "github.com/elastic/beats/v7/heartbeat/security"
"github.com/elastic/beats/v7/heartbeat/tracer"
"github.com/elastic/beats/v7/libbeat/autodiscover"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/backoff"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/management"
)
// Heartbeat represents the root datastructure of this beat.
type Heartbeat struct {
done chan struct{}
stopOnce sync.Once
// config is used for iterating over elements of the config.
config *config.Config
scheduler *scheduler.Scheduler
monitorReloader *cfgfile.Reloader
monitorFactory *monitors.RunnerFactory
autodiscover *autodiscover.Autodiscover
replaceStateLoader func(sl monitorstate.StateLoader)
trace tracer.Tracer
}
// New creates a new heartbeat.
func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) {
parsedConfig := config.DefaultConfig()
if err := rawConfig.Unpack(&parsedConfig); err != nil {
return nil, fmt.Errorf("error reading config file: %w", err)
}
// The sock tracer should be setup before any other code to ensure its reliability
// The ES Loader, for instance, can exit early
var trace tracer.Tracer = tracer.NewNoopTracer()
stConfig := parsedConfig.SocketTrace
if stConfig != nil {
// Note this, intentionally, blocks until connected to the trace endpoint
var err error
logp.L().Infof("Setting up sock tracer at %s (wait: %s)", stConfig.Path, stConfig.Wait)
sockTrace, err := tracer.NewSockTracer(stConfig.Path, stConfig.Wait)
if err == nil {
trace = sockTrace
} else {
logp.L().Warnf("could not connect to socket trace at path %s after %s timeout: %w", stConfig.Path, stConfig.Wait, err)
}
}
// Check if any of these can prevent using states client
stateLoader, replaceStateLoader := monitorstate.AtomicStateLoader(monitorstate.NilStateLoader)
if b.Config.Output.Name() == "elasticsearch" && !b.Manager.Enabled() {
// Connect to ES and setup the State loader if the output is not managed by agent
// Note this, intentionally, blocks until connected or max attempts reached
esClient, err := makeESClient(context.TODO(), b.Config.Output.Config(), 3, 2*time.Second)
if err != nil {
if parsedConfig.RunOnce {
trace.Abort()
return nil, fmt.Errorf("run_once mode fatal error: %w", err)
} else {
logp.L().Warnf("skipping monitor state management: %w", err)
}
} else {
replaceStateLoader(monitorstate.MakeESLoader(esClient, monitorstate.DefaultDataStreams, parsedConfig.RunFrom))
}
} else if b.Manager.Enabled() {
stateLoader, replaceStateLoader = monitorstate.DeferredStateLoader(monitorstate.NilStateLoader, 15*time.Second)
}
limit := parsedConfig.Scheduler.Limit
schedLocationName := parsedConfig.Scheduler.Location
if schedLocationName == "" {
schedLocationName = "Local"
}
location, err := time.LoadLocation(schedLocationName)
if err != nil {
return nil, err
}
jobConfig := parsedConfig.Jobs
sched := scheduler.Create(limit, hbregistry.SchedulerRegistry, location, jobConfig, parsedConfig.RunOnce)
pipelineClientFactory := func(p beat.Pipeline) (beat.Client, error) {
return p.Connect()
}
bt := &Heartbeat{
done: make(chan struct{}),
config: parsedConfig,
scheduler: sched,
replaceStateLoader: replaceStateLoader,
// monitorFactory is the factory used for creating all monitor instances,
// wiring them up to everything needed to actually execute.
monitorFactory: monitors.NewFactory(monitors.FactoryParams{
BeatInfo: b.Info,
AddTask: sched.Add,
StateLoader: stateLoader,
PluginsReg: plugin.GlobalPluginsReg,
PipelineClientFactory: pipelineClientFactory,
BeatRunFrom: parsedConfig.RunFrom,
}),
trace: trace,
}
runFromID := "<unknown location>"
if parsedConfig.RunFrom != nil {
runFromID = parsedConfig.RunFrom.ID
}
logp.L().Infof("heartbeat starting, running from: %v", runFromID)
return bt, nil
}
// Run executes the beat.
func (bt *Heartbeat) Run(b *beat.Beat) error {
bt.trace.Start()
defer bt.trace.Close()
// Adapt local pipeline to synchronized mode if run_once is enabled
pipeline := b.Publisher
var pipelineWrapper monitors.PipelineWrapper = &monitors.NoopPipelineWrapper{}
if bt.config.RunOnce {
sync := &monitors.SyncPipelineWrapper{}
pipeline = monitors.WithSyncPipelineWrapper(pipeline, sync)
pipelineWrapper = sync
}
logp.L().Info("heartbeat is running! Hit CTRL-C to stop it.")
groups, _ := syscall.Getgroups()
logp.L().Infof("Effective user/group ids: %d/%d, with groups: %v", syscall.Geteuid(), syscall.Getegid(), groups)
waitMonitors := monitors.NewSignalWait()
// It is important this appear before we check for run once mode
// In run once mode we depend on these monitors being loaded, but not other more
// dynamic types.
stopStaticMonitors, err := bt.RunStaticMonitors(b, pipeline)
if err != nil {
return err
}
defer stopStaticMonitors()
if bt.config.RunOnce {
waitMonitors.Add(monitors.WithLog(bt.scheduler.WaitForRunOnce, "Ending run_once run."))
}
if b.Manager.Enabled() {
bt.RunCentralMgmtMonitors(b)
}
if bt.config.ConfigMonitors.Enabled() {
bt.monitorReloader = cfgfile.NewReloader(logp.L().Named("module.reload"), b.Publisher, bt.config.ConfigMonitors)
defer bt.monitorReloader.Stop()
err := bt.RunReloadableMonitors()
if err != nil {
return err
}
}
// Configure the beats Manager to start after all the reloadable hooks are initialized
// and shutdown when the function return.
if err := b.Manager.Start(); err != nil {
return err
}
defer b.Manager.Stop()
if bt.config.Autodiscover != nil {
bt.autodiscover, err = bt.makeAutodiscover(b)
if err != nil {
return err
}
bt.autodiscover.Start()
defer bt.autodiscover.Stop()
}
defer bt.scheduler.Stop()
// Wait until run_once ends or bt is being shut down
waitMonitors.AddChan(bt.done)
waitMonitors.Wait()
logp.L().Info("Shutting down, waiting for output to complete")
// Due to defer's LIFO execution order, waitPublished.Wait() has to be
// located _after_ b.Manager.Stop() or else it will exit early
waitPublished := monitors.NewSignalWait()
defer waitPublished.Wait()
// Three possible events: global beat, run_once pipeline done and publish timeout
waitPublished.AddChan(bt.done)
waitPublished.Add(monitors.WithLog(pipelineWrapper.Wait, "shutdown: finished publishing events."))
if bt.config.PublishTimeout > 0 {
logp.L().Infof("shutdown: output timer started. Waiting for max %v.", bt.config.PublishTimeout)
waitPublished.Add(monitors.WithLog(monitors.WaitDuration(bt.config.PublishTimeout),
"shutdown: timed out waiting for pipeline to publish events."))
}
return nil
}
// RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat, pipeline beat.Pipeline) (stop func(), err error) {
runners := make([]cfgfile.Runner, 0, len(bt.config.Monitors))
for _, cfg := range bt.config.Monitors {
created, err := bt.monitorFactory.Create(pipeline, cfg)
if err != nil {
if errors.Is(err, monitors.ErrMonitorDisabled) {
logp.L().Infof("skipping disabled monitor: %s", err)
continue // don't stop loading monitors just because they're disabled
}
return nil, fmt.Errorf("could not create monitor: %w", err)
}
created.Start()
runners = append(runners, created)
}
stop = func() {
for _, runner := range runners {
runner.Stop()
}
}
return stop, nil
}
// RunCentralMgmtMonitors loads any central management configured configs.
func (bt *Heartbeat) RunCentralMgmtMonitors(b *beat.Beat) {
// Register output reloader for managed outputs
b.OutputConfigReloader = reload.ReloadableFunc(func(r *reload.ConfigWithMeta) error {
// Do not return error here, it will prevent libbeat output from processing the same event
if r == nil {
return nil
}
outCfg := conf.Namespace{}
//nolint:nilerr // we are intentionally ignoring specific errors here
if err := r.Config.Unpack(&outCfg); err != nil || outCfg.Name() != "elasticsearch" {
return nil
}
// Backoff panics with 0 duration, set to smallest unit
esClient, err := makeESClient(context.TODO(), outCfg.Config(), 1, 1*time.Nanosecond)
if err != nil {
logp.L().Warnf("skipping monitor state management during managed reload: %w", err)
} else {
bt.replaceStateLoader(monitorstate.MakeESLoader(esClient, monitorstate.DefaultDataStreams, bt.config.RunFrom))
}
return nil
})
inputs := cfgfile.NewRunnerList(management.DebugK, bt.monitorFactory, b.Publisher)
b.Registry.MustRegisterInput(inputs)
}
// RunReloadableMonitors runs the `heartbeat.config.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunReloadableMonitors() (err error) {
// Check monitor configs
if err := bt.monitorReloader.Check(bt.monitorFactory); err != nil {
logp.L().Error(fmt.Errorf("error loading reloadable monitors: %w", err))
}
// Execute the monitor
go bt.monitorReloader.Run(bt.monitorFactory)
return nil
}
// makeAutodiscover creates an autodiscover object ready to be started.
func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover, error) {
ad, err := autodiscover.NewAutodiscover(
"heartbeat",
b.Publisher,
bt.monitorFactory,
autodiscover.QueryConfig(),
bt.config.Autodiscover,
b.Keystore,
)
if err != nil {
return nil, err
}
return ad, nil
}
// Stop stops the beat.
func (bt *Heartbeat) Stop() {
bt.stopOnce.Do(func() { close(bt.done) })
}
// makeESClient establishes an ES connection meant to load monitors' state
func makeESClient(ctx context.Context, cfg *conf.C, attempts int, wait time.Duration) (*eslegclient.Connection, error) {
var (
esClient *eslegclient.Connection
err error
)
// ES client backoff
connectDelay := backoff.NewEqualJitterBackoff(
context.Background().Done(),
wait,
wait,
)
// Overriding the default ES request timeout:
// Higher values of timeouts cannot be applied on the SAAS Service
// where we are running in tight loops and want the next successive check to be run for a given monitor
// within the next scheduled interval which could be 1m or 3m
// Clone original config since we don't want this change to be global
newCfg, err := conf.NewConfigFrom(cfg)
if err != nil {
return nil, fmt.Errorf("error cloning config: %w", err)
}
timeout := int64((10 * time.Second).Seconds())
if err := newCfg.SetInt("timeout", -1, timeout); err != nil {
return nil, fmt.Errorf("error setting the ES timeout in config: %w", err)
}
for i := 0; i < attempts; i++ {
esClient, err = eslegclient.NewConnectedClient(ctx, newCfg, "Heartbeat")
if err == nil {
connectDelay.Reset()
return esClient, nil
} else {
connectDelay.Wait()
}
}
return nil, fmt.Errorf("could not establish states loader connection after %d attempts, with %s delay", attempts, wait)
}