Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

Commit

Permalink
Merge pull request #126 from Nordix/lb-svc
Browse files Browse the repository at this point in the history
Adding Loadbalancer service code
  • Loading branch information
k8s-ci-robot authored Dec 1, 2021
2 parents 66d7741 + 098faa0 commit d80ff13
Show file tree
Hide file tree
Showing 10 changed files with 1,019 additions and 105 deletions.
2 changes: 1 addition & 1 deletion backends/ipvs-as-sink/clusteripsvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *Backend) handleNewClusterIPService(key string, svc *localnetv1.Service)
s.addServiceIPToKubeIPVSIntf(nil, svc)

//Cluster service IP is stored in LB tree
s.storeLBSvc(svc.Ports, svc.IPs.All().All(), key, ClusterIPService)
s.storeLBSvc(svc.Ports, svc.IPs.ClusterIPs.All(), key, ClusterIPService)

//Cluster service IP needs to be programmed in ipset.
s.AddOrDelClusterIPInIPSet(svc, svc.Ports, AddService)
Expand Down
76 changes: 68 additions & 8 deletions backends/ipvs-as-sink/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
netutils "k8s.io/utils/net"

"sigs.k8s.io/kpng/api/localnetv1"
ipvs "sigs.k8s.io/kpng/backends/ipvs/util"
ipsetutil "sigs.k8s.io/kpng/backends/ipvs-as-sink/util"
)

type endPointInfo struct {
Expand Down Expand Up @@ -70,13 +70,13 @@ func getIPFamily(ipAddr string) v1.IPFamily {
return ipAddrFamily
}

func getEndPointEntry(endPointIP string, port *localnetv1.PortMapping) *ipvs.Entry {
return &ipvs.Entry{
func getEndPointEntry(endPointIP string, port *localnetv1.PortMapping) *ipsetutil.Entry {
return &ipsetutil.Entry{
IP: endPointIP,
Port: int(port.TargetPort),
Protocol: strings.ToLower(port.Protocol.String()),
IP2: endPointIP,
SetType: ipvs.HashIPPortIP,
SetType: ipsetutil.HashIPPortIP,
}
}

Expand All @@ -94,7 +94,7 @@ func (s *Backend) AddOrDelClusterIPInIPSet(svc *localnetv1.Service, portList []*
}
ipSetName := clusterIPSetMap[ipFamily]
// Capture the clusterIP.
entry := getIPSetEntry(clusterIP, port)
entry := getIPSetEntry(clusterIP, "", port)
// add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
if valid := s.ipsetList[ipSetName].validateEntry(entry); !valid {
klog.Errorf("error adding entry :%s, to ipset:%s", entry.String(), s.ipsetList[ipSetName].Name)
Expand Down Expand Up @@ -126,12 +126,21 @@ func getServiceIPFamily(svc *localnetv1.Service) []v1.IPFamily {
return svcIPFamily
}

func getIPSetEntry(svcIP string, port *localnetv1.PortMapping) *ipvs.Entry {
return &ipvs.Entry{
func getIPSetEntry(svcIP,srcAddr string, port *localnetv1.PortMapping) *ipsetutil.Entry {
if srcAddr != "" {
return &ipsetutil.Entry{
IP: svcIP,
Port: int(port.Port),
Protocol: strings.ToLower(port.Protocol.String()),
SetType: ipsetutil.HashIPPort,
Net: srcAddr,
}
}
return &ipsetutil.Entry{
IP: svcIP,
Port: int(port.Port),
Protocol: strings.ToLower(port.Protocol.String()),
SetType: ipvs.HashIPPort,
SetType: ipsetutil.HashIPPort,
}
}

Expand Down Expand Up @@ -201,3 +210,54 @@ func (s *Backend) deleteLBSvc(portList []*localnetv1.PortMapping, addrList []str
}
}
}

func (s *Backend) AddOrDelNodePortInIPSet(svc *localnetv1.Service, portList []*localnetv1.PortMapping, op Operation) {
svcIPFamily := getServiceIPFamily(svc)

for _, port := range portList {
var entries []*ipsetutil.Entry
for _, ipFamily := range svcIPFamily {
protocol := strings.ToLower(port.Protocol.String())
ipsetName := protocolIPSetMap[protocol][ipFamily]
nodePortSet := s.ipsetList[ipsetName]
switch protocol {
case ipsetutil.ProtocolTCP, ipsetutil.ProtocolUDP:
entries = []*ipsetutil.Entry{getNodePortIPSetEntry(int(port.NodePort), protocol, ipsetutil.BitmapPort)}

case ipsetutil.ProtocolSCTP:
// Since hash ip:port is used for SCTP, all the nodeIPs to be used in the SCTP ipset entries.
entries = []*ipsetutil.Entry{}
for _, nodeIP := range s.nodeAddresses {
entry := getNodePortIPSetEntry(int(port.NodePort), protocol, ipsetutil.HashIPPort)
entry.IP = nodeIP
entries = append(entries, entry)
}
default:
// It should never hit
klog.Errorf( "Unsupported protocol type %v protocol ", protocol)
}
if nodePortSet != nil {
for _, entry := range entries {
if valid := nodePortSet.validateEntry(entry); !valid {
klog.Errorf( "error adding entry (%v) to ipset (%v)", entry.String(), nodePortSet.Name)
}
if op == AddService {
nodePortSet.newEntries.Insert(entry.String())
}
if op == DeleteService {
nodePortSet.deleteEntries.Insert(entry.String())
}
}
}
}
}
}

func getNodePortIPSetEntry(port int, protocol string, ipSetType ipsetutil.Type) *ipsetutil.Entry {
return &ipsetutil.Entry{
// No need to provide ip info
Port: port,
Protocol: protocol,
SetType: ipSetType,
}
}
32 changes: 26 additions & 6 deletions backends/ipvs-as-sink/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import (

v1 "k8s.io/api/core/v1"

localnetv1 "sigs.k8s.io/kpng/api/localnetv1"
"sigs.k8s.io/kpng/backends/ipvs/util"
"sigs.k8s.io/kpng/api/localnetv1"
ipsetutil "sigs.k8s.io/kpng/backends/ipvs-as-sink/util"
)

const (
ClusterIPService = "ClusterIP"
NodePortService = "NodePort"
LoadBalancerService = "LoadBalancer"
)

var loopBackIPSetMap = map[v1.IPFamily]string{
Expand All @@ -41,20 +42,40 @@ var clusterIPSetMap = map[v1.IPFamily]string{
}

var protocolIPSetMap = map[string]map[v1.IPFamily]string{
ipvs.ProtocolTCP: {
ipsetutil.ProtocolTCP: {
v1.IPv4Protocol: kubeNodePortIPv4SetTCP,
v1.IPv6Protocol: kubeNodePortIPv6SetTCP,
},
ipvs.ProtocolUDP: {
ipsetutil.ProtocolUDP: {
v1.IPv4Protocol: kubeNodePortIPv4SetUDP,
v1.IPv6Protocol: kubeNodePortIPv6SetUDP,
},
ipvs.ProtocolSCTP: {
ipsetutil.ProtocolSCTP: {
v1.IPv4Protocol: kubeNodePortIPv4SetSCTP,
v1.IPv6Protocol: kubeNodePortIPv6SetSCTP,
},
}

var loadbalancerIPSetMap = map[v1.IPFamily]string{
v1.IPv4Protocol: kubeLoadBalancerIPv4Set,
v1.IPv6Protocol: kubeLoadBalancerIPv6Set,
}

var loadbalancerLocalSetMap = map[v1.IPFamily]string{
v1.IPv4Protocol: kubeLoadBalancerLocalIPv4Set,
v1.IPv6Protocol: kubeLoadBalancerLocalIPv6Set,
}

var loadbalancerSourceCIDRSetMap = map[v1.IPFamily]string{
v1.IPv4Protocol: kubeLoadBalancerSourceCIDRIPv4Set,
v1.IPv6Protocol: kubeLoadBalancerSourceCIDRIPv6Set,
}

var loadbalancerFWSetMap = map[v1.IPFamily]string{
v1.IPv4Protocol: kubeLoadbalancerFWIPv4Set,
v1.IPv6Protocol: kubeLoadbalancerFWIPv6Set,
}

type Operation int32

const (
Expand All @@ -81,7 +102,6 @@ func epPortSuffix(port *localnetv1.PortMapping) string {
return port.Protocol.String() + ":" + strconv.Itoa(int(port.Port))
}

// diffInPortMapping TODO, we should support this logic in the diffstore, this is a temporary workaround.
func diffInPortMapping(previous, current *localnetv1.Service) (added, removed []*localnetv1.PortMapping) {
for _, p1 := range previous.Ports {
found := false
Expand Down
54 changes: 27 additions & 27 deletions backends/ipvs-as-sink/ipset.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"

"sigs.k8s.io/kpng/backends/ipvs/util"
ipsetutil "sigs.k8s.io/kpng/backends/ipvs-as-sink/util"
)

const (
Expand Down Expand Up @@ -95,25 +95,25 @@ const (
// ipsetInfo is all ipset we needed in ipvs proxier
var ipsetInfo = []struct {
name string
setType ipvs.Type
setType ipsetutil.Type
comment string
}{
{kubeLoopBackIPv4Set, ipvs.HashIPPortIP, kubeLoopBackIPSetComment},
{kubeClusterIPv4Set, ipvs.HashIPPort, kubeClusterIPSetComment},
{kubeExternalIPv4Set, ipvs.HashIPPort, kubeExternalIPSetComment},
{kubeExternalIPv4LocalSet, ipvs.HashIPPort, kubeExternalIPLocalSetComment},
{kubeLoadBalancerIPv4Set, ipvs.HashIPPort, kubeLoadBalancerSetComment},
{kubeLoadbalancerFWIPv4Set, ipvs.HashIPPort, kubeLoadbalancerFWSetComment},
{kubeLoadBalancerLocalIPv4Set, ipvs.HashIPPort, kubeLoadBalancerLocalSetComment},
{kubeLoadBalancerSourceIPv4Set, ipvs.HashIPPortIP, kubeLoadBalancerSourceIPSetComment},
{kubeLoadBalancerSourceCIDRIPv4Set, ipvs.HashIPPortNet, kubeLoadBalancerSourceCIDRSetComment},
{kubeNodePortIPv4SetTCP, ipvs.BitmapPort, kubeNodePortSetTCPComment},
{kubeNodePortLocalIPv4SetTCP, ipvs.BitmapPort, kubeNodePortLocalSetTCPComment},
{kubeNodePortIPv4SetUDP, ipvs.BitmapPort, kubeNodePortSetUDPComment},
{kubeNodePortLocalIPv4SetUDP, ipvs.BitmapPort, kubeNodePortLocalSetUDPComment},
{kubeNodePortIPv4SetSCTP, ipvs.HashIPPort, kubeNodePortSetSCTPComment},
{kubeNodePortLocalIPv4SetSCTP, ipvs.HashIPPort, kubeNodePortLocalSetSCTPComment},
{kubeHealthCheckNodePortIPv4Set, ipvs.BitmapPort, kubeHealthCheckNodePortSetComment},
{kubeLoopBackIPv4Set, ipsetutil.HashIPPortIP, kubeLoopBackIPSetComment},
{kubeClusterIPv4Set, ipsetutil.HashIPPort, kubeClusterIPSetComment},
{kubeExternalIPv4Set, ipsetutil.HashIPPort, kubeExternalIPSetComment},
{kubeExternalIPv4LocalSet, ipsetutil.HashIPPort, kubeExternalIPLocalSetComment},
{kubeLoadBalancerIPv4Set, ipsetutil.HashIPPort, kubeLoadBalancerSetComment},
{kubeLoadbalancerFWIPv4Set, ipsetutil.HashIPPort, kubeLoadbalancerFWSetComment},
{kubeLoadBalancerLocalIPv4Set, ipsetutil.HashIPPort, kubeLoadBalancerLocalSetComment},
{kubeLoadBalancerSourceIPv4Set, ipsetutil.HashIPPortIP, kubeLoadBalancerSourceIPSetComment},
{kubeLoadBalancerSourceCIDRIPv4Set, ipsetutil.HashIPPortNet, kubeLoadBalancerSourceCIDRSetComment},
{kubeNodePortIPv4SetTCP, ipsetutil.BitmapPort, kubeNodePortSetTCPComment},
{kubeNodePortLocalIPv4SetTCP, ipsetutil.BitmapPort, kubeNodePortLocalSetTCPComment},
{kubeNodePortIPv4SetUDP, ipsetutil.BitmapPort, kubeNodePortSetUDPComment},
{kubeNodePortLocalIPv4SetUDP, ipsetutil.BitmapPort, kubeNodePortLocalSetUDPComment},
{kubeNodePortIPv4SetSCTP, ipsetutil.HashIPPort, kubeNodePortSetSCTPComment},
{kubeNodePortLocalIPv4SetSCTP, ipsetutil.HashIPPort, kubeNodePortLocalSetSCTPComment},
{kubeHealthCheckNodePortIPv4Set, ipsetutil.BitmapPort, kubeHealthCheckNodePortSetComment},
}

// IPSetVersioner can query the current ipset version.
Expand All @@ -123,20 +123,20 @@ type IPSetVersioner interface {
}

type IPSet struct {
ipvs.IPSet
ipsetutil.IPSet
// activeEntries is the current active entries of the ipset.
newEntries sets.String
// activeEntries is the current active entries of the ipset.
deleteEntries sets.String
// handle is the util ipset interface handle.
handle ipvs.Interface
handle ipsetutil.Interface
}

// NewIPSet initialize a new IPSet struct
func newIPv4Set(handle ipvs.Interface, name string, setType ipvs.Type, comment string) *IPSet {
hashFamily := ipvs.ProtocolFamilyIPV4
func newIPv4Set(handle ipsetutil.Interface, name string, setType ipsetutil.Type, comment string) *IPSet {
hashFamily := ipsetutil.ProtocolFamilyIPV4
set := &IPSet{
IPSet: ipvs.IPSet{
IPSet: ipsetutil.IPSet{
Name: name,
SetType: setType,
HashFamily: hashFamily,
Expand All @@ -150,8 +150,8 @@ func newIPv4Set(handle ipvs.Interface, name string, setType ipvs.Type, comment s
}

// NewIPSet initialize a new IPSet struct
func newIPv6Set(handle ipvs.Interface, name string, setType ipvs.Type, comment string) *IPSet {
hashFamily := ipvs.ProtocolFamilyIPV6
func newIPv6Set(handle ipsetutil.Interface, name string, setType ipsetutil.Type, comment string) *IPSet {
hashFamily := ipsetutil.ProtocolFamilyIPV6
// In dual-stack both ipv4 and ipv6 ipset's can co-exist. To
// ensure unique names the prefix for ipv6 is changed from
// "KUBE-" to "KUBE-6-". The "KUBE-" prefix is kept for
Expand All @@ -167,7 +167,7 @@ func newIPv6Set(handle ipvs.Interface, name string, setType ipvs.Type, comment s
}
}
set := &IPSet{
IPSet: ipvs.IPSet{
IPSet: ipsetutil.IPSet{
Name: name,
SetType: setType,
HashFamily: hashFamily,
Expand All @@ -180,7 +180,7 @@ func newIPv6Set(handle ipvs.Interface, name string, setType ipvs.Type, comment s
return set
}

func (set *IPSet) validateEntry(entry *ipvs.Entry) bool {
func (set *IPSet) validateEntry(entry *ipsetutil.Entry) bool {
return entry.Validate(&set.IPSet)
}

Expand Down
22 changes: 19 additions & 3 deletions backends/ipvs-as-sink/ipvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"k8s.io/utils/exec"

"sigs.k8s.io/kpng/api/localnetv1"
ipvsutil "sigs.k8s.io/kpng/backends/ipvs/util"
ipsetutil "sigs.k8s.io/kpng/backends/ipvs-as-sink/util"
"sigs.k8s.io/kpng/client/backendcmd"
"sigs.k8s.io/kpng/client/localsink"
"sigs.k8s.io/kpng/client/localsink/decoder"
Expand Down Expand Up @@ -149,12 +149,12 @@ func (s *Backend) createIPVSDummyInterface() {
}

func (s *Backend) initializeIPSets() {
var ipsetInterface ipvsutil.Interface
var ipsetInterface ipsetutil.Interface

// Create a iptables utils.
execer := exec.New()

ipsetInterface = ipvsutil.New(execer)
ipsetInterface = ipsetutil.New(execer)

// initialize ipsetList with all sets we needed
s.ipsetList = make(map[string]*IPSet)
Expand Down Expand Up @@ -270,6 +270,10 @@ func (s *Backend) SetService(svc *localnetv1.Service) {
if svc.Type == NodePortService {
s.handleNodePortService(svc, AddService)
}

if svc.Type == LoadBalancerService {
s.handleLBService(svc, AddService)
}
}

func (s *Backend) DeleteService(namespace, name string) {
Expand All @@ -287,6 +291,10 @@ func (s *Backend) DeleteService(namespace, name string) {
s.handleNodePortService(svc, DeleteService)
}

if svc.Type == LoadBalancerService {
s.handleLBService(svc, DeleteService)
}

for _, ip := range asDummyIPs(svc.IPs.All()) {
s.dummyIPsRefCounts[ip]--
}
Expand All @@ -308,6 +316,10 @@ func (s *Backend) SetEndpoint(namespace, serviceName, key string, endpoint *loca
if service.Type == NodePortService {
s.SetEndPointForNodePortSvc(svcKey, key, endpoint)
}

if service.Type == LoadBalancerService {
s.SetEndPointForLBSvc(svcKey, key, endpoint)
}
}

func (s *Backend) DeleteEndpoint(namespace, serviceName, key string) {
Expand All @@ -323,4 +335,8 @@ func (s *Backend) DeleteEndpoint(namespace, serviceName, key string) {
if service.Type == NodePortService {
s.DeleteEndPointForNodePortSvc(svcKey, key)
}

if service.Type == LoadBalancerService {
s.DeleteEndPointForLBSvc(svcKey, key)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package ipvssink
import (
"encoding/json"
"net"
localnetv1 "sigs.k8s.io/kpng/api/localnetv1"
"sigs.k8s.io/kpng/api/localnetv1"
"syscall"

"github.com/google/seesaw/ipvs"
Expand All @@ -46,6 +46,9 @@ func (lb ipvsLB) ToService() ipvs.Service {
if lb.ServiceType == NodePortService {
port = uint16(lb.Port.NodePort)
}
if lb.ServiceType == LoadBalancerService {
port = uint16(lb.Port.Port)
}
s := ipvs.Service{
Address: net.ParseIP(lb.IP),
Port: port,
Expand Down
Loading

0 comments on commit d80ff13

Please sign in to comment.