Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

Adding Loadbalancer service code #126

Merged
merged 1 commit into from
Dec 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backends/ipvs-as-sink/clusteripsvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *Backend) handleNewClusterIPService(key string, svc *localnetv1.Service)
s.addServiceIPToKubeIPVSIntf(nil, svc)

//Cluster service IP is stored in LB tree
s.storeLBSvc(svc.Ports, svc.IPs.All().All(), key, ClusterIPService)
s.storeLBSvc(svc.Ports, svc.IPs.ClusterIPs.All(), key, ClusterIPService)

//Cluster service IP needs to be programmed in ipset.
s.AddOrDelClusterIPInIPSet(svc, svc.Ports, AddService)
Expand Down
76 changes: 68 additions & 8 deletions backends/ipvs-as-sink/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
netutils "k8s.io/utils/net"

"sigs.k8s.io/kpng/api/localnetv1"
ipvs "sigs.k8s.io/kpng/backends/ipvs/util"
ipsetutil "sigs.k8s.io/kpng/backends/ipvs-as-sink/util"
)

type endPointInfo struct {
Expand Down Expand Up @@ -70,13 +70,13 @@ func getIPFamily(ipAddr string) v1.IPFamily {
return ipAddrFamily
}

func getEndPointEntry(endPointIP string, port *localnetv1.PortMapping) *ipvs.Entry {
return &ipvs.Entry{
func getEndPointEntry(endPointIP string, port *localnetv1.PortMapping) *ipsetutil.Entry {
return &ipsetutil.Entry{
IP: endPointIP,
Port: int(port.TargetPort),
Protocol: strings.ToLower(port.Protocol.String()),
IP2: endPointIP,
SetType: ipvs.HashIPPortIP,
SetType: ipsetutil.HashIPPortIP,
}
}

Expand All @@ -94,7 +94,7 @@ func (s *Backend) AddOrDelClusterIPInIPSet(svc *localnetv1.Service, portList []*
}
ipSetName := clusterIPSetMap[ipFamily]
// Capture the clusterIP.
entry := getIPSetEntry(clusterIP, port)
entry := getIPSetEntry(clusterIP, "", port)
// add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
if valid := s.ipsetList[ipSetName].validateEntry(entry); !valid {
klog.Errorf("error adding entry :%s, to ipset:%s", entry.String(), s.ipsetList[ipSetName].Name)
Expand Down Expand Up @@ -126,12 +126,21 @@ func getServiceIPFamily(svc *localnetv1.Service) []v1.IPFamily {
return svcIPFamily
}

func getIPSetEntry(svcIP string, port *localnetv1.PortMapping) *ipvs.Entry {
return &ipvs.Entry{
func getIPSetEntry(svcIP,srcAddr string, port *localnetv1.PortMapping) *ipsetutil.Entry {
if srcAddr != "" {
return &ipsetutil.Entry{
IP: svcIP,
Port: int(port.Port),
Protocol: strings.ToLower(port.Protocol.String()),
SetType: ipsetutil.HashIPPort,
Net: srcAddr,
}
}
return &ipsetutil.Entry{
IP: svcIP,
Port: int(port.Port),
Protocol: strings.ToLower(port.Protocol.String()),
SetType: ipvs.HashIPPort,
SetType: ipsetutil.HashIPPort,
}
}

Expand Down Expand Up @@ -201,3 +210,54 @@ func (s *Backend) deleteLBSvc(portList []*localnetv1.PortMapping, addrList []str
}
}
}

func (s *Backend) AddOrDelNodePortInIPSet(svc *localnetv1.Service, portList []*localnetv1.PortMapping, op Operation) {
svcIPFamily := getServiceIPFamily(svc)

for _, port := range portList {
var entries []*ipsetutil.Entry
for _, ipFamily := range svcIPFamily {
protocol := strings.ToLower(port.Protocol.String())
ipsetName := protocolIPSetMap[protocol][ipFamily]
nodePortSet := s.ipsetList[ipsetName]
switch protocol {
case ipsetutil.ProtocolTCP, ipsetutil.ProtocolUDP:
entries = []*ipsetutil.Entry{getNodePortIPSetEntry(int(port.NodePort), protocol, ipsetutil.BitmapPort)}

case ipsetutil.ProtocolSCTP:
// Since hash ip:port is used for SCTP, all the nodeIPs to be used in the SCTP ipset entries.
entries = []*ipsetutil.Entry{}
for _, nodeIP := range s.nodeAddresses {
entry := getNodePortIPSetEntry(int(port.NodePort), protocol, ipsetutil.HashIPPort)
entry.IP = nodeIP
entries = append(entries, entry)
}
default:
// It should never hit
klog.Errorf( "Unsupported protocol type %v protocol ", protocol)
}
if nodePortSet != nil {
for _, entry := range entries {
if valid := nodePortSet.validateEntry(entry); !valid {
klog.Errorf( "error adding entry (%v) to ipset (%v)", entry.String(), nodePortSet.Name)
}
if op == AddService {
nodePortSet.newEntries.Insert(entry.String())
}
if op == DeleteService {
nodePortSet.deleteEntries.Insert(entry.String())
}
}
}
}
}
}

func getNodePortIPSetEntry(port int, protocol string, ipSetType ipsetutil.Type) *ipsetutil.Entry {
return &ipsetutil.Entry{
// No need to provide ip info
Port: port,
Protocol: protocol,
SetType: ipSetType,
}
}
32 changes: 26 additions & 6 deletions backends/ipvs-as-sink/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import (

v1 "k8s.io/api/core/v1"

localnetv1 "sigs.k8s.io/kpng/api/localnetv1"
"sigs.k8s.io/kpng/backends/ipvs/util"
"sigs.k8s.io/kpng/api/localnetv1"
ipsetutil "sigs.k8s.io/kpng/backends/ipvs-as-sink/util"
)

const (
ClusterIPService = "ClusterIP"
NodePortService = "NodePort"
LoadBalancerService = "LoadBalancer"
)

var loopBackIPSetMap = map[v1.IPFamily]string{
Expand All @@ -41,20 +42,40 @@ var clusterIPSetMap = map[v1.IPFamily]string{
}

var protocolIPSetMap = map[string]map[v1.IPFamily]string{
ipvs.ProtocolTCP: {
ipsetutil.ProtocolTCP: {
v1.IPv4Protocol: kubeNodePortIPv4SetTCP,
v1.IPv6Protocol: kubeNodePortIPv6SetTCP,
},
ipvs.ProtocolUDP: {
ipsetutil.ProtocolUDP: {
v1.IPv4Protocol: kubeNodePortIPv4SetUDP,
v1.IPv6Protocol: kubeNodePortIPv6SetUDP,
},
ipvs.ProtocolSCTP: {
ipsetutil.ProtocolSCTP: {
v1.IPv4Protocol: kubeNodePortIPv4SetSCTP,
v1.IPv6Protocol: kubeNodePortIPv6SetSCTP,
},
}

var loadbalancerIPSetMap = map[v1.IPFamily]string{
v1.IPv4Protocol: kubeLoadBalancerIPv4Set,
v1.IPv6Protocol: kubeLoadBalancerIPv6Set,
}

var loadbalancerLocalSetMap = map[v1.IPFamily]string{
v1.IPv4Protocol: kubeLoadBalancerLocalIPv4Set,
v1.IPv6Protocol: kubeLoadBalancerLocalIPv6Set,
}

var loadbalancerSourceCIDRSetMap = map[v1.IPFamily]string{
v1.IPv4Protocol: kubeLoadBalancerSourceCIDRIPv4Set,
v1.IPv6Protocol: kubeLoadBalancerSourceCIDRIPv6Set,
}

var loadbalancerFWSetMap = map[v1.IPFamily]string{
v1.IPv4Protocol: kubeLoadbalancerFWIPv4Set,
v1.IPv6Protocol: kubeLoadbalancerFWIPv6Set,
}

type Operation int32

const (
Expand All @@ -81,7 +102,6 @@ func epPortSuffix(port *localnetv1.PortMapping) string {
return port.Protocol.String() + ":" + strconv.Itoa(int(port.Port))
}

// diffInPortMapping TODO, we should support this logic in the diffstore, this is a temporary workaround.
func diffInPortMapping(previous, current *localnetv1.Service) (added, removed []*localnetv1.PortMapping) {
for _, p1 := range previous.Ports {
found := false
Expand Down
54 changes: 27 additions & 27 deletions backends/ipvs-as-sink/ipset.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"

"sigs.k8s.io/kpng/backends/ipvs/util"
ipsetutil "sigs.k8s.io/kpng/backends/ipvs-as-sink/util"
)

const (
Expand Down Expand Up @@ -95,25 +95,25 @@ const (
// ipsetInfo is all ipset we needed in ipvs proxier
var ipsetInfo = []struct {
name string
setType ipvs.Type
setType ipsetutil.Type
comment string
}{
{kubeLoopBackIPv4Set, ipvs.HashIPPortIP, kubeLoopBackIPSetComment},
{kubeClusterIPv4Set, ipvs.HashIPPort, kubeClusterIPSetComment},
{kubeExternalIPv4Set, ipvs.HashIPPort, kubeExternalIPSetComment},
{kubeExternalIPv4LocalSet, ipvs.HashIPPort, kubeExternalIPLocalSetComment},
{kubeLoadBalancerIPv4Set, ipvs.HashIPPort, kubeLoadBalancerSetComment},
{kubeLoadbalancerFWIPv4Set, ipvs.HashIPPort, kubeLoadbalancerFWSetComment},
{kubeLoadBalancerLocalIPv4Set, ipvs.HashIPPort, kubeLoadBalancerLocalSetComment},
{kubeLoadBalancerSourceIPv4Set, ipvs.HashIPPortIP, kubeLoadBalancerSourceIPSetComment},
{kubeLoadBalancerSourceCIDRIPv4Set, ipvs.HashIPPortNet, kubeLoadBalancerSourceCIDRSetComment},
{kubeNodePortIPv4SetTCP, ipvs.BitmapPort, kubeNodePortSetTCPComment},
{kubeNodePortLocalIPv4SetTCP, ipvs.BitmapPort, kubeNodePortLocalSetTCPComment},
{kubeNodePortIPv4SetUDP, ipvs.BitmapPort, kubeNodePortSetUDPComment},
{kubeNodePortLocalIPv4SetUDP, ipvs.BitmapPort, kubeNodePortLocalSetUDPComment},
{kubeNodePortIPv4SetSCTP, ipvs.HashIPPort, kubeNodePortSetSCTPComment},
{kubeNodePortLocalIPv4SetSCTP, ipvs.HashIPPort, kubeNodePortLocalSetSCTPComment},
{kubeHealthCheckNodePortIPv4Set, ipvs.BitmapPort, kubeHealthCheckNodePortSetComment},
{kubeLoopBackIPv4Set, ipsetutil.HashIPPortIP, kubeLoopBackIPSetComment},
{kubeClusterIPv4Set, ipsetutil.HashIPPort, kubeClusterIPSetComment},
{kubeExternalIPv4Set, ipsetutil.HashIPPort, kubeExternalIPSetComment},
{kubeExternalIPv4LocalSet, ipsetutil.HashIPPort, kubeExternalIPLocalSetComment},
{kubeLoadBalancerIPv4Set, ipsetutil.HashIPPort, kubeLoadBalancerSetComment},
{kubeLoadbalancerFWIPv4Set, ipsetutil.HashIPPort, kubeLoadbalancerFWSetComment},
{kubeLoadBalancerLocalIPv4Set, ipsetutil.HashIPPort, kubeLoadBalancerLocalSetComment},
{kubeLoadBalancerSourceIPv4Set, ipsetutil.HashIPPortIP, kubeLoadBalancerSourceIPSetComment},
{kubeLoadBalancerSourceCIDRIPv4Set, ipsetutil.HashIPPortNet, kubeLoadBalancerSourceCIDRSetComment},
{kubeNodePortIPv4SetTCP, ipsetutil.BitmapPort, kubeNodePortSetTCPComment},
{kubeNodePortLocalIPv4SetTCP, ipsetutil.BitmapPort, kubeNodePortLocalSetTCPComment},
{kubeNodePortIPv4SetUDP, ipsetutil.BitmapPort, kubeNodePortSetUDPComment},
{kubeNodePortLocalIPv4SetUDP, ipsetutil.BitmapPort, kubeNodePortLocalSetUDPComment},
{kubeNodePortIPv4SetSCTP, ipsetutil.HashIPPort, kubeNodePortSetSCTPComment},
{kubeNodePortLocalIPv4SetSCTP, ipsetutil.HashIPPort, kubeNodePortLocalSetSCTPComment},
{kubeHealthCheckNodePortIPv4Set, ipsetutil.BitmapPort, kubeHealthCheckNodePortSetComment},
}

// IPSetVersioner can query the current ipset version.
Expand All @@ -123,20 +123,20 @@ type IPSetVersioner interface {
}

type IPSet struct {
ipvs.IPSet
ipsetutil.IPSet
// activeEntries is the current active entries of the ipset.
newEntries sets.String
// activeEntries is the current active entries of the ipset.
deleteEntries sets.String
// handle is the util ipset interface handle.
handle ipvs.Interface
handle ipsetutil.Interface
}

// NewIPSet initialize a new IPSet struct
func newIPv4Set(handle ipvs.Interface, name string, setType ipvs.Type, comment string) *IPSet {
hashFamily := ipvs.ProtocolFamilyIPV4
func newIPv4Set(handle ipsetutil.Interface, name string, setType ipsetutil.Type, comment string) *IPSet {
hashFamily := ipsetutil.ProtocolFamilyIPV4
set := &IPSet{
IPSet: ipvs.IPSet{
IPSet: ipsetutil.IPSet{
Name: name,
SetType: setType,
HashFamily: hashFamily,
Expand All @@ -150,8 +150,8 @@ func newIPv4Set(handle ipvs.Interface, name string, setType ipvs.Type, comment s
}

// NewIPSet initialize a new IPSet struct
func newIPv6Set(handle ipvs.Interface, name string, setType ipvs.Type, comment string) *IPSet {
hashFamily := ipvs.ProtocolFamilyIPV6
func newIPv6Set(handle ipsetutil.Interface, name string, setType ipsetutil.Type, comment string) *IPSet {
hashFamily := ipsetutil.ProtocolFamilyIPV6
// In dual-stack both ipv4 and ipv6 ipset's can co-exist. To
// ensure unique names the prefix for ipv6 is changed from
// "KUBE-" to "KUBE-6-". The "KUBE-" prefix is kept for
Expand All @@ -167,7 +167,7 @@ func newIPv6Set(handle ipvs.Interface, name string, setType ipvs.Type, comment s
}
}
set := &IPSet{
IPSet: ipvs.IPSet{
IPSet: ipsetutil.IPSet{
Name: name,
SetType: setType,
HashFamily: hashFamily,
Expand All @@ -180,7 +180,7 @@ func newIPv6Set(handle ipvs.Interface, name string, setType ipvs.Type, comment s
return set
}

func (set *IPSet) validateEntry(entry *ipvs.Entry) bool {
func (set *IPSet) validateEntry(entry *ipsetutil.Entry) bool {
return entry.Validate(&set.IPSet)
}

Expand Down
22 changes: 19 additions & 3 deletions backends/ipvs-as-sink/ipvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"k8s.io/utils/exec"

"sigs.k8s.io/kpng/api/localnetv1"
ipvsutil "sigs.k8s.io/kpng/backends/ipvs/util"
ipsetutil "sigs.k8s.io/kpng/backends/ipvs-as-sink/util"
"sigs.k8s.io/kpng/client/backendcmd"
"sigs.k8s.io/kpng/client/localsink"
"sigs.k8s.io/kpng/client/localsink/decoder"
Expand Down Expand Up @@ -149,12 +149,12 @@ func (s *Backend) createIPVSDummyInterface() {
}

func (s *Backend) initializeIPSets() {
var ipsetInterface ipvsutil.Interface
var ipsetInterface ipsetutil.Interface

// Create a iptables utils.
execer := exec.New()

ipsetInterface = ipvsutil.New(execer)
ipsetInterface = ipsetutil.New(execer)

// initialize ipsetList with all sets we needed
s.ipsetList = make(map[string]*IPSet)
Expand Down Expand Up @@ -270,6 +270,10 @@ func (s *Backend) SetService(svc *localnetv1.Service) {
if svc.Type == NodePortService {
s.handleNodePortService(svc, AddService)
}

if svc.Type == LoadBalancerService {
s.handleLBService(svc, AddService)
}
}

func (s *Backend) DeleteService(namespace, name string) {
Expand All @@ -287,6 +291,10 @@ func (s *Backend) DeleteService(namespace, name string) {
s.handleNodePortService(svc, DeleteService)
}

if svc.Type == LoadBalancerService {
s.handleLBService(svc, DeleteService)
}

for _, ip := range asDummyIPs(svc.IPs.All()) {
s.dummyIPsRefCounts[ip]--
}
Expand All @@ -308,6 +316,10 @@ func (s *Backend) SetEndpoint(namespace, serviceName, key string, endpoint *loca
if service.Type == NodePortService {
s.SetEndPointForNodePortSvc(svcKey, key, endpoint)
}

if service.Type == LoadBalancerService {
s.SetEndPointForLBSvc(svcKey, key, endpoint)
}
}

func (s *Backend) DeleteEndpoint(namespace, serviceName, key string) {
Expand All @@ -323,4 +335,8 @@ func (s *Backend) DeleteEndpoint(namespace, serviceName, key string) {
if service.Type == NodePortService {
s.DeleteEndPointForNodePortSvc(svcKey, key)
}

if service.Type == LoadBalancerService {
s.DeleteEndPointForLBSvc(svcKey, key)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package ipvssink
import (
"encoding/json"
"net"
localnetv1 "sigs.k8s.io/kpng/api/localnetv1"
"sigs.k8s.io/kpng/api/localnetv1"
"syscall"

"github.com/google/seesaw/ipvs"
Expand All @@ -46,6 +46,9 @@ func (lb ipvsLB) ToService() ipvs.Service {
if lb.ServiceType == NodePortService {
port = uint16(lb.Port.NodePort)
}
if lb.ServiceType == LoadBalancerService {
port = uint16(lb.Port.Port)
}
s := ipvs.Service{
Address: net.ParseIP(lb.IP),
Port: port,
Expand Down
Loading