diff --git a/pkg/pillar/devicenetwork/pbr.go b/pkg/pillar/devicenetwork/pbr.go index 541d48dca7..50308c2158 100644 --- a/pkg/pillar/devicenetwork/pbr.go +++ b/pkg/pillar/devicenetwork/pbr.go @@ -4,6 +4,10 @@ package devicenetwork const ( + // KubeSvcRT : index of the routing table used for the Kubernetes service prefix. + // Only used in the Kubevirt mode. + KubeSvcRT = 400 + // DPCBaseRTIndex : base index for per-port routing tables used for device // connectivity (between EVE and remote endpoints, such as the controller), // i.e. used for DevicePortConfig (abbreviated to DPC). diff --git a/pkg/pillar/dpcreconciler/linux.go b/pkg/pillar/dpcreconciler/linux.go index f6c92c8261..a147ab5a2f 100644 --- a/pkg/pillar/dpcreconciler/linux.go +++ b/pkg/pillar/dpcreconciler/linux.go @@ -304,7 +304,8 @@ func (r *LinuxDpcReconciler) watcher(netEvents <-chan netmonitor.Event) { } } if ev.Deleted { - changed := r.updateCurrentRoutes(r.lastArgs.DPC) + changed := r.updateCurrentRoutes(r.lastArgs.DPC, + r.lastArgs.ClusterStatus) if changed { r.addPendingReconcile( L3SG, "interface delete triggered route change", true) @@ -315,13 +316,14 @@ func (r *LinuxDpcReconciler) watcher(netEvents <-chan netmonitor.Event) { if changed { r.addPendingReconcile(L3SG, "address change", true) } - changed = r.updateCurrentRoutes(r.lastArgs.DPC) + changed = r.updateCurrentRoutes(r.lastArgs.DPC, r.lastArgs.ClusterStatus) if changed { r.addPendingReconcile(L3SG, "address change triggered route change", true) } case netmonitor.DNSInfoChange: - newGlobalCfg := r.getIntendedGlobalCfg(r.lastArgs.DPC) + newGlobalCfg := r.getIntendedGlobalCfg(r.lastArgs.DPC, + r.lastArgs.ClusterStatus) prevGlobalCfg := r.intendedState.SubGraph(GlobalSG) if len(prevGlobalCfg.DiffItems(newGlobalCfg)) > 0 { r.addPendingReconcile(GlobalSG, "DNS info change", true) @@ -449,7 +451,7 @@ func (r *LinuxDpcReconciler) Reconcile(ctx context.Context, args Args) Reconcile var intSG dg.Graph switch reconcileSG { case GlobalSG: - intSG = r.getIntendedGlobalCfg(args.DPC) + intSG = r.getIntendedGlobalCfg(args.DPC, args.ClusterStatus) case NetworkIoSG: intSG = r.getIntendedNetworkIO(args.DPC) case PhysicalIfsSG: @@ -703,7 +705,7 @@ func (r *LinuxDpcReconciler) updateCurrentState(args Args) (changed bool) { if addrsChanged := r.updateCurrentAdapterAddrs(args.DPC); addrsChanged { changed = true } - if routesChanged := r.updateCurrentRoutes(args.DPC); routesChanged { + if routesChanged := r.updateCurrentRoutes(args.DPC, args.ClusterStatus); routesChanged { changed = true } return changed @@ -792,7 +794,8 @@ func (r *LinuxDpcReconciler) updateCurrentAdapterAddrs( return false } -func (r *LinuxDpcReconciler) updateCurrentRoutes(dpc types.DevicePortConfig) (changed bool) { +func (r *LinuxDpcReconciler) updateCurrentRoutes(dpc types.DevicePortConfig, + clusterStatus types.EdgeNodeClusterStatus) (changed bool) { sgPath := dg.NewSubGraphPath(L3SG, RoutesSG) currentRoutes := dg.New(dg.InitArgs{Name: RoutesSG}) for _, port := range dpc.Ports { @@ -819,6 +822,20 @@ func (r *LinuxDpcReconciler) updateCurrentRoutes(dpc types.DevicePortConfig) (ch r.Log.Errorf("updateCurrentRoutes: ListRoutes failed for ifIndex %d: %v", ifIndex, err) } + if r.HVTypeKube && clusterStatus.ClusterInterface == port.Logicallabel { + k3sSvcRoutes, err := r.NetworkMonitor.ListRoutes(netmonitor.RouteFilters{ + FilterByTable: true, + Table: devicenetwork.KubeSvcRT, + FilterByIf: true, + IfIndex: ifIndex, + }) + if err == nil { + routes = append(routes, k3sSvcRoutes...) + } else { + r.Log.Errorf("updateCurrentRoutes: ListRoutes failed for ifIndex %d "+ + "and the KubeSvc table: %v", ifIndex, err) + } + } for _, rt := range routes { currentRoutes.PutItem(linux.Route{ Route: rt.Data.(netlink.Route), @@ -844,7 +861,7 @@ func (r *LinuxDpcReconciler) updateIntendedState(args Args) { Description: "Device Connectivity provided using Linux network stack", } r.intendedState = dg.New(graphArgs) - r.intendedState.PutSubGraph(r.getIntendedGlobalCfg(args.DPC)) + r.intendedState.PutSubGraph(r.getIntendedGlobalCfg(args.DPC, args.ClusterStatus)) r.intendedState.PutSubGraph(r.getIntendedNetworkIO(args.DPC)) r.intendedState.PutSubGraph(r.getIntendedPhysicalIfs(args.DPC)) r.intendedState.PutSubGraph(r.getIntendedLogicalIO(args.DPC)) @@ -854,7 +871,8 @@ func (r *LinuxDpcReconciler) updateIntendedState(args Args) { args.FlowlogEnabled)) } -func (r *LinuxDpcReconciler) getIntendedGlobalCfg(dpc types.DevicePortConfig) dg.Graph { +func (r *LinuxDpcReconciler) getIntendedGlobalCfg(dpc types.DevicePortConfig, + clusterStatus types.EdgeNodeClusterStatus) dg.Graph { graphArgs := dg.InitArgs{ Name: GlobalSG, Description: "Global configuration", @@ -871,10 +889,14 @@ func (r *LinuxDpcReconciler) getIntendedGlobalCfg(dpc types.DevicePortConfig) dg Priority: devicenetwork.PbrKubeNetworkPrio, Table: unix.RT_TABLE_MAIN, }, nil) + tableForKubeSvc := unix.RT_TABLE_MAIN + if clusterStatus.ClusterInterface != "" { + tableForKubeSvc = devicenetwork.KubeSvcRT + } intendedCfg.PutItem(linux.IPRule{ Dst: kubeSvcCIDR, Priority: devicenetwork.PbrKubeNetworkPrio, - Table: unix.RT_TABLE_MAIN, + Table: tableForKubeSvc, }, nil) } if len(dpc.Ports) == 0 { @@ -1076,7 +1098,7 @@ func (r *LinuxDpcReconciler) getIntendedL3Cfg(dpc types.DevicePortConfig, intendedL3 := dg.New(graphArgs) intendedL3.PutSubGraph(r.getIntendedAdapters(dpc, clusterStatus)) intendedL3.PutSubGraph(r.getIntendedSrcIPRules(dpc)) - intendedL3.PutSubGraph(r.getIntendedRoutes(dpc)) + intendedL3.PutSubGraph(r.getIntendedRoutes(dpc, clusterStatus)) intendedL3.PutSubGraph(r.getIntendedArps(dpc)) return intendedL3 } @@ -1176,7 +1198,8 @@ func (r *LinuxDpcReconciler) getIntendedSrcIPRules(dpc types.DevicePortConfig) d return intendedRules } -func (r *LinuxDpcReconciler) getIntendedRoutes(dpc types.DevicePortConfig) dg.Graph { +func (r *LinuxDpcReconciler) getIntendedRoutes(dpc types.DevicePortConfig, + clusterStatus types.EdgeNodeClusterStatus) dg.Graph { graphArgs := dg.InitArgs{ Name: RoutesSG, Description: "IP routes", @@ -1218,6 +1241,29 @@ func (r *LinuxDpcReconciler) getIntendedRoutes(dpc types.DevicePortConfig) dg.Gr AdapterLL: port.Logicallabel, }, nil) } + if r.HVTypeKube && clusterStatus.ClusterInterface == port.Logicallabel && + clusterStatus.ClusterIPPrefix != nil { + // Ensure that packets destined for K3s services do not use the default route, + // but are instead routed through the cluster port. This guarantees that traffic + // handled by kube-proxy is properly SNATed to the cluster IP. That's the theory + // at least. We're not entirely certain. Without this route, however, + // some Longhorn pods fail to access K3s services when the cluster IP is configured + // on a non-default port. + intendedRoutes.PutItem(linux.Route{ + Route: netlink.Route{ + LinkIndex: ifIndex, + Family: netlink.FAMILY_V4, + Scope: netlink.SCOPE_UNIVERSE, + Protocol: unix.RTPROT_STATIC, + Type: unix.RTN_UNICAST, + Dst: kubeSvcCIDR, + Gw: clusterStatus.ClusterIPPrefix.IP, + Table: devicenetwork.KubeSvcRT, + }, + AdapterIfName: port.IfName, + AdapterLL: port.Logicallabel, + }, nil) + } } return intendedRoutes }