Skip to content

Commit

Permalink
update cniManager grpc to listen on port instead of domain socket
Browse files Browse the repository at this point in the history
  • Loading branch information
jwtty committed Dec 13, 2023
1 parent 9069d81 commit d78ee90
Show file tree
Hide file tree
Showing 16 changed files with 104 additions and 164 deletions.
4 changes: 2 additions & 2 deletions cmd/kube-egress-cni/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func cmdAdd(args *skel.CmdArgs) error {
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
d := net.Dialer{}
return d.DialContext(ctx, "unix", addr)
return d.DialContext(ctx, "tcp", addr)
}),
)
if err != nil {
Expand Down Expand Up @@ -211,7 +211,7 @@ func cmdDel(args *skel.CmdArgs) error {
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
d := net.Dialer{}
return d.DialContext(ctx, "unix", addr)
return d.DialContext(ctx, "tcp", addr)
}))
if err != nil {
return err
Expand Down
135 changes: 58 additions & 77 deletions cmd/kube-egress-cni/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import (
)

const (
ifName = "eth0"
ifName = "eth0"
testAddr = ":50051"
)

func createCmdArgs(targetNS ns.NetNS) *skel.CmdArgs {
conf := `{"cniVersion":"1.0.0","excludedCIDRs":["1.2.3.4/32","10.1.0.0/16"],"socketPath":"/tmp/cni_grpc_test.sock","gatewayName":"test","ipam":{"type":"static","addresses":[{"address":"fe80::5/64"},{"address":"10.4.0.5/24"}]},"name":"mynet","type":"kube-egress-cni","prevResult":{"cniVersion":"1.0.0","interfaces":[{"name":"eth0","sandbox":"somepath"}],"ips":[{"interface":0,"address":"10.2.0.1/24"}],"dns":{}}}`
conf := `{"cniVersion":"1.0.0","excludedCIDRs":["1.2.3.4/32","10.1.0.0/16"],"socketPath":"localhost:50051","gatewayName":"test","ipam":{"type":"static","addresses":[{"address":"fe80::5/64"},{"address":"10.4.0.5/24"}]},"name":"mynet","type":"kube-egress-cni","prevResult":{"cniVersion":"1.0.0","interfaces":[{"name":"eth0","sandbox":"somepath"}],"ips":[{"interface":0,"address":"10.2.0.1/24"}],"dns":{}}}`
return &skel.CmdArgs{
Args: `IgnoreUnknown=true;K8S_POD_NAMESPACE=testns;K8S_POD_NAME=testpod`,
ContainerID: "test-container",
Expand All @@ -34,15 +35,13 @@ func createCmdArgs(targetNS ns.NetNS) *skel.CmdArgs {
}
}

var _ = Describe("Test kube-egress-cni-ipam operations", func() {
var originalNS, targetNS ns.NetNS
var _ = Describe("Test kube-egress-cni operations", func() {
var targetNS ns.NetNS
var args *skel.CmdArgs
var ipv4Net, ipv6Net *net.IPNet

BeforeEach(func() {
var err error
originalNS, err = testutils.NewNS()
Expect(err).NotTo(HaveOccurred())
targetNS, err = testutils.NewNS()
Expect(err).NotTo(HaveOccurred())
args = createCmdArgs(targetNS)
Expand All @@ -55,35 +54,29 @@ var _ = Describe("Test kube-egress-cni-ipam operations", func() {

AfterEach(func() {
os.Setenv("IS_UNIT_TEST_ENV", "")
Expect(originalNS.Close()).To(Succeed())
Expect(testutils.UnmountNS(originalNS)).To(Succeed())
Expect(targetNS.Close()).To(Succeed())
Expect(testutils.UnmountNS(targetNS)).To(Succeed())
})

It("should do nothing and print prevResult when pod does not use any staticGatewayConfiguration", func() {
grpcTestServer, err := cniprotocol.StartTestServer("/tmp/cni_grpc_test.sock", nil, nil)
grpcTestServer, err := cniprotocol.StartTestServer(testAddr, nil, nil)
Expect(err).NotTo(HaveOccurred())
Expect(grpcTestServer).NotTo(BeNil())
defer grpcTestServer.GracefulStop()
err = originalNS.Do(func(ns.NetNS) error {
defer GinkgoRecover()
r, _, err := testutils.CmdAddWithArgs(args, func() error {
return cmdAdd(args)
})
Expect(err).NotTo(HaveOccurred())
msg := <-grpcTestServer.Received
req, ok := msg.(*cniprotocol.PodRetrieveRequest)
Expect(ok).To(BeTrue())
Expect(req.GetPodConfig().GetPodNamespace()).To(Equal("testns"))
Expect(req.GetPodConfig().GetPodName()).To(Equal("testpod"))
resultType, err := r.GetAsVersion(type100.ImplementedSpecVersion)
Expect(err).NotTo(HaveOccurred())
result := resultType.(*type100.Result)
Expect(len(result.Interfaces)).To(Equal(1))
return nil

r, _, err := testutils.CmdAddWithArgs(args, func() error {
return cmdAdd(args)
})
Expect(err).NotTo(HaveOccurred())
msg := <-grpcTestServer.Received
req, ok := msg.(*cniprotocol.PodRetrieveRequest)
Expect(ok).To(BeTrue())
Expect(req.GetPodConfig().GetPodNamespace()).To(Equal("testns"))
Expect(req.GetPodConfig().GetPodName()).To(Equal("testpod"))
resultType, err := r.GetAsVersion(type100.ImplementedSpecVersion)
Expect(err).NotTo(HaveOccurred())
result := resultType.(*type100.Result)
Expect(len(result.Interfaces)).To(Equal(1))
})

It("should configure pod namespace as expected in cmdAdd", func() {
Expand All @@ -98,75 +91,63 @@ var _ = Describe("Test kube-egress-cni-ipam operations", func() {
})
Expect(err).NotTo(HaveOccurred())

grpcTestServer, err := cniprotocol.StartTestServer("/tmp/cni_grpc_test.sock", nil, map[string]string{consts.CNIGatewayAnnotationKey: "test-sgw"})
grpcTestServer, err := cniprotocol.StartTestServer(testAddr, nil, map[string]string{consts.CNIGatewayAnnotationKey: "test-sgw"})
Expect(err).NotTo(HaveOccurred())
Expect(grpcTestServer).NotTo(BeNil())
defer grpcTestServer.GracefulStop()
err = originalNS.Do(func(ns.NetNS) error {
defer GinkgoRecover()
r, _, err := testutils.CmdAddWithArgs(args, func() error {
origCNIPath := os.Getenv("CNI_PATH")
os.Setenv("CNI_PATH", "./testdata") // contains static ipam plugin
defer os.Setenv("CNI_PATH", origCNIPath)
return cmdAdd(args)
})
Expect(err).NotTo(HaveOccurred())
resultType, err := r.GetAsVersion(type100.ImplementedSpecVersion)
Expect(err).NotTo(HaveOccurred())
result := resultType.(*type100.Result)
Expect(len(result.Interfaces)).To(Equal(2))
Expect(len(result.IPs)).To(Equal(2))

msg := <-grpcTestServer.Received
req1, ok := msg.(*cniprotocol.PodRetrieveRequest)
Expect(ok).To(BeTrue())
Expect(req1.GetPodConfig().GetPodNamespace()).To(Equal("testns"))
Expect(req1.GetPodConfig().GetPodName()).To(Equal("testpod"))
msg = <-grpcTestServer.Received
req2, ok := msg.(*cniprotocol.NicAddRequest)
Expect(ok).To(BeTrue())
Expect(req2.GetPodConfig().GetPodNamespace()).To(Equal("testns"))
Expect(req2.GetPodConfig().GetPodName()).To(Equal("testpod"))
Expect(req2.GetGatewayName()).To(Equal("test-sgw"))
Expect(req2.GetAllowedIp()).To(Equal("10.4.0.5/32"))
return nil

r, _, err := testutils.CmdAddWithArgs(args, func() error {
origCNIPath := os.Getenv("CNI_PATH")
os.Setenv("CNI_PATH", "./testdata") // contains static ipam plugin
defer os.Setenv("CNI_PATH", origCNIPath)
return cmdAdd(args)
})
Expect(err).NotTo(HaveOccurred())
resultType, err := r.GetAsVersion(type100.ImplementedSpecVersion)
Expect(err).NotTo(HaveOccurred())
result := resultType.(*type100.Result)
Expect(len(result.Interfaces)).To(Equal(2))
Expect(len(result.IPs)).To(Equal(2))

msg := <-grpcTestServer.Received
req1, ok := msg.(*cniprotocol.PodRetrieveRequest)
Expect(ok).To(BeTrue())
Expect(req1.GetPodConfig().GetPodNamespace()).To(Equal("testns"))
Expect(req1.GetPodConfig().GetPodName()).To(Equal("testpod"))
msg = <-grpcTestServer.Received
req2, ok := msg.(*cniprotocol.NicAddRequest)
Expect(ok).To(BeTrue())
Expect(req2.GetPodConfig().GetPodNamespace()).To(Equal("testns"))
Expect(req2.GetPodConfig().GetPodName()).To(Equal("testpod"))
Expect(req2.GetGatewayName()).To(Equal("test-sgw"))
Expect(req2.GetAllowedIp()).To(Equal("10.4.0.5/32"))

})

It("should not report error in cmdDel", func() {
grpcTestServer, err := cniprotocol.StartTestServer("/tmp/cni_grpc_test.sock", nil, map[string]string{consts.CNIGatewayAnnotationKey: "test-sgw"})
grpcTestServer, err := cniprotocol.StartTestServer(testAddr, nil, map[string]string{consts.CNIGatewayAnnotationKey: "test-sgw"})
Expect(err).NotTo(HaveOccurred())
Expect(grpcTestServer).NotTo(BeNil())
defer grpcTestServer.GracefulStop()
err = originalNS.Do(func(ns.NetNS) error {
defer GinkgoRecover()
err := testutils.CmdDelWithArgs(args, func() error {
origCNIPath := os.Getenv("CNI_PATH")
os.Setenv("CNI_PATH", "./testdata") // contains static ipam plugin
defer os.Setenv("CNI_PATH", origCNIPath)
return cmdDel(args)
})
Expect(err).NotTo(HaveOccurred())

msg := <-grpcTestServer.Received
req, ok := msg.(*cniprotocol.NicDelRequest)
Expect(ok).To(BeTrue())
Expect(req.GetPodConfig().GetPodNamespace()).To(Equal("testns"))
Expect(req.GetPodConfig().GetPodName()).To(Equal("testpod"))
return nil
err = testutils.CmdDelWithArgs(args, func() error {
origCNIPath := os.Getenv("CNI_PATH")
os.Setenv("CNI_PATH", "./testdata") // contains static ipam plugin
defer os.Setenv("CNI_PATH", origCNIPath)
return cmdDel(args)
})
Expect(err).NotTo(HaveOccurred())

msg := <-grpcTestServer.Received
req, ok := msg.(*cniprotocol.NicDelRequest)
Expect(ok).To(BeTrue())
Expect(req.GetPodConfig().GetPodNamespace()).To(Equal("testns"))
Expect(req.GetPodConfig().GetPodName()).To(Equal("testpod"))
})

It("should not report error in cmdCheck", func() {
err := originalNS.Do(func(ns.NetNS) error {
defer GinkgoRecover()
err := testutils.CmdCheckWithArgs(args, func() error {
return cmdCheck(args)
})
Expect(err).NotTo(HaveOccurred())
return nil
err := testutils.CmdCheckWithArgs(args, func() error {
return cmdCheck(args)
})
Expect(err).NotTo(HaveOccurred())
})
Expand Down
11 changes: 4 additions & 7 deletions cmd/kube-egress-gateway-cnimanager/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ import (
"github.com/Azure/kube-egress-gateway/pkg/logger"
)

const (
protocol = "unix"
sockAddr = consts.CNISocketPath
)

// serveCmd represents the serve command
var serveCmd = &cobra.Command{
Use: "serve",
Expand All @@ -57,6 +52,7 @@ var (
confFileName string
exceptionCidrs string
cniUninstallConfigMapName string
grpcPort int
)

func init() {
Expand All @@ -71,6 +67,7 @@ func init() {
// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
// serveCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
serveCmd.Flags().IntVar(&grpcPort, "grpc-server-port", 50051, "The port the grpc server listens on.")
serveCmd.Flags().StringVar(&exceptionCidrs, "exception-cidrs", "", "Cidrs that should bypass egress gateway separated with ',', e.g. intra-cluster traffic")
serveCmd.Flags().StringVar(&confFileName, "cni-conf-file", "01-egressgateway.conflist", "Name of the new cni configuration file")
serveCmd.Flags().StringVar(&cniUninstallConfigMapName, "cni-uninstall-configmap-name", "cni-uninstall", "Name of the configmap that indicates whether to uninstall cni plugin or not, the configMap should be in the same namespace as the cniManager pod")
Expand All @@ -87,7 +84,7 @@ func ServiceLauncher(cmd *cobra.Command, args []string) {

k8sClient := startKubeClient(ctx, logger)

cniConfMgr, err := cniconf.NewCNIConfManager(consts.CNIConfDir, confFileName, exceptionCidrs, cniUninstallConfigMapName, k8sClient)
cniConfMgr, err := cniconf.NewCNIConfManager(consts.CNIConfDir, confFileName, exceptionCidrs, cniUninstallConfigMapName, k8sClient, grpcPort)
if err != nil {
logger.Error(err, "failed to create cni config manager")
os.Exit(1)
Expand Down Expand Up @@ -122,7 +119,7 @@ func ServiceLauncher(cmd *cobra.Command, args []string) {

cniprotocol.RegisterNicServiceServer(server, nicSvc)
var listener net.Listener
listener, err = net.Listen(protocol, sockAddr)
listener, err = net.Listen("tcp", fmt.Sprintf(":%d", grpcPort))
if err != nil {
logger.Error(err, "failed to listen")
os.Exit(1)
Expand Down
34 changes: 8 additions & 26 deletions config/cnimanager/daemon/cnimanager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ spec:
- /kube-egress-gateway-cnimanager
args:
- serve
- --grpc-server-port=50051
- --exception-cidrs=$(EXCEPTION_CIDRS)
- --cni-conf-file=01-egressgateway.conflist
- --cni-uninstall-configmap-name=cni-uninstall
Expand All @@ -51,27 +52,17 @@ spec:
volumeMounts:
- mountPath: /etc/cni/net.d
name: cni-conf
- mountPath: /proc
name: hostpath-proc
mountPropagation: Bidirectional
- mountPath: /run
name: hostpath-run
mountPropagation: Bidirectional
- mountPath: /var
name: hostpath-var
mountPropagation: Bidirectional
ports:
- containerPort: 50051
name: grpc
livenessProbe:
exec:
command:
- /usr/local/bin/grpc_health_probe
- -addr=unix:///var/run/egressgateway.sock
grpc:
port: 50051
initialDelaySeconds: 20
periodSeconds: 5
readinessProbe:
exec:
command:
- /usr/local/bin/grpc_health_probe
- -addr=unix:///var/run/egressgateway.sock
grpc:
port: 50051
initialDelaySeconds: 20
periodSeconds: 5
# TODO(user): Configure the resources accordingly based on the project requirements.
Expand All @@ -89,15 +80,6 @@ spec:
serviceAccountName: cni-manager
terminationGracePeriodSeconds: 60 # update to 60 seconds for cni uninstall retry on error
volumes:
- name: hostpath-proc
hostPath:
path: /proc
- name: hostpath-run
hostPath:
path: /run
- name: hostpath-var
hostPath:
path: /var
- name: cni-bin
hostPath:
path: /opt/cni/bin/
Expand Down
7 changes: 0 additions & 7 deletions docker/cnimanager.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
# syntax=docker/dockerfile:1
FROM --platform=$BUILDPLATFORM golang:1.21 as builder
ARG GRPC_HEALTH_PROBE_VERSION=v0.4.23
ARG TARGETARCH
WORKDIR /workspace
RUN wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-${TARGETARCH} && chmod +x /bin/grpc_health_probe

FROM gcr.io/distroless/static:latest
ARG MAIN_ENTRY
COPY --from=baseimg /${MAIN_ENTRY} /
COPY --from=builder /bin/grpc_health_probe /usr/local/bin/grpc_health_probe
ENTRYPOINT [/${MAIN_ENTRY}]
1 change: 0 additions & 1 deletion docker/docker-bake.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ target "cnimanager" {
platforms = [PLATFORMS]
args = {
MAIN_ENTRY = "kube-egress-gateway-cnimanager",
GRPC_HEALTH_PROBE_VERSION = "v0.4.14"
}
}

Expand Down
2 changes: 1 addition & 1 deletion hack/test_linux.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ PKG=${PKG:-$(go list ./... | xargs echo)}
for t in ${PKG}; do
if [[ "${pkg_need_root[*]}" == *"${t}"* ]];
then
bash -c "export XDG_RUNTIME_DIR=/tmp/cni-rootless; unshare -rmn go test -cover ${t} -args -test.gocoverdir=${PWD}/testcoverage"
bash -c "export XDG_RUNTIME_DIR=/tmp/cni-rootless; unshare -rmn bash -c 'ip link set lo up; go test -cover ${t} -args -test.gocoverdir=${PWD}/testcoverage'"
elif [[ "${t}" != *"e2e"* ]];
then
go test -cover ${t} -args -test.gocoverdir="${PWD}/testcoverage"
Expand Down
1 change: 1 addition & 0 deletions helm/kube-egress-gateway/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ Additionally, `common.gatewayLbProbePort` defines the gateway LoadBalancer probe
| `gatewayCNIManager.imageName` | `kube-egress-gateway-cni` | Name of gatewayCNIManager image. |
| `gatewayCNIManager.imageTag` | | Tag of gatewayCNIManager image. |
| `gatewayCNIManager.imagePullPolicy` | `IfNotPresent` | Image pull policy for gatewayCNIManager's image. |
| `gatewayCNIManager.grpcServerPort` | `50051` | Port which cniManager grpc server listens on. Also used for cniManager pod liveness and readiness probes. |
| `gatewayCNIManager.exceptionCidrs` | `[""]` | A list of cidrs that should be exempted from all egress gateways, e.g. intra-cluster traffic. |
| `gatewayCNIManager.cniConfigFileName` | `01-egressgateway.conflist` | Name of the newly generated cni configuration list file. |
| `gatewayCNIManager.cniUninstallConfigMapName` | `cni-uninstall` | Name of the configMap indicating whether cni plugin needs to be uninstalled upon gatewayCNIManager pod shutdown. |
Expand Down
Loading

0 comments on commit d78ee90

Please sign in to comment.