Skip to content

Commit

Permalink
Cherry-pick #16886 to 7.x: [Libbeat] Remove global logger from libbea…
Browse files Browse the repository at this point in the history
…t publisher and reader (#16912)

* Remove global logger from libbeat publisher and reader (#16886)


(cherry picked from commit ad2672d)

* backport #16915 in this PR
  • Loading branch information
kaiyan-sheng authored Mar 17, 2020
1 parent 0eeb109 commit 02d3c70
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 26 deletions.
20 changes: 13 additions & 7 deletions libbeat/publisher/pipeline/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,17 @@ type netClientWorker struct {

batchSize int
batchSizer func() int
logger *logp.Logger
}

func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Client) outputWorker {
if nc, ok := client.(outputs.NetworkClient); ok {
c := &netClientWorker{observer: observer, qu: qu, client: nc}
c := &netClientWorker{
observer: observer,
qu: qu,
client: nc,
logger: logp.NewLogger("publisher_pipeline_output"),
}
go c.run()
return c
}
Expand Down Expand Up @@ -85,24 +91,24 @@ func (w *netClientWorker) run() {
batch.Cancelled()

if w.closed.Load() {
logp.Info("Closed connection to %v", w.client)
w.logger.Infof("Closed connection to %v", w.client)
return
}

if reconnectAttempts > 0 {
logp.Info("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts)
w.logger.Infof("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts)
} else {
logp.Info("Connecting to %v", w.client)
w.logger.Infof("Connecting to %v", w.client)
}

err := w.client.Connect()
if err != nil {
logp.Err("Failed to connect to %v: %v", w.client, err)
w.logger.Errorf("Failed to connect to %v: %v", w.client, err)
reconnectAttempts++
continue
}

logp.Info("Connection to %v established", w.client)
w.logger.Infof("Connection to %v established", w.client)
reconnectAttempts = 0
break
}
Expand All @@ -118,7 +124,7 @@ func (w *netClientWorker) run() {

err := w.client.Publish(batch)
if err != nil {
logp.Err("Failed to publish events: %v", err)
w.logger.Errorf("Failed to publish events: %v", err)
// on error return to connect loop
break
}
Expand Down
12 changes: 7 additions & 5 deletions libbeat/publisher/pipeline/stress/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/logp"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/logp"
)

type generateConfig struct {
Expand Down Expand Up @@ -65,9 +66,10 @@ func generate(
WaitClose: config.WaitClose,
}

logger := logp.NewLogger("publisher_pipeline_stress_generate")
if config.ACK {
settings.ACKCount = func(n int) {
logp.Info("Pipeline client (%v) ACKS; %v", id, n)
logger.Infof("Pipeline client (%v) ACKS; %v", id, n)
}
}

Expand All @@ -89,7 +91,7 @@ func generate(
panic(err)
}

defer logp.Info("client (%v) closed: %v", id, time.Now())
defer logger.Infof("client (%v) closed: %v", id, time.Now())

done := make(chan struct{})
defer close(done)
Expand Down Expand Up @@ -136,8 +138,8 @@ func generate(
})
}

logp.Info("start (%v) generator: %v", id, time.Now())
defer logp.Info("stop (%v) generator: %v", id, time.Now())
logger.Infof("start (%v) generator: %v", id, time.Now())
defer logger.Infof("stop (%v) generator: %v", id, time.Now())

for cs.Active() {
event := beat.Event{
Expand Down
6 changes: 3 additions & 3 deletions libbeat/publisher/pipeline/stress/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ func RunTests(
return fmt.Errorf("loading pipeline failed: %+v", err)
}
defer func() {
logp.Info("Stop pipeline")
log.Info("Stop pipeline")
pipeline.Close()
logp.Info("pipeline closed")
log.Info("pipeline closed")
}()

cs := newCloseSignaler()
Expand All @@ -100,7 +100,7 @@ func RunTests(
withWG(&genWG, func() {
err := generate(cs, pipeline, config.Generate, i, errors)
if err != nil {
logp.Err("Generator failed with: %v", err)
log.Errorf("Generator failed with: %v", err)
}
})
}
Expand Down
3 changes: 2 additions & 1 deletion libbeat/publisher/processing/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type processorFn struct {
}

func newGeneralizeProcessor(keepNull bool) *processorFn {
logger := logp.NewLogger("publisher_processing")
return newProcessor("generalizeEvent", func(event *beat.Event) (*beat.Event, error) {
// Filter out empty events. Empty events are still reported by ACK callbacks.
if len(event.Fields) == 0 {
Expand All @@ -50,7 +51,7 @@ func newGeneralizeProcessor(keepNull bool) *processorFn {
g := common.NewGenericEventConverter(keepNull)
fields := g.Convert(event.Fields)
if fields == nil {
logp.Err("fail to convert to generic event")
logger.Error("fail to convert to generic event")
return nil, nil
}

Expand Down
6 changes: 4 additions & 2 deletions libbeat/reader/multiline/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Reader struct {
err error // last seen error
state func(*Reader) (reader.Message, error)
message reader.Message
logger *logp.Logger
}

const (
Expand Down Expand Up @@ -125,6 +126,7 @@ func New(
maxLines: maxLines,
separator: []byte(separator),
message: reader.Message{},
logger: logp.NewLogger("reader_multiline"),
}
return mlr, nil
}
Expand All @@ -143,7 +145,7 @@ func (mlr *Reader) readFirst() (reader.Message, error) {
continue
}

logp.Debug("multiline", "Multiline event flushed because timeout reached.")
mlr.logger.Debug("Multiline event flushed because timeout reached.")

// pass error to caller (next layer) for handling
return message, err
Expand Down Expand Up @@ -172,7 +174,7 @@ func (mlr *Reader) readNext() (reader.Message, error) {
continue
}

logp.Debug("multiline", "Multiline event flushed because timeout reached.")
mlr.logger.Debug("Multiline event flushed because timeout reached.")

// return collected multiline event and
// empty buffer for new multiline event
Expand Down
10 changes: 6 additions & 4 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type LineReader struct {
inOffset int // input buffer read offset
byteCount int // number of bytes decoded from input buffer into output buffer
decoder transform.Transformer
logger *logp.Logger
}

// New creates a new reader object
Expand All @@ -66,6 +67,7 @@ func NewLineReader(input io.Reader, config Config) (*LineReader, error) {
decodedNl: terminator,
inBuffer: streambuf.New(nil),
outBuffer: streambuf.New(nil),
logger: logp.NewLogger("reader_line"),
}, nil
}

Expand All @@ -86,15 +88,15 @@ func (r *LineReader) Next() ([]byte, int, error) {

// This can happen if something goes wrong during decoding
if len(buf) == 0 {
logp.Err("Empty buffer returned by advance")
r.logger.Error("Empty buffer returned by advance")
continue
}

if bytes.HasSuffix(buf, r.decodedNl) {
break
} else {
logp.Debug("line", "Line ending char found which wasn't one: %c", buf[len(buf)-1])
logp.Debug("line", "In %s", string(buf))
r.logger.Debugf("Line ending char found which wasn't one: %c", buf[len(buf)-1])
r.logger.Debugf("In %s", string(buf))
}
}

Expand Down Expand Up @@ -151,7 +153,7 @@ func (r *LineReader) advance() error {
// -> decode input sequence into outBuffer
sz, err := r.decode(idx + len(r.nl))
if err != nil {
logp.Err("Error decoding line: %s", err)
r.logger.Errorf("Error decoding line: %s", err)
// In case of error increase size by unencoded length
sz = idx + len(r.nl)
}
Expand Down
7 changes: 5 additions & 2 deletions libbeat/reader/readjson/docker_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type DockerJSONReader struct {
parseLine func(message *reader.Message, msg *logLine) error

stripNewLine func(msg *reader.Message)

logger *logp.Logger
}

type logLine struct {
Expand All @@ -64,6 +66,7 @@ func New(r reader.Reader, stream string, partial bool, format string, CRIFlags b
partial: partial,
reader: r,
criflags: CRIFlags,
logger: logp.NewLogger("reader_docker_json"),
}

switch strings.ToLower(format) {
Expand Down Expand Up @@ -198,7 +201,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
var logLine logLine
err = p.parseLine(&message, &logLine)
if err != nil {
logp.Err("Parse line error: %v", err)
p.logger.Errorf("Parse line error: %v", err)
return message, reader.ErrLineUnparsable
}

Expand All @@ -215,7 +218,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
}
err = p.parseLine(&next, &logLine)
if err != nil {
logp.Err("Parse line error: %v", err)
p.logger.Errorf("Parse line error: %v", err)
return message, reader.ErrLineUnparsable
}
message.Content = append(message.Content, next.Content...)
Expand Down
9 changes: 7 additions & 2 deletions libbeat/reader/readjson/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@ import (
type JSONReader struct {
reader reader.Reader
cfg *Config
logger *logp.Logger
}

// NewJSONReader creates a new reader that can decode JSON.
func NewJSONReader(r reader.Reader, cfg *Config) *JSONReader {
return &JSONReader{reader: r, cfg: cfg}
return &JSONReader{
reader: r,
cfg: cfg,
logger: logp.NewLogger("reader_json"),
}
}

// decodeJSON unmarshals the text parameter into a MapStr and
Expand All @@ -49,7 +54,7 @@ func (r *JSONReader) decode(text []byte) ([]byte, common.MapStr) {
err := unmarshal(text, &jsonFields)
if err != nil || jsonFields == nil {
if !r.cfg.IgnoreDecodingError {
logp.Err("Error decoding JSON: %v", err)
r.logger.Errorf("Error decoding JSON: %v", err)
}
if r.cfg.AddErrorKey {
jsonFields = common.MapStr{"error": createJSONError(fmt.Sprintf("Error decoding JSON: %v", err))}
Expand Down

0 comments on commit 02d3c70

Please sign in to comment.