Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data streams improvements #4449

Merged
merged 13 commits into from
Dec 1, 2020
4 changes: 4 additions & 0 deletions approvaltest/approvals.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"testing"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -128,6 +129,9 @@ func removeReceived(name string) {
}

func writeReceived(name string, received interface{}) {
if err := os.MkdirAll(filepath.Dir(name), 0755); err != nil {
panic(err)
}
f, err := os.Create(name + ReceivedSuffix)
if err != nil {
panic(err)
Expand Down
49 changes: 35 additions & 14 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,14 @@ func NewCreator(args CreatorParams) beat.Creator {
return nil, err
}

// setup pipelines if explicitly directed to or setup --pipelines and config is not set at all,
// and apm-server is not running supervised by Elastic Agent
shouldSetupPipelines := bt.config.Register.Ingest.Pipeline.IsEnabled() ||
(b.InSetupCmd && bt.config.Register.Ingest.Pipeline.Enabled == nil)
runningUnderElasticAgent := b.Manager != nil && b.Manager.Enabled()

if esOutputCfg != nil && shouldSetupPipelines && !runningUnderElasticAgent {
bt.logger.Info("Registering pipeline callback")
err := bt.registerPipelineCallback(b)
if err != nil {
return nil, err
if !bt.config.DataStreams.Enabled {
if b.Manager != nil && b.Manager.Enabled() {
return nil, errors.New("data streams must be enabled when the server is managed")
}
} else {
bt.logger.Info("No pipeline callback registered")
}

if err := bt.registerPipelineCallback(b); err != nil {
return nil, err
}

return bt, nil
Expand Down Expand Up @@ -248,13 +242,40 @@ func checkConfig(logger *logp.Logger) error {

// elasticsearchOutputConfig returns nil if the output is not elasticsearch
func elasticsearchOutputConfig(b *beat.Beat) *common.Config {
if b.Config != nil && b.Config.Output.Name() == "elasticsearch" {
if hasElasticsearchOutput(b) {
return b.Config.Output.Config()
}
return nil
}

func hasElasticsearchOutput(b *beat.Beat) bool {
return b.Config != nil && b.Config.Output.Name() == "elasticsearch"
}

// registerPipelineCallback registers an Elasticsearch connection callback
// that ensures the configured pipeline is installed, if configured to do
// so. If data streams are enabled, then pipeline registration is always
// disabled and `setup --pipelines` will return an error.
func (bt *beater) registerPipelineCallback(b *beat.Beat) error {
if !hasElasticsearchOutput(b) {
bt.logger.Info("Output is not Elasticsearch: pipeline registration disabled")
return nil
}

if bt.config.DataStreams.Enabled {
bt.logger.Info("Data streams enabled: pipeline registration disabled")
b.OverwritePipelinesCallback = func(esConfig *common.Config) error {
return errors.New("index pipeline setup must be performed externally when using data streams, by installing the 'apm' integration package")
}
return nil
}

if !bt.config.Register.Ingest.Pipeline.IsEnabled() {
bt.logger.Info("Pipeline registration disabled")
return nil
}

bt.logger.Info("Registering pipeline callback")
overwrite := bt.config.Register.Ingest.Pipeline.ShouldOverwrite()
path := bt.config.Register.Ingest.Pipeline.Path

Expand Down
8 changes: 4 additions & 4 deletions beater/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,6 @@ func NewConfig(ucfg *common.Config, outputESCfg *common.Config) (*Config, error)
return nil, errors.New(msgInvalidConfigAgentCfg)
}

if outputESCfg != nil && (outputESCfg.HasField("pipeline") || outputESCfg.HasField("pipelines")) {
c.Pipeline = ""
}

if err := c.RumConfig.setup(logger, outputESCfg); err != nil {
return nil, err
}
Expand Down Expand Up @@ -156,6 +152,10 @@ func NewConfig(ucfg *common.Config, outputESCfg *common.Config) (*Config, error)
"which will lead to incorrect metrics being reported in the APM UI",
)
}

if c.DataStreams.Enabled || (outputESCfg != nil && (outputESCfg.HasField("pipeline") || outputESCfg.HasField("pipelines"))) {
c.Pipeline = ""
}
return c, nil
}

Expand Down
33 changes: 33 additions & 0 deletions beater/config/data_streams_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package config

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common"
)

func TestDataStreamsPipeline(t *testing.T) {
cfg, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{"data_streams.enabled": true}), nil)
require.NoError(t, err)
assert.Equal(t, "", cfg.Pipeline) // enabling data streams disables use of the pipeline
}
71 changes: 71 additions & 0 deletions idxmgmt/datastreams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package idxmgmt

import (
"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/outil"

"github.com/elastic/apm-server/datastreams"
)

type dataStreamsSupporter struct{}

// BuildSelector returns an outputs.IndexSelector which routes events through
// to data streams based on well-defined data_stream.* fields in events.
func (dataStreamsSupporter) BuildSelector(*common.Config) (outputs.IndexSelector, error) {
fmtstr, err := fmtstr.CompileEvent(datastreams.IndexFormat)
if err != nil {
return nil, err
}
expr, err := outil.FmtSelectorExpr(fmtstr, "", outil.SelectorLowerCase)
if err != nil {
return nil, err
}
return outil.MakeSelector(expr), nil
}

// Enabled always returns false, indicating that this idxmgmt.Supporter does
// not setting up templates or ILM policies.
func (dataStreamsSupporter) Enabled() bool {
return false
}

// Manager returns a no-op idxmgmt.Manager.
func (dataStreamsSupporter) Manager(client idxmgmt.ClientHandler, assets idxmgmt.Asseter) idxmgmt.Manager {
return dataStreamsManager{}
}

type dataStreamsManager struct{}

// VerifySetup always returns true and an empty string, to avoid logging
// duplicate warnings.
func (dataStreamsManager) VerifySetup(template, ilm idxmgmt.LoadMode) (bool, string) {
// Just return true to avoid logging warnings. We'll error out in Setup.
return true, ""
}

// Setup will always return an error, in response to manual setup (i.e. `apm-server setup`).
func (dataStreamsManager) Setup(template, ilm idxmgmt.LoadMode) error {
return errors.New("index setup must be performed externally when using data streams, by installing the 'apm' integration package")
}
28 changes: 2 additions & 26 deletions idxmgmt/supporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
libidxmgmt "github.com/elastic/beats/v7/libbeat/idxmgmt"
libilm "github.com/elastic/beats/v7/libbeat/idxmgmt/ilm"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/outil"
"github.com/elastic/beats/v7/libbeat/template"

"github.com/elastic/apm-server/datastreams"
"github.com/elastic/apm-server/idxmgmt/ilm"
"github.com/elastic/apm-server/idxmgmt/unmanaged"
)
Expand All @@ -51,7 +49,6 @@ const esKey = "elasticsearch"
type supporter struct {
log *logp.Logger
info beat.Info
dataStreams bool
templateConfig template.TemplateConfig
ilmConfig ilm.Config
unmanagedIdxConfig unmanaged.Config
Expand All @@ -66,20 +63,6 @@ type indexState struct {
isSet atomic.Bool
}

// newDataStreamSelector returns an outil.Selector which routes events to
// a data stream based on well-defined data_stream.* fields in events.
func newDataStreamSelector() (outputs.IndexSelector, error) {
fmtstr, err := fmtstr.CompileEvent(datastreams.IndexFormat)
if err != nil {
return nil, err
}
expr, err := outil.FmtSelectorExpr(fmtstr, "", outil.SelectorLowerCase)
if err != nil {
return nil, err
}
return outil.MakeSelector(expr), nil
}

type unmanagedIndexSelector outil.Selector

type ilmIndexSelector struct {
Expand All @@ -99,10 +82,8 @@ func newSupporter(log *logp.Logger, info beat.Info, cfg *IndexManagementConfig)
if cfg.Output.Name() != esKey || cfg.ILM.Mode == libilm.ModeDisabled {
disableILM = true
} else if cfg.ILM.Mode == libilm.ModeAuto {
// ILM is set to "auto": disable if we're using data streams,
// or if we're not using data streams but we're using customised,
// unmanaged indices.
if cfg.DataStreams || cfg.unmanagedIdxCfg.Customized() {
// ILM is set to "auto": disable if we're using customised, unmanaged indices.
if cfg.unmanagedIdxCfg.Customized() {
disableILM = true
}
}
Expand All @@ -119,7 +100,6 @@ func newSupporter(log *logp.Logger, info beat.Info, cfg *IndexManagementConfig)
return &supporter{
log: log,
info: info,
dataStreams: cfg.DataStreams,
templateConfig: cfg.Template,
ilmConfig: cfg.ILM,
unmanagedIdxConfig: cfg.unmanagedIdxCfg,
Expand Down Expand Up @@ -154,10 +134,6 @@ func (s *supporter) Manager(
// depending on the supporter's config an ILM instance or an unmanaged index selector instance is returned.
// The ILM instance decides on every Select call whether or not to return ILM indices or regular ones.
func (s *supporter) BuildSelector(_ *common.Config) (outputs.IndexSelector, error) {
if s.dataStreams {
return newDataStreamSelector()
}

sel, err := s.buildSelector(s.unmanagedIdxConfig.SelectorConfig())
if err != nil {
return nil, err
Expand Down
49 changes: 40 additions & 9 deletions idxmgmt/supporter_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ type IndexManagementConfig struct {
ILM ilm.Config
Output common.ConfigNamespace

unmanagedIdxCfg unmanaged.Config
unmanagedIdxCfg unmanaged.Config
registerIngestPipelineSpecified bool
setupTemplateSpecified bool
ilmSpecified bool
}

// MakeDefaultSupporter creates a new idxmgmt.Supporter, using the given root config.
Expand All @@ -46,9 +49,10 @@ type IndexManagementConfig struct {
// managed, and legacy unmanaged. The legacy modes exist purely to run
// apm-server without data streams or Fleet integration.
//
// If (Fleet) management is enabled, then no index, template, or ILM config
// should be set. Index (data stream) names will be well defined, based on
// the data type, service name, and user-defined namespace.
// If (Fleet) management is enabled, then any index, template, and ILM config
// defined will be ignored and warnings logged. Index (data stream) names will
// be well defined, based on the data type, service name, and user-defined
// namespace.
//
// If management is disabled, then the Supporter will operate in one of the
// legacy modes based on configuration.
Expand All @@ -58,6 +62,10 @@ func MakeDefaultSupporter(log *logp.Logger, info beat.Info, configRoot *common.C
return nil, err
}
log = namedLogger(log)
cfg.logWarnings(log)
if cfg.DataStreams {
return dataStreamsSupporter{}, nil
}
return newSupporter(log, info, cfg)
}

Expand Down Expand Up @@ -104,11 +112,15 @@ func NewIndexManagementConfig(info beat.Info, configRoot *common.Config) (*Index
}

return &IndexManagementConfig{
Output: cfg.Output,
Template: templateConfig,
ILM: ilmConfig,

unmanagedIdxCfg: unmanagedIdxCfg,
DataStreams: cfg.DataStreams.Enabled(),
Output: cfg.Output,
Template: templateConfig,
ILM: ilmConfig,

unmanagedIdxCfg: unmanagedIdxCfg,
registerIngestPipelineSpecified: cfg.RegisterIngestPipeline != nil,
setupTemplateSpecified: cfg.Template != nil,
ilmSpecified: cfg.ILM != nil,
}, nil
}

Expand All @@ -122,6 +134,25 @@ func checkTemplateESSettings(tmplCfg template.TemplateConfig, indexCfg *unmanage
return nil
}

func (cfg *IndexManagementConfig) logWarnings(log *logp.Logger) {
if !cfg.DataStreams {
return
}
const format = "`%s` specified, but will be ignored as data streams are enabled"
if cfg.setupTemplateSpecified {
log.Warnf(format, "setup.template")
}
if cfg.ilmSpecified {
log.Warnf(format, "apm-server.ilm")
}
if cfg.registerIngestPipelineSpecified {
log.Warnf(format, "apm-server.register.ingest.pipeline")
}
if cfg.unmanagedIdxCfg.Customized() {
log.Warnf(format, "output.elasticsearch.{index,indices}")
}
}

// unpackTemplateConfig merges APM-specific template settings with (possibly nil)
// user-defined config, unpacks it over template.DefaultConfig(), returning the result.
func unpackTemplateConfig(userTemplateConfig *common.Config) (template.TemplateConfig, error) {
Expand Down
Loading