Skip to content

Commit

Permalink
nflog: log packets by setting the loglevel to print them by stdout
Browse files Browse the repository at this point in the history
  • Loading branch information
aojea committed Jan 6, 2025
1 parent c7d21ee commit 2f1eb78
Show file tree
Hide file tree
Showing 6 changed files with 662 additions and 0 deletions.
20 changes: 20 additions & 0 deletions cmd/kindnetd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/aojea/kindnet/pkg/fastpath"
"github.com/aojea/kindnet/pkg/masq"
kindnetnat64 "github.com/aojea/kindnet/pkg/nat64"
"github.com/aojea/kindnet/pkg/nflog"
kindnetnode "github.com/aojea/kindnet/pkg/node"

"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -79,6 +80,7 @@ var (
metricsBindAddress string
fastpathThreshold int
disableCNI bool
nflogLevel int
)

func init() {
Expand All @@ -95,6 +97,8 @@ func init() {
flag.StringVar(&metricsBindAddress, "metrics-bind-address", ":19080", "The IP address and port for the metrics server to serve on")
flag.IntVar(&fastpathThreshold, "fastpath-threshold", 20, "The number of packets after the traffic is offloaded to the fast path, zero disables it (default 20). Set to zero to disable it")

flag.IntVar(&nflogLevel, "nflog-level", 9, "The log level at which the TCP and UDP packets are logged to stdout (default 9)")

flag.Usage = func() {
fmt.Fprint(os.Stderr, "Usage: kindnet [options]\n\n")
flag.PrintDefaults()
Expand Down Expand Up @@ -293,6 +297,22 @@ func main() {
klog.Info("Skipping fastpathAgent")
}

if klog.V(klog.Level(nflogLevel)).Enabled() {
klog.Infof("Packet logging enabled")
nflogAgent, err := nflog.NewNFLogAgent(nflogLevel)
if err != nil {
klog.Fatalf("error creating nflog agent: %v", err)
}
go func() {
defer nflogAgent.CleanRules()
if err := nflogAgent.Run(ctx); err != nil {
klog.Infof("error running nflog: %v", err)
}
}()
} else {
klog.Info("Skipping nflog agent")
}

// network policies
if networkpolicies {
cfg := networkpolicy.Config{
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.22
github.com/aws/aws-sdk-go-v2/service/ec2 v1.198.1
github.com/containerd/nri v0.9.0
github.com/florianl/go-nflog/v2 v2.1.0
github.com/google/nftables v0.2.1-0.20241219092456-e99829fb4f26
github.com/mdlayher/netlink v1.7.2
github.com/prometheus/client_golang v1.20.5
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ github.com/emicklei/go-restful/v3 v3.12.1 h1:PJMDIM/ak7btuL8Ex0iYET9hxM3CI2sjZtz
github.com/emicklei/go-restful/v3 v3.12.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/florianl/go-nflog/v2 v2.1.0 h1:yXvA/ZWMS2dXBBM364xOEaW4WX14RjvsGCVt+y9O0ZM=
github.com/florianl/go-nflog/v2 v2.1.0/go.mod h1:U8o3DfjAAIMuW3/IHS3KmTccSMLyRbr09dImALuwEI8=
github.com/florianl/go-nfqueue v1.3.2 h1:8DPzhKJHywpHJAE/4ktgcqveCL7qmMLsEsVD68C4x4I=
github.com/florianl/go-nfqueue v1.3.2/go.mod h1:eSnAor2YCfMCVYrVNEhkLGN/r1L+J4uDjc0EUy0tfq4=
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
Expand Down
158 changes: 158 additions & 0 deletions pkg/nflog/nflog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// SPDX-License-Identifier: APACHE-2.0

package nflog

import (
"context"
"fmt"
"syscall"

"github.com/google/nftables"
"github.com/google/nftables/expr"

"github.com/florianl/go-nflog/v2"
"github.com/mdlayher/netlink"

"k8s.io/klog/v2"
)

// Experimental: allow to log packets for troubleshooting

const (
tableName = "kindnet-nflog"
)

func NewNFLogAgent(logLevel int) (*NFLogAgent, error) {
return &NFLogAgent{logLevel}, nil
}

type NFLogAgent struct {
logLevel int
}

func (n *NFLogAgent) Run(ctx context.Context) error {
logger := klog.FromContext(ctx)

err := n.syncRules()
if err != nil {
return err
}
config := nflog.Config{
Group: 100,
Copymode: nflog.CopyPacket,
Bufsize: 128,
}

nf, err := nflog.Open(&config)
if err != nil {
return fmt.Errorf("could not open nflog socket: %v", err)
}
defer nf.Close()

// Avoid receiving ENOBUFS errors.
if err := nf.SetOption(netlink.NoENOBUFS, true); err != nil {
return fmt.Errorf("failed to set netlink option %v: %v",
netlink.NoENOBUFS, err)
}

// hook that is called for every received packet by the nflog group
hook := func(attrs nflog.Attribute) int {
packet, err := parsePacket(*attrs.Payload)
if err != nil {
logger.Error(err, "Can not process packet")
return 0
}
// Just print out the payload of the nflog packet
logger.V(n.logLevel).Info("Evaluating packet", "packet", packet)
return 0
}

// errFunc that is called for every error on the registered hook
errFunc := func(e error) int {
// Just log the error and return 0 to continue receiving packets
klog.Infof("received error on hook: %v", e)
return 0
}

// Register your function to listen on nflog group 100
err = nf.RegisterWithErrorFunc(ctx, hook, errFunc)
if err != nil {
return fmt.Errorf("failed to register hook function: %v", err)
}

// Block till the context expires
<-ctx.Done()
return nil
}

func (n *NFLogAgent) syncRules() error {
klog.V(2).Info("Syncing kindnet-nflog nftables rules")
nft, err := nftables.New()
if err != nil {
return fmt.Errorf("fastpath failure, can not start nftables:%v", err)
}

// add + delete + add for flushing all the table
table := &nftables.Table{
Name: tableName,
Family: nftables.TableFamilyINet,
}
nft.AddTable(table)
nft.DelTable(table)
nft.AddTable(table)

chain := nft.AddChain(&nftables.Chain{
Name: "prerouting",
Table: table,
Type: nftables.ChainTypeFilter,
Hooknum: nftables.ChainHookPrerouting,
Priority: nftables.ChainPriorityMangle, // before DNAT
})

// Log first and last packet of each connection
nft.AddRule(&nftables.Rule{
Table: table,
Chain: chain,
Exprs: []expr.Any{
&expr.Meta{Key: expr.MetaKeyL4PROTO, SourceRegister: false, Register: 0x1},
&expr.Cmp{Op: 0x0, Register: 0x1, Data: []byte{syscall.IPPROTO_TCP}},
&expr.Log{Level: 0x0, Flags: 0x0, Key: 0x2, Snaplen: 0x0, Group: 100, QThreshold: 0x0, Data: []uint8(nil)},
},
})

nft.AddRule(&nftables.Rule{
Table: table,
Chain: chain,
Exprs: []expr.Any{
&expr.Meta{Key: expr.MetaKeyL4PROTO, SourceRegister: false, Register: 0x1},
&expr.Cmp{Op: 0x0, Register: 0x1, Data: []byte{syscall.IPPROTO_UDP}},
&expr.Log{Level: 0x0, Flags: 0x0, Key: 0x2, Snaplen: 0x0, Group: 100, QThreshold: 0x0, Data: []uint8(nil)},
},
})

err = nft.Flush()
if err != nil {
return fmt.Errorf("failed to create kindnet-fastpath table: %v", err)
}
return nil
}

func (n *NFLogAgent) CleanRules() {
nft, err := nftables.New()
if err != nil {
klog.Infof("fastpath cleanup failure, can not start nftables:%v", err)
return
}
// Add+Delete is idempotent and won't return an error if the table doesn't already
// exist.
table := nft.AddTable(&nftables.Table{
Family: nftables.TableFamilyINet,
Name: tableName,
})
nft.DelTable(table)

err = nft.Flush()
if err != nil {
klog.Infof("error deleting nftables rules %v", err)
}
}
168 changes: 168 additions & 0 deletions pkg/nflog/packet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// SPDX-License-Identifier: APACHE-2.0

package nflog

import (
"encoding/binary"
"encoding/hex"
"fmt"
"net"
"syscall"

v1 "k8s.io/api/core/v1"
)

type packet struct {
id uint32
family v1.IPFamily
srcIP net.IP
dstIP net.IP
proto v1.Protocol
srcPort int
dstPort int
payload []byte
}

var ErrorTooShort = fmt.Errorf("packet too short")
var ErrorCorrupted = fmt.Errorf("packet corrupted")

func (p packet) String() string {
return fmt.Sprintf("[%d] %s:%d %s:%d %s\n%s", p.id, p.srcIP.String(), p.srcPort, p.dstIP.String(), p.dstPort, p.proto, hex.Dump(p.payload))
}

// This function is used for JSON output (interface logr.Marshaler)
func (p packet) MarshalLog() any {
return &struct {
ID uint32
Family v1.IPFamily
SrcIP net.IP
DstIP net.IP
Proto v1.Protocol
SrcPort int
DstPort int
}{
p.id,
p.family,
p.srcIP,
p.dstIP,
p.proto,
p.srcPort,
p.dstPort,
}
}

// https://en.wikipedia.org/wiki/Internet_Protocol_version_4#Packet_structure
// https://en.wikipedia.org/wiki/IPv6_packet
// https://github.com/golang/net/blob/master/ipv4/header.go
func parsePacket(b []byte) (packet, error) {
t := packet{}
if len(b) < 20 {
// 20 is the minimum length of an IPv4 header (IPv6 is 40)
return t, ErrorTooShort
}
version := int(b[0] >> 4)
// initialize variables
var protocol, l4offset, nxtHeader int
switch version {
case 4:
t.family = v1.IPv4Protocol
hdrlen := int(b[0]&0x0f) * 4 // (header length in 32-bit words)
if hdrlen < 20 {
return t, ErrorCorrupted
}
l4offset = hdrlen
if l4offset >= len(b) {
return t, ErrorTooShort
}
t.srcIP = net.IPv4(b[12], b[13], b[14], b[15])
t.dstIP = net.IPv4(b[16], b[17], b[18], b[19])
protocol = int(b[9])
// IPv4 fragments:
// Since the conntracker is always used in K8s, IPv4 fragments
// will never be passed via the nfqueue. Packets are
// re-assembled by the kernel. Please see:
// https://unix.stackexchange.com/questions/650790/unwanted-defragmentation-of-forwarded-ipv4-packets
case 6:
t.family = v1.IPv6Protocol
if len(b) < 48 {
// 40 is the minimum length of an IPv6 header, and 8 is
// the minimum lenght of an extension or L4 header
return t, ErrorTooShort
}
t.srcIP = make(net.IP, net.IPv6len)
copy(t.srcIP, b[8:24])
t.dstIP = make(net.IP, net.IPv6len)
copy(t.dstIP, b[24:40])
// Handle extension headers.
nxtHeader = int(b[6])
l4offset = 40
for nxtHeader == syscall.IPPROTO_DSTOPTS || nxtHeader == syscall.IPPROTO_HOPOPTS || nxtHeader == syscall.IPPROTO_ROUTING {
// These headers have a lenght in 8-octet units, not
// including the first 8 octets
nxtHeader = int(b[l4offset])
l4offset += (8 + int(b[l4offset+1])*8)
// Now l4offset points to either another extension header,
// or an L4 header. So we must have at least 8 byte data
// after this (minimum extension header size)
if (l4offset + 8) >= len(b) {
return t, ErrorTooShort
}
}
if nxtHeader == syscall.IPPROTO_FRAGMENT {
// Only the first fragment has the L4 header
fragOffset := int(binary.BigEndian.Uint16(b[l4offset+2 : l4offset+4]))
if fragOffset&0xfff8 == 0 {
nxtHeader = int(b[l4offset])
l4offset += 8
// Here it's assumed that the fragment is the last
// extension header before the L4 header. But more
// IPPROTO_DSTOPTS are allowed by the recommended order.
// TODO: handle extra IPPROTO_DSTOPTS.
} else {
// If this is NOT the first fragment, we have no L4
// header and the payload begins after this
// header. Return a packet with t.proto unset
return t, nil
}
}
protocol = nxtHeader
default:
return t, fmt.Errorf("unknown version %d", version)
}

// The payload follows immediately after the L4 header, pointed
// out by 'l4offset'. So payloadOffset will be (l4offset + the
// L4header len) The L4header len is 8 byte for udp and sctp, but
// may vary for tcp (the dataOffset)
var payloadOffset int
switch protocol {
case syscall.IPPROTO_TCP:
t.proto = v1.ProtocolTCP
dataOffset := int(b[l4offset+12]>>4) * 4
if dataOffset < 20 {
return t, ErrorCorrupted
}
payloadOffset = l4offset + dataOffset
case syscall.IPPROTO_UDP:
t.proto = v1.ProtocolUDP
payloadOffset = l4offset + 8
case syscall.IPPROTO_SCTP:
t.proto = v1.ProtocolSCTP
payloadOffset = l4offset + 8
default:
// Return a packet with t.proto unset, and ports 0
return t, nil

}
if payloadOffset > len(b) {
// If the payloadOffset is beyond the packet size, we have an
// incomplete L4 header
return t, ErrorTooShort
}
t.srcPort = int(binary.BigEndian.Uint16(b[l4offset : l4offset+2]))
t.dstPort = int(binary.BigEndian.Uint16(b[l4offset+2 : l4offset+4]))

// TODO allow to filter by the payload
t.payload = b[payloadOffset:]
return t, nil
}
Loading

0 comments on commit 2f1eb78

Please sign in to comment.