From a36faf3d1cb46fe69ee571b3e9fa9becfda21921 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Fri, 6 Mar 2020 11:48:57 -0700 Subject: [PATCH] Remove global logger from libbeat publisher and reader --- libbeat/publisher/pipeline/output.go | 13 +++++++------ libbeat/publisher/pipeline/stress/gen.go | 12 +++++++----- libbeat/publisher/pipeline/stress/run.go | 6 +++--- libbeat/publisher/processing/processors.go | 3 ++- libbeat/reader/multiline/multiline.go | 5 +++-- libbeat/reader/readfile/line.go | 9 +++++---- libbeat/reader/readjson/docker_json.go | 6 ++++-- libbeat/reader/readjson/json.go | 3 ++- 8 files changed, 33 insertions(+), 24 deletions(-) diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index 405fc7cc432b..435838caa12d 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -40,6 +40,7 @@ type netClientWorker struct { batchSize int batchSizer func() int + logger *logp.Logger } func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Client) outputWorker { @@ -85,24 +86,24 @@ func (w *netClientWorker) run() { batch.Cancelled() if w.closed.Load() { - logp.Info("Closed connection to %v", w.client) + w.logger.Infof("Closed connection to %v", w.client) return } if reconnectAttempts > 0 { - logp.Info("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts) + w.logger.Infof("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts) } else { - logp.Info("Connecting to %v", w.client) + w.logger.Infof("Connecting to %v", w.client) } err := w.client.Connect() if err != nil { - logp.Err("Failed to connect to %v: %v", w.client, err) + w.logger.Errorf("Failed to connect to %v: %v", w.client, err) reconnectAttempts++ continue } - logp.Info("Connection to %v established", w.client) + w.logger.Infof("Connection to %v established", w.client) reconnectAttempts = 0 break } @@ -118,7 +119,7 @@ func (w *netClientWorker) run() { err := w.client.Publish(batch) if err != nil { - logp.Err("Failed to publish events: %v", err) + w.logger.Errorf("Failed to publish events: %v", err) // on error return to connect loop break } diff --git a/libbeat/publisher/pipeline/stress/gen.go b/libbeat/publisher/pipeline/stress/gen.go index e5a18ceaef62..149278304eac 100644 --- a/libbeat/publisher/pipeline/stress/gen.go +++ b/libbeat/publisher/pipeline/stress/gen.go @@ -24,10 +24,11 @@ import ( "sync" "time" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/atomic" - "github.com/elastic/beats/v7/libbeat/logp" ) type generateConfig struct { @@ -65,9 +66,10 @@ func generate( WaitClose: config.WaitClose, } + logger := logp.NewLogger("publisher_pipeline_stress_generate") if config.ACK { settings.ACKCount = func(n int) { - logp.Info("Pipeline client (%v) ACKS; %v", id, n) + logger.Infof("Pipeline client (%v) ACKS; %v", id, n) } } @@ -89,7 +91,7 @@ func generate( panic(err) } - defer logp.Info("client (%v) closed: %v", id, time.Now()) + defer logger.Infof("client (%v) closed: %v", id, time.Now()) done := make(chan struct{}) defer close(done) @@ -136,8 +138,8 @@ func generate( }) } - logp.Info("start (%v) generator: %v", id, time.Now()) - defer logp.Info("stop (%v) generator: %v", id, time.Now()) + logger.Infof("start (%v) generator: %v", id, time.Now()) + defer logger.Infof("stop (%v) generator: %v", id, time.Now()) for cs.Active() { event := beat.Event{ diff --git a/libbeat/publisher/pipeline/stress/run.go b/libbeat/publisher/pipeline/stress/run.go index 8a46489ae6cf..80e61146cc34 100644 --- a/libbeat/publisher/pipeline/stress/run.go +++ b/libbeat/publisher/pipeline/stress/run.go @@ -84,9 +84,9 @@ func RunTests( return fmt.Errorf("loading pipeline failed: %+v", err) } defer func() { - logp.Info("Stop pipeline") + log.Info("Stop pipeline") pipeline.Close() - logp.Info("pipeline closed") + log.Info("pipeline closed") }() cs := newCloseSignaler() @@ -100,7 +100,7 @@ func RunTests( withWG(&genWG, func() { err := generate(cs, pipeline, config.Generate, i, errors) if err != nil { - logp.Err("Generator failed with: %v", err) + log.Errorf("Generator failed with: %v", err) } }) } diff --git a/libbeat/publisher/processing/processors.go b/libbeat/publisher/processing/processors.go index 0125008c39fc..e994eef48cc6 100644 --- a/libbeat/publisher/processing/processors.go +++ b/libbeat/publisher/processing/processors.go @@ -41,6 +41,7 @@ type processorFn struct { } func newGeneralizeProcessor(keepNull bool) *processorFn { + logger := logp.NewLogger("publisher_processing") return newProcessor("generalizeEvent", func(event *beat.Event) (*beat.Event, error) { // Filter out empty events. Empty events are still reported by ACK callbacks. if len(event.Fields) == 0 { @@ -50,7 +51,7 @@ func newGeneralizeProcessor(keepNull bool) *processorFn { g := common.NewGenericEventConverter(keepNull) fields := g.Convert(event.Fields) if fields == nil { - logp.Err("fail to convert to generic event") + logger.Error("fail to convert to generic event") return nil, nil } diff --git a/libbeat/reader/multiline/multiline.go b/libbeat/reader/multiline/multiline.go index a6054c765502..ec20362fc170 100644 --- a/libbeat/reader/multiline/multiline.go +++ b/libbeat/reader/multiline/multiline.go @@ -52,6 +52,7 @@ type Reader struct { err error // last seen error state func(*Reader) (reader.Message, error) message reader.Message + logger *logp.Logger } const ( @@ -143,7 +144,7 @@ func (mlr *Reader) readFirst() (reader.Message, error) { continue } - logp.Debug("multiline", "Multiline event flushed because timeout reached.") + mlr.logger.Debug("Multiline event flushed because timeout reached.") // pass error to caller (next layer) for handling return message, err @@ -172,7 +173,7 @@ func (mlr *Reader) readNext() (reader.Message, error) { continue } - logp.Debug("multiline", "Multiline event flushed because timeout reached.") + mlr.logger.Debug("Multiline event flushed because timeout reached.") // return collected multiline event and // empty buffer for new multiline event diff --git a/libbeat/reader/readfile/line.go b/libbeat/reader/readfile/line.go index 21d902c6eb84..1c801ae672f5 100644 --- a/libbeat/reader/readfile/line.go +++ b/libbeat/reader/readfile/line.go @@ -41,6 +41,7 @@ type LineReader struct { inOffset int // input buffer read offset byteCount int // number of bytes decoded from input buffer into output buffer decoder transform.Transformer + logger *logp.Logger } // New creates a new reader object @@ -86,15 +87,15 @@ func (r *LineReader) Next() ([]byte, int, error) { // This can happen if something goes wrong during decoding if len(buf) == 0 { - logp.Err("Empty buffer returned by advance") + r.logger.Error("Empty buffer returned by advance") continue } if bytes.HasSuffix(buf, r.decodedNl) { break } else { - logp.Debug("line", "Line ending char found which wasn't one: %c", buf[len(buf)-1]) - logp.Debug("line", "In %s", string(buf)) + r.logger.Debugf("Line ending char found which wasn't one: %c", buf[len(buf)-1]) + r.logger.Debugf("In %s", string(buf)) } } @@ -151,7 +152,7 @@ func (r *LineReader) advance() error { // -> decode input sequence into outBuffer sz, err := r.decode(idx + len(r.nl)) if err != nil { - logp.Err("Error decoding line: %s", err) + r.logger.Errorf("Error decoding line: %s", err) // In case of error increase size by unencoded length sz = idx + len(r.nl) } diff --git a/libbeat/reader/readjson/docker_json.go b/libbeat/reader/readjson/docker_json.go index 95c98bddbb88..6d587a55aa86 100644 --- a/libbeat/reader/readjson/docker_json.go +++ b/libbeat/reader/readjson/docker_json.go @@ -46,6 +46,8 @@ type DockerJSONReader struct { parseLine func(message *reader.Message, msg *logLine) error stripNewLine func(msg *reader.Message) + + logger *logp.Logger } type logLine struct { @@ -198,7 +200,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) { var logLine logLine err = p.parseLine(&message, &logLine) if err != nil { - logp.Err("Parse line error: %v", err) + p.logger.Errorf("Parse line error: %v", err) return message, reader.ErrLineUnparsable } @@ -215,7 +217,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) { } err = p.parseLine(&next, &logLine) if err != nil { - logp.Err("Parse line error: %v", err) + p.logger.Errorf("Parse line error: %v", err) return message, reader.ErrLineUnparsable } message.Content = append(message.Content, next.Content...) diff --git a/libbeat/reader/readjson/json.go b/libbeat/reader/readjson/json.go index 3567c372ed81..a404d611056f 100644 --- a/libbeat/reader/readjson/json.go +++ b/libbeat/reader/readjson/json.go @@ -34,6 +34,7 @@ import ( type JSONReader struct { reader reader.Reader cfg *Config + logger *logp.Logger } // NewJSONReader creates a new reader that can decode JSON. @@ -49,7 +50,7 @@ func (r *JSONReader) decode(text []byte) ([]byte, common.MapStr) { err := unmarshal(text, &jsonFields) if err != nil || jsonFields == nil { if !r.cfg.IgnoreDecodingError { - logp.Err("Error decoding JSON: %v", err) + r.logger.Errorf("Error decoding JSON: %v", err) } if r.cfg.AddErrorKey { jsonFields = common.MapStr{"error": createJSONError(fmt.Sprintf("Error decoding JSON: %v", err))}