-
Notifications
You must be signed in to change notification settings - Fork 123
/
Copy pathk8s_discover.go
212 lines (181 loc) · 6.57 KB
/
k8s_discover.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
// Package k8s provides pod discovery for Kubernetes.
package k8s
import (
"context"
"fmt"
"log"
"path/filepath"
"strconv"
"github.com/hashicorp/go-multierror"
"github.com/mitchellh/go-homedir"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
// Register all known auth mechanisms since we might be authenticating
// from anywhere.
_ "k8s.io/client-go/plugin/pkg/client/auth"
)
const (
// AnnotationKeyPort is the annotation name of the field that specifies
// the port name or number to append to the address.
AnnotationKeyPort = "consul.hashicorp.com/auto-join-port"
)
type Provider struct{}
func (p *Provider) Help() string {
return `Kubernetes (K8S):
provider: "k8s"
kubeconfig: Path to the kubeconfig file.
namespace: Namespace to search for pods (defaults to "default").
label_selector: Label selector value to filter pods.
field_selector: Field selector value to filter pods.
host_network: "true" if pod host IP and ports should be used.
The kubeconfig file value will be searched in the following locations:
1. Use path from "kubeconfig" option if provided.
2. Use path from KUBECONFIG environment variable.
3. Use default path of $HOME/.kube/config
By default, the Pod IP is used to join. The "host_network" option may
be set to use the Host IP. No port is used by default. Pods may set
an annotation 'hashicorp/consul-auto-join-port' to a named port or
an integer value. If the value matches a named port, that port will
be used to join.
Note that if "host_network" is set to true, then only pods that have
a HostIP available will be selected. If a port annotation exists, then
the port must be exposed via a HostPort as well, otherwise the pod will
be ignored.
`
}
func (p *Provider) Addrs(args map[string]string, l *log.Logger) ([]string, error) {
if args["provider"] != "k8s" {
return nil, fmt.Errorf("discover-k8s: invalid provider " + args["provider"])
}
// Get the configuration. This can come from multiple sources. We first
// try kubeconfig it is set directly, then we fall back to in-cluster
// auth. Finally, we try the default kubeconfig path.
kubeconfig := args["kubeconfig"]
if kubeconfig == "" {
// If kubeconfig is empty, let's first try the default directory.
// This is must faster than trying in-cluster auth so we try this
// first.
dir, err := homedir.Dir()
if err != nil {
return nil, fmt.Errorf("discover-k8s: error retrieving home directory: %s", err)
}
kubeconfig = filepath.Join(dir, ".kube", "config")
}
// First try to get the configuration from the kubeconfig value
config, configErr := clientcmd.BuildConfigFromFlags("", kubeconfig)
if configErr != nil {
configErr = fmt.Errorf("discover-k8s: error loading kubeconfig: %s", configErr)
// kubeconfig failed, fall back and try in-cluster config. We do
// this as the fallback since this makes network connections and
// is much slower to fail.
var err error
config, err = rest.InClusterConfig()
if err != nil {
return nil, multierror.Append(configErr, fmt.Errorf(
"discover-k8s: error loading in-cluster config: %s", err))
}
}
// Initialize the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("discover-k8s: error initializing k8s client: %s", err)
}
namespace := args["namespace"]
if namespace == "" {
namespace = "default"
}
// List all the pods based on the filters we requested
pods, err := clientset.CoreV1().Pods(namespace).List(
context.Background(),
metav1.ListOptions{
LabelSelector: args["label_selector"],
FieldSelector: args["field_selector"],
})
if err != nil {
return nil, fmt.Errorf("discover-k8s: error listing pods: %s", err)
}
return PodAddrs(pods, args, l)
}
// PodAddrs extracts the addresses from a list of pods.
//
// This is a separate method so that we can unit test this without having
// to setup complicated K8S cluster scenarios. It shouldn't generally be
// called externally.
func PodAddrs(pods *corev1.PodList, args map[string]string, l *log.Logger) ([]string, error) {
hostNetwork := false
if v := args["host_network"]; v != "" {
var err error
hostNetwork, err = strconv.ParseBool(v)
if err != nil {
return nil, fmt.Errorf("discover-k8s: host_network must be boolean value: %s", err)
}
}
var addrs []string
PodLoop:
for _, pod := range pods.Items {
if pod.Status.Phase != corev1.PodRunning {
l.Printf("[DEBUG] discover-k8s: ignoring pod %q, not running: %q",
pod.Name, pod.Status.Phase)
continue
}
// If there is a Ready condition available, we need that to be true.
// If no ready condition is set, then we accept this pod regardless.
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodReady && condition.Status != corev1.ConditionTrue {
l.Printf("[DEBUG] discover-k8s: ignoring pod %q, not ready state", pod.Name)
continue PodLoop
}
}
// Get the IP address that we will join.
addr := pod.Status.PodIP
if hostNetwork {
addr = pod.Status.HostIP
}
if addr == "" {
// This can be empty according to the API docs, so we protect that.
l.Printf("[DEBUG] discover-k8s: ignoring pod %q, requested IP is empty", pod.Name)
continue
}
// We only use the port if it is specified as an annotation. The
// annotation value can be a name or a number.
if v := pod.Annotations[AnnotationKeyPort]; v != "" {
port, err := podPort(&pod, v, hostNetwork)
if err != nil {
l.Printf("[DEBUG] discover-k8s: ignoring pod %q, error retrieving port: %s",
pod.Name, err)
continue
}
addr = fmt.Sprintf("%s:%d", addr, port)
}
addrs = append(addrs, addr)
}
return addrs, nil
}
// podPort extracts the proper port for the address from the given pod
// for a non-empty annotation.
//
// Pre-condition: annotation is non-empty
func podPort(pod *corev1.Pod, annotation string, host bool) (int32, error) {
// First look for a matching port matching the value of the annotation.
for _, container := range pod.Spec.Containers {
for _, portDef := range container.Ports {
if portDef.Name == annotation {
if host {
// It is possible for HostPort to be zero, if that is the
// case then we ignore this port.
if portDef.HostPort == 0 {
continue
}
return portDef.HostPort, nil
}
return portDef.ContainerPort, nil
}
}
}
// Otherwise assume that the port is a numeric value.
v, err := strconv.ParseInt(annotation, 0, 32)
return int32(v), err
}