From 0800ab1cd35b95ae55934b611628176c0c739fa2 Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Wed, 1 Jul 2020 15:05:00 +0200 Subject: [PATCH] Implement storage handling for cursor inputs (#19518) This change provide the store implementation that is used by the cursor input to track ephemeral and persistent state. The full list of changes will include: - Introduce v2 API interfaces - Introduce [compatibility layer](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/input/v2/compat) to integrate API with existing functionality - Introduce helpers for writing [stateless](https://github.com/urso/beats/blob/fb-input-v2-combined/filebeat/input/v2/input-stateless/stateless.go) inputs. - Introduce helpers for writing [inputs that store a state](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/input/v2/input-cursor) between restarts. - Integrate new API with [existing inputs and modules](https://github.com/urso/beats/blob/fb-input-v2-combined/filebeat/beater/filebeat.go#L301) in filebeat. --- filebeat/input/v2/input-cursor/publish.go | 36 ++ filebeat/input/v2/input-cursor/store.go | 131 ++++++- filebeat/input/v2/input-cursor/store_test.go | 351 +++++++++++++++++++ 3 files changed, 513 insertions(+), 5 deletions(-) create mode 100644 filebeat/input/v2/input-cursor/store_test.go diff --git a/filebeat/input/v2/input-cursor/publish.go b/filebeat/input/v2/input-cursor/publish.go index 437c4e07373f..b5d33a7fce4d 100644 --- a/filebeat/input/v2/input-cursor/publish.go +++ b/filebeat/input/v2/input-cursor/publish.go @@ -22,6 +22,7 @@ import ( input "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" ) // Publisher is used to publish an event and update the cursor in a single call to Publish. @@ -70,3 +71,38 @@ func (c *cursorPublisher) Publish(event beat.Event, cursorUpdate interface{}) er func (op *updateOp) Execute(numEvents uint) { panic("TODO: implement me") } + +func createUpdateOp(store *store, resource *resource, updates interface{}) (*updateOp, error) { + ts := time.Now() + + resource.stateMutex.Lock() + defer resource.stateMutex.Unlock() + + cursor := resource.pendingCursor + if resource.activeCursorOperations == 0 { + var tmp interface{} + typeconv.Convert(&tmp, cursor) + resource.pendingCursor = tmp + cursor = tmp + } + if err := typeconv.Convert(&cursor, updates); err != nil { + return nil, err + } + resource.pendingCursor = cursor + + resource.Retain() + resource.activeCursorOperations++ + return &updateOp{ + resource: resource, + store: store, + timestamp: ts, + delta: updates, + }, nil +} + +// done releases resources held by the last N updateOps. +func (op *updateOp) done(n uint) { + op.resource.UpdatesReleaseN(n) + op.resource = nil + *op = updateOp{} +} diff --git a/filebeat/input/v2/input-cursor/store.go b/filebeat/input/v2/input-cursor/store.go index f44cf6760dca..66d5b4509363 100644 --- a/filebeat/input/v2/input-cursor/store.go +++ b/filebeat/input/v2/input-cursor/store.go @@ -18,10 +18,12 @@ package cursor import ( + "strings" "sync" "time" "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/beats/v7/libbeat/common/cleanup" "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/statestore" @@ -126,7 +128,25 @@ type ( var closeStore = (*store).close func openStore(log *logp.Logger, statestore StateStore, prefix string) (*store, error) { - panic("TODO: implement me") + ok := false + + persistentStore, err := statestore.Access() + if err != nil { + return nil, err + } + defer cleanup.IfNot(&ok, func() { persistentStore.Close() }) + + states, err := readStates(log, persistentStore, prefix) + if err != nil { + return nil, err + } + + ok = true + return &store{ + log: log, + persistentStore: persistentStore, + ephemeralStore: states, + }, nil } func (s *store) Retain() { s.refCount.Retain() } @@ -137,7 +157,9 @@ func (s *store) Release() { } func (s *store) close() { - panic("TODO: implement me") + if err := s.persistentStore.Close(); err != nil { + s.log.Errorf("Closing registry store did report an error: %+v", err) + } } // Get returns the resource for the key. @@ -152,18 +174,62 @@ func (s *store) Get(key string) *resource { // On update the resource its `cursor` state is used, to keep the cursor state in sync with the current known // on disk store state. func (s *store) UpdateTTL(resource *resource, ttl time.Duration) { - panic("TODO: implement me") + resource.stateMutex.Lock() + defer resource.stateMutex.Unlock() + if resource.stored && resource.internalState.TTL == ttl { + return + } + + resource.internalState.TTL = ttl + if resource.internalState.Updated.IsZero() { + resource.internalState.Updated = time.Now() + } + + err := s.persistentStore.Set(resource.key, state{ + TTL: resource.internalState.TTL, + Updated: resource.internalState.Updated, + Cursor: resource.cursor, + }) + if err != nil { + s.log.Errorf("Failed to update resource management fields for '%v'", resource.key) + resource.internalInSync = false + } else { + resource.stored = true + resource.internalInSync = true + } } // Find returns the resource for a given key. If the key is unknown and create is set to false nil will be returned. // The resource returned by Find is marked as active. (*resource).Release must be called to mark the resource as inactive again. func (s *states) Find(key string, create bool) *resource { - panic("TODO: implement me") + s.mu.Lock() + defer s.mu.Unlock() + + if resource := s.table[key]; resource != nil { + resource.Retain() + return resource + } + + if !create { + return nil + } + + // resource is owned by table(session) and input that uses the resource. + resource := &resource{ + stored: false, + key: key, + lock: unison.MakeMutex(), + } + s.table[key] = resource + resource.Retain() + return resource } // IsNew returns true if we have no state recorded for the current resource. func (r *resource) IsNew() bool { - panic("TODO: implement me") + r.stateMutex.Lock() + defer r.stateMutex.Unlock() + return r.pendingCursor == nil && r.cursor == nil } // Retain is used to indicate that 'resource' gets an additional 'owner'. @@ -201,3 +267,58 @@ func (r *resource) inSyncStateSnapshot() state { Cursor: r.cursor, } } + +// stateSnapshot returns the current in memory state, that already contains state updates +// not yet ACKed. +func (r *resource) stateSnapshot() state { + cursor := r.pendingCursor + if r.activeCursorOperations == 0 { + cursor = r.cursor + } + + return state{ + TTL: r.internalState.TTL, + Updated: r.internalState.Updated, + Cursor: cursor, + } +} + +func readStates(log *logp.Logger, store *statestore.Store, prefix string) (*states, error) { + keyPrefix := prefix + "::" + states := &states{ + table: map[string]*resource{}, + } + + err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { + if !strings.HasPrefix(string(key), keyPrefix) { + return true, nil + } + + var st state + if err := dec.Decode(&st); err != nil { + log.Errorf("Failed to read regisry state for '%v', cursor state will be ignored. Error was: %+v", + key, err) + return true, nil + } + + resource := &resource{ + key: key, + stored: true, + lock: unison.MakeMutex(), + internalInSync: true, + internalState: stateInternal{ + TTL: st.TTL, + Updated: st.Updated, + }, + cursor: st.Cursor, + } + states.table[resource.key] = resource + + return true, nil + }) + + if err != nil { + return nil, err + } + return states, nil +} diff --git a/filebeat/input/v2/input-cursor/store_test.go b/filebeat/input/v2/input-cursor/store_test.go new file mode 100644 index 000000000000..2fa3f6eaa3e8 --- /dev/null +++ b/filebeat/input/v2/input-cursor/store_test.go @@ -0,0 +1,351 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cursor + +import ( + "errors" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/storetest" +) + +type testStateStore struct { + Store *statestore.Store + GCPeriod time.Duration +} + +func TestStore_OpenClose(t *testing.T) { + t.Run("releasing store closes", func(t *testing.T) { + var closed bool + cleanup := closeStoreWith(func(s *store) { + closed = true + s.close() + }) + defer cleanup() + + store := testOpenStore(t, "test", nil) + store.Release() + + require.True(t, closed) + }) + + t.Run("fail if persistent store can not be accessed", func(t *testing.T) { + _, err := openStore(logp.NewLogger("test"), testStateStore{}, "test") + require.Error(t, err) + }) + + t.Run("load from empty", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + require.Equal(t, 0, len(storeMemorySnapshot(store))) + require.Equal(t, 0, len(storeInSyncSnapshot(store))) + }) + + t.Run("already available state is loaded", func(t *testing.T) { + states := map[string]state{ + "test::key0": {Cursor: "1"}, + "test::key1": {Cursor: "2"}, + } + + store := testOpenStore(t, "test", createSampleStore(t, states)) + defer store.Release() + + checkEqualStoreState(t, states, storeMemorySnapshot(store)) + checkEqualStoreState(t, states, storeInSyncSnapshot(store)) + }) + + t.Run("ignore entries with wrong index on open", func(t *testing.T) { + states := map[string]state{ + "test::key0": {Cursor: "1"}, + "other::key": {Cursor: "2"}, + } + + store := testOpenStore(t, "test", createSampleStore(t, states)) + defer store.Release() + + want := map[string]state{ + "test::key0": {Cursor: "1"}, + } + checkEqualStoreState(t, want, storeMemorySnapshot(store)) + checkEqualStoreState(t, want, storeInSyncSnapshot(store)) + }) +} + +func TestStore_Get(t *testing.T) { + t.Run("find existing resource", func(t *testing.T) { + cursorState := state{Cursor: "1"} + store := testOpenStore(t, "test", createSampleStore(t, map[string]state{ + "test::key0": cursorState, + })) + defer store.Release() + + res := store.Get("test::key0") + require.NotNil(t, res) + defer res.Release() + + // check in memory state matches matches original persistent state + require.Equal(t, cursorState, res.stateSnapshot()) + // check assumed in-sync state matches matches original persistent state + require.Equal(t, cursorState, res.inSyncStateSnapshot()) + }) + + t.Run("access unknown resource", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + + res := store.Get("test::key") + require.NotNil(t, res) + defer res.Release() + + // new resource has empty state + require.Equal(t, state{}, res.stateSnapshot()) + }) + + t.Run("same resource is returned", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + + res1 := store.Get("test::key") + require.NotNil(t, res1) + defer res1.Release() + + res2 := store.Get("test::key") + require.NotNil(t, res2) + defer res2.Release() + + assert.Equal(t, res1, res2) + }) +} + +func TestStore_UpdateTTL(t *testing.T) { + t.Run("add TTL for new entry to store", func(t *testing.T) { + // when creating a resource we set the TTL and insert a new key value pair without cursor value into the store: + store := testOpenStore(t, "test", createSampleStore(t, nil)) + defer store.Release() + + res := store.Get("test::key") + store.UpdateTTL(res, 60*time.Second) + + want := map[string]state{ + "test::key": { + TTL: 60 * time.Second, + Updated: res.internalState.Updated, + Cursor: nil, + }, + } + + checkEqualStoreState(t, want, storeMemorySnapshot(store)) + checkEqualStoreState(t, want, storeInSyncSnapshot(store)) + }) + + t.Run("update TTL for in-sync resource does not overwrite state", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, map[string]state{ + "test::key": { + TTL: 1 * time.Second, + Cursor: "test", + }, + })) + defer store.Release() + + res := store.Get("test::key") + store.UpdateTTL(res, 60*time.Second) + want := map[string]state{ + "test::key": { + Updated: res.internalState.Updated, + TTL: 60 * time.Second, + Cursor: "test", + }, + } + + checkEqualStoreState(t, want, storeMemorySnapshot(store)) + checkEqualStoreState(t, want, storeInSyncSnapshot(store)) + }) + + t.Run("update TTL for resource with pending updates", func(t *testing.T) { + // This test updates the resource TTL while update operations are still + // pending, but not synced to the persistent store yet. + // UpdateTTL changes the state in the persistent store immediately, and must therefore + // serialize the old in-sync state with update meta-data. + + // create store + backend := createSampleStore(t, map[string]state{ + "test::key": { + TTL: 1 * time.Second, + Cursor: "test", + }, + }) + store := testOpenStore(t, "test", backend) + defer store.Release() + + // create pending update operation + res := store.Get("test::key") + op, err := createUpdateOp(store, res, "test-state-update") + require.NoError(t, err) + defer op.done(1) + + // Update key/value pair TTL. This will update the internal state in the + // persistent store only, not modifying the old cursor state yet. + store.UpdateTTL(res, 60*time.Second) + + // validate + wantMemoryState := state{ + Updated: res.internalState.Updated, + TTL: 60 * time.Second, + Cursor: "test-state-update", + } + wantInSyncState := state{ + Updated: res.internalState.Updated, + TTL: 60 * time.Second, + Cursor: "test", + } + + checkEqualStoreState(t, map[string]state{"test::key": wantMemoryState}, storeMemorySnapshot(store)) + checkEqualStoreState(t, map[string]state{"test::key": wantInSyncState}, storeInSyncSnapshot(store)) + checkEqualStoreState(t, map[string]state{"test::key": wantInSyncState}, backend.snapshot()) + }) +} + +func closeStoreWith(fn func(s *store)) func() { + old := closeStore + closeStore = fn + return func() { + closeStore = old + } +} + +func testOpenStore(t *testing.T, prefix string, persistentStore StateStore) *store { + if persistentStore == nil { + persistentStore = createSampleStore(t, nil) + } + + store, err := openStore(logp.NewLogger("test"), persistentStore, prefix) + if err != nil { + t.Fatalf("failed to open the store") + } + return store +} + +func createSampleStore(t *testing.T, data map[string]state) testStateStore { + storeReg := statestore.NewRegistry(storetest.NewMemoryStoreBackend()) + store, err := storeReg.Get("test") + if err != nil { + t.Fatalf("Failed to access store: %v", err) + } + + for k, v := range data { + if err := store.Set(k, v); err != nil { + t.Fatalf("Error when populating the sample store: %v", err) + } + } + + return testStateStore{ + Store: store, + } +} + +func (ts testStateStore) WithGCPeriod(d time.Duration) testStateStore { ts.GCPeriod = d; return ts } +func (ts testStateStore) CleanupInterval() time.Duration { return ts.GCPeriod } +func (ts testStateStore) Access() (*statestore.Store, error) { + if ts.Store == nil { + return nil, errors.New("no store configured") + } + return ts.Store, nil +} + +// snapshot copies all key/value pairs from the persistent store into a table for inspection. +func (ts testStateStore) snapshot() map[string]state { + states := map[string]state{} + err := ts.Store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { + var st state + if err := dec.Decode(&st); err != nil { + return false, err + } + states[key] = st + return true, nil + }) + + if err != nil { + panic("unexpected decode error from persistent test store") + } + return states +} + +// storeMemorySnapshot copies all key/value pairs into a table for inspection. +// The state returned reflects the in memory state, which can be ahead of the +// persistent state. +// +// Note: The state returned by storeMemorySnapshot is always ahead of the state returned by storeInSyncSnapshot. +// All key value pairs are fully in-sync, if both snapshot functions return the same state. +func storeMemorySnapshot(store *store) map[string]state { + store.ephemeralStore.mu.Lock() + defer store.ephemeralStore.mu.Unlock() + + states := map[string]state{} + for k, res := range store.ephemeralStore.table { + states[k] = res.stateSnapshot() + } + return states +} + +// storeInSyncSnapshot copies all key/value pairs into the table for inspection. +// The state returned reflects the current state that the in-memory tables assumed to be +// written to the persistent store already. + +// Note: The state returned by storeMemorySnapshot is always ahead of the state returned by storeInSyncSnapshot. +// All key value pairs are fully in-sync, if both snapshot functions return the same state. +func storeInSyncSnapshot(store *store) map[string]state { + store.ephemeralStore.mu.Lock() + defer store.ephemeralStore.mu.Unlock() + + states := map[string]state{} + for k, res := range store.ephemeralStore.table { + states[k] = res.inSyncStateSnapshot() + } + return states +} + +// checkEqualStoreState compares 2 store snapshot tables for equality. The test +// fails with Errorf if the state differ. +// +// Note: testify is too strict when comparing timestamp, better use checkEqualStoreState. +func checkEqualStoreState(t *testing.T, want, got map[string]state) bool { + if d := cmp.Diff(want, got); d != "" { + t.Errorf("store state mismatch (-want +got):\n%s", d) + return false + } + return true +} + +// requireEqualStoreState compares 2 store snapshot tables for equality. The test +// fails with Fatalf if the state differ. +// +// Note: testify is too strict when comparing timestamp, better use checkEqualStoreState. +func requireEqualStoreState(t *testing.T, want, got map[string]state) bool { + if d := cmp.Diff(want, got); d != "" { + t.Fatalf("store state mismatch (-want +got):\n%s", d) + return false + } + return true +}