Skip to content

Commit

Permalink
Make sure that K3s service prefix is routed via cluster interface
Browse files Browse the repository at this point in the history
Ensure that packets destined for K3s services do not use the default route,
but are instead routed through the cluster port. This guarantees that traffic
handled by kube-proxy is properly SNATed to the cluster IP. That's the theory
at least. We're not entirely certain. Without this route, however,
some Longhorn pods fail to access K3s services when the cluster IP is configured
on a non-default port.

Signed-off-by: Milan Lenco <[email protected]>
  • Loading branch information
milan-zededa authored and eriknordmark committed Dec 3, 2024
1 parent f79132c commit a85bf88
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 11 deletions.
4 changes: 4 additions & 0 deletions pkg/pillar/devicenetwork/pbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
package devicenetwork

const (
// KubeSvcRT : index of the routing table used for the Kubernetes service prefix.
// Only used in the Kubevirt mode.
KubeSvcRT = 400

// DPCBaseRTIndex : base index for per-port routing tables used for device
// connectivity (between EVE and remote endpoints, such as the controller),
// i.e. used for DevicePortConfig (abbreviated to DPC).
Expand Down
68 changes: 57 additions & 11 deletions pkg/pillar/dpcreconciler/linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ func (r *LinuxDpcReconciler) watcher(netEvents <-chan netmonitor.Event) {
}
}
if ev.Deleted {
changed := r.updateCurrentRoutes(r.lastArgs.DPC)
changed := r.updateCurrentRoutes(r.lastArgs.DPC,
r.lastArgs.ClusterStatus)
if changed {
r.addPendingReconcile(
L3SG, "interface delete triggered route change", true)
Expand All @@ -315,13 +316,14 @@ func (r *LinuxDpcReconciler) watcher(netEvents <-chan netmonitor.Event) {
if changed {
r.addPendingReconcile(L3SG, "address change", true)
}
changed = r.updateCurrentRoutes(r.lastArgs.DPC)
changed = r.updateCurrentRoutes(r.lastArgs.DPC, r.lastArgs.ClusterStatus)
if changed {
r.addPendingReconcile(L3SG, "address change triggered route change", true)
}

case netmonitor.DNSInfoChange:
newGlobalCfg := r.getIntendedGlobalCfg(r.lastArgs.DPC)
newGlobalCfg := r.getIntendedGlobalCfg(r.lastArgs.DPC,
r.lastArgs.ClusterStatus)
prevGlobalCfg := r.intendedState.SubGraph(GlobalSG)
if len(prevGlobalCfg.DiffItems(newGlobalCfg)) > 0 {
r.addPendingReconcile(GlobalSG, "DNS info change", true)
Expand Down Expand Up @@ -449,7 +451,7 @@ func (r *LinuxDpcReconciler) Reconcile(ctx context.Context, args Args) Reconcile
var intSG dg.Graph
switch reconcileSG {
case GlobalSG:
intSG = r.getIntendedGlobalCfg(args.DPC)
intSG = r.getIntendedGlobalCfg(args.DPC, args.ClusterStatus)
case NetworkIoSG:
intSG = r.getIntendedNetworkIO(args.DPC)
case PhysicalIfsSG:
Expand Down Expand Up @@ -703,7 +705,7 @@ func (r *LinuxDpcReconciler) updateCurrentState(args Args) (changed bool) {
if addrsChanged := r.updateCurrentAdapterAddrs(args.DPC); addrsChanged {
changed = true
}
if routesChanged := r.updateCurrentRoutes(args.DPC); routesChanged {
if routesChanged := r.updateCurrentRoutes(args.DPC, args.ClusterStatus); routesChanged {
changed = true
}
return changed
Expand Down Expand Up @@ -792,7 +794,8 @@ func (r *LinuxDpcReconciler) updateCurrentAdapterAddrs(
return false
}

func (r *LinuxDpcReconciler) updateCurrentRoutes(dpc types.DevicePortConfig) (changed bool) {
func (r *LinuxDpcReconciler) updateCurrentRoutes(dpc types.DevicePortConfig,
clusterStatus types.EdgeNodeClusterStatus) (changed bool) {
sgPath := dg.NewSubGraphPath(L3SG, RoutesSG)
currentRoutes := dg.New(dg.InitArgs{Name: RoutesSG})
for _, port := range dpc.Ports {
Expand All @@ -819,6 +822,20 @@ func (r *LinuxDpcReconciler) updateCurrentRoutes(dpc types.DevicePortConfig) (ch
r.Log.Errorf("updateCurrentRoutes: ListRoutes failed for ifIndex %d: %v",
ifIndex, err)
}
if r.HVTypeKube && clusterStatus.ClusterInterface == port.Logicallabel {
k3sSvcRoutes, err := r.NetworkMonitor.ListRoutes(netmonitor.RouteFilters{
FilterByTable: true,
Table: devicenetwork.KubeSvcRT,
FilterByIf: true,
IfIndex: ifIndex,
})
if err == nil {
routes = append(routes, k3sSvcRoutes...)
} else {
r.Log.Errorf("updateCurrentRoutes: ListRoutes failed for ifIndex %d "+
"and the KubeSvc table: %v", ifIndex, err)
}
}
for _, rt := range routes {
currentRoutes.PutItem(linux.Route{
Route: rt.Data.(netlink.Route),
Expand All @@ -844,7 +861,7 @@ func (r *LinuxDpcReconciler) updateIntendedState(args Args) {
Description: "Device Connectivity provided using Linux network stack",
}
r.intendedState = dg.New(graphArgs)
r.intendedState.PutSubGraph(r.getIntendedGlobalCfg(args.DPC))
r.intendedState.PutSubGraph(r.getIntendedGlobalCfg(args.DPC, args.ClusterStatus))
r.intendedState.PutSubGraph(r.getIntendedNetworkIO(args.DPC))
r.intendedState.PutSubGraph(r.getIntendedPhysicalIfs(args.DPC))
r.intendedState.PutSubGraph(r.getIntendedLogicalIO(args.DPC))
Expand All @@ -854,7 +871,8 @@ func (r *LinuxDpcReconciler) updateIntendedState(args Args) {
args.FlowlogEnabled))
}

func (r *LinuxDpcReconciler) getIntendedGlobalCfg(dpc types.DevicePortConfig) dg.Graph {
func (r *LinuxDpcReconciler) getIntendedGlobalCfg(dpc types.DevicePortConfig,
clusterStatus types.EdgeNodeClusterStatus) dg.Graph {
graphArgs := dg.InitArgs{
Name: GlobalSG,
Description: "Global configuration",
Expand All @@ -871,10 +889,14 @@ func (r *LinuxDpcReconciler) getIntendedGlobalCfg(dpc types.DevicePortConfig) dg
Priority: devicenetwork.PbrKubeNetworkPrio,
Table: unix.RT_TABLE_MAIN,
}, nil)
tableForKubeSvc := unix.RT_TABLE_MAIN
if clusterStatus.ClusterInterface != "" {
tableForKubeSvc = devicenetwork.KubeSvcRT
}
intendedCfg.PutItem(linux.IPRule{
Dst: kubeSvcCIDR,
Priority: devicenetwork.PbrKubeNetworkPrio,
Table: unix.RT_TABLE_MAIN,
Table: tableForKubeSvc,
}, nil)
}
if len(dpc.Ports) == 0 {
Expand Down Expand Up @@ -1076,7 +1098,7 @@ func (r *LinuxDpcReconciler) getIntendedL3Cfg(dpc types.DevicePortConfig,
intendedL3 := dg.New(graphArgs)
intendedL3.PutSubGraph(r.getIntendedAdapters(dpc, clusterStatus))
intendedL3.PutSubGraph(r.getIntendedSrcIPRules(dpc))
intendedL3.PutSubGraph(r.getIntendedRoutes(dpc))
intendedL3.PutSubGraph(r.getIntendedRoutes(dpc, clusterStatus))
intendedL3.PutSubGraph(r.getIntendedArps(dpc))
return intendedL3
}
Expand Down Expand Up @@ -1176,7 +1198,8 @@ func (r *LinuxDpcReconciler) getIntendedSrcIPRules(dpc types.DevicePortConfig) d
return intendedRules
}

func (r *LinuxDpcReconciler) getIntendedRoutes(dpc types.DevicePortConfig) dg.Graph {
func (r *LinuxDpcReconciler) getIntendedRoutes(dpc types.DevicePortConfig,
clusterStatus types.EdgeNodeClusterStatus) dg.Graph {
graphArgs := dg.InitArgs{
Name: RoutesSG,
Description: "IP routes",
Expand Down Expand Up @@ -1218,6 +1241,29 @@ func (r *LinuxDpcReconciler) getIntendedRoutes(dpc types.DevicePortConfig) dg.Gr
AdapterLL: port.Logicallabel,
}, nil)
}
if r.HVTypeKube && clusterStatus.ClusterInterface == port.Logicallabel &&
clusterStatus.ClusterIPPrefix != nil {
// Ensure that packets destined for K3s services do not use the default route,
// but are instead routed through the cluster port. This guarantees that traffic
// handled by kube-proxy is properly SNATed to the cluster IP. That's the theory
// at least. We're not entirely certain. Without this route, however,
// some Longhorn pods fail to access K3s services when the cluster IP is configured
// on a non-default port.
intendedRoutes.PutItem(linux.Route{
Route: netlink.Route{
LinkIndex: ifIndex,
Family: netlink.FAMILY_V4,
Scope: netlink.SCOPE_UNIVERSE,
Protocol: unix.RTPROT_STATIC,
Type: unix.RTN_UNICAST,
Dst: kubeSvcCIDR,
Gw: clusterStatus.ClusterIPPrefix.IP,
Table: devicenetwork.KubeSvcRT,
},
AdapterIfName: port.IfName,
AdapterLL: port.Logicallabel,
}, nil)
}
}
return intendedRoutes
}
Expand Down

0 comments on commit a85bf88

Please sign in to comment.