Skip to content

Commit

Permalink
Injector can define kubernetes clusterDomain (dapr#3164)
Browse files Browse the repository at this point in the history
* injector support define k8s clusterDomain

* use define ContentTypeJSON const

* format address all in a single Sprintf statement

* auto-detect the k8s cluster domain by read resolv.conf file

* make some func private

* wait timeout output podStatus info for debug

* output pod resources and check string is nil

* injector charts add KUBE_CLUSTER_DOMAIN for e2e tests

* add injector config test file

* adjust test for azure env because pod search domain is not cluster.local

* resolve golangci-lint check error

Co-authored-by: Artur Souza <[email protected]>
  • Loading branch information
marviniter and artursouza authored Jun 23, 2021
1 parent a16db69 commit 34bbc96
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ spec:
value: /dapr/cert/tls.crt
- name: TLS_KEY_FILE
value: /dapr/cert/tls.key
{{- if .Values.kubeClusterDomain }}
- name: KUBE_CLUSTER_DOMAIN
value: "{{ .Values.kubeClusterDomain }}"
{{- end }}
- name: SIDECAR_IMAGE
{{- if contains "/" .Values.image.name }}
value: "{{ .Values.image.name }}"
Expand Down
1 change: 1 addition & 0 deletions charts/dapr/charts/dapr_sidecar_injector/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ webhookFailurePolicy: Ignore
sidecarImagePullPolicy: IfNotPresent
runAsNonRoot: true
resources: {}
kubeClusterDomain: cluster.local

debug:
enabled: false
Expand Down
2 changes: 1 addition & 1 deletion cmd/injector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func main() {
log.Infof("starting Dapr Sidecar Injector -- version %s -- commit %s", version.Version(), version.Commit())

ctx := signals.Context()
cfg, err := injector.GetConfigFromEnvironment()
cfg, err := injector.GetConfig()
if err != nil {
log.Fatalf("error getting config: %s", err)
}
Expand Down
28 changes: 24 additions & 4 deletions pkg/injector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@

package injector

import "github.com/kelseyhightower/envconfig"
import (
"github.com/kelseyhightower/envconfig"

"github.com/dapr/dapr/utils"
)

// Config represents configuration options for the Dapr Sidecar Injector webhook server.
type Config struct {
Expand All @@ -14,6 +18,7 @@ type Config struct {
SidecarImage string `envconfig:"SIDECAR_IMAGE" required:"true"`
SidecarImagePullPolicy string `envconfig:"SIDECAR_IMAGE_PULL_POLICY"`
Namespace string `envconfig:"NAMESPACE" required:"true"`
KubeClusterDomain string `envconfig:"KUBE_CLUSTER_DOMAIN"`
}

// NewConfigWithDefaults returns a Config object with default values already
Expand All @@ -25,9 +30,24 @@ func NewConfigWithDefaults() Config {
}
}

// GetConfigFromEnvironment returns configuration derived from environment variables.
func GetConfigFromEnvironment() (Config, error) {
// GetConfig returns configuration derived from environment variables.
func GetConfig() (Config, error) {
// get config from environment variables
c := NewConfigWithDefaults()
err := envconfig.Process("", &c)
return c, err
if err != nil {
return c, err
}

if c.KubeClusterDomain == "" {
// auto-detect KubeClusterDomain from resolv.conf file
clusterDomain, err := utils.GetKubeClusterDomain()
if err != nil {
log.Errorf("failed to get clusterDomain err:%s, set default:%s", err, utils.DefaultKubeClusterDomain)
c.KubeClusterDomain = utils.DefaultKubeClusterDomain
} else {
c.KubeClusterDomain = clusterDomain
}
}
return c, nil
}
48 changes: 48 additions & 0 deletions pkg/injector/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package injector

import (
"os"
"testing"

"github.com/stretchr/testify/assert"
)

func TestGetInectorConfig(t *testing.T) {
t.Run("with kube cluster domain env", func(t *testing.T) {
os.Setenv("TLS_CERT_FILE", "test-cert-file")
os.Setenv("TLS_KEY_FILE", "test-key-file")
os.Setenv("SIDECAR_IMAGE", "daprd-test-image")
os.Setenv("SIDECAR_IMAGE_PULL_POLICY", "Always")
os.Setenv("NAMESPACE", "test-namespace")
os.Setenv("KUBE_CLUSTER_DOMAIN", "cluster.local")
defer os.Clearenv()

cfg, err := GetConfig()
assert.Nil(t, err)
assert.Equal(t, "test-cert-file", cfg.TLSCertFile)
assert.Equal(t, "test-key-file", cfg.TLSKeyFile)
assert.Equal(t, "daprd-test-image", cfg.SidecarImage)
assert.Equal(t, "Always", cfg.SidecarImagePullPolicy)
assert.Equal(t, "test-namespace", cfg.Namespace)
assert.Equal(t, "cluster.local", cfg.KubeClusterDomain)
})

t.Run("not set kube cluster domain env", func(t *testing.T) {
os.Setenv("TLS_CERT_FILE", "test-cert-file")
os.Setenv("TLS_KEY_FILE", "test-key-file")
os.Setenv("SIDECAR_IMAGE", "daprd-test-image")
os.Setenv("SIDECAR_IMAGE_PULL_POLICY", "IfNotPresent")
os.Setenv("NAMESPACE", "test-namespace")
os.Setenv("KUBE_CLUSTER_DOMAIN", "")
defer os.Clearenv()

cfg, err := GetConfig()
assert.Nil(t, err)
assert.Equal(t, "test-cert-file", cfg.TLSCertFile)
assert.Equal(t, "test-key-file", cfg.TLSKeyFile)
assert.Equal(t, "daprd-test-image", cfg.SidecarImage)
assert.Equal(t, "IfNotPresent", cfg.SidecarImagePullPolicy)
assert.Equal(t, "test-namespace", cfg.Namespace)
assert.NotEqual(t, "", cfg.KubeClusterDomain)
})
}
15 changes: 8 additions & 7 deletions pkg/injector/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,10 @@ func AllowedControllersServiceAccountUID(ctx context.Context, kubeClient *kubern
for i, allowedControllersServiceAccount := range allowedControllersServiceAccounts {
saUUID, err := getServiceAccount(ctx, kubeClient, allowedControllersServiceAccount)
// i == 0 => "replicaset-controller" is the only one mandatory
if err != nil && i == 0 {
return nil, err
} else if err != nil {
if err != nil {
if i == 0 {
return nil, err
}
log.Warnf("Unable to get SA %s UID (%s)", allowedControllersServiceAccount, err)
continue
}
Expand Down Expand Up @@ -181,11 +182,11 @@ func (i *injector) handleRequest(w http.ResponseWriter, r *http.Request) {
}

contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
log.Errorf("Content-Type=%s, expect application/json", contentType)
if contentType != runtime.ContentTypeJSON {
log.Errorf("Content-Type=%s, expect %s", contentType, runtime.ContentTypeJSON)
http.Error(
w,
"invalid Content-Type, expect `application/json`",
fmt.Sprintf("invalid Content-Type, expect `%s`", runtime.ContentTypeJSON),
http.StatusUnsupportedMediaType,
)

Expand Down Expand Up @@ -260,7 +261,7 @@ func (i *injector) handleRequest(w http.ResponseWriter, r *http.Request) {
log.Errorf("Sidecar injector failed to inject for app '%s'. Can't deserialize response: %s", diagAppID, err)
monitoring.RecordFailedSidecarInjectionCount(diagAppID, "response")
}
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Type", runtime.ContentTypeJSON)
if _, err := w.Write(respBytes); err != nil {
log.Error(err)
} else {
Expand Down
30 changes: 27 additions & 3 deletions pkg/injector/injector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,33 @@ func TestMaxConcurrency(t *testing.T) {
})
}

func TestKubernetesDNS(t *testing.T) {
dns := getKubernetesDNS("a", "b")
assert.Equal(t, "a.b.svc.cluster.local", dns)
func TestGetServiceAddress(t *testing.T) {
testCases := []struct {
name string
namespace string
clusterDomain string
port int
expect string
}{
{
port: 80,
name: "a",
namespace: "b",
clusterDomain: "cluster.local",
expect: "a.b.svc.cluster.local:80",
},
{
port: 50001,
name: "app",
namespace: "default",
clusterDomain: "selfdefine.domain",
expect: "app.default.svc.selfdefine.domain:50001",
},
}
for _, tc := range testCases {
dns := getServiceAddress(tc.name, tc.namespace, tc.clusterDomain, tc.port)
assert.Equal(t, tc.expect, dns)
}
}

func TestGetMetricsPort(t *testing.T) {
Expand Down
15 changes: 9 additions & 6 deletions pkg/injector/pod_patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ const (
apiAddress = "dapr-api"
placementService = "dapr-placement-server"
sentryService = "dapr-sentry"
apiPort = 80
placementServicePort = 50005
sentryServicePort = 80
sidecarHTTPPortName = "dapr-http"
sidecarGRPCPortName = "dapr-grpc"
sidecarInternalGRPCPortName = "dapr-internal"
Expand Down Expand Up @@ -126,9 +129,9 @@ func (i *injector) getPodPatchOperations(ar *v1.AdmissionReview,
}

// Keep DNS resolution outside of getSidecarContainer for unit testing.
placementAddress := fmt.Sprintf("%s:50005", getKubernetesDNS(placementService, namespace))
sentryAddress := fmt.Sprintf("%s:80", getKubernetesDNS(sentryService, namespace))
apiSrvAddress := fmt.Sprintf("%s:80", getKubernetesDNS(apiAddress, namespace))
placementAddress := getServiceAddress(placementService, namespace, i.config.KubeClusterDomain, placementServicePort)
sentryAddress := getServiceAddress(sentryService, namespace, i.config.KubeClusterDomain, sentryServicePort)
apiSvcAddress := getServiceAddress(apiAddress, namespace, i.config.KubeClusterDomain, apiPort)

var trustAnchors string
var certChain string
Expand All @@ -142,7 +145,7 @@ func (i *injector) getPodPatchOperations(ar *v1.AdmissionReview,
}

tokenMount := getTokenVolumeMount(pod)
sidecarContainer, err := getSidecarContainer(pod.Annotations, id, image, imagePullPolicy, req.Namespace, apiSrvAddress, placementAddress, tokenMount, trustAnchors, certChain, certKey, sentryAddress, mtlsEnabled, identity)
sidecarContainer, err := getSidecarContainer(pod.Annotations, id, image, imagePullPolicy, req.Namespace, apiSvcAddress, placementAddress, tokenMount, trustAnchors, certChain, certKey, sentryAddress, mtlsEnabled, identity)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -459,8 +462,8 @@ func isResourceDaprEnabled(annotations map[string]string) bool {
return getBoolAnnotationOrDefault(annotations, daprEnabledKey, false)
}

func getKubernetesDNS(name, namespace string) string {
return fmt.Sprintf("%s.%s.svc.cluster.local", name, namespace)
func getServiceAddress(name, namespace, clusterDomain string, port int) string {
return fmt.Sprintf("%s.%s.svc.%s:%d", name, namespace, clusterDomain, port)
}

func getPullPolicy(pullPolicy string) corev1.PullPolicy {
Expand Down
18 changes: 17 additions & 1 deletion tests/platforms/kubernetes/appmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,23 @@ func (m *AppManager) WaitUntilDeploymentState(isState func(*appsv1.Deployment, e
})

if waitErr != nil {
return nil, fmt.Errorf("deployment %q is not in desired state, received: %+v: %s", m.app.AppName, lastDeployment, waitErr)
// get deployment's Pods detail status info
podClient := m.client.Pods(m.namespace)
// Filter only 'testapp=appName' labeled Pods
podList, err := podClient.List(context.TODO(), metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", TestAppLabelKey, m.app.AppName),
})
podStatus := map[string][]apiv1.ContainerStatus{}
if err == nil {
for _, pod := range podList.Items {
podStatus[pod.Name] = pod.Status.ContainerStatuses
}
log.Printf("deployment %s relate pods: %+v", m.app.AppName, podList)
} else {
log.Printf("Error list pod for deployment %s. Error was %s", m.app.AppName, err)
}

return nil, fmt.Errorf("deployment %q is not in desired state, received: %+v pod status: %+v error: %s", m.app.AppName, lastDeployment, podStatus, waitErr)
}

return lastDeployment, nil
Expand Down
72 changes: 72 additions & 0 deletions utils/resolvconf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package utils

import (
"bufio"
"bytes"
"io/ioutil"
"regexp"
"sort"
"strings"
)

const (
// DefaultKubeClusterDomain is the default value of KubeClusterDomain.
DefaultKubeClusterDomain = "cluster.local"
defaultResolvPath = "/etc/resolv.conf"
commentMarker = "#"
)

var searchRegexp = regexp.MustCompile(`^\s*search\s*(([^\s]+\s*)*)$`)

// GetKubeClusterDomain search KubeClusterDomain value from /etc/resolv.conf file.
func GetKubeClusterDomain() (string, error) {
resolvContent, err := getResolvContent(defaultResolvPath)
if err != nil {
return "", err
}
return getClusterDomain(resolvContent)
}

func getClusterDomain(resolvConf []byte) (string, error) {
var kubeClusterDomian string
searchDomains := getResolvSearchDomains(resolvConf)
sort.Strings(searchDomains)
if len(searchDomains) == 0 || searchDomains[0] == "" {
kubeClusterDomian = DefaultKubeClusterDomain
} else {
kubeClusterDomian = searchDomains[0]
}
return kubeClusterDomian, nil
}

func getResolvContent(resolvPath string) ([]byte, error) {
return ioutil.ReadFile(resolvPath)
}

func getResolvSearchDomains(resolvConf []byte) []string {
var (
domains []string
lines [][]byte
)

scanner := bufio.NewScanner(bytes.NewReader(resolvConf))
for scanner.Scan() {
line := scanner.Bytes()
commentIndex := bytes.Index(line, []byte(commentMarker))
if commentIndex == -1 {
lines = append(lines, line)
} else {
lines = append(lines, line[:commentIndex])
}
}

for _, line := range lines {
match := searchRegexp.FindSubmatch(line)
if match == nil {
continue
}
domains = strings.Fields(string(match[1]))
}

return domains
}
Loading

0 comments on commit 34bbc96

Please sign in to comment.