From d6fcef4a762856a4228fd4cf9de76325e9dd16fc Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Wed, 27 Sep 2023 16:07:46 +0930 Subject: [PATCH 1/5] libbeat/processors/cache: add file-backed cache --- CHANGELOG-developer.next.asciidoc | 1 + libbeat/processors/cache/cache.go | 33 +- libbeat/processors/cache/cache_test.go | 24 +- libbeat/processors/cache/file_store.go | 229 ++++++++++ libbeat/processors/cache/file_store_test.go | 455 ++++++++++++++++++++ libbeat/processors/cache/mem_store.go | 34 +- libbeat/processors/cache/mem_store_test.go | 58 +-- 7 files changed, 757 insertions(+), 77 deletions(-) create mode 100644 libbeat/processors/cache/file_store.go create mode 100644 libbeat/processors/cache/file_store_test.go diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index d0f4c40ef3ba..ca46d0182476 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -173,6 +173,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Add device handling to Okta API package for entity analytics. {pull}35980[35980] - Make Filebeat HTTPJSON input process responses sequentially. {pull}36493[36493] - Add initial infrastructure for a caching enrichment processor. {pull}36619[36619] +- Add file-backed cache for cache enrichment processor. {pull}36686[36686] ==== Deprecated diff --git a/libbeat/processors/cache/cache.go b/libbeat/processors/cache/cache.go index eea3ed115eae..a7ce4876f505 100644 --- a/libbeat/processors/cache/cache.go +++ b/libbeat/processors/cache/cache.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "os" "time" "github.com/elastic/beats/v7/libbeat/beat" @@ -29,6 +30,7 @@ import ( conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/paths" ) const name = "cache" @@ -64,15 +66,15 @@ func New(cfg *conf.C) (beat.Processor, error) { if err != nil { return nil, fmt.Errorf("failed to unpack the %s configuration: %w", name, err) } - src, cancel, err := getStoreFor(config) - if err != nil { - return nil, fmt.Errorf("failed to get the store for %s: %w", name, err) - } - // Logging (each processor instance has a unique ID). id := int(instanceID.Inc()) log := logp.NewLogger(name).With("instance_id", id) + src, cancel, err := getStoreFor(config, log) + if err != nil { + return nil, fmt.Errorf("failed to get the store for %s: %w", name, err) + } + p := &cache{ config: config, store: src, @@ -87,16 +89,18 @@ func New(cfg *conf.C) (beat.Processor, error) { // and a context cancellation that releases the cache resource when it // is no longer required. The cancellation should be called when the // processor is closed. -func getStoreFor(cfg config) (Store, context.CancelFunc, error) { +func getStoreFor(cfg config, log *logp.Logger) (Store, context.CancelFunc, error) { switch { case cfg.Store.Memory != nil: s, cancel := memStores.get(cfg.Store.Memory.ID, cfg) return s, cancel, nil case cfg.Store.File != nil: - logp.L().Warn("using memory store when file is configured") - // TODO: Replace place-holder code with a file-store. - s, cancel := fileStores.get(cfg.Store.File.ID, cfg) + err := os.MkdirAll(paths.Resolve(paths.Data, "cache_processor"), 0o700) + if err != nil { + return nil, noop, fmt.Errorf("cache processor could not create store directory: %w", err) + } + s, cancel := fileStores.get(cfg.Store.File.ID, cfg, log) return s, cancel, nil default: @@ -105,11 +109,6 @@ func getStoreFor(cfg config) (Store, context.CancelFunc, error) { } } -var ( - memStores = memStoreSet{stores: map[string]*memStore{}, typ: "memory"} - fileStores = memStoreSet{stores: map[string]*memStore{}, typ: "file"} // This is a temporary mock. -) - // noop is a no-op context.CancelFunc. func noop() {} @@ -126,9 +125,9 @@ type Store interface { } type CacheEntry struct { - key string - value any - expires time.Time + Key string `json:"key"` + Value any `json:"val"` + Expires time.Time `json:"expires"` index int } diff --git a/libbeat/processors/cache/cache_test.go b/libbeat/processors/cache/cache_test.go index 6fe5847c01fa..8acd22d74d70 100644 --- a/libbeat/processors/cache/cache_test.go +++ b/libbeat/processors/cache/cache_test.go @@ -130,7 +130,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -191,7 +191,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -210,7 +210,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -271,7 +271,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -290,7 +290,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -351,7 +351,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -379,7 +379,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: errors.New("error applying cache get processor: target field 'crowdstrike.metadata_new' already exists and overwrite_keys is false"), }, @@ -441,7 +441,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -465,7 +465,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -527,7 +527,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -547,7 +547,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: errors.New("error applying cache get processor: expected map but type is string"), }, @@ -613,7 +613,7 @@ func TestCache(t *testing.T) { switch got := p.(*cache).store.(type) { case *memStore: allow := cmp.AllowUnexported(CacheEntry{}) - ignore := cmpopts.IgnoreFields(CacheEntry{}, "expires", "index") + ignore := cmpopts.IgnoreFields(CacheEntry{}, "Expires", "index") if !cmp.Equal(step.wantCacheVal, got.cache, allow, ignore) { t.Errorf("unexpected cache state result %d:\n--- want\n+++ got\n%s", i, cmp.Diff(step.wantCacheVal, got.cache, allow, ignore)) } diff --git a/libbeat/processors/cache/file_store.go b/libbeat/processors/cache/file_store.go new file mode 100644 index 000000000000..870d6e028ddb --- /dev/null +++ b/libbeat/processors/cache/file_store.go @@ -0,0 +1,229 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "container/heap" + "context" + "encoding/json" + "errors" + "io" + "io/fs" + "os" + "path/filepath" + "sync" + "time" + + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/paths" +) + +// TODO: Consider having a periodic write-out (per time or per n puts) to +// reduce loss of state due to crashes. + +var fileStores = fileStoreSet{stores: map[string]*fileStore{}} + +// fileStoreSet is a collection of shared fileStore caches. +type fileStoreSet struct { + mu sync.Mutex + stores map[string]*fileStore +} + +// get returns a fileStore cache with the provided ID based on the config. +// If a fileStore with the ID already exist, its configuration is adjusted +// and its reference count is increased. The returned context.CancelFunc +// reduces the reference count and deletes the fileStore from the set if the +// count reaches zero. +func (s *fileStoreSet) get(id string, cfg config, log *logp.Logger) (*fileStore, context.CancelFunc) { + s.mu.Lock() + defer s.mu.Unlock() + store, ok := s.stores[id] + if !ok { + store = newFileStore(cfg, id, pathFromConfig(cfg, log), log) + s.stores[store.id] = store + } + store.add(cfg) + + return store, func() { + store.dropFrom(s) + } +} + +// pathFromConfig returns the mapping form a config to a file-system path. +func pathFromConfig(cfg config, log *logp.Logger) string { + path := filepath.Join(paths.Resolve(paths.Data, "cache_processor"), cfg.Store.File.ID) + log.Infow("mapping file-backed cache processor config to file path", "id", cfg.Store.File.ID, "path", path) + return path +} + +// free removes the fileStore with the given ID from the set. free is safe +// for concurrent use. +func (s *fileStoreSet) free(id string) { + s.mu.Lock() + delete(s.stores, id) + s.mu.Unlock() +} + +// fileStore is a file-backed cache store. +type fileStore struct { + path string + memStore + log *logp.Logger +} + +// newFileStore returns a new fileStore configured to apply the give TTL duration. +// The fileStore is guaranteed not to grow larger than cap elements. id is the +// look-up into the global cache store the fileStore is held in. +func newFileStore(cfg config, id, path string, log *logp.Logger) *fileStore { + s := fileStore{ + path: path, + log: log, + memStore: memStore{ + id: id, + cache: make(map[string]*CacheEntry), + + // Mark the ttl as invalid until we have had a put + // operation configured. While the shared backing + // data store is incomplete, and has no put operation + // defined, the TTL will be invalid, but will never + // be accessed since all time operations outside put + // refer to absolute times, held by the CacheEntry. + ttl: -1, + cap: -1, + effort: -1, + }, + } + s.readState() + return &s +} + +func (c *fileStore) String() string { return "file:" + c.id } + +// dropFrom decreases the reference count for the fileStore and removes it from +// the stores map if the count is zero. dropFrom is safe for concurrent use. +func (c *fileStore) dropFrom(stores *fileStoreSet) { + c.mu.Lock() + c.refs-- + if c.refs < 0 { + panic("invalid reference count") + } + if c.refs == 0 { + c.writeState() + stores.free(c.id) + // GC assists. + c.cache = nil + c.expiries = nil + } + c.mu.Unlock() +} + +func (c *fileStore) readState() { + f, err := os.Open(c.path) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + c.log.Debugw("no state on file system", "error", err) + } else { + c.log.Errorw("failed to open file to read state", "error", err) + } + return + } + defer f.Close() + + // It would be nice to be able at this stage to determine + // whether the file is stale past the TTL of the cache, but + // we do not have this information yet. So we must read + // through all the elements. If any survive the filter, we + // were alive, otherwise delete the file. + + dec := json.NewDecoder(f) + for { + var e CacheEntry + err = dec.Decode(&e) + if err != nil { + if err != io.EOF { + c.log.Errorw("failed to read state element", "error", err) + } + break + } + if e.Expires.Before(time.Now()) { + // Don't retain expired elements. + continue + } + c.cache[e.Key] = &e + heap.Push(&c.expiries, &e) + } + + if len(c.cache) != 0 { + return + } + // We had no live entries, so delete the file. + err = os.Remove(c.path) + if err != nil { + c.log.Errorw("failed to delete stale cache file", "error", err) + } +} + +func (c *fileStore) writeState() { + if len(c.cache) == 0 { + err := os.Remove(c.path) + if err != nil { + c.log.Errorw("failed to delete write state when empty", "error", err) + } + return + } + f, err := os.CreateTemp("", "") + if err != nil { + c.log.Errorw("failed to open file to write state", "error", err) + return + } + // We are writing into tmp, so make sure we are private. + err = os.Chmod(f.Name(), 0o600) + if err != nil { + c.log.Errorw("failed to set state file mode", "error", err) + return + } + tmp := f.Name() + defer func() { + err = f.Close() + if err != nil { + c.log.Errorw("failed to close file after writing state", "error", err) + return + } + // Try to be atomic. + err = os.Rename(tmp, c.path) + if err != nil { + c.log.Errorw("failed to finalize writing state", "error", err) + } + }() + + enc := json.NewEncoder(f) + enc.SetEscapeHTML(false) + now := time.Now() + for c.expiries.Len() != 0 { + e := c.expiries.pop() + if e.Expires.Before(now) { + // Don't write expired elements. + continue + } + err = enc.Encode(e) + if err != nil { + c.log.Errorw("failed to write state element", "error", err) + return + } + } +} diff --git a/libbeat/processors/cache/file_store_test.go b/libbeat/processors/cache/file_store_test.go new file mode 100644 index 000000000000..737a56a5841e --- /dev/null +++ b/libbeat/processors/cache/file_store_test.go @@ -0,0 +1,455 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + + "github.com/elastic/elastic-agent-libs/logp" +) + +var keep = flag.Bool("keep", false, "keep testdata after test complete") + +type fileStoreTestSteps struct { + doTo func(*fileStore) error + want *fileStore +} + +//nolint:errcheck // Paul Hogan was right. +var fileStoreTests = []struct { + name string + cfg config + want *fileStore + steps []fileStoreTestSteps + wantPersisted []*CacheEntry +}{ + { + name: "new_put", + cfg: config{ + Store: &storeConfig{ + File: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + }, + want: &fileStore{path: "testdata/new_put", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + { + name: "new_get", + cfg: config{ + Store: &storeConfig{ + File: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &fileStore{path: "testdata/new_get", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }}, + }, + { + name: "new_delete", + cfg: config{ + Store: &storeConfig{ + File: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Delete: &delConfig{}, + }, + want: &fileStore{path: "testdata/new_delete", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }}, + }, + { + name: "new_get_add_put", + cfg: config{ + Store: &storeConfig{ + File: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &fileStore{path: "testdata/new_get_add_put", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + // TTL, capacity and effort are set only by put. + refs: 1, + ttl: -1, + cap: -1, + effort: -1, + }}, + steps: []fileStoreTestSteps{ + 0: { + doTo: func(s *fileStore) error { + putCfg := config{ + Store: &storeConfig{ + File: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + } + s.add(putCfg) + return nil + }, + want: &fileStore{path: "testdata/new_get_add_put", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + }, + }, + { + name: "ensemble", + cfg: config{ + Store: &storeConfig{ + File: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }}, + steps: []fileStoreTestSteps{ + 0: { + doTo: func(s *fileStore) error { + putCfg := config{ + Store: &storeConfig{ + File: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + } + s.add(putCfg) + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 1: { + doTo: func(s *fileStore) error { + s.Put("one", 1) + s.Put("two", 2) + s.Put("three", 3) + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 2: { + doTo: func(s *fileStore) error { + got, err := s.Get("two") + if got != 2 { + return fmt.Errorf(`unexpected result from Get("two"): got:%v want:2`, got) + } + return err + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 3: { + doTo: func(s *fileStore) error { + return s.Delete("two") + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 4: { + doTo: func(s *fileStore) error { + got, _ := s.Get("two") + if got != nil { + return fmt.Errorf(`unexpected result from Get("two") after deletion: got:%v want:nil`, got) + } + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 5: { + doTo: func(s *fileStore) error { + s.dropFrom(&fileStores) + if !fileStores.has(s.id) { + return fmt.Errorf("%q fileStore not found after single close", s.id) + } + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, + }, + refs: 1, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 6: { + doTo: func(s *fileStore) error { + s.dropFrom(&fileStores) + if fileStores.has(s.id) { + return fmt.Errorf("%q fileStore still found after double close", s.id) + } + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: nil, // assistively nil-ed. + expiries: nil, // assistively nil-ed. + refs: 0, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + }, + wantPersisted: []*CacheEntry{ + // Numeric values are float due to JSON round-trip. + {Key: "one", Value: 1.0}, + {Key: "three", Value: 3.0}, + }, + }, +} + +func TestFileStore(t *testing.T) { + err := os.RemoveAll("testdata") + if err != nil && !errors.Is(err, fs.ErrNotExist) { + t.Fatalf("failed to clear testdata directory: %v", err) + } + err = os.Mkdir("testdata", 0o755) + if err != nil && !errors.Is(err, fs.ErrExist) { + t.Fatalf("failed to create testdata directory: %v", err) + } + if !*keep { + t.Cleanup(func() { os.RemoveAll("testdata") }) + } + + allow := cmp.AllowUnexported(fileStore{}, memStore{}, CacheEntry{}) + ignoreInFileStore := cmpopts.IgnoreFields(fileStore{}, "log") + ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu") + ignoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "Expires") + + for _, test := range fileStoreTests { + t.Run(test.name, func(t *testing.T) { + // Construct the store and put in into the stores map as + // we would if we were calling Run. + path := filepath.Join("testdata", test.name) + store := newFileStore(test.cfg, test.cfg.Store.File.ID, path, logp.L()) + store.add(test.cfg) + fileStores.add(store) + + if !cmp.Equal(test.want, store, allow, ignoreInFileStore, ignoreInMemStore) { + t.Errorf("unexpected new fileStore result:\n--- want\n+++ got\n%s", + cmp.Diff(test.want, store, allow, ignoreInFileStore, ignoreInMemStore)) + } + for i, step := range test.steps { + err := step.doTo(store) + if err != nil { + t.Errorf("unexpected error at step %d: %v", i, err) + } + if !cmp.Equal(step.want, store, allow, ignoreInFileStore, ignoreInMemStore, ignoreInCacheEntry) { + t.Errorf("unexpected fileStore step %d result:\n--- want\n+++ got\n%s", + i, cmp.Diff(step.want, store, allow, ignoreInFileStore, ignoreInMemStore, ignoreInCacheEntry)) + } + } + if test.wantPersisted == nil { + return + } + store = nil + f, err := os.Open(path) + if err != nil { + t.Fatalf("failed to open persisted data: %v", err) + } + defer f.Close() + dec := json.NewDecoder(f) + var got []*CacheEntry + for { + var e CacheEntry + err = dec.Decode(&e) + if err != nil { + if err != io.EOF { + t.Fatalf("unexpected error reading persisted cache data: %v", err) + } + break + } + got = append(got, &e) + } + if !cmp.Equal(test.wantPersisted, got, allow, ignoreInCacheEntry) { + t.Errorf("unexpected persisted state:\n--- want\n+++ got\n%s", + cmp.Diff(test.wantPersisted, got, allow, ignoreInCacheEntry)) + } + wantCache := make(map[string]*CacheEntry) + for _, e := range got { + wantCache[e.Key] = e + } + store = newFileStore(test.cfg, test.cfg.Store.File.ID, path, logp.L()) + // Specialise the in cache entry ignore list to include index. + ignoreMoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "Expires", "index") + if !cmp.Equal(wantCache, store.cache, allow, ignoreMoreInCacheEntry) { + t.Errorf("unexpected restored state:\n--- want\n+++ got\n%s", + cmp.Diff(wantCache, store.cache, allow, ignoreMoreInCacheEntry)) + } + for k, e := range store.cache { + if e.index < 0 || len(store.expiries) <= e.index { + t.Errorf("cache entry %s index out of bounds: got:%d [0,%d)", k, e.index, len(store.expiries)) + continue + } + if !cmp.Equal(e, store.expiries[e.index], allow, ignoreInCacheEntry) { + t.Errorf("unexpected mismatched cache/expiry state %s:\n--- want\n+++ got\n%s", + k, cmp.Diff(e, store.expiries[e.index], allow, ignoreInCacheEntry)) + } + } + }) + } +} + +// add adds the store to the set. It is used only for testing. +func (s *fileStoreSet) add(store *fileStore) { + s.mu.Lock() + s.stores[store.id] = store + s.mu.Unlock() +} + +// has returns whether the store exists in the set. It is used only for testing. +func (s *fileStoreSet) has(id string) bool { + s.mu.Lock() + _, ok := s.stores[id] + s.mu.Unlock() + return ok +} diff --git a/libbeat/processors/cache/mem_store.go b/libbeat/processors/cache/mem_store.go index 900e91c8e600..774a019d783d 100644 --- a/libbeat/processors/cache/mem_store.go +++ b/libbeat/processors/cache/mem_store.go @@ -24,24 +24,25 @@ import ( "time" ) +var memStores = memStoreSet{stores: map[string]*memStore{}} + // memStoreSet is a collection of shared memStore caches. type memStoreSet struct { mu sync.Mutex stores map[string]*memStore - typ string // TODO: Remove when a file-backed store exists. } // get returns a memStore cache with the provided ID based on the config. // If a memStore with the ID already exist, its configuration is adjusted // and its reference count is increased. The returned context.CancelFunc -// reduces the reference count and deletes the memStore from set if the +// reduces the reference count and deletes the memStore from the set if the // count reaches zero. func (s *memStoreSet) get(id string, cfg config) (*memStore, context.CancelFunc) { s.mu.Lock() defer s.mu.Unlock() store, ok := s.stores[id] if !ok { - store = newMemStore(cfg, id, s.typ) + store = newMemStore(cfg, id) s.stores[store.id] = store } store.add(cfg) @@ -69,10 +70,6 @@ type memStore struct { // id is the index into global cache store for the cache. id string - // typ is a temporary tag to differentiate memory stores - // from the mocked file store. - // TODO: Remove when a file-backed store exists. - typ string // cap is the maximum number of elements the cache // will hold. If not positive, no limit. @@ -85,10 +82,9 @@ type memStore struct { // newMemStore returns a new memStore configured to apply the give TTL duration. // The memStore is guaranteed not to grow larger than cap elements. id is the // look-up into the global cache store the memStore is held in. -func newMemStore(cfg config, id, typ string) *memStore { +func newMemStore(cfg config, id string) *memStore { return &memStore{ id: id, - typ: typ, cache: make(map[string]*CacheEntry), // Mark the ttl as invalid until we have had a put @@ -103,7 +99,7 @@ func newMemStore(cfg config, id, typ string) *memStore { } } -func (c *memStore) String() string { return c.typ + ":" + c.id } +func (c *memStore) String() string { return "memory:" + c.id } // add updates the receiver for a new operation. It increases the reference // count for the receiver, and if the config is a put operation and has no @@ -158,11 +154,11 @@ func (c *memStore) Get(key string) (any, error) { if !ok { return nil, ErrNoData } - if time.Now().After(v.expires) { + if time.Now().After(v.Expires) { delete(c.cache, key) return nil, ErrNoData } - return v.value, nil + return v.Value, nil } // Put stores the provided value in the cache associated with the given key. @@ -174,9 +170,9 @@ func (c *memStore) Put(key string, val any) error { now := time.Now() c.evictExpired(now) e := &CacheEntry{ - key: key, - value: val, - expires: now.Add(c.ttl), + Key: key, + Value: val, + Expires: now.Add(c.ttl), } c.cache[key] = e heap.Push(&c.expiries, e) @@ -189,11 +185,11 @@ func (c *memStore) Put(key string, val any) error { // it under the capacity limit. func (c *memStore) evictExpired(now time.Time) { for n := 0; (c.effort <= 0 || n < c.effort) && len(c.cache) != 0; n++ { - if c.expiries[0].expires.After(now) { + if c.expiries[0].Expires.After(now) { break } e := c.expiries.pop() - delete(c.cache, e.key) + delete(c.cache, e.Key) } if c.cap <= 0 { // No cap, so depend on effort. @@ -201,7 +197,7 @@ func (c *memStore) evictExpired(now time.Time) { } for len(c.cache) >= c.cap { e := c.expiries.pop() - delete(c.cache, e.key) + delete(c.cache, e.Key) } } @@ -237,7 +233,7 @@ func (h expiryHeap) Len() int { return len(h) } func (h expiryHeap) Less(i, j int) bool { - return h[i].expires.Before(h[j].expires) + return h[i].Expires.Before(h[j].Expires) } func (h expiryHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] diff --git a/libbeat/processors/cache/mem_store_test.go b/libbeat/processors/cache/mem_store_test.go index 692931854b81..15be85c76785 100644 --- a/libbeat/processors/cache/mem_store_test.go +++ b/libbeat/processors/cache/mem_store_test.go @@ -199,14 +199,14 @@ var memStoreTests = []struct { want: &memStore{ id: "test", cache: map[string]*CacheEntry{ - "one": {key: "one", value: int(1), index: 0}, - "two": {key: "two", value: int(2), index: 1}, - "three": {key: "three", value: int(3), index: 2}, + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, }, expiries: expiryHeap{ - {key: "one", value: int(1), index: 0}, - {key: "two", value: int(2), index: 1}, - {key: "three", value: int(3), index: 2}, + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, }, refs: 2, ttl: time.Second, @@ -225,14 +225,14 @@ var memStoreTests = []struct { want: &memStore{ id: "test", cache: map[string]*CacheEntry{ - "one": {key: "one", value: int(1), index: 0}, - "two": {key: "two", value: int(2), index: 1}, - "three": {key: "three", value: int(3), index: 2}, + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, }, expiries: expiryHeap{ - {key: "one", value: int(1), index: 0}, - {key: "two", value: int(2), index: 1}, - {key: "three", value: int(3), index: 2}, + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, }, refs: 2, ttl: time.Second, @@ -247,12 +247,12 @@ var memStoreTests = []struct { want: &memStore{ id: "test", cache: map[string]*CacheEntry{ - "one": {key: "one", value: int(1), index: 0}, - "three": {key: "three", value: int(3), index: 1}, + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, }, expiries: expiryHeap{ - {key: "one", value: int(1), index: 0}, - {key: "three", value: int(3), index: 1}, + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, }, refs: 2, ttl: time.Second, @@ -271,12 +271,12 @@ var memStoreTests = []struct { want: &memStore{ id: "test", cache: map[string]*CacheEntry{ - "one": {key: "one", value: int(1), index: 0}, - "three": {key: "three", value: int(3), index: 1}, + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, }, expiries: expiryHeap{ - {key: "one", value: int(1), index: 0}, - {key: "three", value: int(3), index: 1}, + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, }, refs: 2, ttl: time.Second, @@ -295,12 +295,12 @@ var memStoreTests = []struct { want: &memStore{ id: "test", cache: map[string]*CacheEntry{ - "one": {key: "one", value: int(1), index: 0}, - "three": {key: "three", value: int(3), index: 1}, + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, }, expiries: expiryHeap{ - {key: "one", value: int(1), index: 0}, - {key: "three", value: int(3), index: 1}, + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, }, refs: 1, ttl: time.Second, @@ -332,14 +332,14 @@ var memStoreTests = []struct { func TestMemStore(t *testing.T) { allow := cmp.AllowUnexported(memStore{}, CacheEntry{}) - ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu", "typ") - ignoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "expires") + ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu") + ignoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "Expires") for _, test := range memStoreTests { t.Run(test.name, func(t *testing.T) { // Construct the store and put in into the stores map as // we would if we were calling Run. - store := newMemStore(test.cfg, test.cfg.Store.Memory.ID, "memory") + store := newMemStore(test.cfg, test.cfg.Store.Memory.ID) store.add(test.cfg) memStores.add(store) @@ -361,14 +361,14 @@ func TestMemStore(t *testing.T) { } } -// add adds the store to the set, it is used only for testing. +// add adds the store to the set. It is used only for testing. func (s *memStoreSet) add(store *memStore) { s.mu.Lock() s.stores[store.id] = store s.mu.Unlock() } -// has returns whether the store exists in the set, it is used only for testing. +// has returns whether the store exists in the set. It is used only for testing. func (s *memStoreSet) has(id string) bool { s.mu.Lock() _, ok := s.stores[id] From e0327b637c74ae5c5a3ee22c37e100df53de08fe Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Thu, 28 Sep 2023 07:32:54 +0930 Subject: [PATCH 2/5] add periodic writes to file-backed cache --- libbeat/processors/cache/config.go | 11 +++-- libbeat/processors/cache/config_test.go | 14 ++++++ libbeat/processors/cache/docs/cache.asciidoc | 5 ++- libbeat/processors/cache/file_store.go | 47 +++++++++++++++++--- libbeat/processors/cache/file_store_test.go | 18 ++++---- libbeat/processors/cache/mem_store_test.go | 14 +++--- 6 files changed, 81 insertions(+), 28 deletions(-) diff --git a/libbeat/processors/cache/config.go b/libbeat/processors/cache/config.go index 06dacc5c6ca4..019bd8d5840f 100644 --- a/libbeat/processors/cache/config.go +++ b/libbeat/processors/cache/config.go @@ -88,8 +88,8 @@ func defaultConfig() config { } type storeConfig struct { - Memory *id `config:"memory"` - File *id `config:"file"` + Memory *memConfig `config:"memory"` + File *fileConfig `config:"file"` // Capacity and Effort are currently experimental // and not in public-facing documentation. @@ -97,10 +97,15 @@ type storeConfig struct { Effort int `config:"eviction_effort"` } -type id struct { +type memConfig struct { ID string `config:"id"` } +type fileConfig struct { + ID string `config:"id"` + WriteOutEvery time.Duration `config:"write_frequency"` +} + func (cfg *storeConfig) Validate() error { switch { case cfg.Memory != nil && cfg.File != nil: diff --git a/libbeat/processors/cache/config_test.go b/libbeat/processors/cache/config_test.go index 6e8b01d930f3..69d158eb8302 100644 --- a/libbeat/processors/cache/config_test.go +++ b/libbeat/processors/cache/config_test.go @@ -39,6 +39,20 @@ put: ttl: 168h key_field: crowdstrike.aid value_field: crowdstrike.metadata +`, + want: nil, + }, + { + name: "put_file_with_periodic_write_out", + cfg: ` +backend: + file: + id: aidmaster + write_frequency: 15m +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata `, want: nil, }, diff --git a/libbeat/processors/cache/docs/cache.asciidoc b/libbeat/processors/cache/docs/cache.asciidoc index 94f5d07cece1..3e17d37e2133 100644 --- a/libbeat/processors/cache/docs/cache.asciidoc +++ b/libbeat/processors/cache/docs/cache.asciidoc @@ -51,8 +51,9 @@ It has the following settings: One of `backend.memory.id` or `backend.file.id` must be provided. -`backend.memory.id`:: The ID of a memory based cache. Use the same ID across instance to reference the same cache. -`backend.file.id`:: The ID of a file based cache. Use the same ID across instance to reference the same cache. +`backend.memory.id`:: The ID of a memory-based cache. Use the same ID across instance to reference the same cache. +`backend.file.id`:: The ID of a file-based cache. Use the same ID across instance to reference the same cache. +`backend.file.write_frequency`:: The frequency the cache is periodically written to the backing file. Valid time units are h, m, s, ms, us/µs and ns. Periodic writes are only made if `backend.file.write_frequency` is greater than zero. The contents are always written out to the backing file when the processor is closed. Default is zero, no periodic writes. One of `put`, `get` or `delete` must be provided. diff --git a/libbeat/processors/cache/file_store.go b/libbeat/processors/cache/file_store.go index 870d6e028ddb..604b7c01cde1 100644 --- a/libbeat/processors/cache/file_store.go +++ b/libbeat/processors/cache/file_store.go @@ -33,9 +33,6 @@ import ( "github.com/elastic/elastic-agent-libs/paths" ) -// TODO: Consider having a periodic write-out (per time or per n puts) to -// reduce loss of state due to crashes. - var fileStores = fileStoreSet{stores: map[string]*fileStore{}} // fileStoreSet is a collection of shared fileStore caches. @@ -81,8 +78,14 @@ func (s *fileStoreSet) free(id string) { // fileStore is a file-backed cache store. type fileStore struct { - path string memStore + + path string + // cancel stops periodic write out operations. + // Write out operations are protected by the + // memStore's mutex. + cancel context.CancelFunc + log *logp.Logger } @@ -108,6 +111,12 @@ func newFileStore(cfg config, id, path string, log *logp.Logger) *fileStore { effort: -1, }, } + s.cancel = noop + if cfg.Store.File.WriteOutEvery > 0 { + var ctx context.Context + ctx, s.cancel = context.WithCancel(context.Background()) + go s.periodicWriteOut(ctx, cfg.Store.File.WriteOutEvery) + } s.readState() return &s } @@ -123,7 +132,11 @@ func (c *fileStore) dropFrom(stores *fileStoreSet) { panic("invalid reference count") } if c.refs == 0 { - c.writeState() + // Stop periodic writes + c.cancel() + // and do a final write out. + c.writeState(true) + stores.free(c.id) // GC assists. c.cache = nil @@ -178,8 +191,28 @@ func (c *fileStore) readState() { } } -func (c *fileStore) writeState() { - if len(c.cache) == 0 { +// periodicWriteOut writes the cache contents to the backing file at the +// specified interval until the context is cancelled. periodicWriteOut is +// safe for concurrent use. +func (c *fileStore) periodicWriteOut(ctx context.Context, every time.Duration) { + tick := time.NewTicker(every) + defer tick.Stop() + for { + select { + case <-tick.C: + c.mu.Lock() + c.writeState(false) + c.mu.Unlock() + case <-ctx.Done(): + return + } + } +} + +// writeState writes the current cache state to the backing file. +// If final is true and the cache is empty, the file will be deleted. +func (c *fileStore) writeState(final bool) { + if len(c.cache) == 0 && final { err := os.Remove(c.path) if err != nil { c.log.Errorw("failed to delete write state when empty", "error", err) diff --git a/libbeat/processors/cache/file_store_test.go b/libbeat/processors/cache/file_store_test.go index 737a56a5841e..557a7eca9a54 100644 --- a/libbeat/processors/cache/file_store_test.go +++ b/libbeat/processors/cache/file_store_test.go @@ -54,7 +54,7 @@ var fileStoreTests = []struct { name: "new_put", cfg: config{ Store: &storeConfig{ - File: &id{"test"}, + File: &fileConfig{ID: "test"}, Capacity: 1000, Effort: 10, }, @@ -75,7 +75,7 @@ var fileStoreTests = []struct { name: "new_get", cfg: config{ Store: &storeConfig{ - File: &id{"test"}, + File: &fileConfig{ID: "test"}, Capacity: 1000, Effort: 10, }, @@ -95,7 +95,7 @@ var fileStoreTests = []struct { name: "new_delete", cfg: config{ Store: &storeConfig{ - File: &id{"test"}, + File: &fileConfig{ID: "test"}, Capacity: 1000, Effort: 10, }, @@ -115,7 +115,7 @@ var fileStoreTests = []struct { name: "new_get_add_put", cfg: config{ Store: &storeConfig{ - File: &id{"test"}, + File: &fileConfig{ID: "test"}, Capacity: 1000, Effort: 10, }, @@ -135,7 +135,7 @@ var fileStoreTests = []struct { doTo: func(s *fileStore) error { putCfg := config{ Store: &storeConfig{ - File: &id{"test"}, + File: &fileConfig{ID: "test"}, Capacity: 1000, Effort: 10, }, @@ -161,7 +161,7 @@ var fileStoreTests = []struct { name: "ensemble", cfg: config{ Store: &storeConfig{ - File: &id{"test"}, + File: &fileConfig{ID: "test"}, Capacity: 1000, Effort: 10, }, @@ -181,7 +181,7 @@ var fileStoreTests = []struct { doTo: func(s *fileStore) error { putCfg := config{ Store: &storeConfig{ - File: &id{"test"}, + File: &fileConfig{ID: "test"}, Capacity: 1000, Effort: 10, }, @@ -361,7 +361,7 @@ func TestFileStore(t *testing.T) { } allow := cmp.AllowUnexported(fileStore{}, memStore{}, CacheEntry{}) - ignoreInFileStore := cmpopts.IgnoreFields(fileStore{}, "log") + ignoreInFileStore := cmpopts.IgnoreFields(fileStore{}, "cancel", "log") ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu") ignoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "Expires") @@ -391,7 +391,7 @@ func TestFileStore(t *testing.T) { if test.wantPersisted == nil { return } - store = nil + f, err := os.Open(path) if err != nil { t.Fatalf("failed to open persisted data: %v", err) diff --git a/libbeat/processors/cache/mem_store_test.go b/libbeat/processors/cache/mem_store_test.go index 15be85c76785..6be5eadb3577 100644 --- a/libbeat/processors/cache/mem_store_test.go +++ b/libbeat/processors/cache/mem_store_test.go @@ -42,7 +42,7 @@ var memStoreTests = []struct { name: "new_put", cfg: config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -63,7 +63,7 @@ var memStoreTests = []struct { name: "new_get", cfg: config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -83,7 +83,7 @@ var memStoreTests = []struct { name: "new_delete", cfg: config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -103,7 +103,7 @@ var memStoreTests = []struct { name: "new_get_add_put", cfg: config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -123,7 +123,7 @@ var memStoreTests = []struct { doTo: func(s *memStore) error { putCfg := config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -149,7 +149,7 @@ var memStoreTests = []struct { name: "ensemble", cfg: config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -169,7 +169,7 @@ var memStoreTests = []struct { doTo: func(s *memStore) error { putCfg := config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, From 5d574959b5492aca84981199e758d7722c668ccb Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Thu, 28 Sep 2023 11:41:20 +0930 Subject: [PATCH 3/5] address pr comments --- libbeat/processors/cache/config.go | 2 +- libbeat/processors/cache/config_test.go | 2 +- libbeat/processors/cache/file_store.go | 16 +++++++++++++--- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/libbeat/processors/cache/config.go b/libbeat/processors/cache/config.go index 019bd8d5840f..90e6fe2ef3b9 100644 --- a/libbeat/processors/cache/config.go +++ b/libbeat/processors/cache/config.go @@ -103,7 +103,7 @@ type memConfig struct { type fileConfig struct { ID string `config:"id"` - WriteOutEvery time.Duration `config:"write_frequency"` + WriteOutEvery time.Duration `config:"write_interval"` } func (cfg *storeConfig) Validate() error { diff --git a/libbeat/processors/cache/config_test.go b/libbeat/processors/cache/config_test.go index 69d158eb8302..17040b42910b 100644 --- a/libbeat/processors/cache/config_test.go +++ b/libbeat/processors/cache/config_test.go @@ -48,7 +48,7 @@ put: backend: file: id: aidmaster - write_frequency: 15m + write_interval: 15m put: ttl: 168h key_field: crowdstrike.aid diff --git a/libbeat/processors/cache/file_store.go b/libbeat/processors/cache/file_store.go index 604b7c01cde1..866175283af5 100644 --- a/libbeat/processors/cache/file_store.go +++ b/libbeat/processors/cache/file_store.go @@ -169,7 +169,12 @@ func (c *fileStore) readState() { err = dec.Decode(&e) if err != nil { if err != io.EOF { - c.log.Errorw("failed to read state element", "error", err) + switch err := err.(type) { + case *json.SyntaxError: + c.log.Errorw("failed to read state element", "error", err, "path", c.path, "offset", err.Offset) + default: + c.log.Errorw("failed to read state element", "error", err, "path", c.path) + } } break } @@ -219,12 +224,12 @@ func (c *fileStore) writeState(final bool) { } return } - f, err := os.CreateTemp("", "") + f, err := os.CreateTemp(filepath.Dir(c.path), "") if err != nil { c.log.Errorw("failed to open file to write state", "error", err) return } - // We are writing into tmp, so make sure we are private. + // Try to make sure we are private. err = os.Chmod(f.Name(), 0o600) if err != nil { c.log.Errorw("failed to set state file mode", "error", err) @@ -232,6 +237,11 @@ func (c *fileStore) writeState(final bool) { } tmp := f.Name() defer func() { + err = f.Sync() + if err != nil { + c.log.Errorw("failed to sync file after writing state", "error", err) + return + } err = f.Close() if err != nil { c.log.Errorw("failed to close file after writing state", "error", err) From 8238e85dd5e24bb674532ea9d27f275359d47756 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Thu, 28 Sep 2023 12:24:13 +0930 Subject: [PATCH 4/5] address pr comment --- libbeat/processors/cache/file_store.go | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/libbeat/processors/cache/file_store.go b/libbeat/processors/cache/file_store.go index 866175283af5..45ed9d840689 100644 --- a/libbeat/processors/cache/file_store.go +++ b/libbeat/processors/cache/file_store.go @@ -26,6 +26,7 @@ import ( "io/fs" "os" "path/filepath" + "strings" "sync" "time" @@ -63,11 +64,32 @@ func (s *fileStoreSet) get(id string, cfg config, log *logp.Logger) (*fileStore, // pathFromConfig returns the mapping form a config to a file-system path. func pathFromConfig(cfg config, log *logp.Logger) string { - path := filepath.Join(paths.Resolve(paths.Data, "cache_processor"), cfg.Store.File.ID) + path := filepath.Join(paths.Resolve(paths.Data, "cache_processor"), cleanFilename(cfg.Store.File.ID)) log.Infow("mapping file-backed cache processor config to file path", "id", cfg.Store.File.ID, "path", path) return path } +// cleanFilename replaces illegal printable characters (and space or dot) in +// filenames, with underscore. +func cleanFilename(s string) string { + return pathCleaner.Replace(s) +} + +var pathCleaner = strings.NewReplacer( + "/", "_", + "<", "_", + ">", "_", + ":", "_", + `"`, "_", + "/", "_", + `\`, "_", + "|", "_", + "?", "_", + "*", "_", + ".", "_", + " ", "_", +) + // free removes the fileStore with the given ID from the set. free is safe // for concurrent use. func (s *fileStoreSet) free(id string) { @@ -224,7 +246,7 @@ func (c *fileStore) writeState(final bool) { } return } - f, err := os.CreateTemp(filepath.Dir(c.path), "") + f, err := os.CreateTemp(filepath.Dir(c.path), filepath.Base(c.path)+"-*.tmp") if err != nil { c.log.Errorw("failed to open file to write state", "error", err) return From dc1eb11fffe7b18fbbe1d3dfec589ffac6e4eb45 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Thu, 28 Sep 2023 12:42:34 +0930 Subject: [PATCH 5/5] fix config validation bug --- libbeat/processors/cache/config.go | 4 ++-- libbeat/processors/cache/config_test.go | 22 ++++++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/libbeat/processors/cache/config.go b/libbeat/processors/cache/config.go index 90e6fe2ef3b9..36eeb423fc27 100644 --- a/libbeat/processors/cache/config.go +++ b/libbeat/processors/cache/config.go @@ -98,11 +98,11 @@ type storeConfig struct { } type memConfig struct { - ID string `config:"id"` + ID string `config:"id" validate:"required"` } type fileConfig struct { - ID string `config:"id"` + ID string `config:"id" validate:"required"` WriteOutEvery time.Duration `config:"write_interval"` } diff --git a/libbeat/processors/cache/config_test.go b/libbeat/processors/cache/config_test.go index 17040b42910b..4a956caef02b 100644 --- a/libbeat/processors/cache/config_test.go +++ b/libbeat/processors/cache/config_test.go @@ -92,6 +92,28 @@ delete: `, want: nil, }, + { + name: "memory_no_id", + cfg: ` +backend: + memory: + id: '' +delete: + key_field: crowdstrike.aid +`, + want: errors.New("string value is not set accessing 'backend.memory.id'"), + }, + { + name: "file_no_id", + cfg: ` +backend: + file: + id: '' +delete: + key_field: crowdstrike.aid +`, + want: errors.New("string value is not set accessing 'backend.file.id'"), + }, { name: "no_op", cfg: `