From f692bccaa94009909439ecfb97ffca473d61a3af Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Sun, 1 Mar 2020 21:33:53 -0700 Subject: [PATCH] [Libbeat] Use *logp.Logger in libbeat processors (#16654) * Use *logp.Logger in libbeat processors (cherry picked from commit 755227da850196d1a96d0c2096ad524fd5a28ff3) --- libbeat/processors/actions/copy_fields.go | 4 +++- .../processors/actions/copy_fields_test.go | 5 ++++- .../processors/actions/decode_json_fields.go | 13 ++++++------ .../actions/decode_json_fields_test.go | 8 ++++---- libbeat/processors/actions/rename.go | 4 +++- libbeat/processors/actions/rename_test.go | 4 ++++ libbeat/processors/actions/truncate_fields.go | 4 +++- .../actions/truncate_fields_test.go | 4 ++++ .../add_cloud_metadata/add_cloud_metadata.go | 10 +++++----- .../add_cloud_metadata/providers.go | 20 +++++++++---------- .../add_host_metadata/add_host_metadata.go | 4 +++- .../processors/add_locale/add_locale_test.go | 5 ++--- .../add_observer_metadata.go | 4 +++- 13 files changed, 55 insertions(+), 34 deletions(-) diff --git a/libbeat/processors/actions/copy_fields.go b/libbeat/processors/actions/copy_fields.go index c67dd996385f..b81826a7ad1f 100644 --- a/libbeat/processors/actions/copy_fields.go +++ b/libbeat/processors/actions/copy_fields.go @@ -32,6 +32,7 @@ import ( type copyFields struct { config copyFieldsConfig + logger *logp.Logger } type copyFieldsConfig struct { @@ -62,6 +63,7 @@ func NewCopyFields(c *common.Config) (processors.Processor, error) { f := ©Fields{ config: config, + logger: logp.NewLogger("copy_fields"), } return f, nil } @@ -76,7 +78,7 @@ func (f *copyFields) Run(event *beat.Event) (*beat.Event, error) { err := f.copyField(field.From, field.To, event.Fields) if err != nil { errMsg := fmt.Errorf("Failed to copy fields in copy_fields processor: %s", err) - logp.Debug("copy_fields", errMsg.Error()) + f.logger.Debug(errMsg.Error()) if f.config.FailOnError { event.Fields = backup event.PutValue("error.message", errMsg.Error()) diff --git a/libbeat/processors/actions/copy_fields_test.go b/libbeat/processors/actions/copy_fields_test.go index a3de4ae19470..7050e93a97a4 100644 --- a/libbeat/processors/actions/copy_fields_test.go +++ b/libbeat/processors/actions/copy_fields_test.go @@ -20,6 +20,8 @@ package actions import ( "testing" + "github.com/elastic/beats/libbeat/logp" + "github.com/stretchr/testify/assert" "github.com/elastic/beats/libbeat/beat" @@ -27,7 +29,7 @@ import ( ) func TestCopyFields(t *testing.T) { - + log := logp.NewLogger("copy_fields_test") var tests = map[string]struct { FromTo fromTo Input common.MapStr @@ -130,6 +132,7 @@ func TestCopyFields(t *testing.T) { test.FromTo, }, }, + log, } event := &beat.Event{ diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index 078ec38f1ccc..dbff3364338e 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -42,6 +42,7 @@ type decodeJSONFields struct { processArray bool documentID string target *string + logger *logp.Logger } type config struct { @@ -62,8 +63,6 @@ var ( errProcessingSkipped = errors.New("processing skipped") ) -var debug = logp.MakeDebug("filters") - func init() { processors.RegisterPlugin("decode_json_fields", checks.ConfigChecked(NewDecodeJSONFields, @@ -76,10 +75,11 @@ func init() { // NewDecodeJSONFields construct a new decode_json_fields processor. func NewDecodeJSONFields(c *common.Config) (processors.Processor, error) { config := defaultConfig + logger := logp.NewLogger("truncate_fields") err := c.Unpack(&config) if err != nil { - logp.Warn("Error unpacking config for decode_json_fields") + logger.Warn("Error unpacking config for decode_json_fields") return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %s", err) } @@ -91,6 +91,7 @@ func NewDecodeJSONFields(c *common.Config) (processors.Processor, error) { processArray: config.ProcessArray, documentID: config.DocumentID, target: config.Target, + logger: logger, } return f, nil } @@ -101,7 +102,7 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) { for _, field := range f.fields { data, err := event.GetValue(field) if err != nil && errors.Cause(err) != common.ErrKeyNotFound { - debug("Error trying to GetValue for field : %s in event : %v", field, event) + f.logger.Debugf("Error trying to GetValue for field : %s in event : %v", field, event) errs = append(errs, err.Error()) continue } @@ -115,7 +116,7 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) { var output interface{} err = unmarshal(f.maxDepth, text, &output, f.processArray) if err != nil { - debug("Error trying to unmarshal %s", text) + f.logger.Debugf("Error trying to unmarshal %s", text) errs = append(errs, err.Error()) continue } @@ -149,7 +150,7 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) { } if err != nil { - debug("Error trying to Put value %v for field : %s", output, field) + f.logger.Debugf("Error trying to Put value %v for field : %s", output, field) errs = append(errs, err.Error()) continue } diff --git a/libbeat/processors/actions/decode_json_fields_test.go b/libbeat/processors/actions/decode_json_fields_test.go index 3e801d69e5e4..328cadab9f30 100644 --- a/libbeat/processors/actions/decode_json_fields_test.go +++ b/libbeat/processors/actions/decode_json_fields_test.go @@ -96,7 +96,7 @@ func TestInvalidJSONMultiple(t *testing.T) { } func TestDocumentID(t *testing.T) { - logp.TestingSetup() + log := logp.NewLogger("decode_json_fields_test") input := common.MapStr{ "msg": `{"log": "message", "myid": "myDocumentID"}`, @@ -109,7 +109,7 @@ func TestDocumentID(t *testing.T) { p, err := NewDecodeJSONFields(config) if err != nil { - logp.Err("Error initializing decode_json_fields") + log.Error("Error initializing decode_json_fields") t.Fatal(err) } @@ -401,11 +401,11 @@ func TestAddErrKeyOption(t *testing.T) { } func getActualValue(t *testing.T, config *common.Config, input common.MapStr) common.MapStr { - logp.TestingSetup() + log := logp.NewLogger("decode_json_fields_test") p, err := NewDecodeJSONFields(config) if err != nil { - logp.Err("Error initializing decode_json_fields") + log.Error("Error initializing decode_json_fields") t.Fatal(err) } diff --git a/libbeat/processors/actions/rename.go b/libbeat/processors/actions/rename.go index 89e92d2a3e6b..626d35219e8a 100644 --- a/libbeat/processors/actions/rename.go +++ b/libbeat/processors/actions/rename.go @@ -32,6 +32,7 @@ import ( type renameFields struct { config renameFieldsConfig + logger *logp.Logger } type renameFieldsConfig struct { @@ -66,6 +67,7 @@ func NewRenameFields(c *common.Config) (processors.Processor, error) { f := &renameFields{ config: config, + logger: logp.NewLogger("rename"), } return f, nil } @@ -81,7 +83,7 @@ func (f *renameFields) Run(event *beat.Event) (*beat.Event, error) { err := f.renameField(field.From, field.To, event.Fields) if err != nil { errMsg := fmt.Errorf("Failed to rename fields in processor: %s", err) - logp.Debug("rename", errMsg.Error()) + f.logger.Debug(errMsg.Error()) if f.config.FailOnError { event.Fields = backup event.PutValue("error.message", errMsg.Error()) diff --git a/libbeat/processors/actions/rename_test.go b/libbeat/processors/actions/rename_test.go index 125986e8a15e..646d83407e56 100644 --- a/libbeat/processors/actions/rename_test.go +++ b/libbeat/processors/actions/rename_test.go @@ -21,6 +21,8 @@ import ( "reflect" "testing" + "github.com/elastic/beats/libbeat/logp" + "github.com/stretchr/testify/assert" "github.com/elastic/beats/libbeat/beat" @@ -28,6 +30,7 @@ import ( ) func TestRenameRun(t *testing.T) { + log := logp.NewLogger("rename_test") var tests = []struct { description string Fields []fromTo @@ -234,6 +237,7 @@ func TestRenameRun(t *testing.T) { IgnoreMissing: test.IgnoreMissing, FailOnError: test.FailOnError, }, + logger: log, } event := &beat.Event{ Fields: test.Input, diff --git a/libbeat/processors/actions/truncate_fields.go b/libbeat/processors/actions/truncate_fields.go index 1c1256f69bcc..00289b84bcc9 100644 --- a/libbeat/processors/actions/truncate_fields.go +++ b/libbeat/processors/actions/truncate_fields.go @@ -44,6 +44,7 @@ type truncateFieldsConfig struct { type truncateFields struct { config truncateFieldsConfig truncate truncater + logger *logp.Logger } type truncater func(*truncateFields, []byte) ([]byte, bool, error) @@ -76,6 +77,7 @@ func NewTruncateFields(c *common.Config) (processors.Processor, error) { return &truncateFields{ config: config, truncate: truncateFunc, + logger: logp.NewLogger("truncate_fields"), }, nil } @@ -88,7 +90,7 @@ func (f *truncateFields) Run(event *beat.Event) (*beat.Event, error) { for _, field := range f.config.Fields { event, err := f.truncateSingleField(field, event) if err != nil { - logp.Debug("truncate_fields", "Failed to truncate fields: %s", err) + f.logger.Debugf("Failed to truncate fields: %s", err) if f.config.FailOnError { event.Fields = backup return event, err diff --git a/libbeat/processors/actions/truncate_fields_test.go b/libbeat/processors/actions/truncate_fields_test.go index 45d8fb642726..b6e041ca8ee8 100644 --- a/libbeat/processors/actions/truncate_fields_test.go +++ b/libbeat/processors/actions/truncate_fields_test.go @@ -20,6 +20,8 @@ package actions import ( "testing" + "github.com/elastic/beats/libbeat/logp" + "github.com/stretchr/testify/assert" "github.com/elastic/beats/libbeat/beat" @@ -27,6 +29,7 @@ import ( ) func TestTruncateFields(t *testing.T) { + log := logp.NewLogger("truncate_fields_test") var tests = map[string]struct { MaxBytes int MaxChars int @@ -158,6 +161,7 @@ func TestTruncateFields(t *testing.T) { FailOnError: true, }, truncate: test.TruncateFunc, + logger: log, } event := &beat.Event{ diff --git a/libbeat/processors/add_cloud_metadata/add_cloud_metadata.go b/libbeat/processors/add_cloud_metadata/add_cloud_metadata.go index d4a8ea22baee..22146e5ebe67 100644 --- a/libbeat/processors/add_cloud_metadata/add_cloud_metadata.go +++ b/libbeat/processors/add_cloud_metadata/add_cloud_metadata.go @@ -37,8 +37,6 @@ const ( metadataHost = "169.254.169.254" ) -var debugf = logp.MakeDebug("filters") - // init registers the add_cloud_metadata processor. func init() { processors.RegisterPlugin("add_cloud_metadata", New) @@ -49,6 +47,7 @@ type addCloudMetadata struct { initOnce sync.Once initData *initData metadata common.MapStr + logger *logp.Logger } type initData struct { @@ -71,6 +70,7 @@ func New(c *common.Config) (processors.Processor, error) { } p := &addCloudMetadata{ initData: &initData{fetchers, config.Timeout, config.Overwrite}, + logger: logp.NewLogger("add_cloud_metadata"), } go p.init() @@ -84,13 +84,13 @@ func (r result) String() string { func (p *addCloudMetadata) init() { p.initOnce.Do(func() { - result := fetchMetadata(p.initData.fetchers, p.initData.timeout) + result := p.fetchMetadata() if result == nil { - logp.Info("add_cloud_metadata: hosting provider type not detected.") + p.logger.Info("add_cloud_metadata: hosting provider type not detected.") return } p.metadata = result.metadata - logp.Info("add_cloud_metadata: hosting provider type detected as %v, metadata=%v", + p.logger.Infof("add_cloud_metadata: hosting provider type detected as %v, metadata=%v", result.provider, result.metadata.String()) }) } diff --git a/libbeat/processors/add_cloud_metadata/providers.go b/libbeat/processors/add_cloud_metadata/providers.go index 301e7d4731f6..78ca9220e299 100644 --- a/libbeat/processors/add_cloud_metadata/providers.go +++ b/libbeat/processors/add_cloud_metadata/providers.go @@ -122,31 +122,31 @@ func setupFetchers(providers map[string]provider, c *common.Config) ([]metadataF // hosting providers supported by this processor. It wait for the results to // be returned or for a timeout to occur then returns the first result that // completed in time. -func fetchMetadata(metadataFetchers []metadataFetcher, timeout time.Duration) *result { - debugf("add_cloud_metadata: starting to fetch metadata, timeout=%v", timeout) +func (p *addCloudMetadata) fetchMetadata() *result { + p.logger.Debugf("add_cloud_metadata: starting to fetch metadata, timeout=%v", p.initData.timeout) start := time.Now() defer func() { - debugf("add_cloud_metadata: fetchMetadata ran for %v", time.Since(start)) + p.logger.Debugf("add_cloud_metadata: fetchMetadata ran for %v", time.Since(start)) }() // Create HTTP client with our timeouts and keep-alive disabled. client := http.Client{ - Timeout: timeout, + Timeout: p.initData.timeout, Transport: &http.Transport{ DisableKeepAlives: true, DialContext: (&net.Dialer{ - Timeout: timeout, + Timeout: p.initData.timeout, KeepAlive: 0, }).DialContext, }, } // Create context to enable explicit cancellation of the http requests. - ctx, cancel := context.WithTimeout(context.TODO(), timeout) + ctx, cancel := context.WithTimeout(context.TODO(), p.initData.timeout) defer cancel() results := make(chan result) - for _, fetcher := range metadataFetchers { + for _, fetcher := range p.initData.fetchers { fetcher := fetcher go func() { select { @@ -156,17 +156,17 @@ func fetchMetadata(metadataFetchers []metadataFetcher, timeout time.Duration) *r }() } - for i := 0; i < len(metadataFetchers); i++ { + for i := 0; i < len(p.initData.fetchers); i++ { select { case result := <-results: - debugf("add_cloud_metadata: received disposition for %v after %v. %v", + p.logger.Debugf("add_cloud_metadata: received disposition for %v after %v. %v", result.provider, time.Since(start), result) // Bail out on first success. if result.err == nil && result.metadata != nil { return &result } case <-ctx.Done(): - debugf("add_cloud_metadata: timed-out waiting for all responses") + p.logger.Debugf("add_cloud_metadata: timed-out waiting for all responses") return nil } } diff --git a/libbeat/processors/add_host_metadata/add_host_metadata.go b/libbeat/processors/add_host_metadata/add_host_metadata.go index 6cfc938e5f0c..f9dd09db0d6e 100644 --- a/libbeat/processors/add_host_metadata/add_host_metadata.go +++ b/libbeat/processors/add_host_metadata/add_host_metadata.go @@ -47,6 +47,7 @@ type addHostMetadata struct { data common.MapStrPointer geoData common.MapStr config Config + logger *logp.Logger } const ( @@ -63,6 +64,7 @@ func New(cfg *common.Config) (processors.Processor, error) { p := &addHostMetadata{ config: config, data: common.NewMapStrPointer(nil), + logger: logp.NewLogger("add_host_metadata"), } p.loadData() @@ -122,7 +124,7 @@ func (p *addHostMetadata) loadData() error { // IP-address and MAC-address var ipList, hwList, err = util.GetNetInfo() if err != nil { - logp.Info("Error when getting network information %v", err) + p.logger.Infof("Error when getting network information %v", err) } if len(ipList) > 0 { diff --git a/libbeat/processors/add_locale/add_locale_test.go b/libbeat/processors/add_locale/add_locale_test.go index e1b11610984e..f75467b19eb7 100644 --- a/libbeat/processors/add_locale/add_locale_test.go +++ b/libbeat/processors/add_locale/add_locale_test.go @@ -85,11 +85,10 @@ func TestTimezoneFormat(t *testing.T) { } func getActualValue(t *testing.T, config *common.Config, input common.MapStr) common.MapStr { - logp.TestingSetup() - + log := logp.NewLogger("add_locale_test") p, err := New(config) if err != nil { - logp.Err("Error initializing add_locale") + log.Error("Error initializing add_locale") t.Fatal(err) } diff --git a/libbeat/processors/add_observer_metadata/add_observer_metadata.go b/libbeat/processors/add_observer_metadata/add_observer_metadata.go index b1740df00670..464c051fb1ad 100644 --- a/libbeat/processors/add_observer_metadata/add_observer_metadata.go +++ b/libbeat/processors/add_observer_metadata/add_observer_metadata.go @@ -46,6 +46,7 @@ type observerMetadata struct { data common.MapStrPointer geoData common.MapStr config Config + logger *logp.Logger } const ( @@ -62,6 +63,7 @@ func New(cfg *common.Config) (processors.Processor, error) { p := &observerMetadata{ config: config, data: common.NewMapStrPointer(nil), + logger: logp.NewLogger("add_observer_metadata"), } p.loadData() @@ -135,7 +137,7 @@ func (p *observerMetadata) loadData() error { // IP-address and MAC-address var ipList, hwList, err = util.GetNetInfo() if err != nil { - logp.Info("Error when getting network information %v", err) + p.logger.Infof("Error when getting network information %v", err) } if len(ipList) > 0 {