Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #16654 to 7.x: [Libbeat] Use *logp.Logger in libbeat processors #16719

Merged
merged 1 commit into from
Mar 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion libbeat/processors/actions/copy_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

type copyFields struct {
config copyFieldsConfig
logger *logp.Logger
}

type copyFieldsConfig struct {
Expand Down Expand Up @@ -62,6 +63,7 @@ func NewCopyFields(c *common.Config) (processors.Processor, error) {

f := &copyFields{
config: config,
logger: logp.NewLogger("copy_fields"),
}
return f, nil
}
Expand All @@ -76,7 +78,7 @@ func (f *copyFields) Run(event *beat.Event) (*beat.Event, error) {
err := f.copyField(field.From, field.To, event.Fields)
if err != nil {
errMsg := fmt.Errorf("Failed to copy fields in copy_fields processor: %s", err)
logp.Debug("copy_fields", errMsg.Error())
f.logger.Debug(errMsg.Error())
if f.config.FailOnError {
event.Fields = backup
event.PutValue("error.message", errMsg.Error())
Expand Down
5 changes: 4 additions & 1 deletion libbeat/processors/actions/copy_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ package actions
import (
"testing"

"github.com/elastic/beats/libbeat/logp"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

func TestCopyFields(t *testing.T) {

log := logp.NewLogger("copy_fields_test")
var tests = map[string]struct {
FromTo fromTo
Input common.MapStr
Expand Down Expand Up @@ -130,6 +132,7 @@ func TestCopyFields(t *testing.T) {
test.FromTo,
},
},
log,
}

event := &beat.Event{
Expand Down
13 changes: 7 additions & 6 deletions libbeat/processors/actions/decode_json_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type decodeJSONFields struct {
processArray bool
documentID string
target *string
logger *logp.Logger
}

type config struct {
Expand All @@ -62,8 +63,6 @@ var (
errProcessingSkipped = errors.New("processing skipped")
)

var debug = logp.MakeDebug("filters")

func init() {
processors.RegisterPlugin("decode_json_fields",
checks.ConfigChecked(NewDecodeJSONFields,
Expand All @@ -76,10 +75,11 @@ func init() {
// NewDecodeJSONFields construct a new decode_json_fields processor.
func NewDecodeJSONFields(c *common.Config) (processors.Processor, error) {
config := defaultConfig
logger := logp.NewLogger("truncate_fields")

err := c.Unpack(&config)
if err != nil {
logp.Warn("Error unpacking config for decode_json_fields")
logger.Warn("Error unpacking config for decode_json_fields")
return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %s", err)
}

Expand All @@ -91,6 +91,7 @@ func NewDecodeJSONFields(c *common.Config) (processors.Processor, error) {
processArray: config.ProcessArray,
documentID: config.DocumentID,
target: config.Target,
logger: logger,
}
return f, nil
}
Expand All @@ -101,7 +102,7 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) {
for _, field := range f.fields {
data, err := event.GetValue(field)
if err != nil && errors.Cause(err) != common.ErrKeyNotFound {
debug("Error trying to GetValue for field : %s in event : %v", field, event)
f.logger.Debugf("Error trying to GetValue for field : %s in event : %v", field, event)
errs = append(errs, err.Error())
continue
}
Expand All @@ -115,7 +116,7 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) {
var output interface{}
err = unmarshal(f.maxDepth, text, &output, f.processArray)
if err != nil {
debug("Error trying to unmarshal %s", text)
f.logger.Debugf("Error trying to unmarshal %s", text)
errs = append(errs, err.Error())
continue
}
Expand Down Expand Up @@ -149,7 +150,7 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) {
}

if err != nil {
debug("Error trying to Put value %v for field : %s", output, field)
f.logger.Debugf("Error trying to Put value %v for field : %s", output, field)
errs = append(errs, err.Error())
continue
}
Expand Down
8 changes: 4 additions & 4 deletions libbeat/processors/actions/decode_json_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestInvalidJSONMultiple(t *testing.T) {
}

func TestDocumentID(t *testing.T) {
logp.TestingSetup()
log := logp.NewLogger("decode_json_fields_test")

input := common.MapStr{
"msg": `{"log": "message", "myid": "myDocumentID"}`,
Expand All @@ -109,7 +109,7 @@ func TestDocumentID(t *testing.T) {

p, err := NewDecodeJSONFields(config)
if err != nil {
logp.Err("Error initializing decode_json_fields")
log.Error("Error initializing decode_json_fields")
t.Fatal(err)
}

Expand Down Expand Up @@ -401,11 +401,11 @@ func TestAddErrKeyOption(t *testing.T) {
}

func getActualValue(t *testing.T, config *common.Config, input common.MapStr) common.MapStr {
logp.TestingSetup()
log := logp.NewLogger("decode_json_fields_test")

p, err := NewDecodeJSONFields(config)
if err != nil {
logp.Err("Error initializing decode_json_fields")
log.Error("Error initializing decode_json_fields")
t.Fatal(err)
}

Expand Down
4 changes: 3 additions & 1 deletion libbeat/processors/actions/rename.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

type renameFields struct {
config renameFieldsConfig
logger *logp.Logger
}

type renameFieldsConfig struct {
Expand Down Expand Up @@ -66,6 +67,7 @@ func NewRenameFields(c *common.Config) (processors.Processor, error) {

f := &renameFields{
config: config,
logger: logp.NewLogger("rename"),
}
return f, nil
}
Expand All @@ -81,7 +83,7 @@ func (f *renameFields) Run(event *beat.Event) (*beat.Event, error) {
err := f.renameField(field.From, field.To, event.Fields)
if err != nil {
errMsg := fmt.Errorf("Failed to rename fields in processor: %s", err)
logp.Debug("rename", errMsg.Error())
f.logger.Debug(errMsg.Error())
if f.config.FailOnError {
event.Fields = backup
event.PutValue("error.message", errMsg.Error())
Expand Down
4 changes: 4 additions & 0 deletions libbeat/processors/actions/rename_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ import (
"reflect"
"testing"

"github.com/elastic/beats/libbeat/logp"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

func TestRenameRun(t *testing.T) {
log := logp.NewLogger("rename_test")
var tests = []struct {
description string
Fields []fromTo
Expand Down Expand Up @@ -234,6 +237,7 @@ func TestRenameRun(t *testing.T) {
IgnoreMissing: test.IgnoreMissing,
FailOnError: test.FailOnError,
},
logger: log,
}
event := &beat.Event{
Fields: test.Input,
Expand Down
4 changes: 3 additions & 1 deletion libbeat/processors/actions/truncate_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type truncateFieldsConfig struct {
type truncateFields struct {
config truncateFieldsConfig
truncate truncater
logger *logp.Logger
}

type truncater func(*truncateFields, []byte) ([]byte, bool, error)
Expand Down Expand Up @@ -76,6 +77,7 @@ func NewTruncateFields(c *common.Config) (processors.Processor, error) {
return &truncateFields{
config: config,
truncate: truncateFunc,
logger: logp.NewLogger("truncate_fields"),
}, nil
}

Expand All @@ -88,7 +90,7 @@ func (f *truncateFields) Run(event *beat.Event) (*beat.Event, error) {
for _, field := range f.config.Fields {
event, err := f.truncateSingleField(field, event)
if err != nil {
logp.Debug("truncate_fields", "Failed to truncate fields: %s", err)
f.logger.Debugf("Failed to truncate fields: %s", err)
if f.config.FailOnError {
event.Fields = backup
return event, err
Expand Down
4 changes: 4 additions & 0 deletions libbeat/processors/actions/truncate_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ package actions
import (
"testing"

"github.com/elastic/beats/libbeat/logp"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

func TestTruncateFields(t *testing.T) {
log := logp.NewLogger("truncate_fields_test")
var tests = map[string]struct {
MaxBytes int
MaxChars int
Expand Down Expand Up @@ -158,6 +161,7 @@ func TestTruncateFields(t *testing.T) {
FailOnError: true,
},
truncate: test.TruncateFunc,
logger: log,
}

event := &beat.Event{
Expand Down
10 changes: 5 additions & 5 deletions libbeat/processors/add_cloud_metadata/add_cloud_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ const (
metadataHost = "169.254.169.254"
)

var debugf = logp.MakeDebug("filters")

// init registers the add_cloud_metadata processor.
func init() {
processors.RegisterPlugin("add_cloud_metadata", New)
Expand All @@ -49,6 +47,7 @@ type addCloudMetadata struct {
initOnce sync.Once
initData *initData
metadata common.MapStr
logger *logp.Logger
}

type initData struct {
Expand All @@ -71,6 +70,7 @@ func New(c *common.Config) (processors.Processor, error) {
}
p := &addCloudMetadata{
initData: &initData{fetchers, config.Timeout, config.Overwrite},
logger: logp.NewLogger("add_cloud_metadata"),
}

go p.init()
Expand All @@ -84,13 +84,13 @@ func (r result) String() string {

func (p *addCloudMetadata) init() {
p.initOnce.Do(func() {
result := fetchMetadata(p.initData.fetchers, p.initData.timeout)
result := p.fetchMetadata()
if result == nil {
logp.Info("add_cloud_metadata: hosting provider type not detected.")
p.logger.Info("add_cloud_metadata: hosting provider type not detected.")
return
}
p.metadata = result.metadata
logp.Info("add_cloud_metadata: hosting provider type detected as %v, metadata=%v",
p.logger.Infof("add_cloud_metadata: hosting provider type detected as %v, metadata=%v",
result.provider, result.metadata.String())
})
}
Expand Down
20 changes: 10 additions & 10 deletions libbeat/processors/add_cloud_metadata/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,31 +122,31 @@ func setupFetchers(providers map[string]provider, c *common.Config) ([]metadataF
// hosting providers supported by this processor. It wait for the results to
// be returned or for a timeout to occur then returns the first result that
// completed in time.
func fetchMetadata(metadataFetchers []metadataFetcher, timeout time.Duration) *result {
debugf("add_cloud_metadata: starting to fetch metadata, timeout=%v", timeout)
func (p *addCloudMetadata) fetchMetadata() *result {
p.logger.Debugf("add_cloud_metadata: starting to fetch metadata, timeout=%v", p.initData.timeout)
start := time.Now()
defer func() {
debugf("add_cloud_metadata: fetchMetadata ran for %v", time.Since(start))
p.logger.Debugf("add_cloud_metadata: fetchMetadata ran for %v", time.Since(start))
}()

// Create HTTP client with our timeouts and keep-alive disabled.
client := http.Client{
Timeout: timeout,
Timeout: p.initData.timeout,
Transport: &http.Transport{
DisableKeepAlives: true,
DialContext: (&net.Dialer{
Timeout: timeout,
Timeout: p.initData.timeout,
KeepAlive: 0,
}).DialContext,
},
}

// Create context to enable explicit cancellation of the http requests.
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
ctx, cancel := context.WithTimeout(context.TODO(), p.initData.timeout)
defer cancel()

results := make(chan result)
for _, fetcher := range metadataFetchers {
for _, fetcher := range p.initData.fetchers {
fetcher := fetcher
go func() {
select {
Expand All @@ -156,17 +156,17 @@ func fetchMetadata(metadataFetchers []metadataFetcher, timeout time.Duration) *r
}()
}

for i := 0; i < len(metadataFetchers); i++ {
for i := 0; i < len(p.initData.fetchers); i++ {
select {
case result := <-results:
debugf("add_cloud_metadata: received disposition for %v after %v. %v",
p.logger.Debugf("add_cloud_metadata: received disposition for %v after %v. %v",
result.provider, time.Since(start), result)
// Bail out on first success.
if result.err == nil && result.metadata != nil {
return &result
}
case <-ctx.Done():
debugf("add_cloud_metadata: timed-out waiting for all responses")
p.logger.Debugf("add_cloud_metadata: timed-out waiting for all responses")
return nil
}
}
Expand Down
4 changes: 3 additions & 1 deletion libbeat/processors/add_host_metadata/add_host_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type addHostMetadata struct {
data common.MapStrPointer
geoData common.MapStr
config Config
logger *logp.Logger
}

const (
Expand All @@ -63,6 +64,7 @@ func New(cfg *common.Config) (processors.Processor, error) {
p := &addHostMetadata{
config: config,
data: common.NewMapStrPointer(nil),
logger: logp.NewLogger("add_host_metadata"),
}
p.loadData()

Expand Down Expand Up @@ -122,7 +124,7 @@ func (p *addHostMetadata) loadData() error {
// IP-address and MAC-address
var ipList, hwList, err = util.GetNetInfo()
if err != nil {
logp.Info("Error when getting network information %v", err)
p.logger.Infof("Error when getting network information %v", err)
}

if len(ipList) > 0 {
Expand Down
5 changes: 2 additions & 3 deletions libbeat/processors/add_locale/add_locale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,10 @@ func TestTimezoneFormat(t *testing.T) {
}

func getActualValue(t *testing.T, config *common.Config, input common.MapStr) common.MapStr {
logp.TestingSetup()

log := logp.NewLogger("add_locale_test")
p, err := New(config)
if err != nil {
logp.Err("Error initializing add_locale")
log.Error("Error initializing add_locale")
t.Fatal(err)
}

Expand Down
Loading