diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index 86a1c209a971..5ddeb21a0ed7 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -67,11 +67,11 @@ type Autodiscover struct { // NewAutodiscover instantiates and returns a new Autodiscover manager func NewAutodiscover(name string, pipeline beat.Pipeline, adapter Adapter, config *Config) (*Autodiscover, error) { - // Init Event bus - bus := bus.New(name) - logger := logp.NewLogger("autodiscover") + // Init Event bus + bus := bus.New(logger, name) + // Init providers var providers []Provider for _, providerCfg := range config.Providers { diff --git a/libbeat/autodiscover/providers/docker/docker.go b/libbeat/autodiscover/providers/docker/docker.go index e0febbdf8803..1189137f608a 100644 --- a/libbeat/autodiscover/providers/docker/docker.go +++ b/libbeat/autodiscover/providers/docker/docker.go @@ -60,6 +60,8 @@ type Provider struct { // AutodiscoverBuilder builds and returns an autodiscover provider func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodiscover.Provider, error) { + logger := logp.NewLogger("docker") + cfgwarn.Beta("The docker autodiscover is beta") errWrap := func(err error) error { @@ -72,7 +74,7 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodis return nil, errWrap(err) } - watcher, err := docker.NewWatcher(config.Host, config.TLS, false) + watcher, err := docker.NewWatcher(logger, config.Host, config.TLS, false) if err != nil { return nil, errWrap(err) } @@ -115,7 +117,7 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodis stopListener: stop, stoppers: make(map[string]*time.Timer), stopTrigger: make(chan *dockerContainerMetadata), - logger: logp.NewLogger("docker"), + logger: logger, }, nil } diff --git a/libbeat/autodiscover/providers/docker/docker_integration_test.go b/libbeat/autodiscover/providers/docker/docker_integration_test.go index fd5c4f6f86e5..288b004bd468 100644 --- a/libbeat/autodiscover/providers/docker/docker_integration_test.go +++ b/libbeat/autodiscover/providers/docker/docker_integration_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/elastic/beats/libbeat/autodiscover/template" + "github.com/elastic/beats/libbeat/logp" "github.com/gofrs/uuid" "github.com/stretchr/testify/assert" @@ -35,6 +36,8 @@ import ( // Test docker start emits an autodiscover event func TestDockerStart(t *testing.T) { + log := logp.NewLogger("docker") + d, err := dk.NewClient() if err != nil { t.Fatal(err) @@ -44,7 +47,7 @@ func TestDockerStart(t *testing.T) { if err != nil { t.Fatal(err) } - bus := bus.New("test") + bus := bus.New(log, "test") config := defaultConfig() config.CleanupTimeout = 0 diff --git a/libbeat/autodiscover/providers/jolokia/discovery.go b/libbeat/autodiscover/providers/jolokia/discovery.go index ee852e745a7b..ed026ec3b078 100644 --- a/libbeat/autodiscover/providers/jolokia/discovery.go +++ b/libbeat/autodiscover/providers/jolokia/discovery.go @@ -106,6 +106,8 @@ type Instance struct { type Discovery struct { sync.Mutex + log *logp.Logger + ProviderUUID uuid.UUID Interfaces []InterfaceConfig @@ -121,6 +123,9 @@ func (d *Discovery) Start() { d.instances = make(map[string]*Instance) d.events = make(chan Event) d.stop = make(chan struct{}) + if d.log == nil { + d.log = logp.NewLogger("jolokia") + } go d.run() } @@ -198,9 +203,11 @@ var discoveryAddress = net.UDPAddr{IP: net.IPv4(239, 192, 48, 84), Port: 24884} var queryMessage = []byte(`{"type":"query"}`) func (d *Discovery) sendProbe(config InterfaceConfig) { + log := d.log + interfaces, err := d.interfaces(config.Name) if err != nil { - logp.Err("failed to get interfaces: %+v", err) + log.Errorf("failed to get interfaces: %+v", err) return } @@ -208,7 +215,7 @@ func (d *Discovery) sendProbe(config InterfaceConfig) { for _, i := range interfaces { ip, err := getIPv4Addr(i) if err != nil { - logp.Err(err.Error()) + log.Error(err.Error()) continue } if ip == nil { @@ -221,7 +228,7 @@ func (d *Discovery) sendProbe(config InterfaceConfig) { conn, err := net.ListenPacket("udp4", net.JoinHostPort(ip.String(), "0")) if err != nil { - logp.Err(err.Error()) + log.Error(err.Error()) return } defer conn.Close() @@ -234,7 +241,7 @@ func (d *Discovery) sendProbe(config InterfaceConfig) { conn.SetDeadline(time.Now().Add(timeout)) if _, err := conn.WriteTo(queryMessage, &discoveryAddress); err != nil { - logp.Err(err.Error()) + log.Error(err.Error()) return } @@ -243,19 +250,19 @@ func (d *Discovery) sendProbe(config InterfaceConfig) { n, _, err := conn.ReadFrom(b) if err != nil { if netErr, ok := err.(net.Error); !ok || !netErr.Timeout() { - logp.Err(err.Error()) + log.Error(err.Error()) } return } m := make(map[string]interface{}) err = json.Unmarshal(b[:n], &m) if err != nil { - logp.Err(err.Error()) + log.Error(err.Error()) continue } message, err := messageSchema.Apply(m, s.FailOnRequired) if err != nil { - logp.Err(err.Error()) + log.Error(err.Error()) continue } d.update(config, message) @@ -266,14 +273,16 @@ func (d *Discovery) sendProbe(config InterfaceConfig) { } func (d *Discovery) update(config InterfaceConfig, message common.MapStr) { + log := d.log + v, err := message.GetValue("agent.id") if err != nil { - logp.Err("failed to update agent without id: " + err.Error()) + log.Error("failed to update agent without id: ", err) return } agentID, ok := v.(string) if len(agentID) == 0 || !ok { - logp.Err("incorrect or empty agent id in Jolokia Discovery response") + log.Error("incorrect or empty agent id in Jolokia Discovery response") return } @@ -281,7 +290,7 @@ func (d *Discovery) update(config InterfaceConfig, message common.MapStr) { if err != nil || url == nil { // This can happen if Jolokia agent is initializing and it still // doesn't know its URL - logp.Info("agent %s without url, ignoring by now", agentID) + log.Infof("agent %s without url, ignoring by now", agentID) return } diff --git a/libbeat/autodiscover/providers/kubernetes/node.go b/libbeat/autodiscover/providers/kubernetes/node.go index c643ae4f30c4..4d83294cd48d 100644 --- a/libbeat/autodiscover/providers/kubernetes/node.go +++ b/libbeat/autodiscover/providers/kubernetes/node.go @@ -56,7 +56,7 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu // Ensure that node is set correctly whenever the scope is set to "node". Make sure that node is empty // when cluster scope is enforced. if config.Scope == "node" { - config.Node = kubernetes.DiscoverKubernetesNode(config.Node, kubernetes.IsInCluster(config.KubeConfig), client) + config.Node = kubernetes.DiscoverKubernetesNode(logger, config.Node, kubernetes.IsInCluster(config.KubeConfig), client) } else { config.Node = "" } diff --git a/libbeat/autodiscover/providers/kubernetes/node_test.go b/libbeat/autodiscover/providers/kubernetes/node_test.go index aa4cd3476a94..8b0dd7f9a5b7 100644 --- a/libbeat/autodiscover/providers/kubernetes/node_test.go +++ b/libbeat/autodiscover/providers/kubernetes/node_test.go @@ -249,7 +249,7 @@ func TestEmitEvent_Node(t *testing.T) { metaGen := metadata.NewNodeMetadataGenerator(common.NewConfig(), nil) p := &Provider{ config: defaultConfig(), - bus: bus.New("test"), + bus: bus.New(logp.NewLogger("bus"), "test"), templates: mapper, logger: logp.NewLogger("kubernetes"), } diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index ee71f0a76736..9cd5094fa5ab 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -57,7 +57,7 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub // Ensure that node is set correctly whenever the scope is set to "node". Make sure that node is empty // when cluster scope is enforced. if config.Scope == "node" { - config.Node = kubernetes.DiscoverKubernetesNode(config.Node, kubernetes.IsInCluster(config.KubeConfig), client) + config.Node = kubernetes.DiscoverKubernetesNode(logger, config.Node, kubernetes.IsInCluster(config.KubeConfig), client) } else { config.Node = "" } diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index a5672c7e55d2..1d3d11b61bd8 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -474,7 +474,7 @@ func TestEmitEvent(t *testing.T) { metaGen := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil) p := &Provider{ config: defaultConfig(), - bus: bus.New("test"), + bus: bus.New(logp.NewLogger("bus"), "test"), templates: mapper, logger: logp.NewLogger("kubernetes"), } diff --git a/libbeat/autodiscover/providers/kubernetes/service_test.go b/libbeat/autodiscover/providers/kubernetes/service_test.go index 04ab7ec725b9..aa056fc11499 100644 --- a/libbeat/autodiscover/providers/kubernetes/service_test.go +++ b/libbeat/autodiscover/providers/kubernetes/service_test.go @@ -281,7 +281,7 @@ func TestEmitEvent_Service(t *testing.T) { p := &Provider{ config: defaultConfig(), - bus: bus.New("test"), + bus: bus.New(logp.NewLogger("bus"), "test"), templates: mapper, logger: logp.NewLogger("kubernetes"), } diff --git a/libbeat/common/bus/bus.go b/libbeat/common/bus/bus.go index 3d1c2652e14d..cb7393b0c5cb 100644 --- a/libbeat/common/bus/bus.go +++ b/libbeat/common/bus/bus.go @@ -47,7 +47,7 @@ type Listener interface { type bus struct { sync.RWMutex - name string + log *logp.Logger listeners []*listener store chan Event } @@ -59,28 +59,33 @@ type listener struct { } // New initializes a new bus with the given name and returns it -func New(name string) Bus { +func New(log *logp.Logger, name string) Bus { return &bus{ - name: name, + log: createLogger(log, name), listeners: make([]*listener, 0), } } // NewBusWithStore allows to create a buffered bus when producers send data without // listeners being subscribed to them. size determines the size of the buffer. -func NewBusWithStore(name string, size int) Bus { +func NewBusWithStore(log *logp.Logger, name string, size int) Bus { return &bus{ - name: name, + log: createLogger(log, name), listeners: make([]*listener, 0), store: make(chan Event, size), } } +func createLogger(log *logp.Logger, name string) *logp.Logger { + selector := "bus-" + name + return log.Named(selector).With("libbeat.bus", name) +} + func (b *bus) Publish(e Event) { b.RLock() defer b.RUnlock() - logp.Debug("bus", "%s: %+v", b.name, e) + b.log.Debugf("%+v", e) if len(b.listeners) == 0 && b.store != nil { b.store <- e return diff --git a/libbeat/common/bus/bus_test.go b/libbeat/common/bus/bus_test.go index 2a5c9bb928dc..c80d3d71a7e2 100644 --- a/libbeat/common/bus/bus_test.go +++ b/libbeat/common/bus/bus_test.go @@ -21,10 +21,12 @@ import ( "testing" "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/logp" ) func TestEmit(t *testing.T) { - bus := New("name") + bus := New(logp.L(), "name") listener := bus.Subscribe() bus.Publish(Event{ @@ -36,7 +38,7 @@ func TestEmit(t *testing.T) { } func TestEmitOrder(t *testing.T) { - bus := New("name") + bus := New(logp.L(), "name") listener := bus.Subscribe() bus.Publish(Event{"first": "event"}) bus.Publish(Event{"second": "event"}) @@ -48,7 +50,7 @@ func TestEmitOrder(t *testing.T) { } func TestSubscribeFilter(t *testing.T) { - bus := New("name") + bus := New(logp.L(), "name") listener := bus.Subscribe("second") bus.Publish(Event{"first": "event"}) @@ -59,7 +61,7 @@ func TestSubscribeFilter(t *testing.T) { } func TestMultipleListeners(t *testing.T) { - bus := New("name") + bus := New(logp.L(), "name") listener1 := bus.Subscribe("a") listener2 := bus.Subscribe("a", "b") @@ -82,7 +84,7 @@ func TestMultipleListeners(t *testing.T) { } func TestListenerClose(t *testing.T) { - bus := New("name") + bus := New(logp.L(), "name") listener := bus.Subscribe() bus.Publish(Event{"first": "event"}) @@ -103,7 +105,7 @@ func TestListenerClose(t *testing.T) { } func TestUnsubscribedBus(t *testing.T) { - bus := NewBusWithStore("name", 2) + bus := NewBusWithStore(logp.L(), "name", 2) bus.Publish(Event{"first": "event"}) listener := bus.Subscribe() diff --git a/libbeat/common/docker/watcher.go b/libbeat/common/docker/watcher.go index ef01f2634e75..388aa2589a46 100644 --- a/libbeat/common/docker/watcher.go +++ b/libbeat/common/docker/watcher.go @@ -74,6 +74,7 @@ type TLSConfig struct { type watcher struct { sync.RWMutex + log *logp.Logger client Client ctx context.Context stop context.CancelFunc @@ -105,10 +106,10 @@ type Client interface { } // WatcherConstructor represent a function that creates a new Watcher from giving parameters -type WatcherConstructor func(host string, tls *TLSConfig, storeShortID bool) (Watcher, error) +type WatcherConstructor func(logp *logp.Logger, host string, tls *TLSConfig, storeShortID bool) (Watcher, error) // NewWatcher returns a watcher running for the given settings -func NewWatcher(host string, tls *TLSConfig, storeShortID bool) (Watcher, error) { +func NewWatcher(log *logp.Logger, host string, tls *TLSConfig, storeShortID bool) (Watcher, error) { var httpClient *http.Client if tls != nil { options := tlsconfig.Options{ @@ -140,20 +141,23 @@ func NewWatcher(host string, tls *TLSConfig, storeShortID bool) (Watcher, error) return nil, err } - return NewWatcherWithClient(client, 60*time.Second, storeShortID) + return NewWatcherWithClient(log, client, 60*time.Second, storeShortID) } // NewWatcherWithClient creates a new Watcher from a given Docker client -func NewWatcherWithClient(client Client, cleanupTimeout time.Duration, storeShortID bool) (Watcher, error) { +func NewWatcherWithClient(log *logp.Logger, client Client, cleanupTimeout time.Duration, storeShortID bool) (Watcher, error) { + log = log.Named("docker") + ctx, cancel := context.WithCancel(context.Background()) return &watcher{ + log: log, client: client, ctx: ctx, stop: cancel, containers: make(map[string]*Container), deleted: make(map[string]time.Time), cleanupTimeout: cleanupTimeout, - bus: bus.New("docker"), + bus: bus.New(log, "docker"), shortID: storeShortID, }, nil } @@ -195,7 +199,7 @@ func (w *watcher) Containers() map[string]*Container { // Start watching docker API for new containers func (w *watcher) Start() error { // Do initial scan of existing containers - logp.Debug("docker", "Start docker containers scanner") + w.log.Debug("Start docker containers scanner") w.lastValidTimestamp = time.Now().Unix() w.Lock() @@ -234,6 +238,8 @@ func (w *watcher) Stop() { } func (w *watcher) watch() { + log := w.log + filter := filters.NewArgs() filter.Add("type", "container") @@ -243,7 +249,7 @@ func (w *watcher) watch() { Filters: filter, } - logp.Debug("docker", "Fetching events since %s", options.Since) + log.Debugf("Fetching events since %s", options.Since) ctx, cancel := context.WithTimeout(w.ctx, dockerWatchRequestTimeout) defer cancel() @@ -258,7 +264,7 @@ func (w *watcher) watch() { for { select { case event := <-events: - logp.Debug("docker", "Got a new docker event: %v", event) + log.Debugf("Got a new docker event: %v", event) w.lastValidTimestamp = event.Time w.lastWatchReceivedEventTime = time.Now() @@ -271,7 +277,7 @@ func (w *watcher) watch() { Filters: filter, }) if err != nil || len(containers) != 1 { - logp.Err("Error getting container info: %v", err) + log.Errorf("Error getting container info: %v", err) continue } container := containers[0] @@ -309,9 +315,9 @@ func (w *watcher) watch() { case err := <-errors: // Restart watch call if err == context.DeadlineExceeded { - logp.Info("Context deadline exceeded for docker request, restarting watch call") + log.Info("Context deadline exceeded for docker request, restarting watch call") } else { - logp.Err("Error watching for docker events: %v", err) + log.Errorf("Error watching for docker events: %+v", err) } time.Sleep(1 * time.Second) @@ -319,13 +325,13 @@ func (w *watcher) watch() { case <-tickChan.C: if time.Since(w.lastWatchReceivedEventTime) > dockerEventsWatchPityTimerTimeout { - logp.Info("No events received within %s, restarting watch call", dockerEventsWatchPityTimerTimeout) + log.Infof("No events received within %s, restarting watch call", dockerEventsWatchPityTimerTimeout) time.Sleep(1 * time.Second) break WATCH } case <-w.ctx.Done(): - logp.Debug("docker", "Watcher stopped") + log.Debug("Watcher stopped") w.stopped.Done() return } @@ -335,7 +341,9 @@ func (w *watcher) watch() { } func (w *watcher) listContainers(options types.ContainerListOptions) ([]*Container, error) { - logp.Debug("docker", "List containers") + log := w.log + + log.Debug("List containers") ctx, cancel := context.WithTimeout(w.ctx, dockerRequestTimeout) defer cancel() @@ -359,14 +367,14 @@ func (w *watcher) listContainers(options types.ContainerListOptions) ([]*Contain // If there are no network interfaces, assume that the container is on host network // Inspect the container directly and use the hostname as the IP address in order if len(ipaddresses) == 0 { - logp.Debug("docker", "Inspect container %s", c.ID) + log.Debugf("Inspect container %s", c.ID) ctx, cancel := context.WithTimeout(w.ctx, dockerRequestTimeout) defer cancel() info, err := w.client.ContainerInspect(ctx, c.ID) if err == nil { ipaddresses = append(ipaddresses, info.Config.Hostname) } else { - logp.Warn("unable to inspect container %s due to error %v", c.ID, err) + log.Warnf("unable to inspect container %s due to error %+v", c.ID, err) } } result = append(result, &Container{ @@ -384,6 +392,8 @@ func (w *watcher) listContainers(options types.ContainerListOptions) ([]*Contain // Clean up deleted containers after they are not used anymore func (w *watcher) cleanupWorker() { + log := w.log + for { // Wait a full period time.Sleep(w.cleanupTimeout) @@ -399,7 +409,7 @@ func (w *watcher) cleanupWorker() { w.RLock() for key, lastSeen := range w.deleted { if lastSeen.Before(timeout) { - logp.Debug("docker", "Removing container %s after cool down timeout", key) + log.Debugf("Removing container %s after cool down timeout", key) toDelete = append(toDelete, key) } } diff --git a/libbeat/common/docker/watcher_test.go b/libbeat/common/docker/watcher_test.go index faed944401dd..31f33dde9157 100644 --- a/libbeat/common/docker/watcher_test.go +++ b/libbeat/common/docker/watcher_test.go @@ -459,7 +459,7 @@ func runWatcherShortID(t *testing.T, kill bool, containers [][]types.Container, done: make(chan interface{}), } - w, err := NewWatcherWithClient(client, 200*time.Millisecond, enable) + w, err := NewWatcherWithClient(logp.L(), client, 200*time.Millisecond, enable) if err != nil { t.Fatal(err) } diff --git a/libbeat/common/kubernetes/util.go b/libbeat/common/kubernetes/util.go index 7d6f7113a26d..a8f1254b64a2 100644 --- a/libbeat/common/kubernetes/util.go +++ b/libbeat/common/kubernetes/util.go @@ -74,52 +74,53 @@ func IsInCluster(kubeconfig string) bool { // If host is provided in the config use it directly. // If beat is deployed in k8s cluster, use hostname of pod which is pod name to query pod meta for node name. // If beat is deployed outside k8s cluster, use machine-id to match against k8s nodes for node name. -func DiscoverKubernetesNode(host string, inCluster bool, client kubernetes.Interface) (node string) { +func DiscoverKubernetesNode(log *logp.Logger, host string, inCluster bool, client kubernetes.Interface) (node string) { if host != "" { - logp.Info("kubernetes: Using node %s provided in the config", host) + log.Infof("kubernetes: Using node %s provided in the config", host) return host } if inCluster { ns, err := inClusterNamespace() if err != nil { - logp.Err("kubernetes: Couldn't get namespace when beat is in cluster with error: %+v", err.Error()) + log.Errorf("kubernetes: Couldn't get namespace when beat is in cluster with error: %+v", err.Error()) return defaultNode } podName, err := os.Hostname() if err != nil { - logp.Err("kubernetes: Couldn't get hostname as beat pod name in cluster with error: %+v", err.Error()) + log.Errorf("kubernetes: Couldn't get hostname as beat pod name in cluster with error: %+v", err.Error()) return defaultNode } - logp.Info("kubernetes: Using pod name %s and namespace %s to discover kubernetes node", podName, ns) + log.Infof("kubernetes: Using pod name %s and namespace %s to discover kubernetes node", podName, ns) pod, err := client.CoreV1().Pods(ns).Get(podName, metav1.GetOptions{}) if err != nil { - logp.Err("kubernetes: Querying for pod failed with error: %+v", err.Error()) + log.Errorf("kubernetes: Querying for pod failed with error: %+v", err) return defaultNode } - logp.Info("kubernetes: Using node %s discovered by in cluster pod node query", pod.Spec.NodeName) + log.Info("kubernetes: Using node %s discovered by in cluster pod node query", pod.Spec.NodeName) return pod.Spec.NodeName } mid := machineID() if mid == "" { - logp.Err("kubernetes: Couldn't collect info from any of the files in /etc/machine-id /var/lib/dbus/machine-id") + log.Error("kubernetes: Couldn't collect info from any of the files in /etc/machine-id /var/lib/dbus/machine-id") return defaultNode } nodes, err := client.CoreV1().Nodes().List(metav1.ListOptions{}) if err != nil { - logp.Err("kubernetes: Querying for nodes failed with error: %+v", err.Error()) + log.Errorf("kubernetes: Querying for nodes failed with error: %+v", err) return defaultNode } for _, n := range nodes.Items { if n.Status.NodeInfo.MachineID == mid { - logp.Info("kubernetes: Using node %s discovered by machine-id matching", n.GetObjectMeta().GetName()) - return n.GetObjectMeta().GetName() + name := n.GetObjectMeta().GetName() + log.Infof("kubernetes: Using node %s discovered by machine-id matching", name) + return name } } - logp.Warn("kubernetes: Couldn't discover node, using localhost as default") + log.Warn("kubernetes: Couldn't discover node, using localhost as default") return defaultNode } diff --git a/libbeat/processors/add_docker_metadata/add_docker_metadata.go b/libbeat/processors/add_docker_metadata/add_docker_metadata.go index 6148a4be2eb0..8020955dd591 100644 --- a/libbeat/processors/add_docker_metadata/add_docker_metadata.go +++ b/libbeat/processors/add_docker_metadata/add_docker_metadata.go @@ -68,12 +68,14 @@ type addDockerMetadata struct { dockerAvailable bool // If Docker exists in env, then it is set to true } +const selector = "add_docker_metadata" + // New constructs a new add_docker_metadata processor. func New(cfg *common.Config) (processors.Processor, error) { - return buildDockerMetadataProcessor(cfg, docker.NewWatcher) + return buildDockerMetadataProcessor(logp.NewLogger(selector), cfg, docker.NewWatcher) } -func buildDockerMetadataProcessor(cfg *common.Config, watcherConstructor docker.WatcherConstructor) (processors.Processor, error) { +func buildDockerMetadataProcessor(log *logp.Logger, cfg *common.Config, watcherConstructor docker.WatcherConstructor) (processors.Processor, error) { config := defaultConfig() if err := cfg.Unpack(&config); err != nil { return nil, errors.Wrapf(err, "fail to unpack the %v configuration", processorName) @@ -81,14 +83,13 @@ func buildDockerMetadataProcessor(cfg *common.Config, watcherConstructor docker. var dockerAvailable bool - watcher, err := watcherConstructor(config.Host, config.TLS, config.MatchShortID) + watcher, err := watcherConstructor(log, config.Host, config.TLS, config.MatchShortID) if err != nil { dockerAvailable = false - errorMsg := fmt.Sprintf("%v: docker environment not detected: %v", processorName, err) - logp.Debug("add_docker_metadata", errorMsg) + log.Debugf("%v: docker environment not detected: %+v", processorName, err) } else { dockerAvailable = true - logp.Debug("add_docker_metadata", "%v: docker environment detected", processorName) + log.Debugf("%v: docker environment detected", processorName) if err = watcher.Start(); err != nil { return nil, errors.Wrap(err, "failed to start watcher") } @@ -110,7 +111,7 @@ func buildDockerMetadataProcessor(cfg *common.Config, watcherConstructor docker. } return &addDockerMetadata{ - log: logp.NewLogger(processorName), + log: log, watcher: watcher, fields: config.Fields, sourceProcessor: sourceProcessor, diff --git a/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go b/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go index 9a99ac1cd02b..7ed4f486cb59 100644 --- a/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go +++ b/libbeat/processors/add_docker_metadata/add_docker_metadata_test.go @@ -31,6 +31,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/bus" "github.com/elastic/beats/libbeat/common/docker" + "github.com/elastic/beats/libbeat/logp" ) func init() { @@ -58,7 +59,7 @@ func TestInitializationNoDocker(t *testing.T) { var testConfig = common.NewConfig() testConfig.SetString("host", -1, "unix:///var/run42/docker.sock") - p, err := buildDockerMetadataProcessor(testConfig, docker.NewWatcher) + p, err := buildDockerMetadataProcessor(logp.L(), testConfig, docker.NewWatcher) assert.NoError(t, err, "initializing add_docker_metadata processor") input := common.MapStr{} @@ -71,7 +72,7 @@ func TestInitializationNoDocker(t *testing.T) { func TestInitialization(t *testing.T) { var testConfig = common.NewConfig() - p, err := buildDockerMetadataProcessor(testConfig, MockWatcherFactory(nil)) + p, err := buildDockerMetadataProcessor(logp.L(), testConfig, MockWatcherFactory(nil)) assert.NoError(t, err, "initializing add_docker_metadata processor") input := common.MapStr{} @@ -87,7 +88,7 @@ func TestNoMatch(t *testing.T) { }) assert.NoError(t, err) - p, err := buildDockerMetadataProcessor(testConfig, MockWatcherFactory(nil)) + p, err := buildDockerMetadataProcessor(logp.L(), testConfig, MockWatcherFactory(nil)) assert.NoError(t, err, "initializing add_docker_metadata processor") input := common.MapStr{ @@ -105,7 +106,7 @@ func TestMatchNoContainer(t *testing.T) { }) assert.NoError(t, err) - p, err := buildDockerMetadataProcessor(testConfig, MockWatcherFactory(nil)) + p, err := buildDockerMetadataProcessor(logp.L(), testConfig, MockWatcherFactory(nil)) assert.NoError(t, err, "initializing add_docker_metadata processor") input := common.MapStr{ @@ -124,7 +125,7 @@ func TestMatchContainer(t *testing.T) { }) assert.NoError(t, err) - p, err := buildDockerMetadataProcessor(testConfig, MockWatcherFactory( + p, err := buildDockerMetadataProcessor(logp.L(), testConfig, MockWatcherFactory( map[string]*docker.Container{ "container_id": &docker.Container{ ID: "container_id", @@ -172,7 +173,7 @@ func TestMatchContainerWithDedot(t *testing.T) { }) assert.NoError(t, err) - p, err := buildDockerMetadataProcessor(testConfig, MockWatcherFactory( + p, err := buildDockerMetadataProcessor(logp.L(), testConfig, MockWatcherFactory( map[string]*docker.Container{ "container_id": &docker.Container{ ID: "container_id", @@ -215,7 +216,7 @@ func TestMatchSource(t *testing.T) { testConfig, err := common.NewConfigFrom(map[string]interface{}{}) assert.NoError(t, err) - p, err := buildDockerMetadataProcessor(testConfig, MockWatcherFactory( + p, err := buildDockerMetadataProcessor(logp.L(), testConfig, MockWatcherFactory( map[string]*docker.Container{ "FABADA": &docker.Container{ ID: "FABADA", @@ -274,7 +275,7 @@ func TestDisableSource(t *testing.T) { }) assert.NoError(t, err) - p, err := buildDockerMetadataProcessor(testConfig, MockWatcherFactory( + p, err := buildDockerMetadataProcessor(logp.L(), testConfig, MockWatcherFactory( map[string]*docker.Container{ "FABADA": &docker.Container{ ID: "FABADA", @@ -299,7 +300,7 @@ func TestDisableSource(t *testing.T) { } func TestMatchPIDs(t *testing.T) { - p, err := buildDockerMetadataProcessor(common.NewConfig(), MockWatcherFactory( + p, err := buildDockerMetadataProcessor(logp.L(), common.NewConfig(), MockWatcherFactory( map[string]*docker.Container{ "FABADA": &docker.Container{ ID: "FABADA", @@ -399,7 +400,7 @@ func MockWatcherFactory(containers map[string]*docker.Container) docker.WatcherC if containers == nil { containers = make(map[string]*docker.Container) } - return func(host string, tls *docker.TLSConfig, shortID bool) (docker.Watcher, error) { + return func(_ *logp.Logger, host string, tls *docker.TLSConfig, shortID bool) (docker.Watcher, error) { return &mockWatcher{containers: containers}, nil } } diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 5d02056871ea..2f6b2a71ae8c 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -41,6 +41,7 @@ const ( ) type kubernetesAnnotator struct { + log *logp.Logger watcher kubernetes.Watcher indexers *Indexers matchers *Matchers @@ -48,6 +49,8 @@ type kubernetesAnnotator struct { kubernetesAvailable bool } +const selector = "kubernetes" + func init() { processors.RegisterPlugin("add_kubernetes_metadata", New) jsprocessor.RegisterPlugin("AddKubernetesMetadata", New) @@ -74,6 +77,7 @@ func isKubernetesAvailable(client k8sclient.Interface) bool { // New constructs a new add_kubernetes_metadata processor. func New(cfg *common.Config) (processors.Processor, error) { config := defaultKubernetesAnnotatorConfig() + log := logp.NewLogger(selector).With("libbeat.processor", "add_kubernetes_metadata") err := cfg.Unpack(&config) if err != nil { @@ -99,6 +103,7 @@ func New(cfg *common.Config) (processors.Processor, error) { } processor := &kubernetesAnnotator{ + log: log, cache: newCache(config.CleanupTimeout), kubernetesAvailable: false, } @@ -106,11 +111,11 @@ func New(cfg *common.Config) (processors.Processor, error) { client, err := kubernetes.GetKubernetesClient(config.KubeConfig) if err != nil { if kubernetes.IsInCluster(config.KubeConfig) { - logp.Debug("kubernetes", "%v: could not create kubernetes client using in_cluster config: %v", "add_kubernetes_metadata", err) + log.Debugf("Could not create kubernetes client using in_cluster config: %+v", err) } else if config.KubeConfig == "" { - logp.Debug("kubernetes", "%v: could not create kubernetes client using config: %v: %v", "add_kubernetes_metadata", os.Getenv("KUBECONFIG"), err) + log.Debugf("Could not create kubernetes client using config: %v: %+v", os.Getenv("KUBECONFIG"), err) } else { - logp.Debug("kubernetes", "%v: could not create kubernetes client using config: %v: %v", "add_kubernetes_metadata", config.KubeConfig, err) + log.Debugf("Could not create kubernetes client using config: %v: %+v", config.KubeConfig, err) } return processor, nil } @@ -122,15 +127,15 @@ func New(cfg *common.Config) (processors.Processor, error) { matchers := NewMatchers(config.Matchers) if matchers.Empty() { - logp.Debug("kubernetes", "%v: could not initialize kubernetes plugin with zero matcher plugins", "add_kubernetes_metadata") + log.Debugf("Could not initialize kubernetes plugin with zero matcher plugins") return processor, nil } processor.matchers = matchers - config.Host = kubernetes.DiscoverKubernetesNode(config.Host, kubernetes.IsInCluster(config.KubeConfig), client) + config.Host = kubernetes.DiscoverKubernetesNode(log, config.Host, kubernetes.IsInCluster(config.KubeConfig), client) - logp.Debug("kubernetes", "Initializing a new Kubernetes watcher using host: %s", config.Host) + log.Debug("Initializing a new Kubernetes watcher using host: %s", config.Host) watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{ SyncTimeout: config.SyncPeriod, @@ -138,7 +143,7 @@ func New(cfg *common.Config) (processors.Processor, error) { Namespace: config.Namespace, }, nil) if err != nil { - logp.Err("kubernetes: Couldn't create watcher for %T", &kubernetes.Pod{}) + log.Errorf("Couldn't create kubernetes watcher for %T", &kubernetes.Pod{}) return nil, err } @@ -150,17 +155,17 @@ func New(cfg *common.Config) (processors.Processor, error) { watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { pod := obj.(*kubernetes.Pod) - logp.Debug("kubernetes", "%v: adding pod: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName()) + log.Debugf("Adding kubernetes pod: %s/%s", pod.GetNamespace(), pod.GetName()) processor.addPod(pod) }, UpdateFunc: func(obj interface{}) { pod := obj.(*kubernetes.Pod) - logp.Debug("kubernetes", "%v: updating pod: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName()) + log.Debugf("Updating kubernetes pod: %s/%s", pod.GetNamespace(), pod.GetName()) processor.updatePod(pod) }, DeleteFunc: func(obj interface{}) { pod := obj.(*kubernetes.Pod) - logp.Debug("kubernetes", "%v: removing pod: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName()) + log.Debugf("Removing pod: %s/%s", pod.GetNamespace(), pod.GetName()) processor.removePod(pod) }, }) @@ -205,7 +210,7 @@ func (k *kubernetesAnnotator) updatePod(pod *kubernetes.Pod) { // Add it again only if it is not being deleted if pod.GetObjectMeta().GetDeletionTimestamp() != nil { - logp.Debug("kubernetes", "%v: removing pod being terminated: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName()) + k.log.Debugf("Removing kubernetes pod being terminated: %s/%s", pod.GetNamespace(), pod.GetName()) return } diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index 82b936c194ce..df88096f810a 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -64,6 +64,8 @@ type enricher struct { isPod bool } +const selector = "kubernetes" + // GetWatcher initializes a kubernetes watcher with the given // scope (node or cluster), and resource type func GetWatcher(base mb.BaseMetricSet, resource kubernetes.Resource, nodeScope bool) (kubernetes.Watcher, error) { @@ -89,12 +91,14 @@ func GetWatcher(base mb.BaseMetricSet, resource kubernetes.Resource, nodeScope b SyncTimeout: config.SyncPeriod, } + log := logp.NewLogger(selector) + // Watch objects in the node only if nodeScope { - options.Node = kubernetes.DiscoverKubernetesNode(config.Host, kubernetes.IsInCluster(config.KubeConfig), client) + options.Node = kubernetes.DiscoverKubernetesNode(log, config.Host, kubernetes.IsInCluster(config.KubeConfig), client) } - logp.Debug("kubernetes", "Initializing a new Kubernetes watcher using host: %v", config.Host) + log.Debugf("Initializing a new Kubernetes watcher using host: %v", config.Host) return kubernetes.NewWatcher(client, resource, options, nil) } diff --git a/x-pack/libbeat/autodiscover/providers/aws/elb/provider_test.go b/x-pack/libbeat/autodiscover/providers/aws/elb/provider_test.go index 894d1188c634..0378bb7f93e6 100644 --- a/x-pack/libbeat/autodiscover/providers/aws/elb/provider_test.go +++ b/x-pack/libbeat/autodiscover/providers/aws/elb/provider_test.go @@ -16,6 +16,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/bus" + "github.com/elastic/beats/libbeat/logp" ) type testEventAccumulator struct { @@ -60,10 +61,11 @@ func (tea *testEventAccumulator) waitForNumEvents(t *testing.T, targetLen int, t } func Test_internalBuilder(t *testing.T) { + log := logp.NewLogger("elb") lbl := fakeLbl() lbls := []*lbListener{lbl} fetcher := newMockFetcher(lbls, nil) - pBus := bus.New("test") + pBus := bus.New(log, "test") cfg := &Config{ Regions: []string{"us-east-1a", "us-west-1b"},