Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cursor input skeleton #19378

Merged
merged 2 commits into from
Jun 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions filebeat/input/v2/input-cursor/clean.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package cursor

import (
"time"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/go-concert/unison"
)

// cleaner removes finished entries from the registry file.
Copy link
Contributor

@kvch kvch Jun 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the finished states are not claimed by any input and there is no pending state update. So this means that as long as Filebeat is running, it must claim the resource to persist its state. Does this mean that Filebeat cannot close an input source temporarily, e.g. a journal to wait until new entries show up otherwise its state is lost? Or is this going to implement functionality similar to those of the options clean_* for log input?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finished states are not claimed by any input and there is no pending state update.

correct.

So this means that as long as Filebeat is running, it must claim the resource to persist its state.

The resource is claimed by an active input. If there is no active input (e.g. autodiscovery closed the input after the container has been deleted, or the configuration file has changed between restarts), it is 'free' and might be removed by the cleaning process.

Does this mean that Filebeat cannot close an input source temporarily,

More or less correct. With the implementation of this input manager we add a TTL to each entry in the store. unclaimed resources are removed if latest update + TTL < now.

This input manager requires sources to be configured statically and upfront based on the configuration. Internally resources can't be released and reclaimed without an external signal.
But:

  • the input.Run method is allowed to close connections, wait, and reopen. The resource is claimed for as long as the go-routine is alive (Run did not return).
  • We might consider to add a 'flag' to a resource saying that it can not be cleaned yet. This flag would be active in memory only and reset on beats restart

Filebeat can have different input manager instances and even different input manager implementations that can all coexist and provide different coordination behavior. For the log input we can not configure []Source upfront, as we need a discovery mechanism (file watcher) adding and removing sources dynamically. Here we have two options, enhance this input manager, or develop a second input manager that can handle dynamic sources. For dynamic sources we should also consider some signaling like 'source removed' or 'source renamed', so we implement close_removed and clean_removed on top of these signals.

Almost all stateful filebeat inputs we have do configure the source statically at init-time. Even the windows event logs and the journald log (configure system_logs, path, or a filename). In the later case the journald libraries do the heavy lifting for use (based on our configured source).
AFAICT the only special input is the log input.

Or is this going to implement functionality similar to those of the options clean_* for log input?

Yes. The TTL and cleaner are similar to clean_inactive. This input manager does not have a clean_removed, as the sources are 'static' and we have no way to tell it that a resource is gone for good.

For an input manager that supports dynamic sources, having a file watcher we could emit a remove immediately, even cancelling pending update operations (or set TTL to 0 and wait for the resource to be 'finished').

For an example usage check out the journald input implementation: https://github.com/urso/beats/blob/fb-input-v2-combined/filebeat/features/input/journald/input.go
The journald input creates it's own InputManager, that coordinates journald input instances only.

type cleaner struct {
log *logp.Logger
}

// run starts a loop that tries to clean entries from the registry.
// The cleaner locks the store, such that no new states can be created
// during the cleanup phase. Only resources that are finished and whos TTL
// (clean_timeout setting) has expired will be removed.
//
// Resources are considered "Finished" if they do not have a current owner (active input), and
// if they have no pending updates that still need to be written to the registry file after associated
// events have been ACKed by the outputs.
// The event acquisition timestamp is used as reference to clean resources. If a resources was blocked
// for a long time, and the life time has been exhausted, then the resource will be removed immediately
// once the last event has been ACKed.
func (c *cleaner) run(canceler unison.Canceler, store *store, interval time.Duration) {
panic("TODO: implement me")
}
43 changes: 43 additions & 0 deletions filebeat/input/v2/input-cursor/cursor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package cursor

// Cursor allows the input to check if cursor status has been stored
// in the past and unpack the status into a custom structure.
type Cursor struct {
store *store
resource *resource
}

func makeCursor(store *store, res *resource) Cursor {
return Cursor{store: store, resource: res}
}

// IsNew returns true if no cursor information has been stored
// for the current Source.
func (c Cursor) IsNew() bool { return c.resource.IsNew() }

// Unpack deserialized the cursor state into to. Unpack fails if no pointer is
// given, or if the structure to points to is not compatible with the document
// stored.
func (c Cursor) Unpack(to interface{}) error {
if c.IsNew() {
return nil
}
return c.resource.UnpackCursor(to)
}
58 changes: 58 additions & 0 deletions filebeat/input/v2/input-cursor/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Package cursor provides an InputManager for use with the v2 API, that is
// capable of storing an internal cursor state between restarts.
//
// The InputManager requires authors to Implement a configuration function and
// the cursor.Input interface. The configuration function returns a slice of
// sources ([]Source) that it has read from the configuration object, and the
// actual Input that will be used to collect events from each configured
// source.
// When Run a go-routine will be started per configured source. If two inputs have
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

// configured the same source, only one will be active, while the other waits
// for the resource to become free.
// The manager keeps track of the state per source. When publishing an event a
// new cursor value can be passed as well. Future instance of the input can
// read the last published cursor state.
//
// For each source an in-memory and a persitent state are tracked. Internal
// meta updates by the input manager can not be read by Inputs, and will be
// written to the persistent store immediately. Cursor state updates are read
// and update by the input. Cursor updates are written to the persistent store
// only after the events have been ACKed by the output. Internally the input
// manager keeps track of already ACKed updates and pending ACKs.
// In order to guarantee progress even if the pbulishing is slow or blocked, all cursor
// updates are written to the in-memory state immediately. Source without any
// pending updates are in-sync (in-memory state == persistet state). All
// updates are ordered, but we allow the in-memory state to be ahead of the
// persistent state.
// When an input is started, the cursor state is read from the in-memory state.
// This way a new input instance can continue where other inputs have been
// stopped, even if we still have in-flight events from older input instances.
// The coordination between inputs guarantees that all updates are always in
// order.
//
// When a shutdown signal is received, the publisher is directly disconnected
// from the outputs. As all coordination is directly handled by the
// InputManager, shutdown will be immediate (once the input itself has
// returned), and can not be blocked by the outputs.
//
// An input that is about to collect a source that is already collected by
// another input will wait until the other input has returned or the current
// input did receive a shutdown signal.
package cursor
84 changes: 84 additions & 0 deletions filebeat/input/v2/input-cursor/input.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package cursor

import (
"fmt"
"time"

input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
)

// Input interface for cursor based inputs. This interface must be implemented
// by inputs that with to use the InputManager in order to implement a stateful
// input that can store state between restarts.
type Input interface {
Name() string

// Test checks the configuaration and runs additional checks if the Input can
// actually collect data for the given configuration (e.g. check if host/port or files are
// accessible).
// The input manager will call Test per configured source.
Test(Source, input.TestContext) error

// Run starts the data collection. Run must return an error only if the
// error is fatal making it impossible for the input to recover.
// The input run a go-routine can call Run per configured Source.
Run(input.Context, Source, Cursor, Publisher) error
}

// managedInput implements the v2.Input interface, integrating cursor Inputs
// with the v2 input API.
// The managedInput starts go-routines per configured source.
// If a Run returns the error is 'remembered', but active data collecting
// continues. Only after all Run calls have returned will the managedInput be
// done.
type managedInput struct {
manager *InputManager
userID string
sources []Source
input Input
cleanTimeout time.Duration
}

// Name is required to implement the v2.Input interface
func (inp *managedInput) Name() string { return inp.input.Name() }

// Test runs the Test method for each configured source.
func (inp *managedInput) Test(ctx input.TestContext) error {
panic("TODO: implement me")
}

// Run creates a go-routine per source, waiting until all go-routines have
// returned, either by error, or by shutdown signal.
// If an input panics, we create an error value with stack trace to report the
// issue, but not crash the whole process.
func (inp *managedInput) Run(
ctx input.Context,
pipeline beat.PipelineConnector,
) (err error) {
panic("TODO: implement me")
}

func (inp *managedInput) createSourceID(s Source) string {
if inp.userID != "" {
return fmt.Sprintf("%v::%v::%v", inp.manager.Type, inp.userID, s.Name())
}
return fmt.Sprintf("%v::%v", inp.manager.Type, s.Name())
}
106 changes: 106 additions & 0 deletions filebeat/input/v2/input-cursor/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package cursor

import (
"time"

input "github.com/elastic/beats/v7/filebeat/input/v2"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/statestore"

"github.com/elastic/go-concert/unison"
)

// InputManager is used to create, manage, and coordinate stateful inputs and
// their persistent state.
// The InputManager ensures that only one input can be active for a unique source.
// If two inputs have overlapping sources, both can still collect data, but
// only one input will collect from the common source.
//
// The InputManager automatically cleans up old entries without an active
// input, and without any pending update operations for the persistent store.
//
// The Type field is used to create the key name in the persistent store. Users
// are allowed to add a custome per input configuration ID using the `id`
// setting, to collect the same source multiple times, but with different
// state. The key name in the persistent store becomes <Type>-[<ID>]-<Source Name>
type InputManager struct {
Logger *logp.Logger

// StateStore gives the InputManager access to the persitent key value store.
StateStore StateStore

// Type must contain the name of the input type. It is used to create the key name
// for all sources the inputs collect from.
Type string

// DefaultCleanTimeout configures the key/value garbage collection interval.
// The InputManager will only collect keys for the configured 'Type'
DefaultCleanTimeout time.Duration

// Configure returns an array of Sources, and a configured Input instances
// that will be used to collect events from each source.
Configure func(cfg *common.Config) ([]Source, Input, error)

store *store
}

// Source describe a source the input can collect data from.
// The `Name` method must return an unique name, that will be used to identify
// the source in the persistent state store.
type Source interface {
Name() string
}

// StateStore interface and configurations used to give the Manager access to the persistent store.
type StateStore interface {
Access() (*statestore.Store, error)
CleanupInterval() time.Duration
}

// Init starts background processes for deleting old entries from the
// persistent store if mode is ModeRun.
func (cim *InputManager) Init(group unison.Group, mode v2.Mode) error {
panic("TODO: implement me")
}

// Create builds a new v2.Input using the provided Configure function.
// The Input will run a go-routine per source that has been configured.
func (cim *InputManager) Create(config *common.Config) (input.Input, error) {
panic("TODO: implement me")
}

func lockResource(log *logp.Logger, resource *resource, canceler input.Canceler) error {
if !resource.lock.TryLock() {
log.Infof("Resource '%v' currently in use, waiting...", resource.key)
err := resource.lock.LockContext(canceler)
if err != nil {
log.Infof("Input for resource '%v' has been stopped while waiting", resource.key)
return err
}
}
return nil
}

func releaseResource(resource *resource) {
resource.lock.Unlock()
resource.Release()
}
Loading