Skip to content

Commit

Permalink
sampling: require a default policy (#4729)
Browse files Browse the repository at this point in the history
* sampling: require a default policy

* beater/config: validate policies here too

* beater: don't hang if runServer returns error

If runServer returns an error without Stop being
called, signal the "done" channel.
  • Loading branch information
axw authored Feb 16, 2021
1 parent d65c946 commit 864e5bb
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 5 deletions.
14 changes: 12 additions & 2 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,17 @@ func (bt *beater) start(ctx context.Context, cancelContext context.CancelFunc, b
return nil, err
}
bt.stopServer = func() {
defer close(done)
defer closeTracer()
if bt.config.ShutdownTimeout > 0 {
time.AfterFunc(bt.config.ShutdownTimeout, cancelContext)
}
s.Stop()
}
s.Start()
go func() {
defer close(done)
defer closeTracer()
s.Wait()
}()
}
return done, nil
}
Expand Down Expand Up @@ -316,11 +319,18 @@ func (s *serverRunner) String() string {
return "APMServer"
}

// Stop stops the server.
func (s *serverRunner) Stop() {
s.stopOnce.Do(s.cancelRunServerContext)
s.Wait()
}

// Wait waits for the server to stop.
func (s *serverRunner) Wait() {
s.wg.Wait()
}

// Start starts the server.
func (s *serverRunner) Start() {
s.wg.Add(1)
go func() {
Expand Down
4 changes: 3 additions & 1 deletion beater/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ func TestUnpackConfig(t *testing.T) {
"sampling.keep_unsampled": false,
"sampling.tail": map[string]interface{}{
"enabled": true,
"policies": []map[string]interface{}{{"sample_rate": 0.5}},
"interval": "2m",
"ingest_rate_decay": 1.0,
},
Expand Down Expand Up @@ -371,6 +372,7 @@ func TestUnpackConfig(t *testing.T) {
KeepUnsampled: false,
Tail: &TailSamplingConfig{
Enabled: true,
Policies: []TailSamplingPolicy{{SampleRate: 0.5}},
ESConfig: elasticsearch.DefaultConfig(),
Interval: 2 * time.Minute,
IngestRateDecayFactor: 1.0,
Expand Down Expand Up @@ -564,7 +566,7 @@ func TestAgentConfig(t *testing.T) {
}

func TestNewConfig_ESConfig(t *testing.T) {
ucfg, err := common.NewConfigFrom(`{"rum.enabled":true,"api_key.enabled":true,"sampling.tail.enabled":true}`)
ucfg, err := common.NewConfigFrom(`{"rum.enabled":true,"api_key.enabled":true,"sampling.tail.policies":[{"sample_rate": 0.5}]}`)
require.NoError(t, err)

// no es config given
Expand Down
29 changes: 28 additions & 1 deletion beater/config/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ type SamplingConfig struct {
type TailSamplingConfig struct {
Enabled bool `config:"enabled"`

Policies []TailSamplingPolicy `config:"policies"`
// Policies holds tail-sampling policies.
//
// Policies must include at least one policy that matches all traces, to ensure
// that dropping non-matching traces is intentional.
Policies []TailSamplingPolicy `config:"policies"`

ESConfig *elasticsearch.Config `config:"elasticsearch"`
Interval time.Duration `config:"interval" validate:"min=1s"`
IngestRateDecayFactor float64 `config:"ingest_rate_decay" validate:"min=0, max=1"`
Expand Down Expand Up @@ -76,8 +81,30 @@ func (c *TailSamplingConfig) Unpack(in *common.Config) error {
if err := in.Unpack(&cfg); err != nil {
return errors.Wrap(err, "error unpacking tail sampling config")
}
cfg.Enabled = in.Enabled()
*c = TailSamplingConfig(cfg)
c.esConfigured = in.HasField("elasticsearch")
return errors.Wrap(c.Validate(), "invalid tail sampling config")
}

func (c *TailSamplingConfig) Validate() error {
if !c.Enabled {
return nil
}
if len(c.Policies) == 0 {
return errors.New("no policies specified")
}
var anyDefaultPolicy bool
for _, policy := range c.Policies {
if policy == (TailSamplingPolicy{SampleRate: policy.SampleRate}) {
// We have at least one default policy.
anyDefaultPolicy = true
break
}
}
if !anyDefaultPolicy {
return errors.New("no default (empty criteria) policy specified")
}
return nil
}

Expand Down
52 changes: 52 additions & 0 deletions beater/config/sampling_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package config

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/common"
)

func TestSamplingPoliciesValidation(t *testing.T) {
t.Run("MinimallyValid", func(t *testing.T) {
_, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{
"sampling.tail.policies": []map[string]interface{}{{
"sample_rate": 0.5,
}},
}), nil)
assert.NoError(t, err)
})
t.Run("NoPolicies", func(t *testing.T) {
_, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{
"sampling.tail.enabled": true,
}), nil)
assert.EqualError(t, err, "Error processing configuration: invalid tail sampling config: no policies specified accessing 'sampling.tail'")
})
t.Run("NoDefaultPolicies", func(t *testing.T) {
_, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{
"sampling.tail.policies": []map[string]interface{}{{
"service.name": "foo",
"sample_rate": 0.5,
}},
}), nil)
assert.EqualError(t, err, "Error processing configuration: invalid tail sampling config: no default (empty criteria) policy specified accessing 'sampling.tail'")
})
}
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ https://github.com/elastic/apm-server/compare/7.11\...master[View commits]
* OpenTelemetry Protocol (OTLP) over gRPC is now supported on the standard endpoint (8200) {pull}4677[4677] {pull}4722[4722]
* Add initial support for APM central config and sourcemaps when running under Fleet {pull}4670[4670]
* Data stream and ILM policy for tail-based sampling {pull}4707[4707]
* When tail-sampling is enabled, a default policy must be defined {pull}4729[4729]
* Support additional config options when running under Fleet {pull}4690[4690]

[float]
Expand Down
10 changes: 10 additions & 0 deletions x-pack/apm-server/sampling/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type LocalSamplingConfig struct {
// Policies holds local tail-sampling policies. Policies are matched in the
// order provided. Policies should therefore be ordered from most to least
// specific.
//
// Policies must include at least one policy that matches all traces, to ensure
// that dropping non-matching traces is intentional.
Policies []Policy

// IngestRateDecayFactor holds the ingest rate decay factor, used for calculating
Expand Down Expand Up @@ -173,10 +176,17 @@ func (config LocalSamplingConfig) validate() error {
if len(config.Policies) == 0 {
return errors.New("Policies unspecified")
}
var anyDefaultPolicy bool
for i, policy := range config.Policies {
if err := policy.validate(); err != nil {
return errors.Wrapf(err, "Policy %d invalid", i)
}
if policy.PolicyCriteria == (PolicyCriteria{}) {
anyDefaultPolicy = true
}
}
if !anyDefaultPolicy {
return errors.New("Policies does not contain a default (empty criteria) policy")
}
if config.IngestRateDecayFactor <= 0 || config.IngestRateDecayFactor > 1 {
return errors.New("IngestRateDecayFactor unspecified or out of range (0,1]")
Expand Down
6 changes: 5 additions & 1 deletion x-pack/apm-server/sampling/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ func TestNewProcessorConfigInvalid(t *testing.T) {
config.MaxDynamicServices = 1

assertInvalidConfigError("invalid local sampling config: Policies unspecified")
config.Policies = []sampling.Policy{{}}
config.Policies = []sampling.Policy{{
PolicyCriteria: sampling.PolicyCriteria{ServiceName: "foo"},
}}
assertInvalidConfigError("invalid local sampling config: Policies does not contain a default (empty criteria) policy")
config.Policies[0].PolicyCriteria = sampling.PolicyCriteria{}
for _, invalid := range []float64{-1, 1.0, 2.0} {
config.Policies[0].SampleRate = invalid
assertInvalidConfigError("invalid local sampling config: Policy 0 invalid: SampleRate unspecified or out of range [0,1)")
Expand Down
3 changes: 3 additions & 0 deletions x-pack/apm-server/sampling/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ func TestProcessLocalTailSamplingPolicyOrder(t *testing.T) {
}, {
PolicyCriteria: sampling.PolicyCriteria{ServiceName: "service_name"},
SampleRate: 0.1,
}, {
PolicyCriteria: sampling.PolicyCriteria{},
SampleRate: 0,
}}
config.FlushInterval = 10 * time.Millisecond
published := make(chan string)
Expand Down

0 comments on commit 864e5bb

Please sign in to comment.