From 098faa0aaf45461b61dc38758532a2f2b3f67eac Mon Sep 17 00:00:00 2001 From: Hanamantagoud Date: Fri, 26 Nov 2021 11:33:10 +0530 Subject: [PATCH] Adding Loadbalancer service code --- backends/ipvs-as-sink/clusteripsvc.go | 2 +- backends/ipvs-as-sink/common.go | 76 ++- backends/ipvs-as-sink/helpers.go | 32 +- backends/ipvs-as-sink/ipset.go | 54 +-- backends/ipvs-as-sink/ipvs.go | 22 +- backends/ipvs-as-sink/{lb.go => ipvslb.go} | 5 +- backends/ipvs-as-sink/loadbalancersvc.go | 285 +++++++++++ backends/ipvs-as-sink/nodeportsvc.go | 61 +-- backends/ipvs-as-sink/util/ipset.go | 528 +++++++++++++++++++++ backends/ipvs-as-sink/util/types.go | 59 +++ 10 files changed, 1019 insertions(+), 105 deletions(-) rename backends/ipvs-as-sink/{lb.go => ipvslb.go} (94%) create mode 100644 backends/ipvs-as-sink/loadbalancersvc.go create mode 100644 backends/ipvs-as-sink/util/ipset.go create mode 100644 backends/ipvs-as-sink/util/types.go diff --git a/backends/ipvs-as-sink/clusteripsvc.go b/backends/ipvs-as-sink/clusteripsvc.go index 0c5b2a7cd..24f5441b4 100644 --- a/backends/ipvs-as-sink/clusteripsvc.go +++ b/backends/ipvs-as-sink/clusteripsvc.go @@ -49,7 +49,7 @@ func (s *Backend) handleNewClusterIPService(key string, svc *localnetv1.Service) s.addServiceIPToKubeIPVSIntf(nil, svc) //Cluster service IP is stored in LB tree - s.storeLBSvc(svc.Ports, svc.IPs.All().All(), key, ClusterIPService) + s.storeLBSvc(svc.Ports, svc.IPs.ClusterIPs.All(), key, ClusterIPService) //Cluster service IP needs to be programmed in ipset. s.AddOrDelClusterIPInIPSet(svc, svc.Ports, AddService) diff --git a/backends/ipvs-as-sink/common.go b/backends/ipvs-as-sink/common.go index a62ebbc93..3496cddb0 100644 --- a/backends/ipvs-as-sink/common.go +++ b/backends/ipvs-as-sink/common.go @@ -27,7 +27,7 @@ import ( netutils "k8s.io/utils/net" "sigs.k8s.io/kpng/api/localnetv1" - ipvs "sigs.k8s.io/kpng/backends/ipvs/util" + ipsetutil "sigs.k8s.io/kpng/backends/ipvs-as-sink/util" ) type endPointInfo struct { @@ -70,13 +70,13 @@ func getIPFamily(ipAddr string) v1.IPFamily { return ipAddrFamily } -func getEndPointEntry(endPointIP string, port *localnetv1.PortMapping) *ipvs.Entry { - return &ipvs.Entry{ +func getEndPointEntry(endPointIP string, port *localnetv1.PortMapping) *ipsetutil.Entry { + return &ipsetutil.Entry{ IP: endPointIP, Port: int(port.TargetPort), Protocol: strings.ToLower(port.Protocol.String()), IP2: endPointIP, - SetType: ipvs.HashIPPortIP, + SetType: ipsetutil.HashIPPortIP, } } @@ -94,7 +94,7 @@ func (s *Backend) AddOrDelClusterIPInIPSet(svc *localnetv1.Service, portList []* } ipSetName := clusterIPSetMap[ipFamily] // Capture the clusterIP. - entry := getIPSetEntry(clusterIP, port) + entry := getIPSetEntry(clusterIP, "", port) // add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin. if valid := s.ipsetList[ipSetName].validateEntry(entry); !valid { klog.Errorf("error adding entry :%s, to ipset:%s", entry.String(), s.ipsetList[ipSetName].Name) @@ -126,12 +126,21 @@ func getServiceIPFamily(svc *localnetv1.Service) []v1.IPFamily { return svcIPFamily } -func getIPSetEntry(svcIP string, port *localnetv1.PortMapping) *ipvs.Entry { - return &ipvs.Entry{ +func getIPSetEntry(svcIP,srcAddr string, port *localnetv1.PortMapping) *ipsetutil.Entry { + if srcAddr != "" { + return &ipsetutil.Entry{ + IP: svcIP, + Port: int(port.Port), + Protocol: strings.ToLower(port.Protocol.String()), + SetType: ipsetutil.HashIPPort, + Net: srcAddr, + } + } + return &ipsetutil.Entry{ IP: svcIP, Port: int(port.Port), Protocol: strings.ToLower(port.Protocol.String()), - SetType: ipvs.HashIPPort, + SetType: ipsetutil.HashIPPort, } } @@ -201,3 +210,54 @@ func (s *Backend) deleteLBSvc(portList []*localnetv1.PortMapping, addrList []str } } } + +func (s *Backend) AddOrDelNodePortInIPSet(svc *localnetv1.Service, portList []*localnetv1.PortMapping, op Operation) { + svcIPFamily := getServiceIPFamily(svc) + + for _, port := range portList { + var entries []*ipsetutil.Entry + for _, ipFamily := range svcIPFamily { + protocol := strings.ToLower(port.Protocol.String()) + ipsetName := protocolIPSetMap[protocol][ipFamily] + nodePortSet := s.ipsetList[ipsetName] + switch protocol { + case ipsetutil.ProtocolTCP, ipsetutil.ProtocolUDP: + entries = []*ipsetutil.Entry{getNodePortIPSetEntry(int(port.NodePort), protocol, ipsetutil.BitmapPort)} + + case ipsetutil.ProtocolSCTP: + // Since hash ip:port is used for SCTP, all the nodeIPs to be used in the SCTP ipset entries. + entries = []*ipsetutil.Entry{} + for _, nodeIP := range s.nodeAddresses { + entry := getNodePortIPSetEntry(int(port.NodePort), protocol, ipsetutil.HashIPPort) + entry.IP = nodeIP + entries = append(entries, entry) + } + default: + // It should never hit + klog.Errorf( "Unsupported protocol type %v protocol ", protocol) + } + if nodePortSet != nil { + for _, entry := range entries { + if valid := nodePortSet.validateEntry(entry); !valid { + klog.Errorf( "error adding entry (%v) to ipset (%v)", entry.String(), nodePortSet.Name) + } + if op == AddService { + nodePortSet.newEntries.Insert(entry.String()) + } + if op == DeleteService { + nodePortSet.deleteEntries.Insert(entry.String()) + } + } + } + } + } +} + +func getNodePortIPSetEntry(port int, protocol string, ipSetType ipsetutil.Type) *ipsetutil.Entry { + return &ipsetutil.Entry{ + // No need to provide ip info + Port: port, + Protocol: protocol, + SetType: ipSetType, + } +} \ No newline at end of file diff --git a/backends/ipvs-as-sink/helpers.go b/backends/ipvs-as-sink/helpers.go index 5b17a4611..4cba6dc89 100644 --- a/backends/ipvs-as-sink/helpers.go +++ b/backends/ipvs-as-sink/helpers.go @@ -21,13 +21,14 @@ import ( v1 "k8s.io/api/core/v1" - localnetv1 "sigs.k8s.io/kpng/api/localnetv1" - "sigs.k8s.io/kpng/backends/ipvs/util" + "sigs.k8s.io/kpng/api/localnetv1" + ipsetutil "sigs.k8s.io/kpng/backends/ipvs-as-sink/util" ) const ( ClusterIPService = "ClusterIP" NodePortService = "NodePort" + LoadBalancerService = "LoadBalancer" ) var loopBackIPSetMap = map[v1.IPFamily]string{ @@ -41,20 +42,40 @@ var clusterIPSetMap = map[v1.IPFamily]string{ } var protocolIPSetMap = map[string]map[v1.IPFamily]string{ - ipvs.ProtocolTCP: { + ipsetutil.ProtocolTCP: { v1.IPv4Protocol: kubeNodePortIPv4SetTCP, v1.IPv6Protocol: kubeNodePortIPv6SetTCP, }, - ipvs.ProtocolUDP: { + ipsetutil.ProtocolUDP: { v1.IPv4Protocol: kubeNodePortIPv4SetUDP, v1.IPv6Protocol: kubeNodePortIPv6SetUDP, }, - ipvs.ProtocolSCTP: { + ipsetutil.ProtocolSCTP: { v1.IPv4Protocol: kubeNodePortIPv4SetSCTP, v1.IPv6Protocol: kubeNodePortIPv6SetSCTP, }, } +var loadbalancerIPSetMap = map[v1.IPFamily]string{ + v1.IPv4Protocol: kubeLoadBalancerIPv4Set, + v1.IPv6Protocol: kubeLoadBalancerIPv6Set, +} + +var loadbalancerLocalSetMap = map[v1.IPFamily]string{ + v1.IPv4Protocol: kubeLoadBalancerLocalIPv4Set, + v1.IPv6Protocol: kubeLoadBalancerLocalIPv6Set, +} + +var loadbalancerSourceCIDRSetMap = map[v1.IPFamily]string{ + v1.IPv4Protocol: kubeLoadBalancerSourceCIDRIPv4Set, + v1.IPv6Protocol: kubeLoadBalancerSourceCIDRIPv6Set, +} + +var loadbalancerFWSetMap = map[v1.IPFamily]string{ + v1.IPv4Protocol: kubeLoadbalancerFWIPv4Set, + v1.IPv6Protocol: kubeLoadbalancerFWIPv6Set, +} + type Operation int32 const ( @@ -81,7 +102,6 @@ func epPortSuffix(port *localnetv1.PortMapping) string { return port.Protocol.String() + ":" + strconv.Itoa(int(port.Port)) } -// diffInPortMapping TODO, we should support this logic in the diffstore, this is a temporary workaround. func diffInPortMapping(previous, current *localnetv1.Service) (added, removed []*localnetv1.PortMapping) { for _, p1 := range previous.Ports { found := false diff --git a/backends/ipvs-as-sink/ipset.go b/backends/ipvs-as-sink/ipset.go index 20f4a0903..89849bef0 100644 --- a/backends/ipvs-as-sink/ipset.go +++ b/backends/ipvs-as-sink/ipset.go @@ -23,7 +23,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" - "sigs.k8s.io/kpng/backends/ipvs/util" + ipsetutil "sigs.k8s.io/kpng/backends/ipvs-as-sink/util" ) const ( @@ -95,25 +95,25 @@ const ( // ipsetInfo is all ipset we needed in ipvs proxier var ipsetInfo = []struct { name string - setType ipvs.Type + setType ipsetutil.Type comment string }{ - {kubeLoopBackIPv4Set, ipvs.HashIPPortIP, kubeLoopBackIPSetComment}, - {kubeClusterIPv4Set, ipvs.HashIPPort, kubeClusterIPSetComment}, - {kubeExternalIPv4Set, ipvs.HashIPPort, kubeExternalIPSetComment}, - {kubeExternalIPv4LocalSet, ipvs.HashIPPort, kubeExternalIPLocalSetComment}, - {kubeLoadBalancerIPv4Set, ipvs.HashIPPort, kubeLoadBalancerSetComment}, - {kubeLoadbalancerFWIPv4Set, ipvs.HashIPPort, kubeLoadbalancerFWSetComment}, - {kubeLoadBalancerLocalIPv4Set, ipvs.HashIPPort, kubeLoadBalancerLocalSetComment}, - {kubeLoadBalancerSourceIPv4Set, ipvs.HashIPPortIP, kubeLoadBalancerSourceIPSetComment}, - {kubeLoadBalancerSourceCIDRIPv4Set, ipvs.HashIPPortNet, kubeLoadBalancerSourceCIDRSetComment}, - {kubeNodePortIPv4SetTCP, ipvs.BitmapPort, kubeNodePortSetTCPComment}, - {kubeNodePortLocalIPv4SetTCP, ipvs.BitmapPort, kubeNodePortLocalSetTCPComment}, - {kubeNodePortIPv4SetUDP, ipvs.BitmapPort, kubeNodePortSetUDPComment}, - {kubeNodePortLocalIPv4SetUDP, ipvs.BitmapPort, kubeNodePortLocalSetUDPComment}, - {kubeNodePortIPv4SetSCTP, ipvs.HashIPPort, kubeNodePortSetSCTPComment}, - {kubeNodePortLocalIPv4SetSCTP, ipvs.HashIPPort, kubeNodePortLocalSetSCTPComment}, - {kubeHealthCheckNodePortIPv4Set, ipvs.BitmapPort, kubeHealthCheckNodePortSetComment}, + {kubeLoopBackIPv4Set, ipsetutil.HashIPPortIP, kubeLoopBackIPSetComment}, + {kubeClusterIPv4Set, ipsetutil.HashIPPort, kubeClusterIPSetComment}, + {kubeExternalIPv4Set, ipsetutil.HashIPPort, kubeExternalIPSetComment}, + {kubeExternalIPv4LocalSet, ipsetutil.HashIPPort, kubeExternalIPLocalSetComment}, + {kubeLoadBalancerIPv4Set, ipsetutil.HashIPPort, kubeLoadBalancerSetComment}, + {kubeLoadbalancerFWIPv4Set, ipsetutil.HashIPPort, kubeLoadbalancerFWSetComment}, + {kubeLoadBalancerLocalIPv4Set, ipsetutil.HashIPPort, kubeLoadBalancerLocalSetComment}, + {kubeLoadBalancerSourceIPv4Set, ipsetutil.HashIPPortIP, kubeLoadBalancerSourceIPSetComment}, + {kubeLoadBalancerSourceCIDRIPv4Set, ipsetutil.HashIPPortNet, kubeLoadBalancerSourceCIDRSetComment}, + {kubeNodePortIPv4SetTCP, ipsetutil.BitmapPort, kubeNodePortSetTCPComment}, + {kubeNodePortLocalIPv4SetTCP, ipsetutil.BitmapPort, kubeNodePortLocalSetTCPComment}, + {kubeNodePortIPv4SetUDP, ipsetutil.BitmapPort, kubeNodePortSetUDPComment}, + {kubeNodePortLocalIPv4SetUDP, ipsetutil.BitmapPort, kubeNodePortLocalSetUDPComment}, + {kubeNodePortIPv4SetSCTP, ipsetutil.HashIPPort, kubeNodePortSetSCTPComment}, + {kubeNodePortLocalIPv4SetSCTP, ipsetutil.HashIPPort, kubeNodePortLocalSetSCTPComment}, + {kubeHealthCheckNodePortIPv4Set, ipsetutil.BitmapPort, kubeHealthCheckNodePortSetComment}, } // IPSetVersioner can query the current ipset version. @@ -123,20 +123,20 @@ type IPSetVersioner interface { } type IPSet struct { - ipvs.IPSet + ipsetutil.IPSet // activeEntries is the current active entries of the ipset. newEntries sets.String // activeEntries is the current active entries of the ipset. deleteEntries sets.String // handle is the util ipset interface handle. - handle ipvs.Interface + handle ipsetutil.Interface } // NewIPSet initialize a new IPSet struct -func newIPv4Set(handle ipvs.Interface, name string, setType ipvs.Type, comment string) *IPSet { - hashFamily := ipvs.ProtocolFamilyIPV4 +func newIPv4Set(handle ipsetutil.Interface, name string, setType ipsetutil.Type, comment string) *IPSet { + hashFamily := ipsetutil.ProtocolFamilyIPV4 set := &IPSet{ - IPSet: ipvs.IPSet{ + IPSet: ipsetutil.IPSet{ Name: name, SetType: setType, HashFamily: hashFamily, @@ -150,8 +150,8 @@ func newIPv4Set(handle ipvs.Interface, name string, setType ipvs.Type, comment s } // NewIPSet initialize a new IPSet struct -func newIPv6Set(handle ipvs.Interface, name string, setType ipvs.Type, comment string) *IPSet { - hashFamily := ipvs.ProtocolFamilyIPV6 +func newIPv6Set(handle ipsetutil.Interface, name string, setType ipsetutil.Type, comment string) *IPSet { + hashFamily := ipsetutil.ProtocolFamilyIPV6 // In dual-stack both ipv4 and ipv6 ipset's can co-exist. To // ensure unique names the prefix for ipv6 is changed from // "KUBE-" to "KUBE-6-". The "KUBE-" prefix is kept for @@ -167,7 +167,7 @@ func newIPv6Set(handle ipvs.Interface, name string, setType ipvs.Type, comment s } } set := &IPSet{ - IPSet: ipvs.IPSet{ + IPSet: ipsetutil.IPSet{ Name: name, SetType: setType, HashFamily: hashFamily, @@ -180,7 +180,7 @@ func newIPv6Set(handle ipvs.Interface, name string, setType ipvs.Type, comment s return set } -func (set *IPSet) validateEntry(entry *ipvs.Entry) bool { +func (set *IPSet) validateEntry(entry *ipsetutil.Entry) bool { return entry.Validate(&set.IPSet) } diff --git a/backends/ipvs-as-sink/ipvs.go b/backends/ipvs-as-sink/ipvs.go index 51cc0613f..dfc9c8652 100644 --- a/backends/ipvs-as-sink/ipvs.go +++ b/backends/ipvs-as-sink/ipvs.go @@ -27,7 +27,7 @@ import ( "k8s.io/utils/exec" "sigs.k8s.io/kpng/api/localnetv1" - ipvsutil "sigs.k8s.io/kpng/backends/ipvs/util" + ipsetutil "sigs.k8s.io/kpng/backends/ipvs-as-sink/util" "sigs.k8s.io/kpng/client/backendcmd" "sigs.k8s.io/kpng/client/localsink" "sigs.k8s.io/kpng/client/localsink/decoder" @@ -149,12 +149,12 @@ func (s *Backend) createIPVSDummyInterface() { } func (s *Backend) initializeIPSets() { - var ipsetInterface ipvsutil.Interface + var ipsetInterface ipsetutil.Interface // Create a iptables utils. execer := exec.New() - ipsetInterface = ipvsutil.New(execer) + ipsetInterface = ipsetutil.New(execer) // initialize ipsetList with all sets we needed s.ipsetList = make(map[string]*IPSet) @@ -270,6 +270,10 @@ func (s *Backend) SetService(svc *localnetv1.Service) { if svc.Type == NodePortService { s.handleNodePortService(svc, AddService) } + + if svc.Type == LoadBalancerService { + s.handleLBService(svc, AddService) + } } func (s *Backend) DeleteService(namespace, name string) { @@ -287,6 +291,10 @@ func (s *Backend) DeleteService(namespace, name string) { s.handleNodePortService(svc, DeleteService) } + if svc.Type == LoadBalancerService { + s.handleLBService(svc, DeleteService) + } + for _, ip := range asDummyIPs(svc.IPs.All()) { s.dummyIPsRefCounts[ip]-- } @@ -308,6 +316,10 @@ func (s *Backend) SetEndpoint(namespace, serviceName, key string, endpoint *loca if service.Type == NodePortService { s.SetEndPointForNodePortSvc(svcKey, key, endpoint) } + + if service.Type == LoadBalancerService { + s.SetEndPointForLBSvc(svcKey, key, endpoint) + } } func (s *Backend) DeleteEndpoint(namespace, serviceName, key string) { @@ -323,4 +335,8 @@ func (s *Backend) DeleteEndpoint(namespace, serviceName, key string) { if service.Type == NodePortService { s.DeleteEndPointForNodePortSvc(svcKey, key) } + + if service.Type == LoadBalancerService { + s.DeleteEndPointForLBSvc(svcKey, key) + } } diff --git a/backends/ipvs-as-sink/lb.go b/backends/ipvs-as-sink/ipvslb.go similarity index 94% rename from backends/ipvs-as-sink/lb.go rename to backends/ipvs-as-sink/ipvslb.go index 81d52422e..43a588a49 100644 --- a/backends/ipvs-as-sink/lb.go +++ b/backends/ipvs-as-sink/ipvslb.go @@ -19,7 +19,7 @@ package ipvssink import ( "encoding/json" "net" - localnetv1 "sigs.k8s.io/kpng/api/localnetv1" + "sigs.k8s.io/kpng/api/localnetv1" "syscall" "github.com/google/seesaw/ipvs" @@ -46,6 +46,9 @@ func (lb ipvsLB) ToService() ipvs.Service { if lb.ServiceType == NodePortService { port = uint16(lb.Port.NodePort) } + if lb.ServiceType == LoadBalancerService { + port = uint16(lb.Port.Port) + } s := ipvs.Service{ Address: net.ParseIP(lb.IP), Port: port, diff --git a/backends/ipvs-as-sink/loadbalancersvc.go b/backends/ipvs-as-sink/loadbalancersvc.go new file mode 100644 index 000000000..aa2725310 --- /dev/null +++ b/backends/ipvs-as-sink/loadbalancersvc.go @@ -0,0 +1,285 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipvssink + +import ( + "bytes" + v1 "k8s.io/api/core/v1" + "k8s.io/klog" + "sigs.k8s.io/kpng/api/localnetv1" + ipsetutil "sigs.k8s.io/kpng/backends/ipvs-as-sink/util" +) + +func (s *Backend) handleLBService(svc *localnetv1.Service, op Operation) { + key := svc.Namespace + "/" + svc.Name + + if op == AddService { + isNewService := true + if _, ok := s.svcs[key]; ok { + isNewService = false + } + + if isNewService { + s.handleNewLBService(key, svc) + } else { + s.handleUpdatedLBService(key, svc) + } + } + + if op == DeleteService { + s.AddOrDelClusterIPInIPSet(svc, svc.Ports, DeleteService) + s.AddOrDelLbIPInIPSet(svc, svc.Ports, DeleteService) + s.AddOrDelNodePortInIPSet(svc, svc.Ports, DeleteService) + } +} + +func (s *Backend) handleNewLBService(key string, svc *localnetv1.Service) { + s.svcs[key] = svc + + //ClusterIP and Loadbalancer IP is added to kube-ipvs0 interface + s.addServiceIPToKubeIPVSIntf(nil, svc) + + //For LB service, clusterIP , nodeIPs , loadbalancerIPs need to be programmed + // in IPVS table + s.storeLBSvc(svc.Ports, svc.IPs.ClusterIPs.All(), key, ClusterIPService) + s.storeLBSvc(svc.Ports, svc.IPs.LoadBalancerIPs.All(), key, LoadBalancerService) + s.storeLBSvc(svc.Ports, s.nodeAddresses, key, NodePortService) + + + //For LB service, clusterIP , nodeIPs , loadbalancerIPs need to be programmed + // in ipset. + s.AddOrDelClusterIPInIPSet(svc, svc.Ports, AddService) + s.AddOrDelLbIPInIPSet(svc, svc.Ports, AddService) + s.AddOrDelNodePortInIPSet(svc, svc.Ports, AddService) +} + +func (s *Backend) handleUpdatedLBService(key string, svc *localnetv1.Service) { + // update the svc + prevSvc := s.svcs[key] + s.svcs[key] = svc + + //Updated Cluster IP is added/removed from kube-ipvs0 interface + s.addServiceIPToKubeIPVSIntf(prevSvc, svc) + + // When existing service gets updated with new port/protocol, endpoints + // behind it also needs to be updated into tree, so that they are handled in sync(). + addedPorts, removedPorts := diffInPortMapping(prevSvc, svc) + + if len(addedPorts) > 0 { + //Update the service with added ports into LB tree + s.storeLBSvc(addedPorts, svc.IPs.ClusterIPs.All(), key, ClusterIPService) + s.storeLBSvc(addedPorts, svc.IPs.LoadBalancerIPs.All(), key, LoadBalancerService) + s.storeLBSvc(addedPorts, s.nodeAddresses, key, NodePortService) + + var endPointList []string + var isLocalEndPoint bool + + for _, epKV := range s.endpoints.GetByPrefix([]byte(key + "/")) { + epInfo := epKV.Value.(endPointInfo) + endPointList = append(endPointList, epInfo.endPointIP) + svcIP := getServiceIP(epInfo.endPointIP, svc) + isLocalEndPoint = epInfo.isLocalEndPoint + + for _, port := range addedPorts { + lbKey := key + "/" + svcIP + "/" + epPortSuffix(port) + ipvslb := s.lbs.GetByPrefix([]byte(lbKey)) + + s.dests.Set([]byte(lbKey+"/"+epInfo.endPointIP), 0, ipvsSvcDst{ + Svc: ipvslb[0].Value.(ipvsLB).ToService(), + Dst: ipvsDestination(epInfo.endPointIP, port, s.weight), + }) + } + } + + //Added port info need to updated in clusterIP, NodePort , LB service + // related ipsets , along with endpoint ipset. + s.AddOrDelClusterIPInIPSet(svc, addedPorts, AddService) + s.AddOrDelLbIPInIPSet(svc, addedPorts, AddService) + s.AddOrDelNodePortInIPSet(svc, addedPorts, AddService) + s.AddOrDelEndPointInIPSet(endPointList, addedPorts, isLocalEndPoint, AddEndPoint) + } + + // When existing service gets updated with deletion of port/protocol, + // endpoint behind it needs to be removed from tree. + if len(removedPorts) > 0 { + s.deleteLBSvc(removedPorts, svc.IPs.ClusterIPs.All(), key) + s.deleteLBSvc(removedPorts, svc.IPs.LoadBalancerIPs.All(), key) + s.deleteLBSvc(removedPorts, s.nodeAddresses, key) + + var endPointList []string + var isLocalEndPoint bool + + for _, epKV := range s.endpoints.GetByPrefix([]byte(key + "/")) { + epInfo := epKV.Value.(endPointInfo) + endPointList = append(endPointList, epInfo.endPointIP) + svcIP := getServiceIP(epInfo.endPointIP, svc) + isLocalEndPoint = epInfo.isLocalEndPoint + + for _, port := range removedPorts { + lbKey := key + "/" + svcIP + "/" + epPortSuffix(port) + s.dests.Delete([]byte(lbKey + "/" + epInfo.endPointIP)) + } + } + + //Removed port info need to removed in clusterIP, NodePort , LB service + // related ipsets , along with endpoint ipset. + s.AddOrDelClusterIPInIPSet(svc, removedPorts, DeleteService) + s.AddOrDelLbIPInIPSet(svc, removedPorts, DeleteService) + s.AddOrDelNodePortInIPSet(svc, removedPorts, DeleteService) + s.AddOrDelEndPointInIPSet(endPointList, removedPorts, isLocalEndPoint, DeleteEndPoint) + } +} + +func (s *Backend) SetEndPointForLBSvc(svcKey, key string, endpoint *localnetv1.Endpoint) { + prefix := svcKey + "/" + key + "/" + service := s.svcs[svcKey] + portList := service.Ports + + for _, endPointIP := range endpoint.IPs.All() { + epInfo := endPointInfo{ + endPointIP: endPointIP, + isLocalEndPoint: endpoint.Local, + } + s.endpoints.Set([]byte(prefix+endPointIP), 0, epInfo) + svcIP := getServiceIP(endPointIP, service) + + // add a destination for every LB of this service + for _, lbKV := range s.lbs.GetByPrefix([]byte(svcKey + "/" + svcIP)) { + lb := lbKV.Value.(ipvsLB) + destination := ipvsSvcDst{ + Svc: lb.ToService(), + Dst: ipvsDestination(endPointIP, lb.Port, s.weight), + } + s.dests.Set([]byte(string(lbKV.Key)+"/"+endPointIP), 0, destination) + } + } + + s.AddOrDelEndPointInIPSet(endpoint.IPs.All(), portList, endpoint.Local, AddEndPoint) +} + +func (s *Backend) DeleteEndPointForLBSvc(svcKey, key string) { + prefix := []byte(svcKey + "/" + key + "/") + service := s.svcs[svcKey] + portList := service.Ports + var endPointList []string + var isLocalEndPoint bool + + for _, kv := range s.endpoints.GetByPrefix(prefix) { + epInfo := kv.Value.(endPointInfo) + suffix := []byte("/" + epInfo.endPointIP) + + for _, destKV := range s.dests.GetByPrefix([]byte(svcKey)) { + if bytes.HasSuffix(destKV.Key, suffix) { + s.dests.Delete(destKV.Key) + } + } + endPointList = append(endPointList, epInfo.endPointIP) + isLocalEndPoint = epInfo.isLocalEndPoint + } + + // remove this endpoint from the endpoints + s.endpoints.DeleteByPrefix(prefix) + + s.AddOrDelEndPointInIPSet(endPointList, portList, isLocalEndPoint, DeleteEndPoint) +} + +func (s *Backend) AddOrDelLbIPInIPSet(svc *localnetv1.Service, portList []*localnetv1.PortMapping, op Operation) { + svcIPFamily := getLBServiceIPFamily(svc) + var ipSetName string + var entry *ipsetutil.Entry + + for _, port := range portList { + for _, ipFamily := range svcIPFamily { + var lbIP string + if ipFamily == v1.IPv4Protocol { + lbIP = svc.IPs.LoadBalancerIPs.V4[0] + } + if ipFamily == v1.IPv6Protocol { + lbIP = svc.IPs.LoadBalancerIPs.V6[0] + } + + entry = getIPSetEntry(lbIP, "",port) + // add service load balancer ingressIP:Port to kubeServiceAccess ip set for the purpose of solving hairpin. + // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) + // If we are proxying globally, we need to masquerade in case we cross nodes. + // If we are proxying only locally, we can retain the source IP. + ipSetName = loadbalancerIPSetMap[ipFamily] + s.setKubeLBIPSet(ipSetName, entry, op) + + if svc.ExternalTrafficToLocal { + //insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local + ipSetName = loadbalancerLocalSetMap[ipFamily] + s.setKubeLBIPSet(ipSetName, entry, op) + } + + var isSourceRangeConfigured bool = false + if len(svc.IPFilters) > 0 { + for _, ip := range svc.IPFilters { + if len (ip.SourceRanges) > 0 { + isSourceRangeConfigured = true + } + for _, srcIP := range ip.SourceRanges { + srcRangeEntry := getIPSetEntry(lbIP, srcIP, port) + ipSetName = loadbalancerSourceCIDRSetMap[ipFamily] + s.setKubeLBIPSet(ipSetName, srcRangeEntry, op) + } + } + + // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. + // This currently works for loadbalancers that preserves source ips. + // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. + if isSourceRangeConfigured { + ipSetName = loadbalancerFWSetMap[ipFamily] + s.setKubeLBIPSet(ipSetName, entry, op) + } + } + + } + } +} + +func (s *Backend) setKubeLBIPSet(ipSetName string, entry *ipsetutil.Entry, op Operation) { + if valid := s.ipsetList[ipSetName].validateEntry(entry); !valid { + klog.Errorf("error adding entry :%s, to ipset:%s", entry.String(), s.ipsetList[ipSetName].Name) + return + } + if op == AddService { + s.ipsetList[ipSetName].newEntries.Insert(entry.String()) + } + if op == DeleteService { + s.ipsetList[ipSetName].deleteEntries.Insert(entry.String()) + } +} + +func getLBServiceIPFamily(svc *localnetv1.Service) []v1.IPFamily { + var svcIPFamily []v1.IPFamily + if svc.IPs.LoadBalancerIPs == nil { + return svcIPFamily + } + if len(svc.IPs.LoadBalancerIPs.V4) > 0 && len(svc.IPs.LoadBalancerIPs.V6) == 0 { + svcIPFamily = append(svcIPFamily, v1.IPv4Protocol) + } + + if len(svc.IPs.LoadBalancerIPs.V6) > 0 && len(svc.IPs.LoadBalancerIPs.V4) == 0 { + svcIPFamily = append(svcIPFamily, v1.IPv6Protocol) + } + + if len(svc.IPs.LoadBalancerIPs.V4) > 0 && len(svc.IPs.LoadBalancerIPs.V6) > 0 { + svcIPFamily = append(svcIPFamily, v1.IPv4Protocol, v1.IPv6Protocol) + } + return svcIPFamily +} \ No newline at end of file diff --git a/backends/ipvs-as-sink/nodeportsvc.go b/backends/ipvs-as-sink/nodeportsvc.go index c4a4326c8..ce71f0369 100644 --- a/backends/ipvs-as-sink/nodeportsvc.go +++ b/backends/ipvs-as-sink/nodeportsvc.go @@ -18,12 +18,7 @@ package ipvssink import ( "bytes" - "strings" - - "k8s.io/klog/v2" - - localnetv1 "sigs.k8s.io/kpng/api/localnetv1" - "sigs.k8s.io/kpng/backends/ipvs/util" + "sigs.k8s.io/kpng/api/localnetv1" ) func (s *Backend) handleNodePortService(svc *localnetv1.Service, op Operation) { @@ -46,7 +41,6 @@ func (s *Backend) handleNodePortService(svc *localnetv1.Service, op Operation) { portList := svc.Ports s.AddOrDelNodePortInIPSet(svc, portList, DeleteService) - s.AddOrDelClusterIPInIPSet(svc, svc.Ports, DeleteService) } } @@ -62,7 +56,7 @@ func (s *Backend) handleNewNodePortService(key string, svc *localnetv1.Service) //NodePort svc clusterIPs need to be added as ClusterIPService //so that in sync(), port is attached to clusterIP. - s.storeLBSvc(svc.Ports, svc.IPs.All().All(), key, ClusterIPService) + s.storeLBSvc(svc.Ports, svc.IPs.ClusterIPs.All(), key, ClusterIPService) portList := svc.Ports s.AddOrDelNodePortInIPSet(svc, portList, AddService) @@ -132,57 +126,6 @@ func (s *Backend) handleUpdatedNodePortService(svckey string, svc *localnetv1.Se } } -func (s *Backend) AddOrDelNodePortInIPSet(svc *localnetv1.Service, portList []*localnetv1.PortMapping, op Operation) { - svcIPFamily := getServiceIPFamily(svc) - - for _, port := range portList { - var entries []*ipvs.Entry - for _, ipFamily := range svcIPFamily { - protocol := strings.ToLower(port.Protocol.String()) - ipsetName := protocolIPSetMap[protocol][ipFamily] - nodePortSet := s.ipsetList[ipsetName] - switch protocol { - case ipvs.ProtocolTCP, ipvs.ProtocolUDP: - entries = []*ipvs.Entry{getNodePortIPSetEntry(int(port.NodePort), protocol, ipvs.BitmapPort)} - - case ipvs.ProtocolSCTP: - // Since hash ip:port is used for SCTP, all the nodeIPs to be used in the SCTP ipset entries. - entries = []*ipvs.Entry{} - for _, nodeIP := range s.nodeAddresses { - entry := getNodePortIPSetEntry(int(port.NodePort), protocol, ipvs.HashIPPort) - entry.IP = nodeIP - entries = append(entries, entry) - } - default: - // It should never hit - klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol) - } - if nodePortSet != nil { - for _, entry := range entries { - if valid := nodePortSet.validateEntry(entry); !valid { - klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", nodePortSet.Name) - } - if op == AddService { - nodePortSet.newEntries.Insert(entry.String()) - } - if op == DeleteService { - nodePortSet.deleteEntries.Insert(entry.String()) - } - } - } - } - } -} - -func getNodePortIPSetEntry(port int, protocol string, ipSetType ipvs.Type) *ipvs.Entry { - return &ipvs.Entry{ - // No need to provide ip info - Port: port, - Protocol: protocol, - SetType: ipSetType, - } -} - func (s *Backend) SetEndPointForNodePortSvc(svcKey, key string, endpoint *localnetv1.Endpoint) { prefix := svcKey + "/" + key + "/" service := s.svcs[svcKey] diff --git a/backends/ipvs-as-sink/util/ipset.go b/backends/ipvs-as-sink/util/ipset.go new file mode 100644 index 000000000..552f0651e --- /dev/null +++ b/backends/ipvs-as-sink/util/ipset.go @@ -0,0 +1,528 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "bytes" + "fmt" + "net" + "regexp" + "strconv" + "strings" + + "k8s.io/klog/v2" + utilexec "k8s.io/utils/exec" +) + +// Interface is an injectable interface for running ipset commands. Implementations must be goroutine-safe. +type Interface interface { + // FlushSet deletes all entries from a named set. + FlushSet(set string) error + // DestroySet deletes a named set. + DestroySet(set string) error + // DestroyAllSets deletes all sets. + DestroyAllSets() error + // CreateSet creates a new set. It will ignore error when the set already exists if ignoreExistErr=true. + CreateSet(set *IPSet, ignoreExistErr bool) error + // AddEntry adds a new entry to the named set. It will ignore error when the entry already exists if ignoreExistErr=true. + AddEntry(entry string, set *IPSet, ignoreExistErr bool) error + // DelEntry deletes one entry from the named set + DelEntry(entry string, set string) error + // Test test if an entry exists in the named set + TestEntry(entry string, set string) (bool, error) + // ListEntries lists all the entries from a named set + ListEntries(set string) ([]string, error) + // ListSets list all set names from kernel + ListSets() ([]string, error) + // GetVersion returns the "X.Y" version string for ipset. + GetVersion() (string, error) +} + +// IPSetCmd represents the ipset util. We use ipset command for ipset execute. +const IPSetCmd = "ipset" + +// EntryMemberPattern is the regular expression pattern of ipset member list. +// The raw output of ipset command `ipset list {set}` is similar to, +//Name: foobar +//Type: hash:ip,port +//Revision: 2 +//Header: family inet hashsize 1024 maxelem 65536 +//Size in memory: 16592 +//References: 0 +//Members: +//192.168.1.2,tcp:8080 +//192.168.1.1,udp:53 +var EntryMemberPattern = "(?m)^(.*\n)*Members:\n" + +// VersionPattern is the regular expression pattern of ipset version string. +// ipset version output is similar to "v6.10". +var VersionPattern = "v[0-9]+\\.[0-9]+" + +// IPSet implements an Interface to a set. +type IPSet struct { + // Name is the set name. + Name string + // SetType specifies the ipset type. + SetType Type + // HashFamily specifies the protocol family of the IP addresses to be stored in the set. + // The default is inet, i.e IPv4. If users want to use IPv6, they should specify inet6. + HashFamily string + // HashSize specifies the hash table size of ipset. + HashSize int + // MaxElem specifies the max element number of ipset. + MaxElem int + // PortRange specifies the port range of bitmap:port type ipset. + PortRange string + // comment message for ipset + Comment string +} + +// Validate checks if a given ipset is valid or not. +func (set *IPSet) Validate() bool { + // Check if protocol is valid for `HashIPPort`, `HashIPPortIP` and `HashIPPortNet` type set. + if set.SetType == HashIPPort || set.SetType == HashIPPortIP || set.SetType == HashIPPortNet { + if valid := validateHashFamily(set.HashFamily); !valid { + return false + } + } + // check set type + if valid := validateIPSetType(set.SetType); !valid { + return false + } + // check port range for bitmap type set + if set.SetType == BitmapPort { + if valid := validatePortRange(set.PortRange); !valid { + return false + } + } + // check hash size value of ipset + if set.HashSize <= 0 { + klog.Errorf("Invalid hashsize value %d, should be >0", set.HashSize) + return false + } + // check max elem value of ipset + if set.MaxElem <= 0 { + klog.Errorf("Invalid maxelem value %d, should be >0", set.MaxElem) + return false + } + + return true +} + +//setIPSetDefaults sets some IPSet fields if not present to their default values. +func (set *IPSet) setIPSetDefaults() { + // Setting default values if not present + if set.HashSize == 0 { + set.HashSize = 1024 + } + if set.MaxElem == 0 { + set.MaxElem = 65536 + } + // Default protocol is IPv4 + if set.HashFamily == "" { + set.HashFamily = ProtocolFamilyIPV4 + } + // Default ipset type is "hash:ip,port" + if len(set.SetType) == 0 { + set.SetType = HashIPPort + } + if len(set.PortRange) == 0 { + set.PortRange = DefaultPortRange + } +} + +// Entry represents a ipset entry. +type Entry struct { + // IP is the entry's IP. The IP address protocol corresponds to the HashFamily of IPSet. + // All entries' IP addresses in the same ip set has same the protocol, IPv4 or IPv6. + IP string + // Port is the entry's Port. + Port int + // Protocol is the entry's Protocol. The protocols of entries in the same ip set are all + // the same. The accepted protocols are TCP, UDP and SCTP. + Protocol string + // Net is the entry's IP network address. Network address with zero prefix size can NOT + // be stored. + Net string + // IP2 is the entry's second IP. IP2 may not be empty for `hash:ip,port,ip` type ip set. + IP2 string + // SetType is the type of ipset where the entry exists. + SetType Type +} + +// Validate checks if a given ipset entry is valid or not. The set parameter is the ipset that entry belongs to. +func (e *Entry) Validate(set *IPSet) bool { + if e.Port < 0 { + klog.Errorf("Entry %v port number %d should be >=0 for ipset %v", e, e.Port, set) + return false + } + switch e.SetType { + case HashIPPort: + //check if IP and Protocol of Entry is valid. + if valid := e.checkIPandProtocol(set); !valid { + return false + } + case HashIPPortIP: + //check if IP and Protocol of Entry is valid. + if valid := e.checkIPandProtocol(set); !valid { + return false + } + + // IP2 can not be empty for `hash:ip,port,ip` type ip set + if net.ParseIP(e.IP2) == nil { + klog.Errorf("Error parsing entry %v second ip address %v for ipset %v", e, e.IP2, set) + return false + } + case HashIPPortNet: + //check if IP and Protocol of Entry is valid. + if valid := e.checkIPandProtocol(set); !valid { + return false + } + + // Net can not be empty for `hash:ip,port,net` type ip set + if _, ipNet, err := net.ParseCIDR(e.Net); ipNet == nil { + klog.Errorf("Error parsing entry %v ip net %v for ipset %v, error: %v", e, e.Net, set, err) + return false + } + case BitmapPort: + // check if port number satisfies its ipset's requirement of port range + if set == nil { + klog.Errorf("Unable to reference ip set where the entry %v exists", e) + return false + } + begin, end, err := parsePortRange(set.PortRange) + if err != nil { + klog.Errorf("Failed to parse set %v port range %s for ipset %v, error: %v", set, set.PortRange, set, err) + return false + } + if e.Port < begin || e.Port > end { + klog.Errorf("Entry %v port number %d is not in the port range %s of its ipset %v", e, e.Port, set.PortRange, set) + return false + } + } + + return true +} + +// String returns the string format for ipset entry. +func (e *Entry) String() string { + switch e.SetType { + case HashIPPort: + // Entry{192.168.1.1, udp, 53} -> 192.168.1.1,udp:53 + // Entry{192.168.1.2, tcp, 8080} -> 192.168.1.2,tcp:8080 + return fmt.Sprintf("%s,%s:%s", e.IP, e.Protocol, strconv.Itoa(e.Port)) + case HashIPPortIP: + // Entry{192.168.1.1, udp, 53, 10.0.0.1} -> 192.168.1.1,udp:53,10.0.0.1 + // Entry{192.168.1.2, tcp, 8080, 192.168.1.2} -> 192.168.1.2,tcp:8080,192.168.1.2 + return fmt.Sprintf("%s,%s:%s,%s", e.IP, e.Protocol, strconv.Itoa(e.Port), e.IP2) + case HashIPPortNet: + // Entry{192.168.1.2, udp, 80, 10.0.1.0/24} -> 192.168.1.2,udp:80,10.0.1.0/24 + // Entry{192.168.2,25, tcp, 8080, 10.1.0.0/16} -> 192.168.2,25,tcp:8080,10.1.0.0/16 + return fmt.Sprintf("%s,%s:%s,%s", e.IP, e.Protocol, strconv.Itoa(e.Port), e.Net) + case BitmapPort: + // Entry{53} -> 53 + // Entry{8080} -> 8080 + return strconv.Itoa(e.Port) + } + return "" +} + +// checkIPandProtocol checks if IP and Protocol of Entry is valid. +func (e *Entry) checkIPandProtocol(set *IPSet) bool { + // set default protocol to tcp if empty + if len(e.Protocol) == 0 { + e.Protocol = ProtocolTCP + } else if !validateProtocol(e.Protocol) { + return false + } + + if net.ParseIP(e.IP) == nil { + klog.Errorf("Error parsing entry %v ip address %v for ipset %v", e, e.IP, set) + return false + } + + return true +} + +type runner struct { + exec utilexec.Interface +} + +// New returns a new Interface which will exec ipset. +func New(exec utilexec.Interface) Interface { + return &runner{ + exec: exec, + } +} + +// CreateSet creates a new set, it will ignore error when the set already exists if ignoreExistErr=true. +func (runner *runner) CreateSet(set *IPSet, ignoreExistErr bool) error { + // sets some IPSet fields if not present to their default values. + set.setIPSetDefaults() + + // Validate ipset before creating + valid := set.Validate() + if !valid { + return fmt.Errorf("error creating ipset since it's invalid") + } + return runner.createSet(set, ignoreExistErr) +} + +// If ignoreExistErr is set to true, then the -exist option of ipset will be specified, ipset ignores the error +// otherwise raised when the same set (setname and create parameters are identical) already exists. +func (runner *runner) createSet(set *IPSet, ignoreExistErr bool) error { + args := []string{"create", set.Name, string(set.SetType)} + if set.SetType == HashIPPortIP || set.SetType == HashIPPort || set.SetType == HashIPPortNet { + args = append(args, + "family", set.HashFamily, + "hashsize", strconv.Itoa(set.HashSize), + "maxelem", strconv.Itoa(set.MaxElem), + ) + } + if set.SetType == BitmapPort { + args = append(args, "range", set.PortRange) + } + if ignoreExistErr { + args = append(args, "-exist") + } + if _, err := runner.exec.Command(IPSetCmd, args...).CombinedOutput(); err != nil { + return fmt.Errorf("error creating ipset %s, error: %v", set.Name, err) + } + return nil +} + +// AddEntry adds a new entry to the named set. +// If the -exist option is specified, ipset ignores the error otherwise raised when +// the same set (setname and create parameters are identical) already exists. +func (runner *runner) AddEntry(entry string, set *IPSet, ignoreExistErr bool) error { + args := []string{"add", set.Name, entry} + if ignoreExistErr { + args = append(args, "-exist") + } + if _, err := runner.exec.Command(IPSetCmd, args...).CombinedOutput(); err != nil { + return fmt.Errorf("error adding entry %s, error: %v", entry, err) + } + return nil +} + +// DelEntry is used to delete the specified entry from the set. +func (runner *runner) DelEntry(entry string, set string) error { + if _, err := runner.exec.Command(IPSetCmd, "del", set, entry).CombinedOutput(); err != nil { + return fmt.Errorf("error deleting entry %s: from set: %s, error: %v", entry, set, err) + } + return nil +} + +// TestEntry is used to check whether the specified entry is in the set or not. +func (runner *runner) TestEntry(entry string, set string) (bool, error) { + if out, err := runner.exec.Command(IPSetCmd, "test", set, entry).CombinedOutput(); err == nil { + reg, e := regexp.Compile("is NOT in set " + set) + if e == nil && reg.MatchString(string(out)) { + return false, nil + } else if e == nil { + return true, nil + } else { + return false, fmt.Errorf("error testing entry: %s, error: %v", entry, e) + } + } else { + return false, fmt.Errorf("error testing entry %s: %v (%s)", entry, err, out) + } +} + +// FlushSet deletes all entries from a named set. +func (runner *runner) FlushSet(set string) error { + if _, err := runner.exec.Command(IPSetCmd, "flush", set).CombinedOutput(); err != nil { + return fmt.Errorf("error flushing set: %s, error: %v", set, err) + } + return nil +} + +// DestroySet is used to destroy a named set. +func (runner *runner) DestroySet(set string) error { + if out, err := runner.exec.Command(IPSetCmd, "destroy", set).CombinedOutput(); err != nil { + return fmt.Errorf("error destroying set %s, error: %v(%s)", set, err, out) + } + return nil +} + +// DestroyAllSets is used to destroy all sets. +func (runner *runner) DestroyAllSets() error { + if _, err := runner.exec.Command(IPSetCmd, "destroy").CombinedOutput(); err != nil { + return fmt.Errorf("error destroying all sets, error: %v", err) + } + return nil +} + +// ListSets list all set names from kernel +func (runner *runner) ListSets() ([]string, error) { + out, err := runner.exec.Command(IPSetCmd, "list", "-n").CombinedOutput() + if err != nil { + return nil, fmt.Errorf("error listing all sets, error: %v", err) + } + return strings.Split(string(out), "\n"), nil +} + +// ListEntries lists all the entries from a named set. +func (runner *runner) ListEntries(set string) ([]string, error) { + if len(set) == 0 { + return nil, fmt.Errorf("set name can't be nil") + } + out, err := runner.exec.Command(IPSetCmd, "list", set).CombinedOutput() + if err != nil { + return nil, fmt.Errorf("error listing set: %s, error: %v", set, err) + } + memberMatcher := regexp.MustCompile(EntryMemberPattern) + list := memberMatcher.ReplaceAllString(string(out[:]), "") + strs := strings.Split(list, "\n") + results := make([]string, 0) + for i := range strs { + if len(strs[i]) > 0 { + results = append(results, strs[i]) + } + } + return results, nil +} + +// GetVersion returns the version string. +func (runner *runner) GetVersion() (string, error) { + return getIPSetVersionString(runner.exec) +} + +// getIPSetVersionString runs "ipset --version" to get the version string +// in the form of "X.Y", i.e "6.19" +func getIPSetVersionString(exec utilexec.Interface) (string, error) { + cmd := exec.Command(IPSetCmd, "--version") + cmd.SetStdin(bytes.NewReader([]byte{})) + bytes, err := cmd.CombinedOutput() + if err != nil { + return "", err + } + versionMatcher := regexp.MustCompile(VersionPattern) + match := versionMatcher.FindStringSubmatch(string(bytes)) + if match == nil { + return "", fmt.Errorf("no ipset version found in string: %s", bytes) + } + return match[0], nil +} + +// checks if port range is valid. The begin port number is not necessarily less than +// end port number - ipset util can accept it. It means both 1-100 and 100-1 are valid. +func validatePortRange(portRange string) bool { + strs := strings.Split(portRange, "-") + if len(strs) != 2 { + klog.Errorf("port range should be in the format of `a-b`") + return false + } + for i := range strs { + num, err := strconv.Atoi(strs[i]) + if err != nil { + klog.Errorf("Failed to parse %s, error: %v", strs[i], err) + return false + } + if num < 0 { + klog.Errorf("port number %d should be >=0", num) + return false + } + } + return true +} + +// checks if the given ipset type is valid. +func validateIPSetType(set Type) bool { + for _, valid := range ValidIPSetTypes { + if set == valid { + return true + } + } + klog.Errorf("Currently supported ipset types are: %v, %s is not supported", ValidIPSetTypes, set) + return false +} + +// checks if given hash family is supported in ipset +func validateHashFamily(family string) bool { + if family == ProtocolFamilyIPV4 || family == ProtocolFamilyIPV6 { + return true + } + klog.Errorf("Currently supported ip set hash families are: [%s, %s], %s is not supported", ProtocolFamilyIPV4, ProtocolFamilyIPV6, family) + return false +} + +// IsNotFoundError returns true if the error indicates "not found". It parses +// the error string looking for known values, which is imperfect but works in +// practice. +func IsNotFoundError(err error) bool { + es := err.Error() + if strings.Contains(es, "does not exist") { + // set with the same name already exists + // xref: https://github.com/Olipro/ipset/blob/master/lib/errcode.c#L32-L33 + return true + } + if strings.Contains(es, "element is missing") { + // entry is missing from the set + // xref: https://github.com/Olipro/ipset/blob/master/lib/parse.c#L1904 + // https://github.com/Olipro/ipset/blob/master/lib/parse.c#L1925 + return true + } + return false +} + +// checks if given protocol is supported in entry +func validateProtocol(protocol string) bool { + if protocol == ProtocolTCP || protocol == ProtocolUDP || protocol == ProtocolSCTP { + return true + } + klog.Errorf("Invalid entry's protocol: %s, supported protocols are [%s, %s, %s]", protocol, ProtocolTCP, ProtocolUDP, ProtocolSCTP) + return false +} + +// parsePortRange parse the begin and end port from a raw string(format: a-b). beginPort <= endPort +// in the return value. +func parsePortRange(portRange string) (beginPort int, endPort int, err error) { + if len(portRange) == 0 { + portRange = DefaultPortRange + } + + strs := strings.Split(portRange, "-") + if len(strs) != 2 { + // port number -1 indicates invalid + return -1, -1, fmt.Errorf("port range should be in the format of `a-b`") + } + for i := range strs { + num, err := strconv.Atoi(strs[i]) + if err != nil { + // port number -1 indicates invalid + return -1, -1, err + } + if num < 0 { + // port number -1 indicates invalid + return -1, -1, fmt.Errorf("port number %d should be >=0", num) + } + if i == 0 { + beginPort = num + continue + } + endPort = num + // switch when first port number > second port number + if beginPort > endPort { + endPort = beginPort + beginPort = num + } + } + return beginPort, endPort, nil +} + +var _ = Interface(&runner{}) diff --git a/backends/ipvs-as-sink/util/types.go b/backends/ipvs-as-sink/util/types.go new file mode 100644 index 000000000..a8d6ca8ee --- /dev/null +++ b/backends/ipvs-as-sink/util/types.go @@ -0,0 +1,59 @@ +/* +Copyright 2021 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +// Type represents the ipset type +type Type string + +const ( + // HashIPPort represents the `hash:ip,port` type ipset. The hash:ip,port is similar to hash:ip but + // you can store IP address and protocol-port pairs in it. TCP, SCTP, UDP, UDPLITE, ICMP and ICMPv6 are supported + // with port numbers/ICMP(v6) types and other protocol numbers without port information. + HashIPPort Type = "hash:ip,port" + // HashIPPortIP represents the `hash:ip,port,ip` type ipset. The hash:ip,port,ip set type uses a hash to store + // IP address, port number and a second IP address triples. The port number is interpreted together with a + // protocol (default TCP) and zero protocol number cannot be used. + HashIPPortIP Type = "hash:ip,port,ip" + // HashIPPortNet represents the `hash:ip,port,net` type ipset. The hash:ip,port,net set type uses a hash to store IP address, port number and IP network address triples. The port + // number is interpreted together with a protocol (default TCP) and zero protocol number cannot be used. Network address + // with zero prefix size cannot be stored either. + HashIPPortNet Type = "hash:ip,port,net" + // BitmapPort represents the `bitmap:port` type ipset. The bitmap:port set type uses a memory range, where each bit + // represents one TCP/UDP port. A bitmap:port type of set can store up to 65535 ports. + BitmapPort Type = "bitmap:port" +) + +// DefaultPortRange defines the default bitmap:port valid port range. +const DefaultPortRange string = "0-65535" + +const ( + // ProtocolFamilyIPV4 represents IPv4 protocol. + ProtocolFamilyIPV4 = "inet" + // ProtocolFamilyIPV6 represents IPv6 protocol. + ProtocolFamilyIPV6 = "inet6" + // ProtocolTCP represents TCP protocol. + ProtocolTCP = "tcp" + // ProtocolUDP represents UDP protocol. + ProtocolUDP = "udp" + // ProtocolSCTP represents SCTP protocol. + ProtocolSCTP = "sctp" +) + +// ValidIPSetTypes defines the supported ip set type. +var ValidIPSetTypes = []Type{ + HashIPPort, + HashIPPortIP, + BitmapPort, + HashIPPortNet, +} \ No newline at end of file