Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Select output index based on the source input #14010

Merged
merged 21 commits into from
Nov 7, 2019
Merged
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 47 additions & 11 deletions filebeat/channel/connector.go
Original file line number Diff line number Diff line change
@@ -18,8 +18,11 @@
package channel

import (
"fmt"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/processors"
)

@@ -51,22 +54,28 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien
return nil, err
}

var err error
var userProcessors beat.ProcessorList
procs := processors.NewList(nil)
faec marked this conversation as resolved.
Show resolved Hide resolved

userProcessors, err = processors.New(config.Processors)
userProcessors, err := processors.New(config.Processors)
if err != nil {
return nil, err
}
procs.List = append(procs.List, userProcessors)

if !config.Index.IsEmpty() {
staticFields := fmtstr.FieldsForBeat(
c.parent.beatInfo.Beat, c.parent.beatInfo.Version)
timestampFormat, err :=
fmtstr.NewTimestampFormatString(&config.Index, staticFields)
if err != nil {
return nil, err
}
indexProcessor := &addFormattedIndex{timestampFormat}
procs.List = append(procs.List, indexProcessor)
}

if lst := clientCfg.Processing.Processor; lst != nil {
if len(userProcessors.All()) == 0 {
userProcessors = lst
} else if orig := lst.All(); len(orig) > 0 {
newLst := processors.NewList(nil)
newLst.List = append(newLst.List, lst, userProcessors)
userProcessors = newLst
}
procs.List = append(procs.List, lst)
faec marked this conversation as resolved.
Show resolved Hide resolved
}

setOptional := func(to common.MapStr, key string, value string) {
@@ -105,7 +114,7 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien
clientCfg.Processing.EventMetadata = config.EventMetadata
clientCfg.Processing.Meta = meta
clientCfg.Processing.Fields = fields
clientCfg.Processing.Processor = userProcessors
clientCfg.Processing.Processor = procs
client, err := c.pipeline.ConnectWith(clientCfg)
if err != nil {
return nil, err
@@ -117,3 +126,30 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien
}
return outlet, nil
}

//////////////////////////////
// addFormattedIndex is a Processor to set an event's "raw-index" metadata field
// with a given TimestampFormatString. The elasticsearch output interprets
// that field as specifying the (raw string) index the event should be sent to;
// in other outputs it is just included in the metadata.

type addFormattedIndex struct {
faec marked this conversation as resolved.
Show resolved Hide resolved
formatString *fmtstr.TimestampFormatString
}

func (p *addFormattedIndex) Run(event *beat.Event) (*beat.Event, error) {
index, err := p.formatString.Run(event.Timestamp)
if err != nil {
return nil, err
}

if event.Meta == nil {
event.Meta = common.MapStr{}
}
event.Meta["raw-index"] = index
return event, nil
}

func (p *addFormattedIndex) String() string {
return fmt.Sprintf("add_index_pattern=%v", p.formatString)
}
93 changes: 93 additions & 0 deletions filebeat/channel/connector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package channel

import (
"testing"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

func TestThing(t *testing.T) {
done := make(chan struct{})
beatInfo := beat.Info{Beat: "TestBeat", Version: "3.9.27"}
outletFactory := NewOutletFactory(done, emptyCounter{}, beatInfo)
pipeline := newChannelPipeline()
connector := outletFactory.Create(pipeline)
config, err := common.NewConfigFrom("index: 'test'")
if err != nil {
t.Error(err)
}
//field, _ := config.String("index", -1)
//fmt.Printf("config index: %v\n", field)
//config := common.NewConfig()
outleter, err := connector.ConnectWith(
config,
beat.ClientConfig{},
)
outleter.OnEvent(beat.Event{})
if err != nil {
t.Error(err)
}
processedEvent := <-pipeline.events
if processedEvent.Meta == nil {
//t.Error("Event Meta shouldn't be empty")
}
}

type emptyCounter struct{}

func (c emptyCounter) Add(n int) {}
func (c emptyCounter) Done() {}

// channelPipeline is a Pipeline (and Client) whose connections just echo their
// events to a shared events channel for testing.
type channelPipeline struct {
events chan beat.Event
}

func newChannelPipeline() *channelPipeline {
return &channelPipeline{make(chan beat.Event, 100)}
}

func (cp *channelPipeline) SetACKHandler(h beat.PipelineACKHandler) error {
return nil
}

func (cp *channelPipeline) ConnectWith(conf beat.ClientConfig) (beat.Client, error) {
return cp, nil
}

func (cp *channelPipeline) Connect() (beat.Client, error) {
return cp, nil
}

func (cp *channelPipeline) Publish(event beat.Event) {
cp.events <- event
}

func (cp *channelPipeline) PublishAll(events []beat.Event) {
for _, event := range events {
cp.Publish(event)
}
}

func (cp *channelPipeline) Close() error {
return nil
}
8 changes: 6 additions & 2 deletions filebeat/channel/factory.go
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ package channel
import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/processors"
)

@@ -28,6 +29,7 @@ type OutletFactory struct {

eventer beat.ClientEventer
wgEvents eventCounter
beatInfo beat.Info
}

type eventCounter interface {
@@ -56,19 +58,21 @@ type inputOutletConfig struct {
Fileset string `config:"_fileset_name"` // hidden setting

// Output meta data settings
Pipeline string `config:"pipeline"` // ES Ingest pipeline name

Pipeline string `config:"pipeline"` // ES Ingest pipeline name
Index fmtstr.EventFormatString `config:"index"` // ES output index pattern
}

// NewOutletFactory creates a new outlet factory for
// connecting an input to the publisher pipeline.
func NewOutletFactory(
done <-chan struct{},
wgEvents eventCounter,
beatInfo beat.Info,
) *OutletFactory {
o := &OutletFactory{
done: done,
wgEvents: wgEvents,
beatInfo: beatInfo,
}

if wgEvents != nil {
4 changes: 2 additions & 2 deletions filebeat/input/input.go
Original file line number Diff line number Diff line change
@@ -60,7 +60,7 @@ type Runner struct {
// New instantiates a new Runner
func New(
conf *common.Config,
outlet channel.Connector,
connector channel.Connector,
beatDone chan struct{},
states []file.State,
dynFields *common.MapStrPointer,
@@ -99,7 +99,7 @@ func New(
Meta: nil,
}
var ipt Input
ipt, err = f(conf, outlet, context)
ipt, err = f(conf, connector, context)
if err != nil {
return input, err
}
8 changes: 8 additions & 0 deletions filebeat/kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version: '2.3'
services:
kafka:
build: ${ES_BEATS}/testing/environments/docker/kafka
expose:
- 9092
environment:
- ADVERTISED_HOST=kafka
80 changes: 80 additions & 0 deletions libbeat/common/fmtstr/formattimestamp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package fmtstr

import (
"time"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

// TimestampFormatString is a wrapper around EventFormatString for the
// common special case where the format expression should only have access to
// shared static fields (typically agent / version) and the event timestamp.
type TimestampFormatString struct {
eventFormatString *EventFormatString
fields common.MapStr
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


// NewTimestampFormatString creates from the given event format string a
// TimestampFormatString that includes only the given static fields and
// a timestamp.
func NewTimestampFormatString(
eventFormatString *EventFormatString, staticFields common.MapStr,
) (*TimestampFormatString, error) {
return &TimestampFormatString{
eventFormatString: eventFormatString,
fields: staticFields.Clone(),
}, nil
}

// FieldsForBeat returns a common.MapStr with the given beat name and
// version assigned to their standard field names.
func FieldsForBeat(beat string, version string) common.MapStr {
return common.MapStr{
// beat object was left in for backward compatibility reason for older configs.
"beat": common.MapStr{
"name": beat,
"version": version,
},
"agent": common.MapStr{
"name": beat,
"version": version,
},
// For the Beats that have an observer role
"observer": common.MapStr{
"name": beat,
"version": version,
},
}
}

// Run executes the format string returning a new expanded string or an error
// if execution or event field expansion fails.
func (fs *TimestampFormatString) Run(timestamp time.Time) (string, error) {
placeholderEvent := &beat.Event{
Fields: fs.fields,
Timestamp: timestamp,
}
return fs.eventFormatString.Run(placeholderEvent)
}

func (fs *TimestampFormatString) String() string {
return fs.eventFormatString.expression
}
Loading