Skip to content

Commit

Permalink
Fix that Service traffic cannot be allowed through L7 NetworkPolicy
Browse files Browse the repository at this point in the history
Signed-off-by: Hongliang Liu <[email protected]>
  • Loading branch information
hongliangl committed Jan 20, 2025
1 parent 2a83607 commit ef99eab
Show file tree
Hide file tree
Showing 14 changed files with 195 additions and 102 deletions.
2 changes: 1 addition & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func run(o *Options) error {
}
var l7Reconciler *l7engine.Reconciler
if l7NetworkPolicyEnabled || l7FlowExporterEnabled {
l7Reconciler = l7engine.NewReconciler()
l7Reconciler = l7engine.NewReconciler(ofClient)
}
networkPolicyController, err := networkpolicy.NewNetworkPolicyController(
antreaClientProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type L7FlowExporterController struct {

targetPort uint32

once sync.Once

queue workqueue.TypedRateLimitingInterface[string]
}

Expand Down Expand Up @@ -324,7 +326,9 @@ func (l7c *L7FlowExporterController) syncPod(podNN string) error {
sourceOfPort := []uint32{uint32(podInterfaces[0].OFPort)}

// Start Suricata before starting traffic control mark flows
l7c.l7Reconciler.StartSuricataOnce()
l7c.once.Do(func() {
l7c.l7Reconciler.StartSuricata()
})

oldDirection, exists := l7c.getMirroredDirection(podNN)
if exists {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func newFakeControllerAndWatcher(t *testing.T, objects []runtime.Object, interfa
ifaceStore.AddInterface(itf)
}

l7Reconciler := l7engine.NewReconciler()
l7Reconciler := l7engine.NewReconciler(nil)
l7w := NewL7FlowExporterController(mockOFClient, ifaceStore, localPodInformer, nsInformer, l7Reconciler)

return &fakeController{
Expand Down
20 changes: 15 additions & 5 deletions pkg/agent/controller/networkpolicy/l7engine/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/openflow"
v1beta "antrea.io/antrea/pkg/apis/controlplane/v1beta2"
"antrea.io/antrea/pkg/util/logdir"
)
Expand Down Expand Up @@ -153,10 +154,12 @@ type Reconciler struct {
suricataTenantCache *threadSafeSet[uint32]
suricataTenantHandlerCache *threadSafeSet[uint32]

ofClient openflow.Client

once sync.Once
}

func NewReconciler() *Reconciler {
func NewReconciler(ofClient openflow.Client) *Reconciler {
return &Reconciler{
suricataScFn: suricataSc,
startSuricataFn: startSuricata,
Expand All @@ -166,6 +169,7 @@ func NewReconciler() *Reconciler {
suricataTenantHandlerCache: &threadSafeSet[uint32]{
cached: sets.New[uint32](),
},
ofClient: ofClient,
}
}

Expand Down Expand Up @@ -260,9 +264,15 @@ func convertProtocolTLS(tls *v1beta.TLSProtocol) string {
return strings.Join(keywords, " ")
}

func (r *Reconciler) StartSuricataOnce() {
func (r *Reconciler) ensureL7FlowsAndStartSuricataOnce() {
r.once.Do(func() {
r.startSuricata()
if r.ofClient != nil {
if err := r.ofClient.InstallL7NetworkPolicyFlows(); err != nil {
klog.ErrorS(err, "Failed to install l7 NetworkPolicy flows")
}
}

r.StartSuricata()
})
}

Expand All @@ -272,7 +282,7 @@ func (r *Reconciler) AddRule(ruleID, policyName string, vlanID uint32, l7Protoco
klog.V(5).Infof("AddRule took %v", time.Since(start))
}()

r.StartSuricataOnce()
r.ensureL7FlowsAndStartSuricataOnce()

// Generate the keyword part used in Suricata rules.
protoKeywords := make(map[string]sets.Set[string])
Expand Down Expand Up @@ -461,7 +471,7 @@ func (r *Reconciler) unregisterSuricataTenantHandler(tenantID, vlanID uint32) (*
return r.suricataScFn(scCmd)
}

func (r *Reconciler) startSuricata() {
func (r *Reconciler) StartSuricata() {
f, err := defaultFS.Create(antreaSuricataConfigPath)
if err != nil {
klog.ErrorS(err, "Failed to create Suricata config file", "FilePath", antreaSuricataConfigPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,12 @@ func TestStartSuricata(t *testing.T) {
_, err := defaultFS.Create(defaultSuricataConfigPath)
assert.NoError(t, err)

fe := NewReconciler()
fe := NewReconciler(nil)
fs := newFakeSuricata()
fe.suricataScFn = fs.suricataScFunc
fe.startSuricataFn = fs.startSuricataFn

fe.startSuricata()
fe.StartSuricata()

ok, err := afero.FileContainsBytes(defaultFS, antreaSuricataConfigPath, []byte(suricataAntreaConfigData))
assert.NoError(t, err)
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestRuleLifecycle(t *testing.T) {
_, err := defaultFS.Create(defaultSuricataConfigPath)
assert.NoError(t, err)

fe := NewReconciler()
fe := NewReconciler(nil)
fs := newFakeSuricata()
fe.suricataScFn = fs.suricataScFunc
fe.startSuricataFn = fs.startSuricataFn
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func newTestController() (*Controller, *fake.Clientset, *mockReconciler) {
groupIDAllocator := openflow.NewGroupAllocator()
groupCounters := []proxytypes.GroupCounter{proxytypes.NewGroupCounter(groupIDAllocator, ch2)}
fs := afero.NewMemMapFs()
l7reconciler := l7engine.NewReconciler()
l7reconciler := l7engine.NewReconciler(nil)
controller, _ := NewNetworkPolicyController(&antreaClientGetter{clientset},
nil,
nil,
Expand Down
13 changes: 13 additions & 0 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,9 @@ type Client interface {

// SubscribeOFPortStatusMessage registers a channel to listen the OpenFlow PortStatus message.
SubscribeOFPortStatusMessage(statusCh chan *openflow15.PortStatus)

// InstallL7NetworkPolicyFlows will be only called when one L7 NetworkPolicy is provisioned.
InstallL7NetworkPolicyFlows() error
}

// GetFlowTableStatus returns an array of flow table status.
Expand Down Expand Up @@ -1704,3 +1707,13 @@ func (c *client) getMeterStats() {
func (c *client) SubscribeOFPortStatusMessage(statusCh chan *openflow15.PortStatus) {
c.bridge.SubscribePortStatusConsumer(statusCh)
}

// InstallL7NetworkPolicyFlows will be only called when one L7 NetworkPolicy is provisioned.
func (c *client) InstallL7NetworkPolicyFlows() error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()

cacheKey := "l7_np_provision_flows"
flows := c.featureNetworkPolicy.l7NPTrafficControlFlows()
return c.addFlows(c.featureNetworkPolicy.cachedFlows, cacheKey, flows)
}
29 changes: 28 additions & 1 deletion pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2719,7 +2719,7 @@ func Test_client_ReplayFlows(t *testing.T) {

expectedFlows := append(pipelineDefaultFlows(true /* egressTrafficShapingEnabled */, false /* externalNodeEnabled */, true /* isEncap */, true /* isIPv4 */), egressInitFlows(true)...)
expectedFlows = append(expectedFlows, multicastInitFlows(true)...)
expectedFlows = append(expectedFlows, networkPolicyInitFlows(true, false, false)...)
expectedFlows = append(expectedFlows, networkPolicyInitFlows(true, false)...)
expectedFlows = append(expectedFlows, podConnectivityInitFlows(config.TrafficEncapModeEncap, config.TrafficEncryptionModeNone, false, true, true, true)...)
expectedFlows = append(expectedFlows, serviceInitFlows(true, true, false, false)...)

Expand Down Expand Up @@ -2917,3 +2917,30 @@ func TestSubscribeOFPortStatusMessage(t *testing.T) {
bridge.EXPECT().SubscribePortStatusConsumer(ch).Times(1)
c.SubscribeOFPortStatusMessage(ch)
}

func Test_client_InstallL7NetworkPolicyFlows(t *testing.T) {
ctrl := gomock.NewController(t)
m := opstest.NewMockOFEntryOperations(ctrl)

fc := newFakeClient(m, true, false, config.K8sNode, config.TrafficEncapModeEncap, enableL7NetworkPolicy)
defer resetPipelines()

expectedFlows := []string{
"cookie=0x1020000000000, table=Classifier, priority=200,in_port=11,vlan_tci=0x1000/0x1000 actions=pop_vlan,set_field:0x6/0xf->reg0,goto_table:UnSNAT",
"cookie=0x1020000000000, table=ConntrackZone, priority=212,ip,reg0=0x0/0x800000 actions=set_field:0x800000/0x800000->reg0,ct(table=ConntrackZone,zone=65520)",
"cookie=0x1020000000000, table=ConntrackZone, priority=210,ct_state=+rpl+trk,ct_mark=0x80/0x80,ip actions=goto_table:Output",
"cookie=0x1020000000000, table=ConntrackZone, priority=211,ct_state=+rpl+trk,ip,reg0=0x6/0xf actions=ct(table=L3Forwarding,zone=65520,nat)",
"cookie=0x1020000000000, table=ConntrackZone, priority=211,ct_state=-rpl+trk,ip,reg0=0x6/0xf actions=goto_table:L3Forwarding",
"cookie=0x1020000000000, table=ConntrackZone, priority=210,ct_state=-rpl+trk,ct_mark=0x80/0x80,ip actions=ct(table=ConntrackState,zone=65520,nat)",
"cookie=0x1020000000000, table=TrafficControl, priority=210,reg0=0x6/0xf actions=goto_table:Output",
"cookie=0x1020000000000, table=Output, priority=213,reg0=0x6/0xf actions=output:NXM_NX_REG1[]",
"cookie=0x1020000000000, table=Output, priority=212,ct_mark=0x80/0x80 actions=push_vlan:0x8100,move:NXM_NX_CT_LABEL[64..75]->OXM_OF_VLAN_VID[0..11],output:10",
}

m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1)
cacheKey := "l7_np_provision_flows"
assert.NoError(t, fc.InstallL7NetworkPolicyFlows())
fCacheI, ok := fc.featureNetworkPolicy.cachedFlows.Load(cacheKey)
require.True(t, ok)
assert.ElementsMatch(t, expectedFlows, getFlowStrings(fCacheI))
}
3 changes: 3 additions & 0 deletions pkg/agent/openflow/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ var (
OutputRegField = binding.NewRegField(0, 21, 22)
OutputToOFPortRegMark = binding.NewRegMark(OutputRegField, outputToPortVal)
OutputToControllerRegMark = binding.NewRegMark(OutputRegField, outputToControllerVal)
// reg0[23]:
CtStateNotRestoredRegMark = binding.NewOneBitZeroRegMark(0, 23)
CtStateRestoredRegMark = binding.NewOneBitRegMark(0, 23)
// reg0[25..32]: Field to indicate Antrea-native policy packetIn operations
PacketInOperationField = binding.NewRegField(0, 25, 32)

Expand Down
43 changes: 0 additions & 43 deletions pkg/agent/openflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,49 +284,6 @@ func TestBuildPipeline(t *testing.T) {
},
},
},
{
name: "K8s Node, IPv4 only, with L7NetworkPolicy enabled",
ipStack: ipv6Only,
features: []feature{
newTestFeaturePodConnectivity(ipStackMap[ipv4Only]),
newTestFeatureNetworkPolicy(config.K8sNode, enableL7NetworkPolicy),
newTestFeatureService(),
newTestFeatureEgress(),
},
expectedTables: map[binding.PipelineID][]*Table{
pipelineRoot: {
PipelineRootClassifierTable,
},
pipelineIP: {
ClassifierTable,
SpoofGuardTable,
UnSNATTable,
ConntrackTable,
ConntrackStateTable,
PreRoutingClassifierTable,
SessionAffinityTable,
ServiceLBTable,
EndpointDNATTable,
AntreaPolicyEgressRuleTable,
EgressRuleTable,
EgressDefaultTable,
EgressMetricTable,
L3ForwardingTable,
EgressMarkTable,
L3DecTTLTable,
SNATMarkTable,
SNATTable,
L2ForwardingCalcTable,
TrafficControlTable,
AntreaPolicyIngressRuleTable,
IngressRuleTable,
IngressDefaultTable,
IngressMetricTable,
ConntrackCommitTable,
OutputTable,
},
},
},
{
name: "K8s Node, IPv4 only, with AntreaPolicy disabled",
ipStack: ipv6Only,
Expand Down
Loading

0 comments on commit ef99eab

Please sign in to comment.