Skip to content

Commit

Permalink
Merge branch 'main' into 40745-remove-functionbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
AndersonQ authored Nov 8, 2024
2 parents 0648028 + bfaa70f commit 59add13
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/updatecli.d/bump-golang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ targets:
scmid: githubConfig
kind: file
spec:
content: 'go {{ source "gomod" }}'
content: 'go {{ source "latestGoVersion" }}'
file: go.mod
matchpattern: 'go \d+.\d+.\d+'
update-go-version:
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Restore docker.network.in.* and docker.network.out.* fields in docker module {pull}40968[40968]
- Add `id` field to all the vSphere metricsets. {pull}41097[41097]
- Only watch metadata for ReplicaSets in metricbeat k8s module {pull}41289[41289]
- Add support for region/zone for Vertex AI service in GCP module {pull}41551[41551]

*Metricbeat*

Expand Down
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1865,11 +1865,11 @@ SOFTWARE.

--------------------------------------------------------------------------------
Dependency : github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v4
Version: v4.6.0
Version: v4.8.0
Licence type (autodetected): MIT
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v4@v4.6.0/LICENSE.txt:
Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v4@v4.8.0/LICENSE.txt:

MIT License

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/elastic/beats/v7

go 1.22.0
go 1.22.8

require (
cloud.google.com/go/bigquery v1.62.0
Expand Down Expand Up @@ -168,7 +168,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.1
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/consumption/armconsumption v1.1.0
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v4 v4.6.0
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v4 v4.8.0
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/costmanagement/armcostmanagement v1.1.1
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.8.0
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xP
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.1 h1:0f6XnzroY1yCQQwxGf/n/2xlaBF02Qhof2as99dGNsY=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.1/go.mod h1:vMGz6NOUGJ9h5ONl2kkyaqq5E0g7s4CHNSrXN5fl8UY=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v4 v4.6.0 h1:AAIdAyPkFff6XTct2lQCxOWN/+LnA41S7kIkzKaMbyE=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v4 v4.6.0/go.mod h1:noQIdW75SiQFB3mSFJBr4iRRH83S9skaFiBv4C0uEs0=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v4 v4.8.0 h1:0nGmzwBv5ougvzfGPCO2ljFRHvun57KpNrVCMrlk0ns=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v4 v4.8.0/go.mod h1:gYq8wyDgv6JLhGbAU6gg8amCPgQWRE+aCvrV2gyzdfs=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/costmanagement/armcostmanagement v1.1.1 h1:ehSLdbLah6kk6HTVc6e/lrbmbz7MMbpNxkOd3OYlhB0=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/costmanagement/armcostmanagement v1.1.1/go.mod h1:Am1cUioOk0HdZIsjpXJkQ4RIeQbwYsW6LkNIc5z/5XY=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0 h1:+dggnR89/BIIlRlQ6d19dkhhdd/mQUiQbXhyHUFiB4w=
Expand Down
28 changes: 28 additions & 0 deletions libbeat/outputs/otelconsumer/otelconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package otelconsumer
import (
"context"
"fmt"
"strings"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"

"go.opentelemetry.io/collector/consumer"
Expand All @@ -42,6 +44,7 @@ type otelConsumer struct {
observer outputs.Observer
logsConsumer consumer.Logs
beatInfo beat.Info
log *logp.Logger
}

func makeOtelConsumer(_ outputs.IndexManager, beat beat.Info, observer outputs.Observer, cfg *config.C) (outputs.Group, error) {
Expand All @@ -50,6 +53,7 @@ func makeOtelConsumer(_ outputs.IndexManager, beat beat.Info, observer outputs.O
observer: observer,
logsConsumer: beat.LogConsumer,
beatInfo: beat,
log: logp.NewLogger("otelconsumer"),
}

ocConfig := defaultConfig()
Expand Down Expand Up @@ -99,6 +103,30 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch)

err := out.logsConsumer.ConsumeLogs(ctx, pLogs)
if err != nil {
// If the batch is too large, the elasticsearchexporter will
// return a 413 error.
//
// At the moment, the exporter does not support batch splitting
// on error so we do it here.
//
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/36163.
if strings.Contains(err.Error(), "Request Entity Too Large") {
// Try and split the batch into smaller batches and retry
if batch.SplitRetry() {
st.BatchSplit()
st.RetryableErrors(len(events))
} else {
// If the batch could not be split, there is no option left but
// to drop it and log the error state.
batch.Drop()
st.PermanentErrors(len(events))
out.log.Errorf("the batch is too large to be sent: %v", err)
}

// Don't propagate the error, the batch was split and retried.
return nil
}

// Permanent errors shouldn't be retried. This tipically means
// the data cannot be serialized by the exporter that is attached
// to the pipeline or when the destination refuses the data because
Expand Down
31 changes: 31 additions & 0 deletions libbeat/outputs/otelconsumer/otelconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/outest"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)

Expand All @@ -46,12 +47,15 @@ func TestPublish(t *testing.T) {
makeOtelConsumer := func(t *testing.T, consumeFn func(ctx context.Context, ld plog.Logs) error) *otelConsumer {
t.Helper()

assert.NoError(t, logp.TestingSetup(logp.WithSelectors("otelconsumer")))

logConsumer, err := consumer.NewLogs(consumeFn)
assert.NoError(t, err)
consumer := &otelConsumer{
observer: outputs.NewNilObserver(),
logsConsumer: logConsumer,
beatInfo: beat.Info{},
log: logp.NewLogger("otelconsumer"),
}
return consumer
}
Expand Down Expand Up @@ -86,6 +90,33 @@ func TestPublish(t *testing.T) {
assert.Equal(t, outest.BatchRetry, batch.Signals[0].Tag)
})

t.Run("split batch on entity too large error", func(t *testing.T) {
batch := outest.NewBatch(event1, event2, event3)

otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
return errors.New("Request Entity Too Large")
})

err := otelConsumer.Publish(ctx, batch)
assert.NoError(t, err)
assert.Len(t, batch.Signals, 1)
assert.Equal(t, outest.BatchSplitRetry, batch.Signals[0].Tag)
})

t.Run("drop batch if can't split on entity too large error", func(t *testing.T) {
batch := outest.NewBatch(event1)

otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
return errors.New("Request Entity Too Large")
})

err := otelConsumer.Publish(ctx, batch)
assert.NoError(t, err)
assert.Len(t, batch.Signals, 2)
assert.Equal(t, outest.BatchSplitRetry, batch.Signals[0].Tag)
assert.Equal(t, outest.BatchDrop, batch.Signals[1].Tag)
})

t.Run("drop batch on permanent consumer error", func(t *testing.T) {
batch := outest.NewBatch(event1, event2, event3)

Expand Down
16 changes: 9 additions & 7 deletions x-pack/metricbeat/module/gcp/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
ServiceDataproc = "dataproc"
ServiceCloudSQL = "cloudsql"
ServiceRedis = "redis"
ServiceAIPlatform = "aiplatform"
)

// Paths within the GCP monitoring.TimeSeries response, if converted to JSON, where you can find each ECS field required for the output event
Expand Down Expand Up @@ -82,13 +83,14 @@ const (

// NOTE: if you are adding labels make sure to update tests in metrics/metrics_requester_test.go.
const (
DefaultResourceLabel = "resource.label.zone"
ComputeResourceLabel = "resource.labels.zone"
GKEResourceLabel = "resource.label.location"
StorageResourceLabel = "resource.label.location"
CloudSQLResourceLabel = "resource.labels.region"
DataprocResourceLabel = "resource.label.region"
RedisResourceLabel = "resource.label.region"
DefaultResourceLabel = "resource.label.zone"
ComputeResourceLabel = "resource.labels.zone"
GKEResourceLabel = "resource.label.location"
StorageResourceLabel = "resource.label.location"
CloudSQLResourceLabel = "resource.labels.region"
DataprocResourceLabel = "resource.label.region"
RedisResourceLabel = "resource.label.region"
AIPlatformResourceLabel = "resource.label.location"
)

// AlignersMapToGCP map contains available perSeriesAligner
Expand Down
2 changes: 2 additions & 0 deletions x-pack/metricbeat/module/gcp/metrics/metrics_requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ func getServiceLabelFor(serviceName string) string {
return gcp.CloudSQLResourceLabel
case gcp.ServiceRedis:
return gcp.RedisResourceLabel
case gcp.ServiceAIPlatform:
return gcp.AIPlatformResourceLabel
default:
return gcp.DefaultResourceLabel
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ func TestIsAGlobalService(t *testing.T) {
{"Dataproc service", gcp.ServiceDataproc, false},
{"CloudSQL service", gcp.ServiceCloudSQL, false},
{"Redis service", gcp.ServiceRedis, false},
{"AIPlatform service", gcp.ServiceAIPlatform, false},
}
for _, c := range cases {
t.Run(c.title, func(t *testing.T) {
Expand Down Expand Up @@ -249,6 +250,7 @@ func TestGetServiceLabelFor(t *testing.T) {
{"Dataproc service", gcp.ServiceDataproc, "resource.label.region"},
{"CloudSQL service", gcp.ServiceCloudSQL, "resource.labels.region"},
{"Redis service", gcp.ServiceRedis, "resource.label.region"},
{"AIPlatform service", gcp.ServiceAIPlatform, "resource.label.location"},
}

for _, c := range cases {
Expand Down

0 comments on commit 59add13

Please sign in to comment.