Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libbeat: Add a new way to detect node for k8s which using machine-id #6146

Merged
merged 2 commits into from
Jan 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Add autodiscover for kubernetes. {pull}6055[6055]
- Add the abilility for the add_docker_metadata process to enrich based on process ID. {pull}6100[6100]
- The `add_docker_metadata` and `add_kubernetes_metadata` processors are now GA, instead of Beta. {pull}6105[6105]
- The node name can be discovered automatically by machine-id matching when beat deployed outside kubernetes cluster. {pull}6146[6146]

*Auditbeat*

Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func AutodiscoverBuilder(bus bus.Bus, c *common.Config) (autodiscover.Provider,

metagen := kubernetes.NewMetaGenerator(config.IncludeAnnotations, config.IncludeLabels, config.ExcludeLabels)

config.Host = kubernetes.DiscoverKubernetesNode(config.Host, client)
config.Host = kubernetes.DiscoverKubernetesNode(config.Host, config.InCluster, client)
watcher := kubernetes.NewWatcher(client.CoreV1(), config.SyncPeriod, config.CleanupTimeout, config.Host)

start := watcher.ListenStart()
Expand Down
95 changes: 75 additions & 20 deletions libbeat/common/kubernetes/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import (
"fmt"
"io/ioutil"
"os"
"strings"

"github.com/ericchiang/k8s"
"github.com/ghodss/yaml"

"github.com/elastic/beats/libbeat/logp"
)

const defaultNode = "localhost"

// GetKubernetesClient returns a kubernetes client. If inCluster is true, it returns an
// in cluster configuration based on the secrets mounted in the Pod. If kubeConfig is passed,
// it parses the config file to get the config required to build a client.
Expand Down Expand Up @@ -41,28 +44,80 @@ func GetKubernetesClient(inCluster bool, kubeConfig string) (client *k8s.Client,
return client, nil
}

// DiscoverKubernetesNode figures out the Kubernetes host to use. If host is provided in the config
// use it directly. Else use hostname of the pod which is the Pod ID to query the Pod and get the Node
// name from the specification. Else, return localhost as a default.
func DiscoverKubernetesNode(host string, client *k8s.Client) string {
ctx := context.Background()
if host == "" {
podName := os.Getenv("HOSTNAME")
logp.Info("Using pod name %s and namespace %s", podName, client.Namespace)
if podName == "localhost" {
host = "localhost"
} else {
pod, err := client.CoreV1().GetPod(ctx, podName, client.Namespace)
if err != nil {
logp.Err("Querying for pod failed with error: ", err.Error())
logp.Info("Unable to find pod, setting host to localhost")
host = "localhost"
} else {
host = pod.Spec.GetNodeName()
}
// DiscoverKubernetesNode figures out the Kubernetes node to use.
// If host is provided in the config use it directly.
// If beat is deployed in k8s cluster, use hostname of pod which is pod name to query pod meta for node name.
// If beat is deployed outside k8s cluster, use machine-id to match against k8s nodes for node name.
func DiscoverKubernetesNode(host string, inCluster bool, client *k8s.Client) (node string) {
if host != "" {
logp.Info("kubernetes: Using node %s provided in the config", host)
return host
}

if inCluster {
ns, err := inClusterNamespace()
if err != nil {
logp.Err("kubernetes: Couldn't get namespace when beat is in cluster with error: ", err.Error())
return defaultNode
}
podName, err := os.Hostname()
if err != nil {
logp.Err("kubernetes: Couldn't get hostname as beat pod name in cluster with error: ", err.Error())
return defaultNode
}
logp.Info("kubernetes: Using pod name %s and namespace %s to discover kubernetes node", podName, ns)
pod, err := client.CoreV1().GetPod(context.TODO(), podName, ns)
if err != nil {
logp.Err("kubernetes: Querying for pod failed with error: ", err.Error())
return defaultNode
}
logp.Info("kubernetes: Using node %s discovered by in cluster pod node query", pod.Spec.GetNodeName())
return pod.Spec.GetNodeName()
}

mid := machineID()
if mid == "" {
logp.Err("kubernetes: Couldn't collect info from any of the files in /etc/machine-id /var/lib/dbus/machine-id")
return defaultNode
}

nodes, err := client.CoreV1().ListNodes(context.TODO())
if err != nil {
logp.Err("kubernetes: Querying for nodes failed with error: ", err.Error())
return defaultNode
}
for _, n := range nodes.Items {
if n.GetStatus().GetNodeInfo().GetMachineID() == mid {
logp.Info("kubernetes: Using node %s discovered by machine-id matching", n.GetMetadata().GetName())
return n.GetMetadata().GetName()
}
}

logp.Warn("kubernetes: Couldn't discover node, using localhost as default")
return defaultNode
}

// machineID borrowed from cadvisor.
func machineID() string {
for _, file := range []string{
"/etc/machine-id",
"/var/lib/dbus/machine-id",
} {
id, err := ioutil.ReadFile(file)
if err == nil {
return strings.TrimSpace(string(id))
}
}
return ""
}

return host
// inClusterNamespace gets namespace from serviceaccount when beat is in cluster.
// code borrowed from client-go with some changes.
func inClusterNamespace() (string, error) {
// get namespace associated with the service account token, if available
data, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
if err != nil {
return "", err
}
return strings.TrimSpace(string(data)), nil
}
2 changes: 1 addition & 1 deletion libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func newKubernetesAnnotator(cfg *common.Config) (processors.Processor, error) {
return nil, err
}

config.Host = kubernetes.DiscoverKubernetesNode(config.Host, client)
config.Host = kubernetes.DiscoverKubernetesNode(config.Host, config.InCluster, client)

logp.Debug("kubernetes", "Using host ", config.Host)
logp.Debug("kubernetes", "Initializing watcher")
Expand Down