Skip to content

Commit

Permalink
Use target port of Service in NodePortLocal to configure Pod reachabi…
Browse files Browse the repository at this point in the history
…lity

Currently we use container ports of a Pod to program iptables rules to make the Pod
reachable through a port in the Node. But container ports are not mandatory and multiple
services can use different target ports for the same pod. Hence adding a change in
NodePortLocal implementation, where target ports of all services would be used to
program iptables rules and annotate pods.

To implement this, target ports are being obtained from all the Services selecting a Pod,
in the function handleAddUpdatePod().

Necessary changes in the tests have been added.

Fixes antrea-io#1912

Signed-off-by: Monotosh Das <[email protected]>
  • Loading branch information
monotosh-avi committed Jun 22, 2021
1 parent ef15423 commit 2b07d74
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 64 deletions.
79 changes: 48 additions & 31 deletions pkg/agent/nodeportlocal/k8s/npl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,17 @@ func (c *NPLController) enqueueSvcUpdate(oldObj, newObj interface{}) {
} else if newNPLEnabled {
podKeys = sets.NewString(c.getPodsFromService(newSvc)...)
}
} else if oldNPLEnabled && newNPLEnabled && !reflect.DeepEqual(oldSvc.Spec.Selector, newSvc.Spec.Selector) {
// Disjunctive union of Pods from both Service sets.
oldPodSet := sets.NewString(c.getPodsFromService(oldSvc)...)
} else if oldNPLEnabled && newNPLEnabled {
newPodSet := sets.NewString(c.getPodsFromService(newSvc)...)
podKeys = utilsets.SymmetricDifference(oldPodSet, newPodSet)
if !reflect.DeepEqual(oldSvc.Spec.Selector, newSvc.Spec.Selector) {
// Disjunctive union of Pods from both Service sets.
oldPodSet := sets.NewString(c.getPodsFromService(oldSvc)...)
podKeys = utilsets.SymmetricDifference(oldPodSet, newPodSet)
}
if !reflect.DeepEqual(oldSvc.Spec.Ports, newSvc.Spec.Ports) {
// If ports in a Service are changed, all the Pods selected by the Service have to be processed.
podKeys = podKeys.Union(newPodSet)
}
}

for podKey := range podKeys {
Expand Down Expand Up @@ -268,12 +274,13 @@ func (c *NPLController) getPodsFromService(svc *corev1.Service) []string {
return pods
}

func (c *NPLController) isNPLEnabledForServiceOfPod(obj interface{}) bool {
func (c *NPLController) getTargetPortsForServicesOfPod(obj interface{}) []int {
targetPorts := sets.NewInt()
pod := obj.(*corev1.Pod)
services, err := c.svcInformer.GetIndexer().ByIndex(NPLEnabledAnnotationIndex, "true")
if err != nil {
klog.Errorf("Got error while listing Services with annotation %s: %v", NPLEnabledAnnotationKey, err)
return false
return targetPorts.List()
}

for _, service := range services {
Expand All @@ -282,11 +289,13 @@ func (c *NPLController) isNPLEnabledForServiceOfPod(obj interface{}) bool {
if isSvc && svc.Spec.Type != corev1.ServiceTypeNodePort {
if pod.Namespace == svc.Namespace &&
matchSvcSelectorPodLabels(svc.Spec.Selector, pod.GetLabels()) {
return true
for _, port := range svc.Spec.Ports {
targetPorts.Insert(int(port.TargetPort.IntVal))
}
}
}
}
return false
return targetPorts.List()
}

// matchSvcSelectorPodLabels verifies that all key/value pairs present in Service's selector
Expand Down Expand Up @@ -393,7 +402,8 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error {
}
c.addPodIPToCache(key, podIP)

if !c.isNPLEnabledForServiceOfPod(obj) {
targetPorts := c.getTargetPortsForServicesOfPod(obj)
if len(targetPorts) == 0 {
if err := c.deleteAllPortRulesIfAny(podIP); err != nil {
return err
}
Expand All @@ -420,33 +430,40 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error {

nplAnnotationsRequired := []NPLAnnotation{}

// first, check which rules are needed based on the Pod specification (ignoring NPL
// annotations) and make sure they are present. As we do so, we build the expected list of
// NPL annotations for the Pod.
hostPorts := make(map[int]int)
for _, container := range podContainers {
for _, cport := range container.Ports {
port := int(cport.ContainerPort)
podPorts[port] = struct{}{}
portData := c.portTable.GetEntryByPodIPPort(podIP, int(cport.ContainerPort))
if portData == nil { // rule does not exist
if int(cport.HostPort) > 0 {
klog.V(4).Infof("Host Port is defined for Container %s in Pod %s, thus extra NPL port is not allocated", container.Name, key)
nodePort = int(cport.HostPort)
} else {
nodePort, err = c.portTable.AddRule(podIP, port)
if err != nil {
return fmt.Errorf("failed to add rule for Pod %s: %v", key, err)
}
}
if int(cport.HostPort) > 0 {
klog.V(4).Infof("Host Port is defined for Container %s in Pod %s, thus extra NPL port is not allocated", container.Name, key)
hostPorts[int(cport.ContainerPort)] = int(cport.HostPort)
}
}
}

// first, check which rules are needed based on the target ports of the Services selecting the Pod
// (ignoring NPL annotations) and make sure they are present. As we do so, we build the expected list of
// NPL annotations for the Pod.
for _, targetPort := range targetPorts {
port := int(targetPort)
podPorts[port] = struct{}{}
portData := c.portTable.GetEntryByPodIPPort(podIP, port)
if portData == nil {
if hport, ok := hostPorts[port]; ok {
nodePort = hport
} else {
nodePort = portData.NodePort
nodePort, err = c.portTable.AddRule(podIP, port)
if err != nil {
return fmt.Errorf("failed to add rule for Pod %s: %v", key, err)
}
}
nplAnnotationsRequired = append(nplAnnotationsRequired, NPLAnnotation{
PodPort: port,
NodeIP: pod.Status.HostIP,
NodePort: nodePort,
})
} else {
nodePort = portData.NodePort
}
nplAnnotationsRequired = append(nplAnnotationsRequired, NPLAnnotation{
PodPort: port,
NodeIP: pod.Status.HostIP,
NodePort: nodePort,
})
}

// second, delete any existing rule that is not needed based on the current Pod
Expand Down
52 changes: 38 additions & 14 deletions pkg/agent/nodeportlocal/npl_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,7 @@ func getTestPod() *corev1.Pod {
NodeName: defaultNodeName,
Containers: []corev1.Container{
{
Ports: []corev1.ContainerPort{
{
ContainerPort: int32(defaultPort),
},
},
Ports: []corev1.ContainerPort{},
},
},
},
Expand All @@ -91,7 +87,12 @@ func getTestPod() *corev1.Pod {
}
}

func getTestSvc() *corev1.Service {
func getTestSvc(targetPorts ...int32) *corev1.Service {
var targetPort int32
targetPort = defaultPort
if len(targetPorts) > 0 {
targetPort = targetPorts[0]
}
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: defaultSvcName,
Expand All @@ -106,7 +107,7 @@ func getTestSvc() *corev1.Service {
Protocol: "TCP",
TargetPort: intstr.IntOrString{
Type: intstr.Int,
IntVal: defaultPort,
IntVal: targetPort,
},
}},
},
Expand Down Expand Up @@ -389,24 +390,28 @@ func TestPodDelete(t *testing.T) {
}

// TestPodAddMultiPort creates a Pod with multiple ports and verifies that the Pod's NPL annotation
// and the local port table are updated correctly.
// and the local port table are updated with only one port used as target port of the Service.
func TestPodAddMultiPort(t *testing.T) {
testSvc := getTestSvc()
testPod := getTestPod()
newPort := 90
newPort1 := 91
newPort2 := 92
testPod.Spec.Containers[0].Ports = append(
testPod.Spec.Containers[0].Ports,
corev1.ContainerPort{ContainerPort: int32(newPort)},
corev1.ContainerPort{ContainerPort: int32(newPort1)},
)
testPod.Spec.Containers[0].Ports = append(
testPod.Spec.Containers[0].Ports,
corev1.ContainerPort{ContainerPort: int32(newPort2)},
)
testData := setUp(t, testSvc, testPod)
defer testData.tearDown()

value, err := testData.pollForPodAnnotation(testPod.Name, true)
require.NoError(t, err, "Poll for annotation check failed")
nplData := testData.checkAnnotationValue(value, defaultPort, newPort)
assert.NotEqual(t, nplData[0].NodePort, nplData[1].NodePort)
nplData := testData.checkAnnotationValue(value, defaultPort)
assert.Len(t, nplData, 1)
assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort))
assert.True(t, testData.portTable.RuleExists(defaultPodIP, newPort))
}

// TestPodAddHostPort creates a Pod with host ports and verifies that the Pod's NPL annotation
Expand All @@ -416,7 +421,10 @@ func TestPodAddHostPort(t *testing.T) {
testSvc := getTestSvc()
testPod := getTestPod()
hostPort := 4001
testPod.Spec.Containers[0].Ports[0].HostPort = int32(hostPort)
testPod.Spec.Containers[0].Ports = append(
testPod.Spec.Containers[0].Ports,
corev1.ContainerPort{ContainerPort: int32(defaultPort), HostPort: int32(hostPort)},
)
testData := setUp(t, testSvc, testPod)
defer testData.tearDown()

Expand Down Expand Up @@ -453,6 +461,22 @@ func TestMultiplePods(t *testing.T) {
assert.NotEqual(t, nplData1[0].NodePort, nplData2[0].NodePort)
}

func TestMultipleService(t *testing.T) {
testSvc1 := getTestSvc()
testPod := getTestPod()
testSvc2 := getTestSvc(9090)
testSvc2.Name = "svc2"
testData := setUp(t, testSvc1, testSvc2, testPod)
defer testData.tearDown()

value, err := testData.pollForPodAnnotation(testPod.Name, true)
require.NoError(t, err, "Poll for annotation check failed")
nplData := testData.checkAnnotationValue(value, defaultPort, 9090)
assert.True(t, testData.portTable.RuleExists(testPod.Status.PodIP, defaultPort))
assert.True(t, testData.portTable.RuleExists(testPod.Status.PodIP, 9090))
assert.NotEqual(t, nplData[0].NodePort, nplData[1].NodePort)
}

// TestInitInvalidPod simulates an agent reboot case. A Pod with an invalid NPL annotation is
// added, this invalid annotation should get cleaned up. And a proper NPL annotation should get
// added.
Expand Down
11 changes: 11 additions & 0 deletions test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,17 @@ func (data *TestData) createNginxPodOnNode(name string, nodeName string) error {
}, false, nil)
}

// createNginxPodWithPort creates an nginx Pod in the test namespace with configurable container port.
func (data *TestData) createNginxPodWithPort(name, nodeName string, port int32) error {
return data.createPodOnNode(name, nodeName, nginxImage, []string{}, nil, nil, []corev1.ContainerPort{
{
Name: "http",
ContainerPort: port,
Protocol: corev1.ProtocolTCP,
},
}, false, nil)
}

// createNginxPod creates a Pod in the test namespace with a single nginx container.
func (data *TestData) createNginxPod(name, nodeName string) error {
return data.createNginxPodOnNode(name, nodeName)
Expand Down
Loading

0 comments on commit 2b07d74

Please sign in to comment.