From cd64a37924685eb63eb9ca5a713425fcf0150886 Mon Sep 17 00:00:00 2001 From: Hongliang Liu Date: Fri, 17 Jan 2025 14:41:08 +0800 Subject: [PATCH] Fix that Service traffic cannot be allowed through L7 NetworkPolicy Signed-off-by: Hongliang Liu --- cmd/antrea-agent/agent.go | 2 +- .../l7_flow_export_controller_test.go | 2 +- .../networkpolicy/l7engine/reconciler.go | 37 +++++- .../networkpolicy/l7engine/reconciler_test.go | 40 ++++++- .../networkpolicy_controller_test.go | 2 +- pkg/agent/openflow/client.go | 13 +++ pkg/agent/openflow/client_test.go | 29 ++++- pkg/agent/openflow/fields.go | 34 +++--- pkg/agent/openflow/framework_test.go | 43 ------- pkg/agent/openflow/network_policy.go | 109 ++++++++++++++++-- pkg/agent/openflow/network_policy_test.go | 19 +-- pkg/agent/openflow/testing/mock_openflow.go | 16 ++- test/e2e/l7networkpolicy_test.go | 38 ++++-- 13 files changed, 284 insertions(+), 100 deletions(-) diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index dfd740ed2e0..4e34d57c341 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -498,7 +498,7 @@ func run(o *Options) error { } var l7Reconciler *l7engine.Reconciler if l7NetworkPolicyEnabled || l7FlowExporterEnabled { - l7Reconciler = l7engine.NewReconciler() + l7Reconciler = l7engine.NewReconciler(ofClient) } networkPolicyController, err := networkpolicy.NewNetworkPolicyController( antreaClientProvider, diff --git a/pkg/agent/controller/l7flowexporter/l7_flow_export_controller_test.go b/pkg/agent/controller/l7flowexporter/l7_flow_export_controller_test.go index ef88e7baadd..7cfc0501ab1 100644 --- a/pkg/agent/controller/l7flowexporter/l7_flow_export_controller_test.go +++ b/pkg/agent/controller/l7flowexporter/l7_flow_export_controller_test.go @@ -108,7 +108,7 @@ func newFakeControllerAndWatcher(t *testing.T, objects []runtime.Object, interfa ifaceStore.AddInterface(itf) } - l7Reconciler := l7engine.NewReconciler() + l7Reconciler := l7engine.NewReconciler(nil) l7w := NewL7FlowExporterController(mockOFClient, ifaceStore, localPodInformer, nsInformer, l7Reconciler) return &fakeController{ diff --git a/pkg/agent/controller/networkpolicy/l7engine/reconciler.go b/pkg/agent/controller/networkpolicy/l7engine/reconciler.go index 40ff2ffa854..bf5be074d2c 100644 --- a/pkg/agent/controller/networkpolicy/l7engine/reconciler.go +++ b/pkg/agent/controller/networkpolicy/l7engine/reconciler.go @@ -32,6 +32,7 @@ import ( "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/openflow" v1beta "antrea.io/antrea/pkg/apis/controlplane/v1beta2" "antrea.io/antrea/pkg/util/logdir" ) @@ -153,10 +154,16 @@ type Reconciler struct { suricataTenantCache *threadSafeSet[uint32] suricataTenantHandlerCache *threadSafeSet[uint32] + ofClient openflow.Client + once sync.Once + + sync.RWMutex + + l7FlowsInitialized bool } -func NewReconciler() *Reconciler { +func NewReconciler(ofClient openflow.Client) *Reconciler { return &Reconciler{ suricataScFn: suricataSc, startSuricataFn: startSuricata, @@ -166,6 +173,7 @@ func NewReconciler() *Reconciler { suricataTenantHandlerCache: &threadSafeSet[uint32]{ cached: sets.New[uint32](), }, + ofClient: ofClient, } } @@ -266,6 +274,30 @@ func (r *Reconciler) StartSuricataOnce() { }) } +func (r *Reconciler) initializeL7Flows() error { + r.RLock() // Allow multiple readers once initialized. + if r.l7FlowsInitialized { + r.RUnlock() // Release read lock as initialization is complete. + return nil + } + r.RUnlock() + + // We acquire a write lock to ensure that only one thread does the initialization. + r.Lock() + defer r.Unlock() + + // Double check initialization (since another thread may have initialized while we released the read lock) + if r.l7FlowsInitialized { + return nil + } + + if err := r.ofClient.InstallL7NetworkPolicyFlows(); err != nil { + return fmt.Errorf("failed to install L7 NetworkPolicy flows: %w", err) + } + r.l7FlowsInitialized = true + return nil +} + func (r *Reconciler) AddRule(ruleID, policyName string, vlanID uint32, l7Protocols []v1beta.L7Protocol) error { start := time.Now() defer func() { @@ -273,6 +305,9 @@ func (r *Reconciler) AddRule(ruleID, policyName string, vlanID uint32, l7Protoco }() r.StartSuricataOnce() + if err := r.initializeL7Flows(); err != nil { + return err + } // Generate the keyword part used in Suricata rules. protoKeywords := make(map[string]sets.Set[string]) diff --git a/pkg/agent/controller/networkpolicy/l7engine/reconciler_test.go b/pkg/agent/controller/networkpolicy/l7engine/reconciler_test.go index 39a1745fd80..ef916016b61 100644 --- a/pkg/agent/controller/networkpolicy/l7engine/reconciler_test.go +++ b/pkg/agent/controller/networkpolicy/l7engine/reconciler_test.go @@ -15,12 +15,18 @@ package l7engine import ( + "fmt" + "sync" + "sync/atomic" "testing" "github.com/spf13/afero" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" "k8s.io/apimachinery/pkg/util/sets" + oftesting "antrea.io/antrea/pkg/agent/openflow/testing" v1beta "antrea.io/antrea/pkg/apis/controlplane/v1beta2" ) @@ -124,7 +130,7 @@ func TestStartSuricata(t *testing.T) { _, err := defaultFS.Create(defaultSuricataConfigPath) assert.NoError(t, err) - fe := NewReconciler() + fe := NewReconciler(nil) fs := newFakeSuricata() fe.suricataScFn = fs.suricataScFunc fe.startSuricataFn = fs.startSuricataFn @@ -183,11 +189,15 @@ func TestRuleLifecycle(t *testing.T) { _, err := defaultFS.Create(defaultSuricataConfigPath) assert.NoError(t, err) - fe := NewReconciler() + ctrl := gomock.NewController(t) + mockOfClient := oftesting.NewMockClient(ctrl) + fe := NewReconciler(mockOfClient) fs := newFakeSuricata() fe.suricataScFn = fs.suricataScFunc fe.startSuricataFn = fs.startSuricataFn + mockOfClient.EXPECT().InstallL7NetworkPolicyFlows().Times(1) + // Test add a L7 NetworkPolicy. assert.NoError(t, fe.AddRule(ruleID, policyName, vlanID, tc.l7Protocols)) @@ -225,3 +235,29 @@ func TestRuleLifecycle(t *testing.T) { }) } } + +func TestInitializeL7Flows(t *testing.T) { + ctrl := gomock.NewController(t) + mockOfClient := oftesting.NewMockClient(ctrl) + fe := NewReconciler(mockOfClient) + var errOccurred int32 + + mockOfClient.EXPECT().InstallL7NetworkPolicyFlows().Return(fmt.Errorf("error")) + mockOfClient.EXPECT().InstallL7NetworkPolicyFlows().Return(fmt.Errorf("error")) + mockOfClient.EXPECT().InstallL7NetworkPolicyFlows().Return(fmt.Errorf("error")) + mockOfClient.EXPECT().InstallL7NetworkPolicyFlows().Return(nil) + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + err := fe.initializeL7Flows() + if err != nil { + atomic.AddInt32(&errOccurred, 1) + } + }(i) + } + wg.Wait() + require.Equal(t, int32(3), errOccurred) +} diff --git a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go index 64eb5c97972..d5ed6ddca64 100644 --- a/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go +++ b/pkg/agent/controller/networkpolicy/networkpolicy_controller_test.go @@ -78,7 +78,7 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) { groupIDAllocator := openflow.NewGroupAllocator() groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch2)} fs := afero.NewMemMapFs() - l7reconciler := l7engine.NewReconciler() + l7reconciler := l7engine.NewReconciler(nil) controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset}, nil, nil, diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 4cd1134a457..45af962e270 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -411,6 +411,9 @@ type Client interface { // SubscribeOFPortStatusMessage registers a channel to listen the OpenFlow PortStatus message. SubscribeOFPortStatusMessage(statusCh chan *openflow15.PortStatus) + + // InstallL7NetworkPolicyFlows will be called only when at least one L7 NetworkPolicy is applied locally. + InstallL7NetworkPolicyFlows() error } // GetFlowTableStatus returns an array of flow table status. @@ -1704,3 +1707,13 @@ func (c *client) getMeterStats() { func (c *client) SubscribeOFPortStatusMessage(statusCh chan *openflow15.PortStatus) { c.bridge.SubscribePortStatusConsumer(statusCh) } + +// InstallL7NetworkPolicyFlows will be called only when at least one L7 NetworkPolicy is applied locally. +func (c *client) InstallL7NetworkPolicyFlows() error { + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + + cacheKey := "l7_np_flows" + flows := c.featureNetworkPolicy.l7NPTrafficControlFlows() + return c.addFlows(c.featureNetworkPolicy.cachedFlows, cacheKey, flows) +} diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index ab5bd20dcca..f79f5439729 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -2719,7 +2719,7 @@ func Test_client_ReplayFlows(t *testing.T) { expectedFlows := append(pipelineDefaultFlows(true /* egressTrafficShapingEnabled */, false /* externalNodeEnabled */, true /* isEncap */, true /* isIPv4 */), egressInitFlows(true)...) expectedFlows = append(expectedFlows, multicastInitFlows(true)...) - expectedFlows = append(expectedFlows, networkPolicyInitFlows(true, false, false)...) + expectedFlows = append(expectedFlows, networkPolicyInitFlows(true, false)...) expectedFlows = append(expectedFlows, podConnectivityInitFlows(config.TrafficEncapModeEncap, config.TrafficEncryptionModeNone, false, true, true, true)...) expectedFlows = append(expectedFlows, serviceInitFlows(true, true, false, false)...) @@ -2917,3 +2917,30 @@ func TestSubscribeOFPortStatusMessage(t *testing.T) { bridge.EXPECT().SubscribePortStatusConsumer(ch).Times(1) c.SubscribeOFPortStatusMessage(ch) } + +func Test_client_InstallL7NetworkPolicyFlows(t *testing.T) { + ctrl := gomock.NewController(t) + m := opstest.NewMockOFEntryOperations(ctrl) + + fc := newFakeClient(m, true, false, config.K8sNode, config.TrafficEncapModeEncap, enableL7NetworkPolicy) + defer resetPipelines() + + expectedFlows := []string{ + "cookie=0x1020000000000, table=Classifier, priority=200,in_port=11,vlan_tci=0x1000/0x1000 actions=pop_vlan,set_field:0x7/0xf->reg0,goto_table:UnSNAT", + "cookie=0x1020000000000, table=ConntrackZone, priority=212,ip,reg0=0x0/0x800000 actions=set_field:0x800000/0x800000->reg0,ct(table=ConntrackZone,zone=65520)", + "cookie=0x1020000000000, table=ConntrackZone, priority=210,ct_state=+rpl+trk,ct_mark=0x80/0x80,ip actions=goto_table:Output", + "cookie=0x1020000000000, table=ConntrackZone, priority=211,ct_state=+rpl+trk,ip,reg0=0x7/0xf actions=ct(table=L3Forwarding,zone=65520,nat)", + "cookie=0x1020000000000, table=ConntrackZone, priority=211,ct_state=-rpl+trk,ip,reg0=0x7/0xf actions=goto_table:L3Forwarding", + "cookie=0x1020000000000, table=ConntrackZone, priority=210,ct_state=-rpl+trk,ct_mark=0x80/0x80,ip actions=ct(table=ConntrackState,zone=65520,nat)", + "cookie=0x1020000000000, table=TrafficControl, priority=210,reg0=0x7/0xf actions=goto_table:Output", + "cookie=0x1020000000000, table=Output, priority=213,reg0=0x7/0xf actions=output:NXM_NX_REG1[]", + "cookie=0x1020000000000, table=Output, priority=212,ct_mark=0x80/0x80 actions=push_vlan:0x8100,move:NXM_NX_CT_LABEL[64..75]->OXM_OF_VLAN_VID[0..11],output:10", + } + + m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1) + cacheKey := "l7_np_flows" + require.NoError(t, fc.InstallL7NetworkPolicyFlows()) + fCacheI, ok := fc.featureNetworkPolicy.cachedFlows.Load(cacheKey) + require.True(t, ok) + assert.ElementsMatch(t, expectedFlows, getFlowStrings(fCacheI)) +} diff --git a/pkg/agent/openflow/fields.go b/pkg/agent/openflow/fields.go index 78f0845a143..9d779203f96 100644 --- a/pkg/agent/openflow/fields.go +++ b/pkg/agent/openflow/fields.go @@ -20,12 +20,13 @@ import ( // Fields using reg. var ( - tunnelVal = uint32(1) - gatewayVal = uint32(2) - localVal = uint32(3) - uplinkVal = uint32(4) - bridgeVal = uint32(5) - tcReturnVal = uint32(6) + tunnelVal = uint32(1) + gatewayVal = uint32(2) + localVal = uint32(3) + uplinkVal = uint32(4) + bridgeVal = uint32(5) + tcReturnVal = uint32(6) + l7NPReturnVal = uint32(7) outputToPortVal = uint32(1) outputToControllerVal = uint32(2) @@ -37,14 +38,16 @@ var ( // - 3: from local Pods. // - 4: from uplink port. // - 5: from bridge local port. - // - 6: from traffic control return port. - PktSourceField = binding.NewRegField(0, 0, 3) - FromTunnelRegMark = binding.NewRegMark(PktSourceField, tunnelVal) - FromGatewayRegMark = binding.NewRegMark(PktSourceField, gatewayVal) - FromPodRegMark = binding.NewRegMark(PktSourceField, localVal) - FromUplinkRegMark = binding.NewRegMark(PktSourceField, uplinkVal) - FromBridgeRegMark = binding.NewRegMark(PktSourceField, bridgeVal) - FromTCReturnRegMark = binding.NewRegMark(PktSourceField, tcReturnVal) + // - 6: from TrafficControl return port. + // - 7: from application-aware engine (L7 NetworkPolicy return port). + PktSourceField = binding.NewRegField(0, 0, 3) + FromTunnelRegMark = binding.NewRegMark(PktSourceField, tunnelVal) + FromGatewayRegMark = binding.NewRegMark(PktSourceField, gatewayVal) + FromPodRegMark = binding.NewRegMark(PktSourceField, localVal) + FromUplinkRegMark = binding.NewRegMark(PktSourceField, uplinkVal) + FromBridgeRegMark = binding.NewRegMark(PktSourceField, bridgeVal) + FromTCReturnRegMark = binding.NewRegMark(PktSourceField, tcReturnVal) + FromL7NPReturnRegMark = binding.NewRegMark(PktSourceField, l7NPReturnVal) // reg0[4..7]: Field to store the packet destination. Marks in this field include: // - 1: to tunnel port. // - 2: to Antrea gateway port. @@ -85,6 +88,9 @@ var ( OutputRegField = binding.NewRegField(0, 21, 22) OutputToOFPortRegMark = binding.NewRegMark(OutputRegField, outputToPortVal) OutputToControllerRegMark = binding.NewRegMark(OutputRegField, outputToControllerVal) + // reg0[23]: + CtStateNotRestoredRegMark = binding.NewOneBitZeroRegMark(0, 23) + CtStateRestoredRegMark = binding.NewOneBitRegMark(0, 23) // reg0[25..32]: Field to indicate Antrea-native policy packetIn operations PacketInOperationField = binding.NewRegField(0, 25, 32) diff --git a/pkg/agent/openflow/framework_test.go b/pkg/agent/openflow/framework_test.go index 2c69c85ffa5..25a075ff091 100644 --- a/pkg/agent/openflow/framework_test.go +++ b/pkg/agent/openflow/framework_test.go @@ -284,49 +284,6 @@ func TestBuildPipeline(t *testing.T) { }, }, }, - { - name: "K8s Node, IPv4 only, with L7NetworkPolicy enabled", - ipStack: ipv6Only, - features: []feature{ - newTestFeaturePodConnectivity(ipStackMap[ipv4Only]), - newTestFeatureNetworkPolicy(config.K8sNode, enableL7NetworkPolicy), - newTestFeatureService(), - newTestFeatureEgress(), - }, - expectedTables: map[binding.PipelineID][]*Table{ - pipelineRoot: { - PipelineRootClassifierTable, - }, - pipelineIP: { - ClassifierTable, - SpoofGuardTable, - UnSNATTable, - ConntrackTable, - ConntrackStateTable, - PreRoutingClassifierTable, - SessionAffinityTable, - ServiceLBTable, - EndpointDNATTable, - AntreaPolicyEgressRuleTable, - EgressRuleTable, - EgressDefaultTable, - EgressMetricTable, - L3ForwardingTable, - EgressMarkTable, - L3DecTTLTable, - SNATMarkTable, - SNATTable, - L2ForwardingCalcTable, - TrafficControlTable, - AntreaPolicyIngressRuleTable, - IngressRuleTable, - IngressDefaultTable, - IngressMetricTable, - ConntrackCommitTable, - OutputTable, - }, - }, - }, { name: "K8s Node, IPv4 only, with AntreaPolicy disabled", ipStack: ipv6Only, diff --git a/pkg/agent/openflow/network_policy.go b/pkg/agent/openflow/network_policy.go index babc9e83971..b42ab200935 100644 --- a/pkg/agent/openflow/network_policy.go +++ b/pkg/agent/openflow/network_policy.go @@ -1653,6 +1653,9 @@ func (f *featureNetworkPolicy) replayFlows() []*openflow15.FlowMod { for _, ctx := range f.globalConjMatchFlowCache { addMatchFlows(ctx) } + + flows = append(flows, getCachedFlowMessages(f.cachedFlows)...) + return flows } @@ -2081,8 +2084,10 @@ type featureNetworkPolicy struct { // egressTables map records all IDs of tables related to egress rules. egressTables map[uint8]struct{} + cachedFlows *flowCategoryCache + // loggingGroupCache is a storage for the logging groups, each one includes 2 buckets: one is to send the packet - // to antrea-agent using the packetIn mechanism, the other is to force the packet to continue forwardingin the + // to antrea-agent using the packetIn mechanism, the other is to force the packet to continue forwarding in the // OVS pipeline. The key is the next table used in the second bucket, and the value is the Openflow group. loggingGroupCache sync.Map groupAllocator GroupAllocator @@ -2137,6 +2142,7 @@ func newFeatureNetworkPolicy( category: cookie.NetworkPolicy, ctZoneSrcField: getZoneSrcField(connectUplinkToBridge), loggingGroupCache: sync.Map{}, + cachedFlows: newFlowCategoryCache(), groupAllocator: grpAllocator, } } @@ -2152,9 +2158,6 @@ func (f *featureNetworkPolicy) initFlows() []*openflow15.FlowMod { var flows []binding.Flow if f.nodeType == config.K8sNode { flows = append(flows, f.ingressClassifierFlows()...) - if f.enableL7NetworkPolicy { - flows = append(flows, f.l7NPTrafficControlFlows()...) - } } flows = append(flows, f.skipPolicyRuleCheckFlows()...) flows = append(flows, f.initLoggingFlows()...) @@ -2213,37 +2216,119 @@ func (f *featureNetworkPolicy) skipPolicyRuleCheckFlows() []binding.Flow { func (f *featureNetworkPolicy) l7NPTrafficControlFlows() []binding.Flow { cookieID := f.cookieAllocator.Request(f.category).Raw() vlanMask := uint16(openflow15.OFPVID_PRESENT) - return []binding.Flow{ + flows := []binding.Flow{ + // This generates the flow to output the packets returned from an application-aware engine return port to their + // original target ofPort. It has the highest priority to prevent these packets from being matched by other + // flows in this table that redirect packets to an application-aware engine target ofPort again. + OutputTable.ofTable.BuildFlow(priorityHigh + 3). + Cookie(cookieID). + MatchRegMark(FromL7NPReturnRegMark). + Action().OutputToRegField(TargetOFPortField). + Done(), // This generates the flow to output the packets marked with L7NPRedirectCTMark to an application-aware engine // via the target ofPort. Note that, before outputting the packets, VLAN ID stored on field L7NPRuleVlanIDCTMarkField // will be copied to VLAN ID register (OXM_OF_VLAN_VID) to set VLAN ID of the packets. OutputTable.ofTable.BuildFlow(priorityHigh+2). Cookie(cookieID). - MatchRegMark(OutputToOFPortRegMark). MatchCTMark(L7NPRedirectCTMark). Action().PushVLAN(EtherTypeDot1q). Action().MoveRange(binding.NxmFieldCtLabel, binding.OxmFieldVLANVID, *L7NPRuleVlanIDCTLabel.GetRange(), *binding.VLANVIDRange). Action().Output(f.l7NetworkPolicyConfig.TargetOFPort). Done(), // This generates the flow to mark the packets from an application-aware engine via the return ofPort and forward - // the packets to stageRouting directly. Note that, for the packets which are originally to be output to a tunnel - // port, value of NXM_NX_TUN_IPV4_DST needs to be loaded in L3ForwardingTable of stageRouting. + // the packets to stageConntrackState directly. Note that, for the packets which are originally to be output to a + // tunnel port, value of NXM_NX_TUN_IPV4_DST needs to be loaded in L3ForwardingTable of stageRouting. ClassifierTable.ofTable.BuildFlow(priorityNormal). Cookie(f.cookieAllocator.Request(f.category).Raw()). MatchInPort(f.l7NetworkPolicyConfig.ReturnOFPort). MatchVLAN(false, 0, &vlanMask). Action().PopVLAN(). - Action().LoadRegMark(FromTCReturnRegMark). - Action().GotoStage(stageRouting). + Action().LoadRegMark(FromL7NPReturnRegMark). + Action().GotoStage(stageConntrackState). Done(), - // This generates the flow to forward the returned packets (with FromTCReturnRegMark) to stageOutput directly + // This generates the flow to forward the returned packets (with FromL7NPReturnRegMark) to stageOutput directly // after loading output port number to reg1 in L2ForwardingCalcTable. TrafficControlTable.ofTable.BuildFlow(priorityHigh). Cookie(cookieID). - MatchRegMark(OutputToOFPortRegMark, FromTCReturnRegMark). + MatchRegMark(FromL7NPReturnRegMark). Action().GotoStage(stageOutput). Done(), } + for _, ipProtocol := range f.ipProtocols { + ctZone := CtZone + if ipProtocol == binding.ProtocolIPv6 { + ctZone = CtZoneV6 + } + flows = append(flows, + // This generates the flow to restore the corresponding connection tracking (CT) state, excluding NAT, of + // packets and resubmit them back to the ConntrackTable. CtStateNotRestoredRegMark and CtStateRestoredRegMark + // are used to prevent packets from being resubmitted in a cyclic manner, ensuring that the packets are + // resubmitted only once. + ConntrackTable.ofTable.BuildFlow(priorityHigh+2). + MatchProtocol(ipProtocol). + MatchRegMark(CtStateNotRestoredRegMark). + Action().LoadRegMark(CtStateRestoredRegMark). + Action().CT(false, ConntrackTable.GetID(), ctZone, f.ctZoneSrcField). + CTDone(). + Cookie(cookieID). + Done(), + // This generates the flow to match the reply packets returning from the application-aware engine. If the + // packets belong to a Service connection, DNAT is applied to restore the original source IP (Service IP) with + // a connection tracking (CT) action. If the packets don't belong to a Service connection, the CT action is + // harmless and will have no effect on those packets. + // The reason why the target table is L3ForwardingTable is that for the packets which are originally intended + // to be output to a tunnel port, the value of NXM_NX_TUN_IPV4_DST needs to be loaded in L3ForwardingTable + // in stageRouting. This ensures that the correct tunnel destination IP is used for these packets, enabling + // proper forwarding through the tunnel. + ConntrackTable.ofTable.BuildFlow(priorityHigh+1). + MatchProtocol(ipProtocol). + MatchRegMark(FromL7NPReturnRegMark). + MatchCTStateRpl(true). + MatchCTStateTrk(true). + Action().CT(false, L3ForwardingTable.GetID(), ctZone, f.ctZoneSrcField). + NAT(). + CTDone(). + Cookie(cookieID). + Done(), + // This generates the flow to match the request packets returning from the application-aware engine. A + // connection tracking (CT) action with NAT is not needed because the DNAT has been done before they are redirected + // to the application-aware engine. The reason why the target table is L3ForwardingTable is the same as above. + ConntrackTable.ofTable.BuildFlow(priorityHigh+1). + MatchProtocol(ipProtocol). + MatchRegMark(FromL7NPReturnRegMark). + MatchCTStateRpl(false). + MatchCTStateTrk(true). + Action().GotoStage(stageRouting). + Cookie(cookieID). + Done(), + // This generates the flow to match the reply packets that should be redirected to the application-aware engine. + // A connection tracking (CT) action with NAT is not needed because the DNAT will be done after they are returned + // from the application-aware engine. + ConntrackTable.ofTable.BuildFlow(priorityHigh). + MatchProtocol(ipProtocol). + MatchCTStateRpl(true). + MatchCTStateTrk(true). + MatchCTMark(L7NPRedirectCTMark). + Action().GotoStage(stageOutput). + Cookie(cookieID). + Done(), + // This generates the flow to match request packets that are going to be redirected to the application-aware engine. + // A connection tracking (CT) action with NAT is needed because the DNAT should be done before they are redirected + // to the application-aware engine. + ConntrackTable.ofTable.BuildFlow(priorityHigh). + MatchProtocol(ipProtocol). + MatchCTStateRpl(false). + MatchCTStateTrk(true). + MatchCTMark(L7NPRedirectCTMark). + Action().CT(false, ConntrackTable.GetNext(), ctZone, f.ctZoneSrcField). + NAT(). + CTDone(). + Cookie(cookieID). + Done(), + ) + } + + return flows } func (f *featureNetworkPolicy) loggingNPPacketFlowWithOperations(cookieID uint64, operations uint8) binding.Flow { diff --git a/pkg/agent/openflow/network_policy_test.go b/pkg/agent/openflow/network_policy_test.go index 3951e1b0afd..d3a77026876 100644 --- a/pkg/agent/openflow/network_policy_test.go +++ b/pkg/agent/openflow/network_policy_test.go @@ -1360,7 +1360,7 @@ func TestClient_GetPolicyInfoFromConjunction(t *testing.T) { } } -func networkPolicyInitFlows(ovsMeterSupported, externalNodeEnabled, l7NetworkPolicyEnabled bool) []string { +func networkPolicyInitFlows(ovsMeterSupported, externalNodeEnabled bool) []string { loggingFlows := []string{ "cookie=0x1020000000000, table=Output, priority=200,reg0=0x2400000/0xfe600000 actions=controller(id=32776,reason=no_match,userdata=01.01,max_len=65535)", "cookie=0x1020000000000, table=Output, priority=200,reg0=0x4400000/0xfe600000 actions=controller(id=32776,reason=no_match,userdata=01.02,max_len=65535)", @@ -1399,13 +1399,6 @@ func networkPolicyInitFlows(ovsMeterSupported, externalNodeEnabled, l7NetworkPol "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64990,ct_state=-new+est,ip actions=goto_table:IngressMetric", "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64990,ct_state=-new+rel,ip actions=goto_table:IngressMetric", ) - if l7NetworkPolicyEnabled { - initFlows = append(initFlows, - "cookie=0x1020000000000, table=Classifier, priority=200,in_port=11,vlan_tci=0x1000/0x1000 actions=pop_vlan,set_field:0x6/0xf->reg0,goto_table:L3Forwarding", - "cookie=0x1020000000000, table=TrafficControl, priority=210,reg0=0x200006/0x60000f actions=goto_table:Output", - "cookie=0x1020000000000, table=Output, priority=212,ct_mark=0x80/0x80,reg0=0x200000/0x600000 actions=push_vlan:0x8100,move:NXM_NX_CT_LABEL[64..75]->OXM_OF_VLAN_VID[0..11],output:10", - ) - } return initFlows } @@ -1417,22 +1410,16 @@ func Test_featureNetworkPolicy_initFlows(t *testing.T) { clientOptions []clientOptionsFn expectedFlows []string }{ - { - name: "K8s Node with Multicast and L7NetworkPolicy", - nodeType: config.K8sNode, - clientOptions: []clientOptionsFn{enableMulticast, enableL7NetworkPolicy}, - expectedFlows: networkPolicyInitFlows(ovsMetersSupported, false, true), - }, { name: "K8s Node with Multicast", nodeType: config.K8sNode, clientOptions: []clientOptionsFn{enableMulticast}, - expectedFlows: networkPolicyInitFlows(ovsMetersSupported, false, false), + expectedFlows: networkPolicyInitFlows(ovsMetersSupported, false), }, { name: "External Node", nodeType: config.ExternalNode, - expectedFlows: networkPolicyInitFlows(ovsMetersSupported, true, false), + expectedFlows: networkPolicyInitFlows(ovsMetersSupported, true), }, } diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index eafaa90f344..4053f569285 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -1,4 +1,4 @@ -// Copyright 2024 Antrea Authors +// Copyright 2025 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -296,6 +296,20 @@ func (mr *MockClientMockRecorder) InstallEndpointFlows(protocol, endpoints any) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallEndpointFlows", reflect.TypeOf((*MockClient)(nil).InstallEndpointFlows), protocol, endpoints) } +// InstallL7NetworkPolicyFlows mocks base method. +func (m *MockClient) InstallL7NetworkPolicyFlows() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallL7NetworkPolicyFlows") + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallL7NetworkPolicyFlows indicates an expected call of InstallL7NetworkPolicyFlows. +func (mr *MockClientMockRecorder) InstallL7NetworkPolicyFlows() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallL7NetworkPolicyFlows", reflect.TypeOf((*MockClient)(nil).InstallL7NetworkPolicyFlows)) +} + // InstallMulticastFlexibleIPAMFlows mocks base method. func (m *MockClient) InstallMulticastFlexibleIPAMFlows() error { m.ctrl.T.Helper() diff --git a/test/e2e/l7networkpolicy_test.go b/test/e2e/l7networkpolicy_test.go index be0cf549c10..9ae8dacadca 100644 --- a/test/e2e/l7networkpolicy_test.go +++ b/test/e2e/l7networkpolicy_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -132,8 +133,8 @@ func createL7NetworkPolicy(t *testing.T, assert.NoError(t, err) } -func probeL7NetworkPolicyHTTP(t *testing.T, data *TestData, serverPodName, clientPodName string, serverIPs []*net.IP, allowHTTPPathHostname, allowHTTPPathClientIP bool) { - for _, ip := range serverIPs { +func probeL7NetworkPolicyHTTP(t *testing.T, data *TestData, serverPodName, clientPodName string, targetIPs []*net.IP, allowHTTPPathHostname, allowHTTPPathClientIP bool) { + for _, ip := range targetIPs { baseURL := net.JoinHostPort(ip.String(), "8080") // Verify that access to path /clientip is as expected. @@ -216,7 +217,26 @@ func testL7NetworkPolicyHTTP(t *testing.T, data *TestData) { require.NoError(t, NewPodBuilder(serverPodName, data.testNamespace, agnhostImage).OnNode(nodeName(0)).WithCommand(cmd).WithLabels(serverPodLabels).Create(data)) podIPs, err := data.podWaitForIPs(defaultTimeout, serverPodName, data.testNamespace) require.NoError(t, err, "Expected IP for Pod '%s'", serverPodName) - serverIPs := podIPs.AsSlice() + dstPodIPs := podIPs.AsSlice() + + // Create a Service whose backend is the above backend Pod. + var ipFamilies []corev1.IPFamily + if len(clusterInfo.podV4NetworkCIDR) != 0 { + ipFamilies = append(ipFamilies, corev1.IPv4Protocol) + } + if len(clusterInfo.podV6NetworkCIDR) != 0 { + ipFamilies = append(ipFamilies, corev1.IPv6Protocol) + } + mutator := func(service *corev1.Service) { + service.Spec.IPFamilies = ipFamilies + } + svc, err := data.CreateServiceWithAnnotations("svc-agnhost", data.testNamespace, p8080, p8080, corev1.ProtocolTCP, serverPodLabels, false, false, corev1.ServiceTypeClusterIP, nil, nil, mutator) + require.NoError(t, err) + var serviceIPs []*net.IP + for _, clusterIP := range svc.Spec.ClusterIPs { + serviceIP := net.ParseIP(clusterIP) + serviceIPs = append(serviceIPs, &serviceIP) + } l7ProtocolAllowsPathHostname := []crdv1beta1.L7Protocol{ { @@ -250,7 +270,8 @@ func testL7NetworkPolicyHTTP(t *testing.T, data *TestData) { // the first L7 NetworkPolicy has higher priority, matched packets will be only matched by the first L7 NetworkPolicy. // As a result, only HTTP path 'hostname' is allowed by the first L7 NetworkPolicy, other HTTP path like 'clientip' // will be rejected. - probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, serverIPs, true, false) + probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, dstPodIPs, true, false) + probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, serviceIPs, true, false) // Delete the first L7 NetworkPolicy that only allows HTTP path 'hostname'. data.crdClient.CrdV1beta1().NetworkPolicies(data.testNamespace).Delete(context.TODO(), policyAllowPathHostname, metav1.DeleteOptions{}) @@ -258,7 +279,8 @@ func testL7NetworkPolicyHTTP(t *testing.T, data *TestData) { // Since the fist L7 NetworkPolicy has been deleted, corresponding packets will be matched by the second L7 NetworkPolicy, // and the second L7 NetworkPolicy allows any HTTP path, then both path 'hostname' and 'clientip' are allowed. - probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, serverIPs, true, true) + probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, dstPodIPs, true, true) + probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, serviceIPs, true, true) data.crdClient.CrdV1beta1().NetworkPolicies(data.testNamespace).Delete(context.TODO(), policyAllowAnyPath, metav1.DeleteOptions{}) }) @@ -277,7 +299,8 @@ func testL7NetworkPolicyHTTP(t *testing.T, data *TestData) { // the first L7 NetworkPolicy has higher priority, matched packets will be only matched by the first L7 NetworkPolicy. // As a result, only HTTP path 'hostname' is allowed by the first L7 NetworkPolicy, other HTTP path like 'clientip' // will be rejected. - probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, serverIPs, true, false) + probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, dstPodIPs, true, false) + probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, serviceIPs, true, false) // Delete the first L7 NetworkPolicy that only allows HTTP path 'hostname'. data.crdClient.CrdV1beta1().NetworkPolicies(data.testNamespace).Delete(context.TODO(), policyAllowPathHostname, metav1.DeleteOptions{}) @@ -285,7 +308,8 @@ func testL7NetworkPolicyHTTP(t *testing.T, data *TestData) { // Since the fist L7 NetworkPolicy has been deleted, corresponding packets will be matched by the second L7 NetworkPolicy, // and the second L7 NetworkPolicy allows any HTTP path, then both path 'hostname' and 'clientip' are allowed. - probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, serverIPs, true, true) + probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, dstPodIPs, true, true) + probeL7NetworkPolicyHTTP(t, data, serverPodName, clientPodName, serviceIPs, true, true) }) }