Skip to content

Commit

Permalink
Reduce global logging in some docker/k8s + AD related packages (elast…
Browse files Browse the repository at this point in the history
…ic#16003)

* Reduce global logging in some docker/k8s + AD related packages

Reduce/Remove usage of global logging functions in docker and kubernetes related processors and Autodiscovery providers.

The change adds the structured logging field `libbeat.processor: <name>` to the `add_kubernetes_metadata` and `add_docker_metadata` processors. Structured logging is transitive. Error/debug logs produced by the `libbeat/common/docker`, and `libbeat/common/kubernetes` packages will also mention the processor name. 

(cherry picked from commit bc71069)
  • Loading branch information
Steffen Siering committed Feb 4, 2020
1 parent 32f1a9e commit d8942d8
Show file tree
Hide file tree
Showing 19 changed files with 139 additions and 94 deletions.
6 changes: 3 additions & 3 deletions libbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ type Autodiscover struct {

// NewAutodiscover instantiates and returns a new Autodiscover manager
func NewAutodiscover(name string, pipeline beat.Pipeline, adapter Adapter, config *Config) (*Autodiscover, error) {
// Init Event bus
bus := bus.New(name)

logger := logp.NewLogger("autodiscover")

// Init Event bus
bus := bus.New(logger, name)

// Init providers
var providers []Provider
for _, providerCfg := range config.Providers {
Expand Down
6 changes: 4 additions & 2 deletions libbeat/autodiscover/providers/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type Provider struct {

// AutodiscoverBuilder builds and returns an autodiscover provider
func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodiscover.Provider, error) {
logger := logp.NewLogger("docker")

cfgwarn.Beta("The docker autodiscover is beta")

errWrap := func(err error) error {
Expand All @@ -72,7 +74,7 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodis
return nil, errWrap(err)
}

watcher, err := docker.NewWatcher(config.Host, config.TLS, false)
watcher, err := docker.NewWatcher(logger, config.Host, config.TLS, false)
if err != nil {
return nil, errWrap(err)
}
Expand Down Expand Up @@ -115,7 +117,7 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodis
stopListener: stop,
stoppers: make(map[string]*time.Timer),
stopTrigger: make(chan *dockerContainerMetadata),
logger: logp.NewLogger("docker"),
logger: logger,
}, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/elastic/beats/libbeat/autodiscover/template"
"github.com/elastic/beats/libbeat/logp"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"
Expand All @@ -35,6 +36,8 @@ import (

// Test docker start emits an autodiscover event
func TestDockerStart(t *testing.T) {
log := logp.NewLogger("docker")

d, err := dk.NewClient()
if err != nil {
t.Fatal(err)
Expand All @@ -44,7 +47,7 @@ func TestDockerStart(t *testing.T) {
if err != nil {
t.Fatal(err)
}
bus := bus.New("test")
bus := bus.New(log, "test")
config := defaultConfig()
config.CleanupTimeout = 0

Expand Down
29 changes: 19 additions & 10 deletions libbeat/autodiscover/providers/jolokia/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ type Instance struct {
type Discovery struct {
sync.Mutex

log *logp.Logger

ProviderUUID uuid.UUID

Interfaces []InterfaceConfig
Expand All @@ -121,6 +123,9 @@ func (d *Discovery) Start() {
d.instances = make(map[string]*Instance)
d.events = make(chan Event)
d.stop = make(chan struct{})
if d.log == nil {
d.log = logp.NewLogger("jolokia")
}
go d.run()
}

Expand Down Expand Up @@ -198,17 +203,19 @@ var discoveryAddress = net.UDPAddr{IP: net.IPv4(239, 192, 48, 84), Port: 24884}
var queryMessage = []byte(`{"type":"query"}`)

func (d *Discovery) sendProbe(config InterfaceConfig) {
log := d.log

interfaces, err := d.interfaces(config.Name)
if err != nil {
logp.Err("failed to get interfaces: %+v", err)
log.Errorf("failed to get interfaces: %+v", err)
return
}

var wg sync.WaitGroup
for _, i := range interfaces {
ip, err := getIPv4Addr(i)
if err != nil {
logp.Err(err.Error())
log.Error(err.Error())
continue
}
if ip == nil {
Expand All @@ -221,7 +228,7 @@ func (d *Discovery) sendProbe(config InterfaceConfig) {

conn, err := net.ListenPacket("udp4", net.JoinHostPort(ip.String(), "0"))
if err != nil {
logp.Err(err.Error())
log.Error(err.Error())
return
}
defer conn.Close()
Expand All @@ -234,7 +241,7 @@ func (d *Discovery) sendProbe(config InterfaceConfig) {
conn.SetDeadline(time.Now().Add(timeout))

if _, err := conn.WriteTo(queryMessage, &discoveryAddress); err != nil {
logp.Err(err.Error())
log.Error(err.Error())
return
}

Expand All @@ -243,19 +250,19 @@ func (d *Discovery) sendProbe(config InterfaceConfig) {
n, _, err := conn.ReadFrom(b)
if err != nil {
if netErr, ok := err.(net.Error); !ok || !netErr.Timeout() {
logp.Err(err.Error())
log.Error(err.Error())
}
return
}
m := make(map[string]interface{})
err = json.Unmarshal(b[:n], &m)
if err != nil {
logp.Err(err.Error())
log.Error(err.Error())
continue
}
message, err := messageSchema.Apply(m, s.FailOnRequired)
if err != nil {
logp.Err(err.Error())
log.Error(err.Error())
continue
}
d.update(config, message)
Expand All @@ -266,22 +273,24 @@ func (d *Discovery) sendProbe(config InterfaceConfig) {
}

func (d *Discovery) update(config InterfaceConfig, message common.MapStr) {
log := d.log

v, err := message.GetValue("agent.id")
if err != nil {
logp.Err("failed to update agent without id: " + err.Error())
log.Error("failed to update agent without id: ", err)
return
}
agentID, ok := v.(string)
if len(agentID) == 0 || !ok {
logp.Err("incorrect or empty agent id in Jolokia Discovery response")
log.Error("incorrect or empty agent id in Jolokia Discovery response")
return
}

url, err := message.GetValue("url")
if err != nil || url == nil {
// This can happen if Jolokia agent is initializing and it still
// doesn't know its URL
logp.Info("agent %s without url, ignoring by now", agentID)
log.Infof("agent %s without url, ignoring by now", agentID)
return
}

Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/providers/kubernetes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu
// Ensure that node is set correctly whenever the scope is set to "node". Make sure that node is empty
// when cluster scope is enforced.
if config.Scope == "node" {
config.Node = kubernetes.DiscoverKubernetesNode(config.Node, kubernetes.IsInCluster(config.KubeConfig), client)
config.Node = kubernetes.DiscoverKubernetesNode(logger, config.Node, kubernetes.IsInCluster(config.KubeConfig), client)
} else {
config.Node = ""
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/providers/kubernetes/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func TestEmitEvent_Node(t *testing.T) {
metaGen := metadata.NewNodeMetadataGenerator(common.NewConfig(), nil)
p := &Provider{
config: defaultConfig(),
bus: bus.New("test"),
bus: bus.New(logp.NewLogger("bus"), "test"),
templates: mapper,
logger: logp.NewLogger("kubernetes"),
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub
// Ensure that node is set correctly whenever the scope is set to "node". Make sure that node is empty
// when cluster scope is enforced.
if config.Scope == "node" {
config.Node = kubernetes.DiscoverKubernetesNode(config.Node, kubernetes.IsInCluster(config.KubeConfig), client)
config.Node = kubernetes.DiscoverKubernetesNode(logger, config.Node, kubernetes.IsInCluster(config.KubeConfig), client)
} else {
config.Node = ""
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/providers/kubernetes/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ func TestEmitEvent(t *testing.T) {
metaGen := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil)
p := &Provider{
config: defaultConfig(),
bus: bus.New("test"),
bus: bus.New(logp.NewLogger("bus"), "test"),
templates: mapper,
logger: logp.NewLogger("kubernetes"),
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/providers/kubernetes/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func TestEmitEvent_Service(t *testing.T) {

p := &Provider{
config: defaultConfig(),
bus: bus.New("test"),
bus: bus.New(logp.NewLogger("bus"), "test"),
templates: mapper,
logger: logp.NewLogger("kubernetes"),
}
Expand Down
17 changes: 11 additions & 6 deletions libbeat/common/bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Listener interface {

type bus struct {
sync.RWMutex
name string
log *logp.Logger
listeners []*listener
store chan Event
}
Expand All @@ -59,28 +59,33 @@ type listener struct {
}

// New initializes a new bus with the given name and returns it
func New(name string) Bus {
func New(log *logp.Logger, name string) Bus {
return &bus{
name: name,
log: createLogger(log, name),
listeners: make([]*listener, 0),
}
}

// NewBusWithStore allows to create a buffered bus when producers send data without
// listeners being subscribed to them. size determines the size of the buffer.
func NewBusWithStore(name string, size int) Bus {
func NewBusWithStore(log *logp.Logger, name string, size int) Bus {
return &bus{
name: name,
log: createLogger(log, name),
listeners: make([]*listener, 0),
store: make(chan Event, size),
}
}

func createLogger(log *logp.Logger, name string) *logp.Logger {
selector := "bus-" + name
return log.Named(selector).With("libbeat.bus", name)
}

func (b *bus) Publish(e Event) {
b.RLock()
defer b.RUnlock()

logp.Debug("bus", "%s: %+v", b.name, e)
b.log.Debugf("%+v", e)
if len(b.listeners) == 0 && b.store != nil {
b.store <- e
return
Expand Down
14 changes: 8 additions & 6 deletions libbeat/common/bus/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/logp"
)

func TestEmit(t *testing.T) {
bus := New("name")
bus := New(logp.L(), "name")
listener := bus.Subscribe()

bus.Publish(Event{
Expand All @@ -36,7 +38,7 @@ func TestEmit(t *testing.T) {
}

func TestEmitOrder(t *testing.T) {
bus := New("name")
bus := New(logp.L(), "name")
listener := bus.Subscribe()
bus.Publish(Event{"first": "event"})
bus.Publish(Event{"second": "event"})
Expand All @@ -48,7 +50,7 @@ func TestEmitOrder(t *testing.T) {
}

func TestSubscribeFilter(t *testing.T) {
bus := New("name")
bus := New(logp.L(), "name")
listener := bus.Subscribe("second")

bus.Publish(Event{"first": "event"})
Expand All @@ -59,7 +61,7 @@ func TestSubscribeFilter(t *testing.T) {
}

func TestMultipleListeners(t *testing.T) {
bus := New("name")
bus := New(logp.L(), "name")
listener1 := bus.Subscribe("a")
listener2 := bus.Subscribe("a", "b")

Expand All @@ -82,7 +84,7 @@ func TestMultipleListeners(t *testing.T) {
}

func TestListenerClose(t *testing.T) {
bus := New("name")
bus := New(logp.L(), "name")
listener := bus.Subscribe()

bus.Publish(Event{"first": "event"})
Expand All @@ -103,7 +105,7 @@ func TestListenerClose(t *testing.T) {
}

func TestUnsubscribedBus(t *testing.T) {
bus := NewBusWithStore("name", 2)
bus := NewBusWithStore(logp.L(), "name", 2)
bus.Publish(Event{"first": "event"})

listener := bus.Subscribe()
Expand Down
Loading

0 comments on commit d8942d8

Please sign in to comment.