Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce global logging in some docker/k8s + AD related packages #16003

Merged
merged 4 commits into from
Feb 4, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions libbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ type Autodiscover struct {

// NewAutodiscover instantiates and returns a new Autodiscover manager
func NewAutodiscover(name string, pipeline beat.Pipeline, adapter Adapter, config *Config) (*Autodiscover, error) {
// Init Event bus
bus := bus.New(name)

logger := logp.NewLogger("autodiscover")

// Init Event bus
bus := bus.New(logger, name)

// Init providers
var providers []Provider
for _, providerCfg := range config.Providers {
Expand Down
6 changes: 4 additions & 2 deletions libbeat/autodiscover/providers/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type Provider struct {

// AutodiscoverBuilder builds and returns an autodiscover provider
func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodiscover.Provider, error) {
logger := logp.NewLogger("docker")

cfgwarn.Beta("The docker autodiscover is beta")

errWrap := func(err error) error {
Expand All @@ -72,7 +74,7 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodis
return nil, errWrap(err)
}

watcher, err := docker.NewWatcher(config.Host, config.TLS, false)
watcher, err := docker.NewWatcher(logger, config.Host, config.TLS, false)
if err != nil {
return nil, errWrap(err)
}
Expand Down Expand Up @@ -115,7 +117,7 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodis
stopListener: stop,
stoppers: make(map[string]*time.Timer),
stopTrigger: make(chan *dockerContainerMetadata),
logger: logp.NewLogger("docker"),
logger: logger,
}, nil
}

Expand Down
29 changes: 19 additions & 10 deletions libbeat/autodiscover/providers/jolokia/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ type Instance struct {
type Discovery struct {
sync.Mutex

Logger *logp.Logger
urso marked this conversation as resolved.
Show resolved Hide resolved

ProviderUUID uuid.UUID

Interfaces []InterfaceConfig
Expand All @@ -121,6 +123,9 @@ func (d *Discovery) Start() {
d.instances = make(map[string]*Instance)
d.events = make(chan Event)
d.stop = make(chan struct{})
if d.Logger == nil {
d.Logger = logp.NewLogger("jolokia")
}
go d.run()
}

Expand Down Expand Up @@ -198,17 +203,19 @@ var discoveryAddress = net.UDPAddr{IP: net.IPv4(239, 192, 48, 84), Port: 24884}
var queryMessage = []byte(`{"type":"query"}`)

func (d *Discovery) sendProbe(config InterfaceConfig) {
log := d.Logger

interfaces, err := d.interfaces(config.Name)
if err != nil {
logp.Err("failed to get interfaces: %+v", err)
log.Errorf("failed to get interfaces: %+v", err)
return
}

var wg sync.WaitGroup
for _, i := range interfaces {
ip, err := getIPv4Addr(i)
if err != nil {
logp.Err(err.Error())
log.Error(err.Error())
continue
}
if ip == nil {
Expand All @@ -221,7 +228,7 @@ func (d *Discovery) sendProbe(config InterfaceConfig) {

conn, err := net.ListenPacket("udp4", net.JoinHostPort(ip.String(), "0"))
if err != nil {
logp.Err(err.Error())
log.Error(err.Error())
return
}
defer conn.Close()
Expand All @@ -234,7 +241,7 @@ func (d *Discovery) sendProbe(config InterfaceConfig) {
conn.SetDeadline(time.Now().Add(timeout))

if _, err := conn.WriteTo(queryMessage, &discoveryAddress); err != nil {
logp.Err(err.Error())
log.Error(err.Error())
return
}

Expand All @@ -243,19 +250,19 @@ func (d *Discovery) sendProbe(config InterfaceConfig) {
n, _, err := conn.ReadFrom(b)
if err != nil {
if netErr, ok := err.(net.Error); !ok || !netErr.Timeout() {
logp.Err(err.Error())
log.Error(err.Error())
}
return
}
m := make(map[string]interface{})
err = json.Unmarshal(b[:n], &m)
if err != nil {
logp.Err(err.Error())
log.Error(err.Error())
continue
}
message, err := messageSchema.Apply(m, s.FailOnRequired)
if err != nil {
logp.Err(err.Error())
log.Error(err.Error())
continue
}
d.update(config, message)
Expand All @@ -266,22 +273,24 @@ func (d *Discovery) sendProbe(config InterfaceConfig) {
}

func (d *Discovery) update(config InterfaceConfig, message common.MapStr) {
log := d.Logger

v, err := message.GetValue("agent.id")
if err != nil {
logp.Err("failed to update agent without id: " + err.Error())
log.Error("failed to update agent without id: ", err)
return
}
agentID, ok := v.(string)
if len(agentID) == 0 || !ok {
logp.Err("incorrect or empty agent id in Jolokia Discovery response")
log.Error("incorrect or empty agent id in Jolokia Discovery response")
return
}

url, err := message.GetValue("url")
if err != nil || url == nil {
// This can happen if Jolokia agent is initializing and it still
// doesn't know its URL
logp.Info("agent %s without url, ignoring by now", agentID)
log.Infof("agent %s without url, ignoring by now", agentID)
return
}

Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/providers/kubernetes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu
// Ensure that node is set correctly whenever the scope is set to "node". Make sure that node is empty
// when cluster scope is enforced.
if config.Scope == "node" {
config.Node = kubernetes.DiscoverKubernetesNode(config.Node, kubernetes.IsInCluster(config.KubeConfig), client)
config.Node = kubernetes.DiscoverKubernetesNode(logger, config.Node, kubernetes.IsInCluster(config.KubeConfig), client)
} else {
config.Node = ""
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/providers/kubernetes/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func TestEmitEvent_Node(t *testing.T) {
metaGen := metadata.NewNodeMetadataGenerator(common.NewConfig(), nil)
p := &Provider{
config: defaultConfig(),
bus: bus.New("test"),
bus: bus.New(logp.NewLogger("bus"), "test"),
templates: mapper,
logger: logp.NewLogger("kubernetes"),
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub
// Ensure that node is set correctly whenever the scope is set to "node". Make sure that node is empty
// when cluster scope is enforced.
if config.Scope == "node" {
config.Node = kubernetes.DiscoverKubernetesNode(config.Node, kubernetes.IsInCluster(config.KubeConfig), client)
config.Node = kubernetes.DiscoverKubernetesNode(logger, config.Node, kubernetes.IsInCluster(config.KubeConfig), client)
} else {
config.Node = ""
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/providers/kubernetes/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ func TestEmitEvent(t *testing.T) {
metaGen := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil)
p := &Provider{
config: defaultConfig(),
bus: bus.New("test"),
bus: bus.New(logp.NewLogger("bus"), "test"),
templates: mapper,
logger: logp.NewLogger("kubernetes"),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func TestEmitEvent_Service(t *testing.T) {

p := &Provider{
config: defaultConfig(),
bus: bus.New("test"),
bus: bus.New(logp.NewLogger("bus"), "test"),
templates: mapper,
logger: logp.NewLogger("kubernetes"),
}
Expand Down
17 changes: 11 additions & 6 deletions libbeat/common/bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Listener interface {

type bus struct {
sync.RWMutex
name string
log *logp.Logger
listeners []*listener
store chan Event
}
Expand All @@ -59,28 +59,33 @@ type listener struct {
}

// New initializes a new bus with the given name and returns it
func New(name string) Bus {
func New(log *logp.Logger, name string) Bus {
return &bus{
name: name,
log: createLogger(log, name),
listeners: make([]*listener, 0),
}
}

// NewBusWithStore allows to create a buffered bus when producers send data without
// listeners being subscribed to them. size determines the size of the buffer.
func NewBusWithStore(name string, size int) Bus {
func NewBusWithStore(log *logp.Logger, name string, size int) Bus {
return &bus{
name: name,
log: createLogger(log, name),
listeners: make([]*listener, 0),
store: make(chan Event, size),
}
}

func createLogger(log *logp.Logger, name string) *logp.Logger {
selector := "bus-" + name
return log.Named(selector).With("libbeat.bus", name)
}

func (b *bus) Publish(e Event) {
b.RLock()
defer b.RUnlock()

logp.Debug("bus", "%s: %+v", b.name, e)
b.log.Debugf("%+v", e)
if len(b.listeners) == 0 && b.store != nil {
b.store <- e
return
Expand Down
14 changes: 8 additions & 6 deletions libbeat/common/bus/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/logp"
)

func TestEmit(t *testing.T) {
bus := New("name")
bus := New(logp.L(), "name")
listener := bus.Subscribe()

bus.Publish(Event{
Expand All @@ -36,7 +38,7 @@ func TestEmit(t *testing.T) {
}

func TestEmitOrder(t *testing.T) {
bus := New("name")
bus := New(logp.L(), "name")
listener := bus.Subscribe()
bus.Publish(Event{"first": "event"})
bus.Publish(Event{"second": "event"})
Expand All @@ -48,7 +50,7 @@ func TestEmitOrder(t *testing.T) {
}

func TestSubscribeFilter(t *testing.T) {
bus := New("name")
bus := New(logp.L(), "name")
listener := bus.Subscribe("second")

bus.Publish(Event{"first": "event"})
Expand All @@ -59,7 +61,7 @@ func TestSubscribeFilter(t *testing.T) {
}

func TestMultipleListeners(t *testing.T) {
bus := New("name")
bus := New(logp.L(), "name")
listener1 := bus.Subscribe("a")
listener2 := bus.Subscribe("a", "b")

Expand All @@ -82,7 +84,7 @@ func TestMultipleListeners(t *testing.T) {
}

func TestListenerClose(t *testing.T) {
bus := New("name")
bus := New(logp.L(), "name")
listener := bus.Subscribe()

bus.Publish(Event{"first": "event"})
Expand All @@ -103,7 +105,7 @@ func TestListenerClose(t *testing.T) {
}

func TestUnsubscribedBus(t *testing.T) {
bus := NewBusWithStore("name", 2)
bus := NewBusWithStore(logp.L(), "name", 2)
bus.Publish(Event{"first": "event"})

listener := bus.Subscribe()
Expand Down
Loading