Skip to content

Commit

Permalink
sampling: use a data stream for sampled trace docs (#4707)
Browse files Browse the repository at this point in the history
* beater: add Managed and Namespace to ServerParams

This enables x-pack/apm-server code to alter behaviour
based on whether APM Server is managed or not, and to
create data streams with the configured namespace.

* sampling: use a data stream for sampled trace docs

Update tail-based sampling to index into and search a
data stream. The data stream will be associated with an
ILM policy that takes care of rollover and deletion.

When running in Fleet-managed mode, apm-server will expect
the data stream and ILM policy to exist for the data stream
called `traces-sampled-<namespace>`. Servers participating
in tail-based sampling are required to be configured with
the same namespace.

When running in standalone mode, apm-server will attempt
to create an index template and ILM policy for a data
stream called `apm-sampled-traces`. This is added for
minimal support while we transition things to Fleet, and
is intended to be removed in a future release. The data
stream is not intended to adhere to the standard indexing
strategy.

* apmpackage: add traces-sampled-* data stream

Add a data stream for sampled trace documents,
along with an ILM policy which rolls over after
1h, and then deletes after 1h.

* systemtest: fetch most recent beats monitoring doc

When searching for beats monitoring docs, make sure
we get the most recent one by sorting on 'timestamp'.

* systemtest: update tail-based sampling test

Update test to rely on apm-server to create its
own data stream index template.

* Cross-reference sampling/pubsub and apmpackage
  • Loading branch information
axw authored Feb 11, 2021
1 parent d9f589a commit 0c7bd22
Show file tree
Hide file tree
Showing 26 changed files with 490 additions and 97 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_age": "1h"
}
}
},
"delete": {
"min_age": "1h",
"actions": {
"delete": {}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"description": "Ingest pipeline for sampled trace documents",
"processors": [
{
"set": {
"field": "event.ingested",
"value": "{{_ingest.timestamp}}"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
- name: '@timestamp'
type: date
description: Event timestamp.
- name: data_stream.type
type: constant_keyword
description: Data stream type.
- name: data_stream.dataset
type: constant_keyword
description: Data stream dataset.
- name: data_stream.namespace
type: constant_keyword
description: Data stream namespace.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
- name: event.ingested
type: date
description: |
Timestamp when an event arrived in the central data store.
- name: trace.id
type: keyword
description: |
The ID of the sampled trace.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# When changing fields or ILM policy, make sure to update
# x-pack/apm-server/sampling/pubsub/datastream.go.
- name: observer.id
type: keyword
description: |
The ID of the APM Server that indexed the sampled trace ID.
4 changes: 4 additions & 0 deletions apmpackage/apm/0.1.0/data_stream/sampled_traces/manifest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
title: APM tail-sampled traces
type: traces
dataset: sampled
ilm_policy: traces-apm.sampled-default_policy
2 changes: 2 additions & 0 deletions apmpackage/cmd/gen-package/genfields.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main

import (
"io/ioutil"
"log"
"net/http"
"path/filepath"
"sort"
Expand All @@ -42,6 +43,7 @@ func generateFields(version string) map[string][]field {
inputFieldsFiles["app_metrics"] = filterInternalMetrics(inputFieldsFiles["internal_metrics"])

for streamType, inputFields := range inputFieldsFiles {
log.Printf("%s", streamType)
var ecsFields []field
var nonECSFields []field
for _, fields := range populateECSInfo(ecsFlatFields, inputFields) {
Expand Down
32 changes: 28 additions & 4 deletions apmpackage/cmd/gen-package/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ var versionMapping = map[string]string{
"8.0": "0.1.0",
}

// Some data streams may not have a counterpart template
// in standalone apm-server, and so it does not make sense
// to maintain a separate fields.yml.
var handwritten = map[string]bool{
"sampled_traces": true,
}

func main() {
stackVersion := common.MustNewVersion(cmd.DefaultSettings().Version)
shortVersion := fmt.Sprintf("%d.%d", stackVersion.Major, stackVersion.Minor)
Expand All @@ -57,11 +64,28 @@ func clear(version string) {
panic(err)
}
for _, f := range fileInfo {
if f.IsDir() {
os.Remove(ecsFilePath(version, f.Name()))
os.Remove(fieldsFilePath(version, f.Name()))
os.RemoveAll(pipelinesPath(version, f.Name()))
if !f.IsDir() {
continue
}
name := f.Name()
if handwritten[name] {
continue
}
removeFile(ecsFilePath(version, name))
removeFile(fieldsFilePath(version, name))
removeDir(pipelinesPath(version, name))
}
ioutil.WriteFile(docsFilePath(version), nil, 0644)
}

func removeFile(path string) {
if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
log.Fatal(err)
}
}

func removeDir(path string) {
if err := os.RemoveAll(path); err != nil && !os.IsNotExist(err) {
log.Fatal(err)
}
}
12 changes: 7 additions & 5 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,11 +366,13 @@ func (s *serverRunner) run() error {
}

if err := runServer(s.runServerContext, ServerParams{
Info: s.beat.Info,
Config: s.config,
Logger: s.logger,
Tracer: s.tracer,
Reporter: reporter,
Info: s.beat.Info,
Config: s.config,
Managed: s.beat.Manager != nil && s.beat.Manager.Enabled(),
Namespace: s.namespace,
Logger: s.logger,
Tracer: s.tracer,
Reporter: reporter,
}); err != nil {
return err
}
Expand Down
6 changes: 6 additions & 0 deletions beater/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ type ServerParams struct {
// Config is the configuration used for running the APM Server.
Config *config.Config

// Managed indicates that the server is managed by Fleet.
Managed bool

// Namespace holds the data stream namespace for the server.
Namespace string

// Logger is the logger for the beater component.
Logger *logp.Logger

Expand Down
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ https://github.com/elastic/apm-server/compare/7.11\...master[View commits]
* Add a `_doc_count` field to transaction histogram docs {pull}4647[4647]
* Upgrade Go to 1.15.7 {pull}4663[4663]
* OpenTelemetry Protocol (OTLP) over gRPC is now supported on the standard endpoint (8200) {pull}4677[4677]
* Data stream and ILM policy for tail-based sampling {pull}4707[4707]

[float]
==== Deprecated
Expand Down
11 changes: 7 additions & 4 deletions systemtest/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func newElasticsearchConfig() elasticsearch.Config {
// and deletes the default ILM policy "apm-rollover-30-days".
func CleanupElasticsearch(t testing.TB) {
const (
legacyPrefix = "apm*"
legacyPrefix = "apm-*"
apmTracesPrefix = "traces-apm*"
apmMetricsPrefix = "metrics-apm*"
apmLogsPrefix = "logs-apm*"
Expand All @@ -106,6 +106,7 @@ func CleanupElasticsearch(t testing.TB) {
}

doParallel := func(requests ...estest.Request) {
t.Helper()
var g errgroup.Group
for _, req := range requests {
req := req // copy for closure
Expand All @@ -115,18 +116,20 @@ func CleanupElasticsearch(t testing.TB) {
t.Fatal(err)
}
}

// Delete indices, data streams, and ingest pipelines.
doReq(esapi.IndicesDeleteRequest{Index: []string{legacyPrefix}})
doParallel(
esapi.IndicesDeleteRequest{Index: []string{legacyPrefix}},
esapi.IndicesDeleteDataStreamRequest{Name: legacyPrefix},
esapi.IndicesDeleteDataStreamRequest{Name: apmTracesPrefix},
esapi.IndicesDeleteDataStreamRequest{Name: apmMetricsPrefix},
esapi.IndicesDeleteDataStreamRequest{Name: apmLogsPrefix},
esapi.IngestDeletePipelineRequest{PipelineID: legacyPrefix},
esapi.IndicesDeleteTemplateRequest{Name: legacyPrefix},
)

// Delete index templates after deleting data streams.
doParallel(
esapi.IndicesDeleteIndexTemplateRequest{Name: legacyPrefix}, // for index template created by tests
esapi.IndicesDeleteTemplateRequest{Name: legacyPrefix},
esapi.IndicesDeleteIndexTemplateRequest{Name: apmTracesPrefix},
esapi.IndicesDeleteIndexTemplateRequest{Name: apmMetricsPrefix},
esapi.IndicesDeleteIndexTemplateRequest{Name: apmLogsPrefix},
Expand Down
5 changes: 5 additions & 0 deletions systemtest/estest/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ func (r *SearchRequest) WithQuery(q interface{}) *SearchRequest {
return r
}

func (r *SearchRequest) WithSort(fieldDirection ...string) *SearchRequest {
r.Sort = fieldDirection
return r
}

func (r *SearchRequest) WithSize(size int) *SearchRequest {
r.Size = &size
return r
Expand Down
9 changes: 7 additions & 2 deletions systemtest/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package systemtest_test

import (
"context"
"encoding/json"
"testing"
"time"
Expand Down Expand Up @@ -83,9 +84,13 @@ func getBeatsMonitoringStats(t testing.TB, srv *apmservertest.Server, out interf
}

func getBeatsMonitoring(t testing.TB, srv *apmservertest.Server, type_ string, out interface{}) *beatsMonitoringDoc {
result := systemtest.Elasticsearch.ExpectDocs(t, ".monitoring-beats-*",
var result estest.SearchResult
req := systemtest.Elasticsearch.Search(".monitoring-beats-*").WithQuery(
estest.TermQuery{Field: type_ + ".beat.uuid", Value: srv.BeatUUID},
)
).WithSort("timestamp:desc")
if _, err := req.Do(context.Background(), &result, estest.WithCondition(result.Hits.MinHitsCondition(1))); err != nil {
t.Error(err)
}

var doc beatsMonitoringDoc
doc.RawSource = []byte(result.Hits.Hits[0].RawSource)
Expand Down
58 changes: 35 additions & 23 deletions systemtest/sampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
"go.elastic.co/apm"
"golang.org/x/sync/errgroup"

"github.com/elastic/apm-server/systemtest"
"github.com/elastic/apm-server/systemtest/apmservertest"
Expand Down Expand Up @@ -88,29 +89,6 @@ func TestKeepUnsampledWarning(t *testing.T) {
func TestTailSampling(t *testing.T) {
systemtest.CleanupElasticsearch(t)

// Create the apm-sampled-traces index for the two servers to coordinate.
_, err := systemtest.Elasticsearch.Do(context.Background(), &esapi.IndicesCreateRequest{
Index: "apm-sampled-traces",
Body: strings.NewReader(`{
"mappings": {
"properties": {
"event.ingested": {"type": "date"},
"observer": {
"properties": {
"id": {"type": "keyword"}
}
},
"trace": {
"properties": {
"id": {"type": "keyword"}
}
}
}
}
}`),
}, nil)
require.NoError(t, err)

srv1 := apmservertest.NewUnstartedServer(t)
srv1.Config.Sampling = &apmservertest.SamplingConfig{
Tail: &apmservertest.TailSamplingConfig{
Expand Down Expand Up @@ -148,6 +126,10 @@ func TestTailSampling(t *testing.T) {
tracer1.Flush(nil)
tracer2.Flush(nil)

// Flush the data stream while the test is running, as we have no
// control over the settings for the sampled traces index template.
refreshPeriodically(t, 250*time.Millisecond, "apm-sampled-traces")

for _, transactionType := range []string{"parent", "child"} {
var result estest.SearchResult
t.Logf("waiting for %d %q transactions", expected, transactionType)
Expand Down Expand Up @@ -220,3 +202,33 @@ func TestTailSamplingUnlicensed(t *testing.T) {
assert.NoError(t, err)
assert.Empty(t, result.Hits.Hits)
}

func refreshPeriodically(t *testing.T, interval time.Duration, index ...string) {
g, ctx := errgroup.WithContext(context.Background())
ctx, cancel := context.WithCancel(ctx)
t.Cleanup(func() {
cancel()
assert.NoError(t, g.Wait())
})
g.Go(func() error {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
allowNoIndices := true
ignoreUnavailable := true
request := esapi.IndicesRefreshRequest{
Index: index,
AllowNoIndices: &allowNoIndices,
IgnoreUnavailable: &ignoreUnavailable,
}
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
}
if _, err := systemtest.Elasticsearch.Do(ctx, &request, nil); err != nil {
return err
}
}
})
}
34 changes: 29 additions & 5 deletions x-pack/apm-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/elastic/apm-server/x-pack/apm-server/aggregation/txmetrics"
"github.com/elastic/apm-server/x-pack/apm-server/cmd"
"github.com/elastic/apm-server/x-pack/apm-server/sampling"
"github.com/elastic/apm-server/x-pack/apm-server/sampling/pubsub"
)

var (
Expand Down Expand Up @@ -108,6 +109,31 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
if err != nil {
return nil, errors.Wrap(err, "failed to create Elasticsearch client for tail-sampling")
}

var sampledTracesDataStream sampling.DataStreamConfig
if args.Managed {
// Data stream and ILM policy are managed by Fleet.
sampledTracesDataStream = sampling.DataStreamConfig{
Type: "traces",
Dataset: "sampled",
Namespace: args.Namespace,
}
} else {
sampledTracesDataStream = sampling.DataStreamConfig{
Type: "apm",
Dataset: "sampled",
Namespace: "traces",
}
if err := pubsub.SetupDataStream(context.Background(), es,
"apm-sampled-traces", // Index template
"apm-sampled-traces", // ILM policy
"apm-sampled-traces", // Index pattern
); err != nil {
return nil, errors.Wrap(err, "failed to create data stream for tail-sampling")
}
args.Logger.Infof("Created tail-sampling data stream index template")
}

policies := make([]sampling.Policy, len(tailSamplingConfig.Policies))
for i, in := range tailSamplingConfig.Policies {
policies[i] = sampling.Policy{
Expand All @@ -124,16 +150,14 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
BeatID: args.Info.ID.String(),
Reporter: args.Reporter,
LocalSamplingConfig: sampling.LocalSamplingConfig{
FlushInterval: tailSamplingConfig.Interval,
// TODO(axw) make MaxDynamicServices configurable?
FlushInterval: tailSamplingConfig.Interval,
MaxDynamicServices: 1000,
Policies: policies,
IngestRateDecayFactor: tailSamplingConfig.IngestRateDecayFactor,
},
RemoteSamplingConfig: sampling.RemoteSamplingConfig{
Elasticsearch: es,
// TODO(axw) make index name configurable?
SampledTracesIndex: "apm-sampled-traces",
Elasticsearch: es,
SampledTracesDataStream: sampledTracesDataStream,
},
StorageConfig: sampling.StorageConfig{
StorageDir: paths.Resolve(paths.Data, tailSamplingConfig.StorageDir),
Expand Down
Loading

0 comments on commit 0c7bd22

Please sign in to comment.