diff --git a/approvaltest/approvals.go b/approvaltest/approvals.go index 2333c58085a..485a87bd73b 100644 --- a/approvaltest/approvals.go +++ b/approvaltest/approvals.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "os" + "path/filepath" "testing" "github.com/google/go-cmp/cmp" @@ -128,6 +129,9 @@ func removeReceived(name string) { } func writeReceived(name string, received interface{}) { + if err := os.MkdirAll(filepath.Dir(name), 0755); err != nil { + panic(err) + } f, err := os.Create(name + ReceivedSuffix) if err != nil { panic(err) diff --git a/beater/beater.go b/beater/beater.go index 3bb2fd6d4eb..fd413382396 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -83,20 +83,14 @@ func NewCreator(args CreatorParams) beat.Creator { return nil, err } - // setup pipelines if explicitly directed to or setup --pipelines and config is not set at all, - // and apm-server is not running supervised by Elastic Agent - shouldSetupPipelines := bt.config.Register.Ingest.Pipeline.IsEnabled() || - (b.InSetupCmd && bt.config.Register.Ingest.Pipeline.Enabled == nil) - runningUnderElasticAgent := b.Manager != nil && b.Manager.Enabled() - - if esOutputCfg != nil && shouldSetupPipelines && !runningUnderElasticAgent { - bt.logger.Info("Registering pipeline callback") - err := bt.registerPipelineCallback(b) - if err != nil { - return nil, err + if !bt.config.DataStreams.Enabled { + if b.Manager != nil && b.Manager.Enabled() { + return nil, errors.New("data streams must be enabled when the server is managed") } - } else { - bt.logger.Info("No pipeline callback registered") + } + + if err := bt.registerPipelineCallback(b); err != nil { + return nil, err } return bt, nil @@ -248,13 +242,40 @@ func checkConfig(logger *logp.Logger) error { // elasticsearchOutputConfig returns nil if the output is not elasticsearch func elasticsearchOutputConfig(b *beat.Beat) *common.Config { - if b.Config != nil && b.Config.Output.Name() == "elasticsearch" { + if hasElasticsearchOutput(b) { return b.Config.Output.Config() } return nil } +func hasElasticsearchOutput(b *beat.Beat) bool { + return b.Config != nil && b.Config.Output.Name() == "elasticsearch" +} + +// registerPipelineCallback registers an Elasticsearch connection callback +// that ensures the configured pipeline is installed, if configured to do +// so. If data streams are enabled, then pipeline registration is always +// disabled and `setup --pipelines` will return an error. func (bt *beater) registerPipelineCallback(b *beat.Beat) error { + if !hasElasticsearchOutput(b) { + bt.logger.Info("Output is not Elasticsearch: pipeline registration disabled") + return nil + } + + if bt.config.DataStreams.Enabled { + bt.logger.Info("Data streams enabled: pipeline registration disabled") + b.OverwritePipelinesCallback = func(esConfig *common.Config) error { + return errors.New("index pipeline setup must be performed externally when using data streams, by installing the 'apm' integration package") + } + return nil + } + + if !bt.config.Register.Ingest.Pipeline.IsEnabled() { + bt.logger.Info("Pipeline registration disabled") + return nil + } + + bt.logger.Info("Registering pipeline callback") overwrite := bt.config.Register.Ingest.Pipeline.ShouldOverwrite() path := bt.config.Register.Ingest.Pipeline.Path diff --git a/beater/config/config.go b/beater/config/config.go index 0a086bf4c69..b1b82070ea5 100644 --- a/beater/config/config.go +++ b/beater/config/config.go @@ -118,10 +118,6 @@ func NewConfig(ucfg *common.Config, outputESCfg *common.Config) (*Config, error) return nil, errors.New(msgInvalidConfigAgentCfg) } - if outputESCfg != nil && (outputESCfg.HasField("pipeline") || outputESCfg.HasField("pipelines")) { - c.Pipeline = "" - } - if err := c.RumConfig.setup(logger, outputESCfg); err != nil { return nil, err } @@ -156,6 +152,10 @@ func NewConfig(ucfg *common.Config, outputESCfg *common.Config) (*Config, error) "which will lead to incorrect metrics being reported in the APM UI", ) } + + if c.DataStreams.Enabled || (outputESCfg != nil && (outputESCfg.HasField("pipeline") || outputESCfg.HasField("pipelines"))) { + c.Pipeline = "" + } return c, nil } diff --git a/beater/config/data_streams_test.go b/beater/config/data_streams_test.go new file mode 100644 index 00000000000..ec2cca967ae --- /dev/null +++ b/beater/config/data_streams_test.go @@ -0,0 +1,33 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package config + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestDataStreamsPipeline(t *testing.T) { + cfg, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{"data_streams.enabled": true}), nil) + require.NoError(t, err) + assert.Equal(t, "", cfg.Pipeline) // enabling data streams disables use of the pipeline +} diff --git a/idxmgmt/datastreams.go b/idxmgmt/datastreams.go new file mode 100644 index 00000000000..15210ccb21a --- /dev/null +++ b/idxmgmt/datastreams.go @@ -0,0 +1,71 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package idxmgmt + +import ( + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/fmtstr" + "github.com/elastic/beats/v7/libbeat/idxmgmt" + "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/outputs/outil" + + "github.com/elastic/apm-server/datastreams" +) + +type dataStreamsSupporter struct{} + +// BuildSelector returns an outputs.IndexSelector which routes events through +// to data streams based on well-defined data_stream.* fields in events. +func (dataStreamsSupporter) BuildSelector(*common.Config) (outputs.IndexSelector, error) { + fmtstr, err := fmtstr.CompileEvent(datastreams.IndexFormat) + if err != nil { + return nil, err + } + expr, err := outil.FmtSelectorExpr(fmtstr, "", outil.SelectorLowerCase) + if err != nil { + return nil, err + } + return outil.MakeSelector(expr), nil +} + +// Enabled always returns false, indicating that this idxmgmt.Supporter does +// not setting up templates or ILM policies. +func (dataStreamsSupporter) Enabled() bool { + return false +} + +// Manager returns a no-op idxmgmt.Manager. +func (dataStreamsSupporter) Manager(client idxmgmt.ClientHandler, assets idxmgmt.Asseter) idxmgmt.Manager { + return dataStreamsManager{} +} + +type dataStreamsManager struct{} + +// VerifySetup always returns true and an empty string, to avoid logging +// duplicate warnings. +func (dataStreamsManager) VerifySetup(template, ilm idxmgmt.LoadMode) (bool, string) { + // Just return true to avoid logging warnings. We'll error out in Setup. + return true, "" +} + +// Setup will always return an error, in response to manual setup (i.e. `apm-server setup`). +func (dataStreamsManager) Setup(template, ilm idxmgmt.LoadMode) error { + return errors.New("index setup must be performed externally when using data streams, by installing the 'apm' integration package") +} diff --git a/idxmgmt/supporter.go b/idxmgmt/supporter.go index eeffdcee412..9a46b742d0a 100644 --- a/idxmgmt/supporter.go +++ b/idxmgmt/supporter.go @@ -26,7 +26,6 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/fmtstr" libidxmgmt "github.com/elastic/beats/v7/libbeat/idxmgmt" libilm "github.com/elastic/beats/v7/libbeat/idxmgmt/ilm" "github.com/elastic/beats/v7/libbeat/logp" @@ -34,7 +33,6 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs/outil" "github.com/elastic/beats/v7/libbeat/template" - "github.com/elastic/apm-server/datastreams" "github.com/elastic/apm-server/idxmgmt/ilm" "github.com/elastic/apm-server/idxmgmt/unmanaged" ) @@ -51,7 +49,6 @@ const esKey = "elasticsearch" type supporter struct { log *logp.Logger info beat.Info - dataStreams bool templateConfig template.TemplateConfig ilmConfig ilm.Config unmanagedIdxConfig unmanaged.Config @@ -66,20 +63,6 @@ type indexState struct { isSet atomic.Bool } -// newDataStreamSelector returns an outil.Selector which routes events to -// a data stream based on well-defined data_stream.* fields in events. -func newDataStreamSelector() (outputs.IndexSelector, error) { - fmtstr, err := fmtstr.CompileEvent(datastreams.IndexFormat) - if err != nil { - return nil, err - } - expr, err := outil.FmtSelectorExpr(fmtstr, "", outil.SelectorLowerCase) - if err != nil { - return nil, err - } - return outil.MakeSelector(expr), nil -} - type unmanagedIndexSelector outil.Selector type ilmIndexSelector struct { @@ -99,10 +82,8 @@ func newSupporter(log *logp.Logger, info beat.Info, cfg *IndexManagementConfig) if cfg.Output.Name() != esKey || cfg.ILM.Mode == libilm.ModeDisabled { disableILM = true } else if cfg.ILM.Mode == libilm.ModeAuto { - // ILM is set to "auto": disable if we're using data streams, - // or if we're not using data streams but we're using customised, - // unmanaged indices. - if cfg.DataStreams || cfg.unmanagedIdxCfg.Customized() { + // ILM is set to "auto": disable if we're using customised, unmanaged indices. + if cfg.unmanagedIdxCfg.Customized() { disableILM = true } } @@ -119,7 +100,6 @@ func newSupporter(log *logp.Logger, info beat.Info, cfg *IndexManagementConfig) return &supporter{ log: log, info: info, - dataStreams: cfg.DataStreams, templateConfig: cfg.Template, ilmConfig: cfg.ILM, unmanagedIdxConfig: cfg.unmanagedIdxCfg, @@ -154,10 +134,6 @@ func (s *supporter) Manager( // depending on the supporter's config an ILM instance or an unmanaged index selector instance is returned. // The ILM instance decides on every Select call whether or not to return ILM indices or regular ones. func (s *supporter) BuildSelector(_ *common.Config) (outputs.IndexSelector, error) { - if s.dataStreams { - return newDataStreamSelector() - } - sel, err := s.buildSelector(s.unmanagedIdxConfig.SelectorConfig()) if err != nil { return nil, err diff --git a/idxmgmt/supporter_factory.go b/idxmgmt/supporter_factory.go index 0c826286b3e..560f67313e8 100644 --- a/idxmgmt/supporter_factory.go +++ b/idxmgmt/supporter_factory.go @@ -37,7 +37,10 @@ type IndexManagementConfig struct { ILM ilm.Config Output common.ConfigNamespace - unmanagedIdxCfg unmanaged.Config + unmanagedIdxCfg unmanaged.Config + registerIngestPipelineSpecified bool + setupTemplateSpecified bool + ilmSpecified bool } // MakeDefaultSupporter creates a new idxmgmt.Supporter, using the given root config. @@ -46,9 +49,10 @@ type IndexManagementConfig struct { // managed, and legacy unmanaged. The legacy modes exist purely to run // apm-server without data streams or Fleet integration. // -// If (Fleet) management is enabled, then no index, template, or ILM config -// should be set. Index (data stream) names will be well defined, based on -// the data type, service name, and user-defined namespace. +// If (Fleet) management is enabled, then any index, template, and ILM config +// defined will be ignored and warnings logged. Index (data stream) names will +// be well defined, based on the data type, service name, and user-defined +// namespace. // // If management is disabled, then the Supporter will operate in one of the // legacy modes based on configuration. @@ -58,6 +62,10 @@ func MakeDefaultSupporter(log *logp.Logger, info beat.Info, configRoot *common.C return nil, err } log = namedLogger(log) + cfg.logWarnings(log) + if cfg.DataStreams { + return dataStreamsSupporter{}, nil + } return newSupporter(log, info, cfg) } @@ -104,11 +112,15 @@ func NewIndexManagementConfig(info beat.Info, configRoot *common.Config) (*Index } return &IndexManagementConfig{ - Output: cfg.Output, - Template: templateConfig, - ILM: ilmConfig, - - unmanagedIdxCfg: unmanagedIdxCfg, + DataStreams: cfg.DataStreams.Enabled(), + Output: cfg.Output, + Template: templateConfig, + ILM: ilmConfig, + + unmanagedIdxCfg: unmanagedIdxCfg, + registerIngestPipelineSpecified: cfg.RegisterIngestPipeline != nil, + setupTemplateSpecified: cfg.Template != nil, + ilmSpecified: cfg.ILM != nil, }, nil } @@ -122,6 +134,25 @@ func checkTemplateESSettings(tmplCfg template.TemplateConfig, indexCfg *unmanage return nil } +func (cfg *IndexManagementConfig) logWarnings(log *logp.Logger) { + if !cfg.DataStreams { + return + } + const format = "`%s` specified, but will be ignored as data streams are enabled" + if cfg.setupTemplateSpecified { + log.Warnf(format, "setup.template") + } + if cfg.ilmSpecified { + log.Warnf(format, "apm-server.ilm") + } + if cfg.registerIngestPipelineSpecified { + log.Warnf(format, "apm-server.register.ingest.pipeline") + } + if cfg.unmanagedIdxCfg.Customized() { + log.Warnf(format, "output.elasticsearch.{index,indices}") + } +} + // unpackTemplateConfig merges APM-specific template settings with (possibly nil) // user-defined config, unpacks it over template.DefaultConfig(), returning the result. func unpackTemplateConfig(userTemplateConfig *common.Config) (template.TemplateConfig, error) { diff --git a/idxmgmt/supporter_factory_test.go b/idxmgmt/supporter_factory_test.go index e8f0a4841ce..cab90a2b9c3 100644 --- a/idxmgmt/supporter_factory_test.go +++ b/idxmgmt/supporter_factory_test.go @@ -21,13 +21,19 @@ import ( "testing" "github.com/stretchr/testify/assert" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/idxmgmt" libilm "github.com/elastic/beats/v7/libbeat/idxmgmt/ilm" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/apm-server/datastreams" "github.com/elastic/apm-server/idxmgmt/unmanaged" ) @@ -85,3 +91,69 @@ func TestMakeDefaultSupporter(t *testing.T) { }) } + +func TestMakeDefaultSupporterDataStreams(t *testing.T) { + supporter, err := MakeDefaultSupporter(nil, beat.Info{}, common.MustNewConfigFrom(map[string]interface{}{ + "apm-server.data_streams.enabled": "true", + })) + require.NoError(t, err) + + // The data streams supporter does not set up templates or ILM. These + // are expected to be set up externally, typically by Fleet. + assert.False(t, supporter.Enabled()) + + // Manager will fail when invoked; it should never be invoked automatically + // as supporter.Enabled() returns false. It will be invoked when running the + // "setup" command. + manager := supporter.Manager(nil, nil) + assert.NotNil(t, manager) + ok, warnings := manager.VerifySetup(idxmgmt.LoadModeEnabled, idxmgmt.LoadModeEnabled) + assert.True(t, ok) + assert.Zero(t, warnings) + err = manager.Setup(idxmgmt.LoadModeEnabled, idxmgmt.LoadModeEnabled) + assert.EqualError(t, err, "index setup must be performed externally when using data streams, by installing the 'apm' integration package") + + selector, err := supporter.BuildSelector(nil) + require.NoError(t, err) + index, err := selector.Select(&beat.Event{ + Fields: common.MapStr{ + datastreams.TypeField: datastreams.TracesType, + datastreams.DatasetField: "apm.apm_server", + datastreams.NamespaceField: "production", + }, + }) + require.NoError(t, err) + assert.Equal(t, "traces-apm.apm_server-production", index) +} + +func TestMakeDefaultSupporterDataStreamsWarnings(t *testing.T) { + core, observed := observer.New(zapcore.DebugLevel) + logger := logp.NewLogger("", zap.WrapCore(func(in zapcore.Core) zapcore.Core { + return zapcore.NewTee(in, core) + })) + + attrs := map[string]interface{}{ + "apm-server.data_streams.enabled": "true", + "apm-server.ilm.enabled": "auto", + "apm-server.register.ingest.pipeline.enabled": "true", + "output.elasticsearch.indices": map[string]interface{}{}, + "setup.template.name": "custom", + "setup.template.pattern": "custom", + } + + s, err := MakeDefaultSupporter(logger, beat.Info{}, common.MustNewConfigFrom(attrs)) + assert.NoError(t, err) + assert.NotNil(t, s) + + var warnings []string + for _, record := range observed.All() { + assert.Equal(t, zapcore.WarnLevel, record.Level, record.Message) + warnings = append(warnings, record.Message) + } + assert.Equal(t, []string{ + "`setup.template` specified, but will be ignored as data streams are enabled", + "`apm-server.ilm` specified, but will be ignored as data streams are enabled", + "`apm-server.register.ingest.pipeline` specified, but will be ignored as data streams are enabled", + "`output.elasticsearch.{index,indices}` specified, but will be ignored as data streams are enabled", + }, warnings) +} diff --git a/systemtest/apmservertest/config.go b/systemtest/apmservertest/config.go index f86bfa9c0d6..42c05e34f2e 100644 --- a/systemtest/apmservertest/config.go +++ b/systemtest/apmservertest/config.go @@ -47,6 +47,7 @@ type Config struct { Aggregation *AggregationConfig `json:"apm-server.aggregation,omitempty"` Sampling *SamplingConfig `json:"apm-server.sampling,omitempty"` RUM *RUMConfig `json:"apm-server.rum,omitempty"` + DataStreams *DataStreamsConfig `json:"apm-server.data_streams,omitempty"` // Instrumentation holds configuration for libbeat and apm-server instrumentation. Instrumentation *InstrumentationConfig `json:"instrumentation,omitempty"` @@ -63,7 +64,7 @@ type Config struct { Output OutputConfig `json:"output"` // Setup holds configuration for libbeat setup. - Setup SetupConfig `json:"setup"` + Setup *SetupConfig `json:"setup,omitempty"` // Queue holds configuration for the libbeat event queue. Queue QueueConfig `json:"queue"` @@ -144,6 +145,11 @@ type RUMConfig struct { Enabled bool `json:"enabled"` } +// DataStreamsConfig holds APM Server data streams configuration. +type DataStreamsConfig struct { + Enabled bool `json:"enabled"` +} + // InstrumentationConfig holds APM Server instrumentation configuration. type InstrumentationConfig struct { Enabled bool `json:"enabled"` @@ -159,6 +165,7 @@ type ElasticsearchOutputConfig struct { Hosts []string `json:"hosts,omitempty"` Username string `json:"username,omitempty"` Password string `json:"password,omitempty"` + APIKey string `json:"api_key,omitempty"` } // SetupConfig holds APM Server libbeat setup configuration. @@ -348,7 +355,7 @@ func DefaultConfig() Config { Password: getenvDefault("ES_PASS", defaultElasticsearchPass), }, }, - Setup: SetupConfig{ + Setup: &SetupConfig{ IndexTemplate: IndexTemplateConfig{ Shards: 1, RefreshInterval: "250ms", diff --git a/systemtest/apmservertest/filter.go b/systemtest/apmservertest/filter.go index 079976cf627..e8a18e0a25f 100644 --- a/systemtest/apmservertest/filter.go +++ b/systemtest/apmservertest/filter.go @@ -106,6 +106,7 @@ func (defaultMetadataFilter) FilterEventMetadata(m *EventMetadata) { m.System.Container = nil m.System.Kubernetes = nil m.System.Hostname = "beowulf" + m.Process.Argv = nil m.Process.Pid = 1 m.Process.Ppid = nil m.Service.Agent.Version = "0.0.0" diff --git a/systemtest/approvals/TestDataStreamsEnabled/false.approved.json b/systemtest/approvals/TestDataStreamsEnabled/false.approved.json new file mode 100644 index 00000000000..5b4a28ca9c8 --- /dev/null +++ b/systemtest/approvals/TestDataStreamsEnabled/false.approved.json @@ -0,0 +1,76 @@ +{ + "events": [ + { + "@timestamp": "dynamic", + "agent": { + "name": "go", + "version": "0.0.0" + }, + "ecs": { + "version": "dynamic" + }, + "event": { + "ingested": "dynamic", + "outcome": "unknown" + }, + "host": { + "architecture": "i386", + "hostname": "beowulf", + "ip": "127.0.0.1", + "name": "beowulf", + "os": { + "platform": "minix" + } + }, + "observer": { + "ephemeral_id": "dynamic", + "hostname": "dynamic", + "id": "dynamic", + "type": "apm-server", + "version": "dynamic", + "version_major": "dynamic" + }, + "process": { + "pid": 1, + "title": "systemtest.test" + }, + "processor": { + "event": "transaction", + "name": "transaction" + }, + "service": { + "language": { + "name": "go", + "version": "2.0" + }, + "name": "systemtest", + "node": { + "name": "beowulf" + }, + "runtime": { + "name": "gc", + "version": "2.0" + } + }, + "timestamp": { + "us": "dynamic" + }, + "trace": { + "id": "dynamic" + }, + "transaction": { + "duration": { + "us": 1000000 + }, + "id": "dynamic", + "name": "name", + "sampled": true, + "span_count": { + "dropped": 0, + "started": 0 + }, + "type": "type" + } + } + ] +} diff --git a/systemtest/approvals/TestDataStreamsEnabled/true.approved.json b/systemtest/approvals/TestDataStreamsEnabled/true.approved.json new file mode 100644 index 00000000000..c9a6191a011 --- /dev/null +++ b/systemtest/approvals/TestDataStreamsEnabled/true.approved.json @@ -0,0 +1,78 @@ +{ + "events": [ + { + "@timestamp": "dynamic", + "agent": { + "name": "go", + "version": "0.0.0" + }, + "data_stream.dataset": "apm.systemtest", + "data_stream.namespace": "default", + "data_stream.type": "traces", + "ecs": { + "version": "dynamic" + }, + "event": { + "outcome": "unknown" + }, + "host": { + "architecture": "i386", + "hostname": "beowulf", + "ip": "127.0.0.1", + "name": "beowulf", + "os": { + "platform": "minix" + } + }, + "observer": { + "ephemeral_id": "dynamic", + "hostname": "dynamic", + "id": "dynamic", + "type": "apm-server", + "version": "dynamic", + "version_major": "dynamic" + }, + "process": { + "pid": 1, + "title": "systemtest.test" + }, + "processor": { + "event": "transaction", + "name": "transaction" + }, + "service": { + "language": { + "name": "go", + "version": "2.0" + }, + "name": "systemtest", + "node": { + "name": "beowulf" + }, + "runtime": { + "name": "gc", + "version": "2.0" + } + }, + "timestamp": { + "us": "dynamic" + }, + "trace": { + "id": "dynamic" + }, + "transaction": { + "duration": { + "us": 1000000 + }, + "id": "dynamic", + "name": "name", + "sampled": true, + "span_count": { + "dropped": 0, + "started": 0 + }, + "type": "type" + } + } + ] +} diff --git a/systemtest/datastreams_test.go b/systemtest/datastreams_test.go new file mode 100644 index 00000000000..5f7bfa058a2 --- /dev/null +++ b/systemtest/datastreams_test.go @@ -0,0 +1,135 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package systemtest_test + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" + + "github.com/elastic/apm-server/systemtest" + "github.com/elastic/apm-server/systemtest/apmservertest" + "github.com/elastic/apm-server/systemtest/estest" +) + +func TestDataStreamsEnabled(t *testing.T) { + for _, enabled := range []bool{false, true} { + t.Run(fmt.Sprint(enabled), func(t *testing.T) { + systemtest.CleanupElasticsearch(t) + srv := apmservertest.NewUnstartedServer(t) + if enabled { + // Enable data streams. + srv.Config.DataStreams = &apmservertest.DataStreamsConfig{Enabled: true} + srv.Config.Setup = nil + + // Create a data stream index template. + resp, err := systemtest.Elasticsearch.Indices.PutIndexTemplate("apm-data-streams", strings.NewReader(fmt.Sprintf(`{ + "index_patterns": ["traces-*", "logs-*", "metrics-*"], + "data_stream": {}, + "priority": 200, + "template": {"settings": {"number_of_shards": 1, "refresh_interval": "250ms"}} + }`))) + require.NoError(t, err) + body, _ := ioutil.ReadAll(resp.Body) + require.False(t, resp.IsError(), string(body)) + + // Create an API Key which can write to traces-* etc. + // The default APM Server user can only write to apm-*. + // + // NOTE(axw) importantly, this API key lacks privileges + // to manage templates, pipelines, ILM, etc. Enabling + // data streams should disable all automatic setup. + resp, err = systemtest.Elasticsearch.Security.CreateAPIKey(strings.NewReader(fmt.Sprintf(`{ + "name": "%s", + "expiration": "1h", + "role_descriptors": { + "write-apm-data": { + "cluster": ["monitor"], + "index": [ + { + "names": ["traces-*", "metrics-*", "logs-*"], + "privileges": ["write", "create_index"] + } + ] + } + } + }`, t.Name()))) + require.NoError(t, err) + + var apiKeyResponse struct { + ID string + Name string + APIKey string `json:"api_key"` + } + require.NoError(t, json.NewDecoder(resp.Body).Decode(&apiKeyResponse)) + + // Use an API Key to mimic running under Fleet, with limited permissions. + srv.Config.Output.Elasticsearch.Username = "" + srv.Config.Output.Elasticsearch.Password = "" + srv.Config.Output.Elasticsearch.APIKey = fmt.Sprintf("%s:%s", apiKeyResponse.ID, apiKeyResponse.APIKey) + } + require.NoError(t, srv.Start()) + + tracer := srv.Tracer() + tx := tracer.StartTransaction("name", "type") + tx.Duration = time.Second + tx.End() + tracer.Flush(nil) + + result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*,traces-apm.*", estest.TermQuery{ + Field: "processor.event", Value: "transaction", + }) + systemtest.ApproveEvents( + t, t.Name(), result.Hits.Hits, + "@timestamp", "timestamp.us", + "trace.id", "transaction.id", + ) + + // There should be no warnings or errors logged. + for _, record := range srv.Logs.All() { + assert.Condition(t, func() bool { return record.Level < zapcore.WarnLevel }, record.Level) + } + }) + } +} + +func TestDataStreamsSetupErrors(t *testing.T) { + cfg := apmservertest.DefaultConfig() + cfg.DataStreams = &apmservertest.DataStreamsConfig{Enabled: true} + cfgargs, err := cfg.Args() + require.NoError(t, err) + + test := func(args []string, expected string) { + args = append(args, cfgargs...) + cmd := apmservertest.ServerCommand("setup", args...) + out, err := cmd.CombinedOutput() + require.Error(t, err) + assert.Equal(t, "Exiting: "+expected+"\n", string(out)) + } + + test([]string{}, "index setup must be performed externally when using data streams, by installing the 'apm' integration package") + test([]string{"--index-management"}, "index setup must be performed externally when using data streams, by installing the 'apm' integration package") + test([]string{"--pipelines"}, "index pipeline setup must be performed externally when using data streams, by installing the 'apm' integration package") +} diff --git a/systemtest/elasticsearch.go b/systemtest/elasticsearch.go index e7c146bebbe..357e783a2c3 100644 --- a/systemtest/elasticsearch.go +++ b/systemtest/elasticsearch.go @@ -90,11 +90,19 @@ func newElasticsearchConfig() elasticsearch.Config { // and ingest node pipelines whose names start with "apm", // and deletes the default ILM policy "apm-rollover-30-days". func CleanupElasticsearch(t testing.TB) { - const prefix = "apm*" + const ( + legacyPrefix = "apm*" + apmTracesPrefix = "traces-apm.*" + apmMetricsPrefix = "metrics-apm.*" + apmLogsPrefix = "logs-apm.*" + ) requests := []estest.Request{ - esapi.IndicesDeleteRequest{Index: []string{prefix}}, - esapi.IngestDeletePipelineRequest{PipelineID: prefix}, - esapi.IndicesDeleteTemplateRequest{Name: prefix}, + esapi.IndicesDeleteRequest{Index: []string{legacyPrefix}}, + esapi.IndicesDeleteDataStreamRequest{Name: apmTracesPrefix}, + esapi.IndicesDeleteDataStreamRequest{Name: apmMetricsPrefix}, + esapi.IndicesDeleteDataStreamRequest{Name: apmLogsPrefix}, + esapi.IngestDeletePipelineRequest{PipelineID: legacyPrefix}, + esapi.IndicesDeleteTemplateRequest{Name: legacyPrefix}, } doReq := func(req estest.Request) error { diff --git a/systemtest/estest/search.go b/systemtest/estest/search.go index 2f76b9e4a8a..3cd02a52cf7 100644 --- a/systemtest/estest/search.go +++ b/systemtest/estest/search.go @@ -41,7 +41,11 @@ func (es *Client) ExpectMinDocs(t testing.TB, min int, index string, query inter t.Helper() var result SearchResult opts = append(opts, WithCondition(result.Hits.MinHitsCondition(min))) - if _, err := es.Search(index).WithQuery(query).Do(context.Background(), &result, opts...); err != nil { + req := es.Search(index) + if query != nil { + req = req.WithQuery(query) + } + if _, err := req.Do(context.Background(), &result, opts...); err != nil { t.Error(err) } return result diff --git a/tests/system/apmserver.py b/tests/system/apmserver.py index c641ed5d8cf..bf091794fa8 100644 --- a/tests/system/apmserver.py +++ b/tests/system/apmserver.py @@ -268,7 +268,7 @@ def wait_until_ilm_logged(self): def wait_until_pipeline_logged(self): registration_enabled = self.config().get("register_pipeline_enabled") - msg = "Registered Ingest Pipelines successfully" if registration_enabled != "false" else "No pipeline callback registered" + msg = "Registered Ingest Pipelines successfully" if registration_enabled != "false" else "Pipeline registration disabled" wait_until(lambda: self.log_contains(msg), name="pipelines registration") def load_docs_with_template(self, data_path, url, endpoint, expected_events_count, diff --git a/tests/system/test_pipelines.py b/tests/system/test_pipelines.py index 6d9e5d9c9a4..d4ed6b2aa30 100644 --- a/tests/system/test_pipelines.py +++ b/tests/system/test_pipelines.py @@ -48,7 +48,7 @@ def config(self): return cfg def test_setup_pipelines(self): - assert self.log_contains("No pipeline callback registered") + assert self.log_contains("Pipeline registration disabled") wait_until_pipelines(self.es, [])