Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement storage handling for cursor inputs #19518

Merged
merged 1 commit into from
Jul 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions filebeat/input/v2/input-cursor/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/transform/typeconv"
)

// Publisher is used to publish an event and update the cursor in a single call to Publish.
Expand Down Expand Up @@ -70,3 +71,38 @@ func (c *cursorPublisher) Publish(event beat.Event, cursorUpdate interface{}) er
func (op *updateOp) Execute(numEvents uint) {
panic("TODO: implement me")
}

func createUpdateOp(store *store, resource *resource, updates interface{}) (*updateOp, error) {
ts := time.Now()

resource.stateMutex.Lock()
defer resource.stateMutex.Unlock()

cursor := resource.pendingCursor
if resource.activeCursorOperations == 0 {
var tmp interface{}
typeconv.Convert(&tmp, cursor)
resource.pendingCursor = tmp
cursor = tmp
}
if err := typeconv.Convert(&cursor, updates); err != nil {
return nil, err
}
resource.pendingCursor = cursor

resource.Retain()
resource.activeCursorOperations++
return &updateOp{
resource: resource,
store: store,
timestamp: ts,
delta: updates,
}, nil
}

// done releases resources held by the last N updateOps.
func (op *updateOp) done(n uint) {
op.resource.UpdatesReleaseN(n)
op.resource = nil
*op = updateOp{}
}
131 changes: 126 additions & 5 deletions filebeat/input/v2/input-cursor/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package cursor

import (
"strings"
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/common/cleanup"
"github.com/elastic/beats/v7/libbeat/common/transform/typeconv"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/statestore"
Expand Down Expand Up @@ -126,7 +128,25 @@ type (
var closeStore = (*store).close

func openStore(log *logp.Logger, statestore StateStore, prefix string) (*store, error) {
panic("TODO: implement me")
ok := false

persistentStore, err := statestore.Access()
if err != nil {
return nil, err
}
defer cleanup.IfNot(&ok, func() { persistentStore.Close() })

states, err := readStates(log, persistentStore, prefix)
if err != nil {
return nil, err
}

ok = true
return &store{
log: log,
persistentStore: persistentStore,
ephemeralStore: states,
}, nil
}

func (s *store) Retain() { s.refCount.Retain() }
Expand All @@ -137,7 +157,9 @@ func (s *store) Release() {
}

func (s *store) close() {
panic("TODO: implement me")
if err := s.persistentStore.Close(); err != nil {
s.log.Errorf("Closing registry store did report an error: %+v", err)
kvch marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Get returns the resource for the key.
Expand All @@ -152,18 +174,62 @@ func (s *store) Get(key string) *resource {
// On update the resource its `cursor` state is used, to keep the cursor state in sync with the current known
// on disk store state.
func (s *store) UpdateTTL(resource *resource, ttl time.Duration) {
panic("TODO: implement me")
resource.stateMutex.Lock()
defer resource.stateMutex.Unlock()
if resource.stored && resource.internalState.TTL == ttl {
return
}

resource.internalState.TTL = ttl
if resource.internalState.Updated.IsZero() {
resource.internalState.Updated = time.Now()
}

err := s.persistentStore.Set(resource.key, state{
TTL: resource.internalState.TTL,
Updated: resource.internalState.Updated,
Cursor: resource.cursor,
})
if err != nil {
s.log.Errorf("Failed to update resource management fields for '%v'", resource.key)
resource.internalInSync = false
} else {
resource.stored = true
resource.internalInSync = true
}
}

// Find returns the resource for a given key. If the key is unknown and create is set to false nil will be returned.
// The resource returned by Find is marked as active. (*resource).Release must be called to mark the resource as inactive again.
func (s *states) Find(key string, create bool) *resource {
panic("TODO: implement me")
s.mu.Lock()
defer s.mu.Unlock()

if resource := s.table[key]; resource != nil {
resource.Retain()
return resource
}

if !create {
return nil
}

// resource is owned by table(session) and input that uses the resource.
resource := &resource{
stored: false,
key: key,
lock: unison.MakeMutex(),
}
s.table[key] = resource
resource.Retain()
return resource
}

// IsNew returns true if we have no state recorded for the current resource.
func (r *resource) IsNew() bool {
panic("TODO: implement me")
r.stateMutex.Lock()
defer r.stateMutex.Unlock()
return r.pendingCursor == nil && r.cursor == nil
}

// Retain is used to indicate that 'resource' gets an additional 'owner'.
Expand Down Expand Up @@ -201,3 +267,58 @@ func (r *resource) inSyncStateSnapshot() state {
Cursor: r.cursor,
}
}

// stateSnapshot returns the current in memory state, that already contains state updates
// not yet ACKed.
func (r *resource) stateSnapshot() state {
cursor := r.pendingCursor
if r.activeCursorOperations == 0 {
cursor = r.cursor
}

return state{
TTL: r.internalState.TTL,
Updated: r.internalState.Updated,
Cursor: cursor,
}
}

func readStates(log *logp.Logger, store *statestore.Store, prefix string) (*states, error) {
keyPrefix := prefix + "::"
states := &states{
table: map[string]*resource{},
}

err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) {
if !strings.HasPrefix(string(key), keyPrefix) {
return true, nil
}

var st state
if err := dec.Decode(&st); err != nil {
log.Errorf("Failed to read regisry state for '%v', cursor state will be ignored. Error was: %+v",
key, err)
return true, nil
}

resource := &resource{
key: key,
stored: true,
lock: unison.MakeMutex(),
internalInSync: true,
internalState: stateInternal{
TTL: st.TTL,
Updated: st.Updated,
},
cursor: st.Cursor,
}
states.table[resource.key] = resource

return true, nil
})

if err != nil {
return nil, err
}
return states, nil
}
Loading