Skip to content

Commit

Permalink
fix: ensure all entries from bpf map is copied (#1477)
Browse files Browse the repository at this point in the history
Previously only a `MapSize` (lesser than size of processs) of entries
used to be copied from processes bpf map. The right use of a batch size
less than the size of map is as follows:

Ref: https://lwn.net/Articles/797808/
```
    p_key = NULL;
    p_next_key = &key;
    while (true) {
       err = bpf_map_lookup_batch(fd, p_key, &p_next_key, keys, values,
                                  &batch_size, elem_flags, flags);
       if (err) ...
       if (p_next_key) break; // done
       if (!p_key) p_key = p_next_key;
    }
```

This PR fixes the issue by creating a entries byte array with the same
size as the `processes` map.

Additionally, this commit makes use of `bytes.Reader` instead of
bytes.Buffer to avoid unnecessary copy of ephemeral byte array.

Signed-off-by: Sunil Thaha <[email protected]>
  • Loading branch information
sthaha authored May 30, 2024
1 parent b10f15e commit 747e7eb
Showing 1 changed file with 11 additions and 19 deletions.
30 changes: 11 additions & 19 deletions pkg/bpf/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ const (
bpfPerfArraySuffix = "_event_reader"
TableProcessName = "processes"
TableCPUFreqName = "cpu_freq_array"
MapSize = 10240
CPUNumSize = 128
)

Expand Down Expand Up @@ -327,7 +326,7 @@ func (e *exporter) CollectCPUFreq() (cpuFreqData map[int32]uint64, err error) {
if err != nil {
return
}
//cpuFreqkeySize := int(unsafe.Sizeof(uint32Key))
// cpuFreqkeySize := int(unsafe.Sizeof(uint32Key))
iterator := cpuFreq.Iterator()
var freq uint32
// keySize := int(unsafe.Sizeof(freq))
Expand All @@ -346,7 +345,7 @@ func (e *exporter) CollectCPUFreq() (cpuFreqData map[int32]uint64, err error) {
}
continue
}
getErr = binary.Read(bytes.NewBuffer(data), e.byteOrder, &freq)
getErr = binary.Read(bytes.NewReader(data), e.byteOrder, &freq)
if getErr != nil {
klog.V(5).Infof("failed to decode received data: %v\n", getErr)
next = iterator.Next()
Expand Down Expand Up @@ -399,7 +398,6 @@ func getCPUCores() int {
cores = int(cpu.TotalThreads)
}
return cores

}

func resizeArrayEntries(module *bpf.Module, name string, size int) error {
Expand All @@ -423,33 +421,27 @@ func resizeArrayEntries(module *bpf.Module, name string, size int) error {
// but it is not a big problem since we request all possible keys that the map can store in a single request
func (e *exporter) libbpfCollectProcessBatchSingleHash(processes *bpf.BPFMap) ([]ProcessBPFMetrics, error) {
start := time.Now()
processesData := []ProcessBPFMetrics{}
var err error
keySize := 4 // the map key is uint32, has 4 bytes
entries := MapSize / keySize

entries := processes.MaxEntries()
keys := make([]uint32, entries)
nextKey := uint32(0)

val, err := processes.GetValueAndDeleteBatch(unsafe.Pointer(&keys[0]), nil, unsafe.Pointer(&nextKey), uint32(entries))
values, err := processes.GetValueAndDeleteBatch(unsafe.Pointer(&keys[0]), nil, unsafe.Pointer(&nextKey), uint32(entries))
if err != nil {
// os.IsNotExist means we reached the end of the table
if !os.IsNotExist(err) {
klog.V(5).Infof("GetValueAndDeleteBatch failed: %v. A partial value might have been collected.", err)
}
}
for _, value := range val {
buff := bytes.NewBuffer(value)
if buff == nil {
klog.V(4).Infof("failed to get data: buffer EOF\n")
continue
}

processesData := []ProcessBPFMetrics{}
for _, value := range values {
var ct ProcessBPFMetrics
getErr := binary.Read(buff, e.byteOrder, &ct)
if getErr != nil {
klog.V(1).Infof("failed to decode received data: %v\n", getErr)
if err := binary.Read(bytes.NewReader(value), e.byteOrder, &ct); err != nil {
klog.Warningf("failed to decode received data: %v\n", err)
continue
}

if ct != e.emptyct {
processesData = append(processesData, ct)
}
Expand Down Expand Up @@ -477,7 +469,7 @@ func (e *exporter) libbpfCollectProcessSingleHash(processes *bpf.BPFMap) (proces
}
continue
}
getErr = binary.Read(bytes.NewBuffer(data), e.byteOrder, &ct)
getErr = binary.Read(bytes.NewReader(data), e.byteOrder, &ct)
if getErr != nil {
klog.V(5).Infof("failed to decode received data: %v\n", getErr)
next = iterator.Next()
Expand Down

0 comments on commit 747e7eb

Please sign in to comment.