Skip to content

Commit

Permalink
Data streams improvements (#4449)
Browse files Browse the repository at this point in the history
* idxmgmt: create independent data streams Supporter

Create a completely separate Supporter when data
streams are enabled. If data streams are enabled,
ensure that no setup (template, ILM, pipeline)
related config is specified.

* beater: enabling data streams disables pipeline

If data streams are enabled, pipeline registration
is disabled and attempting to setup pipelines will
fail. No pipeline will be specified when indexing;
the pipeline must be specified in the index template.

Also, if the server is running managed (by Fleet),
the server will fail to start unless data streams
are enabled.

* systemtest: add tests for data streams

* idxmgmt: log warnings upon finding legacy config

If legacy config is defined along with data streams
config, log warnings rather than returning errors.
axw authored Dec 1, 2020
1 parent b0e6d02 commit 49d55cc
Showing 17 changed files with 579 additions and 62 deletions.
4 changes: 4 additions & 0 deletions approvaltest/approvals.go
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"testing"

"github.com/google/go-cmp/cmp"
@@ -128,6 +129,9 @@ func removeReceived(name string) {
}

func writeReceived(name string, received interface{}) {
if err := os.MkdirAll(filepath.Dir(name), 0755); err != nil {
panic(err)
}
f, err := os.Create(name + ReceivedSuffix)
if err != nil {
panic(err)
49 changes: 35 additions & 14 deletions beater/beater.go
Original file line number Diff line number Diff line change
@@ -83,20 +83,14 @@ func NewCreator(args CreatorParams) beat.Creator {
return nil, err
}

// setup pipelines if explicitly directed to or setup --pipelines and config is not set at all,
// and apm-server is not running supervised by Elastic Agent
shouldSetupPipelines := bt.config.Register.Ingest.Pipeline.IsEnabled() ||
(b.InSetupCmd && bt.config.Register.Ingest.Pipeline.Enabled == nil)
runningUnderElasticAgent := b.Manager != nil && b.Manager.Enabled()

if esOutputCfg != nil && shouldSetupPipelines && !runningUnderElasticAgent {
bt.logger.Info("Registering pipeline callback")
err := bt.registerPipelineCallback(b)
if err != nil {
return nil, err
if !bt.config.DataStreams.Enabled {
if b.Manager != nil && b.Manager.Enabled() {
return nil, errors.New("data streams must be enabled when the server is managed")
}
} else {
bt.logger.Info("No pipeline callback registered")
}

if err := bt.registerPipelineCallback(b); err != nil {
return nil, err
}

return bt, nil
@@ -248,13 +242,40 @@ func checkConfig(logger *logp.Logger) error {

// elasticsearchOutputConfig returns nil if the output is not elasticsearch
func elasticsearchOutputConfig(b *beat.Beat) *common.Config {
if b.Config != nil && b.Config.Output.Name() == "elasticsearch" {
if hasElasticsearchOutput(b) {
return b.Config.Output.Config()
}
return nil
}

func hasElasticsearchOutput(b *beat.Beat) bool {
return b.Config != nil && b.Config.Output.Name() == "elasticsearch"
}

// registerPipelineCallback registers an Elasticsearch connection callback
// that ensures the configured pipeline is installed, if configured to do
// so. If data streams are enabled, then pipeline registration is always
// disabled and `setup --pipelines` will return an error.
func (bt *beater) registerPipelineCallback(b *beat.Beat) error {
if !hasElasticsearchOutput(b) {
bt.logger.Info("Output is not Elasticsearch: pipeline registration disabled")
return nil
}

if bt.config.DataStreams.Enabled {
bt.logger.Info("Data streams enabled: pipeline registration disabled")
b.OverwritePipelinesCallback = func(esConfig *common.Config) error {
return errors.New("index pipeline setup must be performed externally when using data streams, by installing the 'apm' integration package")
}
return nil
}

if !bt.config.Register.Ingest.Pipeline.IsEnabled() {
bt.logger.Info("Pipeline registration disabled")
return nil
}

bt.logger.Info("Registering pipeline callback")
overwrite := bt.config.Register.Ingest.Pipeline.ShouldOverwrite()
path := bt.config.Register.Ingest.Pipeline.Path

8 changes: 4 additions & 4 deletions beater/config/config.go
Original file line number Diff line number Diff line change
@@ -118,10 +118,6 @@ func NewConfig(ucfg *common.Config, outputESCfg *common.Config) (*Config, error)
return nil, errors.New(msgInvalidConfigAgentCfg)
}

if outputESCfg != nil && (outputESCfg.HasField("pipeline") || outputESCfg.HasField("pipelines")) {
c.Pipeline = ""
}

if err := c.RumConfig.setup(logger, outputESCfg); err != nil {
return nil, err
}
@@ -156,6 +152,10 @@ func NewConfig(ucfg *common.Config, outputESCfg *common.Config) (*Config, error)
"which will lead to incorrect metrics being reported in the APM UI",
)
}

if c.DataStreams.Enabled || (outputESCfg != nil && (outputESCfg.HasField("pipeline") || outputESCfg.HasField("pipelines"))) {
c.Pipeline = ""
}
return c, nil
}

33 changes: 33 additions & 0 deletions beater/config/data_streams_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package config

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common"
)

func TestDataStreamsPipeline(t *testing.T) {
cfg, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{"data_streams.enabled": true}), nil)
require.NoError(t, err)
assert.Equal(t, "", cfg.Pipeline) // enabling data streams disables use of the pipeline
}
71 changes: 71 additions & 0 deletions idxmgmt/datastreams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package idxmgmt

import (
"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/outil"

"github.com/elastic/apm-server/datastreams"
)

type dataStreamsSupporter struct{}

// BuildSelector returns an outputs.IndexSelector which routes events through
// to data streams based on well-defined data_stream.* fields in events.
func (dataStreamsSupporter) BuildSelector(*common.Config) (outputs.IndexSelector, error) {
fmtstr, err := fmtstr.CompileEvent(datastreams.IndexFormat)
if err != nil {
return nil, err
}
expr, err := outil.FmtSelectorExpr(fmtstr, "", outil.SelectorLowerCase)
if err != nil {
return nil, err
}
return outil.MakeSelector(expr), nil
}

// Enabled always returns false, indicating that this idxmgmt.Supporter does
// not setting up templates or ILM policies.
func (dataStreamsSupporter) Enabled() bool {
return false
}

// Manager returns a no-op idxmgmt.Manager.
func (dataStreamsSupporter) Manager(client idxmgmt.ClientHandler, assets idxmgmt.Asseter) idxmgmt.Manager {
return dataStreamsManager{}
}

type dataStreamsManager struct{}

// VerifySetup always returns true and an empty string, to avoid logging
// duplicate warnings.
func (dataStreamsManager) VerifySetup(template, ilm idxmgmt.LoadMode) (bool, string) {
// Just return true to avoid logging warnings. We'll error out in Setup.
return true, ""
}

// Setup will always return an error, in response to manual setup (i.e. `apm-server setup`).
func (dataStreamsManager) Setup(template, ilm idxmgmt.LoadMode) error {
return errors.New("index setup must be performed externally when using data streams, by installing the 'apm' integration package")
}
28 changes: 2 additions & 26 deletions idxmgmt/supporter.go
Original file line number Diff line number Diff line change
@@ -26,15 +26,13 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
libidxmgmt "github.com/elastic/beats/v7/libbeat/idxmgmt"
libilm "github.com/elastic/beats/v7/libbeat/idxmgmt/ilm"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/outil"
"github.com/elastic/beats/v7/libbeat/template"

"github.com/elastic/apm-server/datastreams"
"github.com/elastic/apm-server/idxmgmt/ilm"
"github.com/elastic/apm-server/idxmgmt/unmanaged"
)
@@ -51,7 +49,6 @@ const esKey = "elasticsearch"
type supporter struct {
log *logp.Logger
info beat.Info
dataStreams bool
templateConfig template.TemplateConfig
ilmConfig ilm.Config
unmanagedIdxConfig unmanaged.Config
@@ -66,20 +63,6 @@ type indexState struct {
isSet atomic.Bool
}

// newDataStreamSelector returns an outil.Selector which routes events to
// a data stream based on well-defined data_stream.* fields in events.
func newDataStreamSelector() (outputs.IndexSelector, error) {
fmtstr, err := fmtstr.CompileEvent(datastreams.IndexFormat)
if err != nil {
return nil, err
}
expr, err := outil.FmtSelectorExpr(fmtstr, "", outil.SelectorLowerCase)
if err != nil {
return nil, err
}
return outil.MakeSelector(expr), nil
}

type unmanagedIndexSelector outil.Selector

type ilmIndexSelector struct {
@@ -99,10 +82,8 @@ func newSupporter(log *logp.Logger, info beat.Info, cfg *IndexManagementConfig)
if cfg.Output.Name() != esKey || cfg.ILM.Mode == libilm.ModeDisabled {
disableILM = true
} else if cfg.ILM.Mode == libilm.ModeAuto {
// ILM is set to "auto": disable if we're using data streams,
// or if we're not using data streams but we're using customised,
// unmanaged indices.
if cfg.DataStreams || cfg.unmanagedIdxCfg.Customized() {
// ILM is set to "auto": disable if we're using customised, unmanaged indices.
if cfg.unmanagedIdxCfg.Customized() {
disableILM = true
}
}
@@ -119,7 +100,6 @@ func newSupporter(log *logp.Logger, info beat.Info, cfg *IndexManagementConfig)
return &supporter{
log: log,
info: info,
dataStreams: cfg.DataStreams,
templateConfig: cfg.Template,
ilmConfig: cfg.ILM,
unmanagedIdxConfig: cfg.unmanagedIdxCfg,
@@ -154,10 +134,6 @@ func (s *supporter) Manager(
// depending on the supporter's config an ILM instance or an unmanaged index selector instance is returned.
// The ILM instance decides on every Select call whether or not to return ILM indices or regular ones.
func (s *supporter) BuildSelector(_ *common.Config) (outputs.IndexSelector, error) {
if s.dataStreams {
return newDataStreamSelector()
}

sel, err := s.buildSelector(s.unmanagedIdxConfig.SelectorConfig())
if err != nil {
return nil, err
49 changes: 40 additions & 9 deletions idxmgmt/supporter_factory.go
Original file line number Diff line number Diff line change
@@ -37,7 +37,10 @@ type IndexManagementConfig struct {
ILM ilm.Config
Output common.ConfigNamespace

unmanagedIdxCfg unmanaged.Config
unmanagedIdxCfg unmanaged.Config
registerIngestPipelineSpecified bool
setupTemplateSpecified bool
ilmSpecified bool
}

// MakeDefaultSupporter creates a new idxmgmt.Supporter, using the given root config.
@@ -46,9 +49,10 @@ type IndexManagementConfig struct {
// managed, and legacy unmanaged. The legacy modes exist purely to run
// apm-server without data streams or Fleet integration.
//
// If (Fleet) management is enabled, then no index, template, or ILM config
// should be set. Index (data stream) names will be well defined, based on
// the data type, service name, and user-defined namespace.
// If (Fleet) management is enabled, then any index, template, and ILM config
// defined will be ignored and warnings logged. Index (data stream) names will
// be well defined, based on the data type, service name, and user-defined
// namespace.
//
// If management is disabled, then the Supporter will operate in one of the
// legacy modes based on configuration.
@@ -58,6 +62,10 @@ func MakeDefaultSupporter(log *logp.Logger, info beat.Info, configRoot *common.C
return nil, err
}
log = namedLogger(log)
cfg.logWarnings(log)
if cfg.DataStreams {
return dataStreamsSupporter{}, nil
}
return newSupporter(log, info, cfg)
}

@@ -104,11 +112,15 @@ func NewIndexManagementConfig(info beat.Info, configRoot *common.Config) (*Index
}

return &IndexManagementConfig{
Output: cfg.Output,
Template: templateConfig,
ILM: ilmConfig,

unmanagedIdxCfg: unmanagedIdxCfg,
DataStreams: cfg.DataStreams.Enabled(),
Output: cfg.Output,
Template: templateConfig,
ILM: ilmConfig,

unmanagedIdxCfg: unmanagedIdxCfg,
registerIngestPipelineSpecified: cfg.RegisterIngestPipeline != nil,
setupTemplateSpecified: cfg.Template != nil,
ilmSpecified: cfg.ILM != nil,
}, nil
}

@@ -122,6 +134,25 @@ func checkTemplateESSettings(tmplCfg template.TemplateConfig, indexCfg *unmanage
return nil
}

func (cfg *IndexManagementConfig) logWarnings(log *logp.Logger) {
if !cfg.DataStreams {
return
}
const format = "`%s` specified, but will be ignored as data streams are enabled"
if cfg.setupTemplateSpecified {
log.Warnf(format, "setup.template")
}
if cfg.ilmSpecified {
log.Warnf(format, "apm-server.ilm")
}
if cfg.registerIngestPipelineSpecified {
log.Warnf(format, "apm-server.register.ingest.pipeline")
}
if cfg.unmanagedIdxCfg.Customized() {
log.Warnf(format, "output.elasticsearch.{index,indices}")
}
}

// unpackTemplateConfig merges APM-specific template settings with (possibly nil)
// user-defined config, unpacks it over template.DefaultConfig(), returning the result.
func unpackTemplateConfig(userTemplateConfig *common.Config) (template.TemplateConfig, error) {
Loading

0 comments on commit 49d55cc

Please sign in to comment.