Skip to content

Commit

Permalink
bundle: Preallocate buffers for file contents. (#6818)
Browse files Browse the repository at this point in the history
This commit adds logic to preallocate buffers when loading files from
both tarballs and on-disk bundle directories. The change results in
lower max RSS memory usage at runtime, and better garbage collector
performance, especially when at lower values of GOMAXPROCS.

For very large bundles (>1 GB in size), this change can lower startup
times for OPA by as much as a full second.

The performance analysis was different than for most changes-- heap
usage increased by about 10% during bundle loading, which made the
change look bad at first. Some of the effect appears to be from the
Go compiler no longer inlining as far up the call chain during bundle
loading (visible in the `pprof` graphs).

Running with `GODEBUG=gctrace=1` and varying GOMAXPROCS allowed seeing a
fuller picture of how performance changes from preallocation, which
results in much less garbage for the collector, and a noticeable speedup
in wall-clock time the GC burns during bundle loading.

Signed-off-by: Philip Conrad <[email protected]>
  • Loading branch information
philipaconrad authored Jun 19, 2024
1 parent 3b06458 commit 8e7172c
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 3 deletions.
44 changes: 44 additions & 0 deletions bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"io"
"net/url"
"os"
"path"
"path/filepath"
"reflect"
Expand Down Expand Up @@ -1667,6 +1668,7 @@ func preProcessBundle(loader DirectoryLoader, skipVerify bool, sizeLimitBytes in
}

func readFile(f *Descriptor, sizeLimitBytes int64) (bytes.Buffer, error) {
// Case for pre-loaded byte buffers, like those from the tarballLoader.
if bb, ok := f.reader.(*bytes.Buffer); ok {
_ = f.Close() // always close, even on error

Expand All @@ -1678,6 +1680,37 @@ func readFile(f *Descriptor, sizeLimitBytes int64) (bytes.Buffer, error) {
return *bb, nil
}

// Case for *lazyFile readers:
if lf, ok := f.reader.(*lazyFile); ok {
var buf bytes.Buffer
if lf.file == nil {
var err error
if lf.file, err = os.Open(lf.path); err != nil {
return buf, fmt.Errorf("failed to open file %s: %w", f.path, err)
}
}
// Bail out if we can't read the whole file-- there's nothing useful we can do at that point!
fileSize, _ := fstatFileSize(lf.file)
if fileSize > sizeLimitBytes {
return buf, fmt.Errorf(maxSizeLimitBytesErrMsg, strings.TrimPrefix(f.Path(), "/"), fileSize, sizeLimitBytes-1)
}
// Prealloc the buffer for the file read.
buffer := make([]byte, fileSize)
_, err := io.ReadFull(lf.file, buffer)
if err != nil {
return buf, err
}
_ = lf.file.Close() // always close, even on error

// Note(philipc): Replace the lazyFile reader in the *Descriptor with a
// pointer to the wrapping bytes.Buffer, so that we don't re-read the
// file on disk again by accident.
buf = *bytes.NewBuffer(buffer)
f.reader = &buf
return buf, nil
}

// Fallback case:
var buf bytes.Buffer
n, err := f.Read(&buf, sizeLimitBytes)
_ = f.Close() // always close, even on error
Expand All @@ -1691,6 +1724,17 @@ func readFile(f *Descriptor, sizeLimitBytes int64) (bytes.Buffer, error) {
return buf, nil
}

// Takes an already open file handle and invokes the os.Stat system call on it
// to determine the file's size. Passes any errors from *File.Stat on up to the
// caller.
func fstatFileSize(f *os.File) (int64, error) {
fileInfo, err := f.Stat()
if err != nil {
return 0, err
}
return fileInfo.Size(), nil
}

func normalizePath(p string) string {
return filepath.ToSlash(p)
}
7 changes: 4 additions & 3 deletions bundle/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,12 +370,13 @@ func (t *tarballLoader) NextFile() (*Descriptor, error) {

f := file{name: header.Name}

var buf bytes.Buffer
if _, err := io.Copy(&buf, t.tr); err != nil {
// Note(philipc): We rely on the previous size check in this loop for safety.
buf := bytes.NewBuffer(make([]byte, 0, header.Size))
if _, err := io.Copy(buf, t.tr); err != nil {
return nil, fmt.Errorf("failed to copy file %s: %w", header.Name, err)
}

f.reader = &buf
f.reader = buf

t.files = append(t.files, f)
} else if header.Typeflag == tar.TypeDir {
Expand Down
134 changes: 134 additions & 0 deletions bundle/file_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package bundle

import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"testing"

"github.com/open-policy-agent/opa/internal/file/archive"
"github.com/open-policy-agent/opa/util"

"github.com/open-policy-agent/opa/util/test"
)

var benchTestArchiveFiles = map[string]string{
"/a.json": `"a"`,
"/a/b.json": `"b"`,
"/a/b/c.json": `"c"`,
"/a/b/d/data.json": `"hello"`,
"/a/c/data.yaml": "12",
"/some.txt": "text",
"/policy.rego": "package foo\n p = 1",
"/roles/policy.rego": "package bar\n p = 1",
"/deeper/dir/path/than/others/foo": "bar",
}

func BenchmarkTarballLoader(b *testing.B) {
files := map[string]string{
"/archive.tar.gz": "",
}
sizes := []int{1000, 10000, 100000, 250000}

for _, n := range sizes {
expectedFiles := make(map[string]string, len(benchTestArchiveFiles)+1)
for k, v := range benchTestArchiveFiles {
expectedFiles[k] = v
}
expectedFiles["/x/data.json"] = benchTestGetFlatDataJSON(n)

// We generate the tarball once in the tempfs, and then reuse it many
// times in the benchmark.
test.WithTempFS(files, func(rootDir string) {
tarballFile := filepath.Join(rootDir, "archive.tar.gz")
benchTestCreateTarballFile(b, rootDir, expectedFiles)

b.ResetTimer()

f, err := os.Open(tarballFile)
if err != nil {
b.Fatalf("Unexpected error: %s", err)
}
defer f.Close()

b.Run(fmt.Sprint(n), func(b *testing.B) {
// Reset the file reader.
if _, err := f.Seek(0, 0); err != nil {
b.Fatalf("Unexpected error: %s", err)
}
loader := NewTarballLoaderWithBaseURL(f, tarballFile)
benchTestLoader(b, loader)
})
})
}
}

func BenchmarkDirectoryLoader(b *testing.B) {
sizes := []int{10000, 100000, 250000, 500000}

for _, n := range sizes {
expectedFiles := make(map[string]string, len(benchTestArchiveFiles)+1)
for k, v := range benchTestArchiveFiles {
expectedFiles[k] = v
}
expectedFiles["/x/data.json"] = benchTestGetFlatDataJSON(n)

test.WithTempFS(expectedFiles, func(rootDir string) {
b.ResetTimer()

b.Run(fmt.Sprint(n), func(b *testing.B) {
loader := NewDirectoryLoader(rootDir)
benchTestLoader(b, loader)
})
})
}
}

// Creates a flat JSON object of configurable size.
func benchTestGetFlatDataJSON(numKeys int) string {
largeFile := make(map[string]string, numKeys)
for i := 0; i < numKeys; i++ {
largeFile[strconv.FormatInt(int64(i), 10)] = strings.Repeat("A", 1024)
}
return string(util.MustMarshalJSON(largeFile))
}

// Generates a tarball with a data.json of variable size.
func benchTestCreateTarballFile(b *testing.B, root string, filesToWrite map[string]string) {
b.Helper()

tarballFile := filepath.Join(root, "archive.tar.gz")
f, err := os.Create(tarballFile)
if err != nil {
b.Fatalf("Unexpected error: %s", err)
}

gzFiles := make([][2]string, 0, len(filesToWrite))
for name, content := range filesToWrite {
gzFiles = append(gzFiles, [2]string{name, content})
}

_, err = f.Write(archive.MustWriteTarGz(gzFiles).Bytes())
if err != nil {
b.Fatalf("Unexpected error: %s", err)
}
f.Close()
}

// We specifically invoke the loader through the bundle reader to mimic
// real-world usage.
func benchTestLoader(b *testing.B, loader DirectoryLoader) {
b.Helper()

br := NewCustomReader(loader).WithLazyLoadingMode(true)
bundle, err := br.Read()
if err != nil {
b.Fatal(err)
}

if len(bundle.Raw) == 0 {
b.Fatal("bundle.Raw is unexpectedly empty")
}
}

0 comments on commit 8e7172c

Please sign in to comment.