Skip to content

Commit

Permalink
Cursor input old state collection (elastic#19528)
Browse files Browse the repository at this point in the history
The change provides the collection and deletion support for old states.
All entries added to the store have a TTL configured. A key-value pair
will be removed from the store once the most recent update timestamp + TTL < now.
Pairs with pending updates will not be deleted yet.

The collector currently only implements the logic for the clean_inactive setting.

In comparison to the old registar, is the TTL not reset on startup. Old
entries are still subject to collection, even if they've not been
claimed. The TTL will be updated by the InputManager if an input with a
different TTL is started for an already seen source.
  • Loading branch information
Steffen Siering committed Jul 8, 2020
1 parent 623201a commit 3d28fab
Show file tree
Hide file tree
Showing 2 changed files with 242 additions and 2 deletions.
82 changes: 80 additions & 2 deletions filebeat/input/v2/input-cursor/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package cursor
import (
"time"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/go-concert/timed"
"github.com/elastic/go-concert/unison"

"github.com/elastic/beats/v7/libbeat/logp"
)

// cleaner removes finished entries from the registry file.
Expand All @@ -41,5 +43,81 @@ type cleaner struct {
// for a long time, and the life time has been exhausted, then the resource will be removed immediately
// once the last event has been ACKed.
func (c *cleaner) run(canceler unison.Canceler, store *store, interval time.Duration) {
panic("TODO: implement me")
started := time.Now()
timed.Periodic(canceler, interval, func() {
gcStore(c.log, started, store)
})
}

// gcStore looks for resources to remove and deletes these. `gcStore` receives
// the start timestamp of the cleaner as reference. If we have entries without
// updates in the registry, that are older than `started`, we will use `started
// + ttl` to decide if an entry will be removed. This way old entries are not
// removed immediately on startup if the Beat is down for a longer period of
// time.
func gcStore(log *logp.Logger, started time.Time, store *store) {
log.Debugf("Start store cleanup")
defer log.Debugf("Done store cleanup")

states := store.ephemeralStore
states.mu.Lock()
defer states.mu.Unlock()

keys := gcFind(states.table, started, time.Now())
if len(keys) == 0 {
log.Debug("No entries to remove were found")
return
}

if err := gcClean(store, keys); err != nil {
log.Errorf("Failed to remove all entries from the registry: %+v", err)
}
}

// gcFind searches the store of resources that can be removed. A set of keys to delete is returned.
func gcFind(table map[string]*resource, started, now time.Time) map[string]struct{} {
keys := map[string]struct{}{}
for key, resource := range table {
clean := checkCleanResource(started, now, resource)
if !clean {
// do not clean the resource if it is still live or not serialized to the persistent store yet.
continue
}
keys[key] = struct{}{}
}

return keys
}

// gcClean removes key value pairs in the removeSet from the store.
// If deletion in the persistent store fails the entry is kept in memory and
// eventually cleaned up later.
func gcClean(store *store, removeSet map[string]struct{}) error {
for key := range removeSet {
if err := store.persistentStore.Remove(key); err != nil {
return err
}
delete(store.ephemeralStore.table, key)
}
return nil
}

// checkCleanResource returns true for a key-value pair is assumed to be old,
// if is not in use and there are no more pending updates that still need to be
// written to the persistent store anymore.
func checkCleanResource(started, now time.Time, resource *resource) bool {
if !resource.Finished() {
return false
}

resource.stateMutex.Lock()
defer resource.stateMutex.Unlock()

ttl := resource.internalState.TTL
reference := resource.internalState.Updated
if started.After(reference) {
reference = started
}

return reference.Add(ttl).Before(now) && resource.stored
}
162 changes: 162 additions & 0 deletions filebeat/input/v2/input-cursor/clean_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package cursor

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/logp"
)

func TestGCStore(t *testing.T) {
t.Run("empty store", func(t *testing.T) {
started := time.Now()

backend := createSampleStore(t, nil)
store := testOpenStore(t, "test", backend)
defer store.Release()

gcStore(logp.NewLogger("test"), started, store)

want := map[string]state{}
checkEqualStoreState(t, want, backend.snapshot())
})

t.Run("state is still alive", func(t *testing.T) {
started := time.Now()
const ttl = 60 * time.Second

initState := map[string]state{
"test::key": {
TTL: ttl,
Updated: started.Add(-ttl / 2),
},
}

backend := createSampleStore(t, initState)
store := testOpenStore(t, "test", backend)
defer store.Release()

gcStore(logp.NewLogger("test"), started, store)

checkEqualStoreState(t, initState, backend.snapshot())
})

t.Run("old state can be removed", func(t *testing.T) {
const ttl = 60 * time.Second
started := time.Now().Add(-5 * ttl) // cleanup process is running for a while already

initState := map[string]state{
"test::key": {
TTL: ttl,
Updated: started.Add(-ttl),
},
}

backend := createSampleStore(t, initState)
store := testOpenStore(t, "test", backend)
defer store.Release()

gcStore(logp.NewLogger("test"), started, store)

want := map[string]state{}
checkEqualStoreState(t, want, backend.snapshot())
})

t.Run("old state is not removed if cleanup is not active long enough", func(t *testing.T) {
const ttl = 60 * time.Minute
started := time.Now()

initState := map[string]state{
"test::key": {
TTL: ttl,
Updated: started.Add(-2 * ttl),
},
}

backend := createSampleStore(t, initState)
store := testOpenStore(t, "test", backend)
defer store.Release()

gcStore(logp.NewLogger("test"), started, store)

checkEqualStoreState(t, initState, backend.snapshot())
})

t.Run("old state but resource is accessed", func(t *testing.T) {
const ttl = 60 * time.Second
started := time.Now().Add(-5 * ttl) // cleanup process is running for a while already

initState := map[string]state{
"test::key": {
TTL: ttl,
Updated: started.Add(-ttl),
},
}

backend := createSampleStore(t, initState)
store := testOpenStore(t, "test", backend)
defer store.Release()

// access resource and check it is not gc'ed
res := store.Get("test::key")
gcStore(logp.NewLogger("test"), started, store)
checkEqualStoreState(t, initState, backend.snapshot())

// release resource and check it gets gc'ed
res.Release()
want := map[string]state{}
gcStore(logp.NewLogger("test"), started, store)
checkEqualStoreState(t, want, backend.snapshot())
})

t.Run("old state but resource has pending updates", func(t *testing.T) {
const ttl = 60 * time.Second
started := time.Now().Add(-5 * ttl) // cleanup process is running for a while already

initState := map[string]state{
"test::key": {
TTL: ttl,
Updated: started.Add(-ttl),
},
}

backend := createSampleStore(t, initState)
store := testOpenStore(t, "test", backend)
defer store.Release()

// create pending update operation
res := store.Get("test::key")
op, err := createUpdateOp(store, res, "test-state-update")
require.NoError(t, err)
res.Release()

// cleanup fails
gcStore(logp.NewLogger("test"), started, store)
checkEqualStoreState(t, initState, backend.snapshot())

// cancel operation (no more pending operations) and try to gc again
op.done(1)
gcStore(logp.NewLogger("test"), started, store)
want := map[string]state{}
checkEqualStoreState(t, want, backend.snapshot())
})
}

0 comments on commit 3d28fab

Please sign in to comment.