Skip to content

Commit

Permalink
Merge branch 'master' into metrics-intake-type-unit
Browse files Browse the repository at this point in the history
  • Loading branch information
axw authored May 27, 2021
2 parents 0af5a47 + 94e3201 commit abfcf29
Show file tree
Hide file tree
Showing 10 changed files with 622 additions and 300 deletions.
7 changes: 7 additions & 0 deletions apmpackage/apm/0.2.0/data_stream/sampled_traces/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,10 @@ title: APM tail-sampled traces
type: traces
dataset: sampled
ilm_policy: traces-apm.sampled-default_policy
elasticsearch:
index_template:
settings:
# Create a single shard per index, so we can use
# global checkpoints as a way of limiting search
# results.
number_of_shards: 1
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits]
==== Bug fixes
* Don't auto-disable ILM due to a failure to communicate with Elasticsearch {pull}5264[5264]
* Fix panic due to misaligned 64-bit access on 32-bit architectures {pull}5277[5277]
* Fixed tail-based sampling pubsub to use _seq_no correctly {pull}5126[5126]
* Fix document grouping of translated OpenTelemetry Java metrics {pull}5309[5309]
* OpenTelemetry: record array attributes as labels {pull}5286[5286]
* model/modeldecoder: fix 32-bit timestamp decoding {pull}5308[5308]
Expand Down
6 changes: 3 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ services:
kibana: { condition: service_healthy }

elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.0.0-394043ad-SNAPSHOT
image: docker.elastic.co/elasticsearch/elasticsearch:8.0.0-5197c308-SNAPSHOT
ports:
- 9200:9200
healthcheck:
Expand Down Expand Up @@ -61,7 +61,7 @@ services:
- "./testing/docker/elasticsearch/users_roles:/usr/share/elasticsearch/config/users_roles"

kibana:
image: docker.elastic.co/kibana/kibana:8.0.0-394043ad-SNAPSHOT
image: docker.elastic.co/kibana/kibana:8.0.0-5197c308-SNAPSHOT
ports:
- 5601:5601
healthcheck:
Expand All @@ -85,7 +85,7 @@ services:
package-registry: { condition: service_healthy }

fleet-server:
image: docker.elastic.co/beats/elastic-agent:8.0.0-394043ad-SNAPSHOT
image: docker.elastic.co/beats/elastic-agent:8.0.0-5197c308-SNAPSHOT
ports:
- 8220:8220
healthcheck:
Expand Down
4 changes: 4 additions & 0 deletions systemtest/sampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package systemtest_test

import (
"context"
"errors"
"fmt"
"strings"
"testing"
Expand Down Expand Up @@ -226,6 +227,9 @@ func refreshPeriodically(t *testing.T, interval time.Duration, index ...string)
case <-ticker.C:
}
if _, err := systemtest.Elasticsearch.Do(ctx, &request, nil); err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
return err
}
}
Expand Down
86 changes: 86 additions & 0 deletions x-pack/apm-server/sampling/pubsub/checkpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package pubsub

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"

"github.com/pkg/errors"

"github.com/elastic/go-elasticsearch/v7/esapi"

"github.com/elastic/apm-server/elasticsearch"
)

// getGlobalCheckpoints returns the current global checkpoint for each index
// underlying dataStream. Each index is required to have a single (primary) shard.
func getGlobalCheckpoints(
ctx context.Context,
client elasticsearch.Client,
dataStream string,
) (map[string]int64, error) {
indexGlobalCheckpoints := make(map[string]int64)
resp, err := esapi.IndicesStatsRequest{
Index: []string{dataStream},
Level: "shards",
// By default all metrics are returned; query just the "get" metric,
// which is very cheap.
Metric: []string{"get"},
}.Do(ctx, client)
if err != nil {
return nil, errors.New("index stats request failed")
}
defer resp.Body.Close()
if resp.IsError() {
switch resp.StatusCode {
case http.StatusNotFound:
// Data stream does not yet exist.
return indexGlobalCheckpoints, nil
}
message, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf("index stats request failed: %s", message)
}

var stats dataStreamStats
if err := json.NewDecoder(resp.Body).Decode(&stats); err != nil {
return nil, err
}

for index, indexStats := range stats.Indices {
if n := len(indexStats.Shards); n > 1 {
return nil, fmt.Errorf("expected 1 shard, got %d for index %q", n, index)
}
for _, shardStats := range indexStats.Shards {
for _, shardStats := range shardStats {
if shardStats.Routing.Primary {
indexGlobalCheckpoints[index] = shardStats.SeqNo.GlobalCheckpoint
break
}
}
}
}
return indexGlobalCheckpoints, nil
}

type dataStreamStats struct {
Indices map[string]indexStats `json:"indices"`
}

type indexStats struct {
Shards map[string][]shardStats `json:"shards"`
}

type shardStats struct {
Routing struct {
Primary bool `json:"primary"`
} `json:"routing"`
SeqNo struct {
GlobalCheckpoint int64 `json:"global_checkpoint"`
} `json:"seq_no"`
}
3 changes: 2 additions & 1 deletion x-pack/apm-server/sampling/pubsub/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ const dataStreamIndexTemplate = `{
"data_stream": {},
"template": {
"settings": {
"index.lifecycle.name": %q
"index.lifecycle.name": %q,
"index.number_of_shards": 1
},
"mappings": {
"properties": {
Expand Down
Loading

0 comments on commit abfcf29

Please sign in to comment.