Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data streams improvements #4449

Merged
merged 13 commits into from
Dec 1, 2020
4 changes: 4 additions & 0 deletions approvaltest/approvals.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"testing"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -128,6 +129,9 @@ func removeReceived(name string) {
}

func writeReceived(name string, received interface{}) {
if err := os.MkdirAll(filepath.Dir(name), 0755); err != nil {
panic(err)
}
f, err := os.Create(name + ReceivedSuffix)
if err != nil {
panic(err)
Expand Down
49 changes: 35 additions & 14 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,14 @@ func NewCreator(args CreatorParams) beat.Creator {
return nil, err
}

// setup pipelines if explicitly directed to or setup --pipelines and config is not set at all,
// and apm-server is not running supervised by Elastic Agent
shouldSetupPipelines := bt.config.Register.Ingest.Pipeline.IsEnabled() ||
(b.InSetupCmd && bt.config.Register.Ingest.Pipeline.Enabled == nil)
runningUnderElasticAgent := b.Manager != nil && b.Manager.Enabled()

if esOutputCfg != nil && shouldSetupPipelines && !runningUnderElasticAgent {
bt.logger.Info("Registering pipeline callback")
err := bt.registerPipelineCallback(b)
if err != nil {
return nil, err
if !bt.config.DataStreams.Enabled {
if b.Manager != nil && b.Manager.Enabled() {
return nil, errors.New("data streams must be enabled when the server is managed")
}
} else {
bt.logger.Info("No pipeline callback registered")
}

if err := bt.registerPipelineCallback(b); err != nil {
return nil, err
}

return bt, nil
Expand Down Expand Up @@ -250,13 +244,40 @@ func checkConfig(logger *logp.Logger) error {

// elasticsearchOutputConfig returns nil if the output is not elasticsearch
func elasticsearchOutputConfig(b *beat.Beat) *common.Config {
if b.Config != nil && b.Config.Output.Name() == "elasticsearch" {
if hasElasticsearchOutput(b) {
return b.Config.Output.Config()
}
return nil
}

func hasElasticsearchOutput(b *beat.Beat) bool {
return b.Config != nil && b.Config.Output.Name() == "elasticsearch"
}

// registerPipelineCallback registers an Elasticsearch connection callback
// that ensures the configured pipeline is installed, if configured to do
// so. If data streams are enabled, then pipeline registration is always
// disabled and `setup --pipelines` will return an error.
func (bt *beater) registerPipelineCallback(b *beat.Beat) error {
if !hasElasticsearchOutput(b) {
bt.logger.Info("Output is not Elasticsearch: pipeline registration disabled")
return nil
}

if bt.config.DataStreams.Enabled {
bt.logger.Info("Data streams enabled: pipeline registration disabled")
b.OverwritePipelinesCallback = func(esConfig *common.Config) error {
return errors.New("index pipeline setup must be performed externally when using data streams")
axw marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

if !bt.config.Register.Ingest.Pipeline.IsEnabled() {
bt.logger.Info("Pipeline registration disabled")
return nil
}

bt.logger.Info("Registering pipeline callback")
overwrite := bt.config.Register.Ingest.Pipeline.ShouldOverwrite()
path := bt.config.Register.Ingest.Pipeline.Path

Expand Down
8 changes: 4 additions & 4 deletions beater/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,6 @@ func NewConfig(ucfg *common.Config, outputESCfg *common.Config) (*Config, error)
return nil, errors.New(msgInvalidConfigAgentCfg)
}

if outputESCfg != nil && (outputESCfg.HasField("pipeline") || outputESCfg.HasField("pipelines")) {
c.Pipeline = ""
}

if err := c.RumConfig.setup(logger, outputESCfg); err != nil {
return nil, err
}
Expand Down Expand Up @@ -156,6 +152,10 @@ func NewConfig(ucfg *common.Config, outputESCfg *common.Config) (*Config, error)
"which will lead to incorrect metrics being reported in the APM UI",
)
}

if c.DataStreams.Enabled || (outputESCfg != nil && (outputESCfg.HasField("pipeline") || outputESCfg.HasField("pipelines"))) {
c.Pipeline = ""
}
return c, nil
}

Expand Down
16 changes: 8 additions & 8 deletions beater/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,33 +448,33 @@ func TestTLSSettings(t *testing.T) {
"ConfiguredToRequired": {
config: map[string]interface{}{"ssl": map[string]interface{}{
"client_authentication": "required",
"key": "../../testdata/tls/key.pem",
"certificate": "../../testdata/tls/certificate.pem",
"key": "../../testdata/tls/key.pem",
"certificate": "../../testdata/tls/certificate.pem",
}},
tls: &tlscommon.ServerConfig{ClientAuth: 4, Certificate: testdataCertificateConfig},
},
"ConfiguredToOptional": {
config: map[string]interface{}{"ssl": map[string]interface{}{
"client_authentication": "optional",
"key": "../../testdata/tls/key.pem",
"certificate": "../../testdata/tls/certificate.pem",
"key": "../../testdata/tls/key.pem",
"certificate": "../../testdata/tls/certificate.pem",
}},
tls: &tlscommon.ServerConfig{ClientAuth: 3, Certificate: testdataCertificateConfig},
},
"DefaultRequiredByCA": {
config: map[string]interface{}{"ssl": map[string]interface{}{
"certificate_authorities": []string{"../../testdata/tls/ca.crt.pem"},
"key": "../../testdata/tls/key.pem",
"certificate": "../../testdata/tls/certificate.pem",
"key": "../../testdata/tls/key.pem",
"certificate": "../../testdata/tls/certificate.pem",
}},
tls: &tlscommon.ServerConfig{ClientAuth: 4, Certificate: testdataCertificateConfig},
},
"ConfiguredWithCA": {
config: map[string]interface{}{"ssl": map[string]interface{}{
"client_authentication": "none",
"certificate_authorities": []string{"../../testdata/tls/ca.crt.pem"},
"key": "../../testdata/tls/key.pem",
"certificate": "../../testdata/tls/certificate.pem",
"key": "../../testdata/tls/key.pem",
"certificate": "../../testdata/tls/certificate.pem",
}},
tls: &tlscommon.ServerConfig{ClientAuth: 0, Certificate: testdataCertificateConfig},
},
Expand Down
33 changes: 33 additions & 0 deletions beater/config/data_streams_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package config

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common"
)

func TestDataStreamsPipeline(t *testing.T) {
cfg, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{"data_streams.enabled": true}), nil)
require.NoError(t, err)
assert.Equal(t, "", cfg.Pipeline) // enabling data streams disables use of the pipeline
}
71 changes: 71 additions & 0 deletions idxmgmt/datastreams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package idxmgmt

import (
"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/outil"

"github.com/elastic/apm-server/datastreams"
)

type dataStreamsSupporter struct{}

// BuildSelector returns an outputs.IndexSelector which routes events through
// to data streams based on well-defined data_stream.* fields in events.
func (dataStreamsSupporter) BuildSelector(*common.Config) (outputs.IndexSelector, error) {
fmtstr, err := fmtstr.CompileEvent(datastreams.IndexFormat)
if err != nil {
return nil, err
}
expr, err := outil.FmtSelectorExpr(fmtstr, "", outil.SelectorLowerCase)
if err != nil {
return nil, err
}
return outil.MakeSelector(expr), nil
}

// Enabled always returns false, indicating that this idxmgmt.Supporter does
// not setting up templates or ILM policies.
func (dataStreamsSupporter) Enabled() bool {
return false
}

// Manager returns a no-op idxmgmt.Manager.
func (dataStreamsSupporter) Manager(client idxmgmt.ClientHandler, assets idxmgmt.Asseter) idxmgmt.Manager {
return dataStreamsManager{}
}

type dataStreamsManager struct{}

// VerifySetup always returns true and an empty string, to avoid logging
// duplicate warnings.
func (dataStreamsManager) VerifySetup(template, ilm idxmgmt.LoadMode) (bool, string) {
// Just return true to avoid logging warnings. We'll error out in Setup.
return true, ""
}

// Setup will always return an error, in response to manual setup (i.e. `apm-server setup`).
func (dataStreamsManager) Setup(template, ilm idxmgmt.LoadMode) error {
return errors.New("index setup must be performed externally when using data streams")
axw marked this conversation as resolved.
Show resolved Hide resolved
}
28 changes: 2 additions & 26 deletions idxmgmt/supporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
libidxmgmt "github.com/elastic/beats/v7/libbeat/idxmgmt"
libilm "github.com/elastic/beats/v7/libbeat/idxmgmt/ilm"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/outil"
"github.com/elastic/beats/v7/libbeat/template"

"github.com/elastic/apm-server/datastreams"
"github.com/elastic/apm-server/idxmgmt/ilm"
"github.com/elastic/apm-server/idxmgmt/unmanaged"
)
Expand All @@ -51,7 +49,6 @@ const esKey = "elasticsearch"
type supporter struct {
log *logp.Logger
info beat.Info
dataStreams bool
templateConfig template.TemplateConfig
ilmConfig ilm.Config
unmanagedIdxConfig *unmanaged.Config
Expand All @@ -66,20 +63,6 @@ type indexState struct {
isSet atomic.Bool
}

// newDataStreamSelector returns an outil.Selector which routes events to
// a data stream based on well-defined data_stream.* fields in events.
func newDataStreamSelector() (outputs.IndexSelector, error) {
fmtstr, err := fmtstr.CompileEvent(datastreams.IndexFormat)
if err != nil {
return nil, err
}
expr, err := outil.FmtSelectorExpr(fmtstr, "", outil.SelectorLowerCase)
if err != nil {
return nil, err
}
return outil.MakeSelector(expr), nil
}

type unmanagedIndexSelector outil.Selector

type ilmIndexSelector struct {
Expand Down Expand Up @@ -110,10 +93,8 @@ func newSupporter(log *logp.Logger, info beat.Info, cfg *IndexManagementConfig)
if cfg.Output.Name() != esKey || cfg.ILM.Mode == libilm.ModeDisabled {
disableILM = true
} else if cfg.ILM.Mode == libilm.ModeAuto {
// ILM is set to "auto": disable if we're using data streams,
// or if we're not using data streams but we're using customised,
// unmanaged indices.
if cfg.DataStreams || unmanagedIdxCfg.Customized() {
// ILM is set to "auto": disable if we're using customised, unmanaged indices.
if unmanagedIdxCfg.Customized() {
disableILM = true
}
}
Expand All @@ -130,7 +111,6 @@ func newSupporter(log *logp.Logger, info beat.Info, cfg *IndexManagementConfig)
return &supporter{
log: log,
info: info,
dataStreams: cfg.DataStreams,
templateConfig: cfg.Template,
ilmConfig: cfg.ILM,
unmanagedIdxConfig: &unmanagedIdxCfg,
Expand Down Expand Up @@ -165,10 +145,6 @@ func (s *supporter) Manager(
// depending on the supporter's config an ILM instance or an unmanaged index selector instance is returned.
// The ILM instance decides on every Select call whether or not to return ILM indices or regular ones.
func (s *supporter) BuildSelector(_ *common.Config) (outputs.IndexSelector, error) {
if s.dataStreams {
return newDataStreamSelector()
}

sel, err := s.buildSelector(s.unmanagedIdxConfig.SelectorConfig())
if err != nil {
return nil, err
Expand Down
Loading