diff --git a/internal/beater/beater.go b/internal/beater/beater.go index 31d997d2168..1aa253837da 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -31,10 +31,7 @@ import ( "github.com/hashicorp/go-multierror" "github.com/pkg/errors" "go.elastic.co/apm/module/apmgrpc/v2" - "go.elastic.co/apm/module/apmotel/v2" "go.elastic.co/apm/v2" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/sdk/metric" "go.uber.org/zap" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" @@ -43,7 +40,6 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" - "github.com/elastic/beats/v7/libbeat/instrumentation" "github.com/elastic/beats/v7/libbeat/licenser" "github.com/elastic/beats/v7/libbeat/outputs" esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" @@ -65,6 +61,7 @@ import ( "github.com/elastic/apm-server/internal/beater/ratelimit" "github.com/elastic/apm-server/internal/elasticsearch" "github.com/elastic/apm-server/internal/idxmgmt" + "github.com/elastic/apm-server/internal/instrumentation" "github.com/elastic/apm-server/internal/kibana" srvmodelprocessor "github.com/elastic/apm-server/internal/model/modelprocessor" "github.com/elastic/apm-server/internal/publish" @@ -296,39 +293,20 @@ func (s *Runner) Run(ctx context.Context) error { } } - instrumentation, err := instrumentation.New(s.rawConfig, "apm-server", version.Version) - if err != nil { - return err - } - tracer := instrumentation.Tracer() - tracerServerListener := instrumentation.Listener() - if tracerServerListener != nil { - defer tracerServerListener.Close() - } - defer tracer.Close() - - tracerProvider, err := apmotel.NewTracerProvider(apmotel.WithAPMTracer(tracer)) - if err != nil { - return err - } - otel.SetTracerProvider(tracerProvider) - - exporter, err := apmotel.NewGatherer() + provider, err := instrumentation.New( + instrumentation.WithBaseCfg(s.rawConfig), + instrumentation.IsManaged(inElasticCloud), + ) if err != nil { - return err + return fmt.Errorf("failed to create tracing provider: %w", err) } - meterProvider := metric.NewMeterProvider( - metric.WithReader(exporter), - ) - otel.SetMeterProvider(meterProvider) - tracer.RegisterMetricsGatherer(exporter) // Ensure the libbeat output and go-elasticsearch clients do not index // any events to Elasticsearch before the integration is ready. publishReady := make(chan struct{}) drain := make(chan struct{}) g.Go(func() error { - if err := s.waitReady(ctx, kibanaClient, tracer); err != nil { + if err := s.waitReady(ctx, kibanaClient, provider); err != nil { // One or more preconditions failed; drop events. close(drain) return errors.Wrap(err, "error waiting for server to be ready") @@ -370,7 +348,7 @@ func (s *Runner) Run(ctx context.Context) error { fetcher, cancel, err := newSourcemapFetcher( s.config.RumConfig.SourceMapping, kibanaClient, newElasticsearchClient, - tracer, + provider.Tracer(), ) if err != nil { return err @@ -400,7 +378,7 @@ func (s *Runner) Run(ctx context.Context) error { // even if TLS is enabled, as TLS is handled by the net/http server. gRPCLogger := s.logger.Named("grpc") grpcServer := grpc.NewServer(grpc.ChainUnaryInterceptor( - apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(tracer)), + apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(provider.Tracer())), interceptors.ClientMetadata(), interceptors.Logging(gRPCLogger), interceptors.Metrics(gRPCLogger, nil), @@ -412,7 +390,7 @@ func (s *Runner) Run(ctx context.Context) error { // Create the BatchProcessor chain that is used to process all events, // including the metrics aggregated by APM Server. finalBatchProcessor, closeFinalBatchProcessor, err := s.newFinalBatchProcessor( - tracer, newElasticsearchClient, memLimitGB, + provider, newElasticsearchClient, memLimitGB, ) if err != nil { return err @@ -442,7 +420,7 @@ func (s *Runner) Run(ctx context.Context) error { s.config, kibanaClient, newElasticsearchClient, - tracer, + provider.Tracer(), ) if err != nil { return err @@ -467,7 +445,7 @@ func (s *Runner) Run(ctx context.Context) error { Config: s.config, Namespace: s.config.DataStreams.Namespace, Logger: s.logger, - Tracer: tracer, + Tracer: provider.Tracer(), Authenticator: authenticator, RateLimitStore: ratelimitStore, BatchProcessor: batchProcessor, @@ -519,6 +497,8 @@ func (s *Runner) Run(ctx context.Context) error { g.Go(func() error { return runServer(ctx, serverParams) }) + + tracerServerListener := provider.Listener() if tracerServerListener != nil { tracerServer, err := newTracerServer(s.config, tracerServerListener, s.logger, serverParams.BatchProcessor, serverParams.Semaphore) if err != nil { @@ -568,7 +548,7 @@ func linearScaledValue(perGBIncrement, memLimitGB, constant float64) int { func (s *Runner) waitReady( ctx context.Context, kibanaClient *kibana.Client, - tracer *apm.Tracer, + provider *instrumentation.Provider, ) error { var preconditions []func(context.Context) error var esOutputClient *elasticsearch.Client @@ -641,14 +621,14 @@ func (s *Runner) waitReady( } return nil } - return waitReady(ctx, s.config.WaitReadyInterval, tracer, s.logger, check) + return waitReady(ctx, s.config.WaitReadyInterval, provider, s.logger, check) } // newFinalBatchProcessor returns the final model.BatchProcessor that publishes events, // and a cleanup function which should be called on server shutdown. If the output is // "elasticsearch", then we use docappender; otherwise we use the libbeat publisher. func (s *Runner) newFinalBatchProcessor( - tracer *apm.Tracer, + provider *instrumentation.Provider, newElasticsearchClient func(cfg *elasticsearch.Config) (*elasticsearch.Client, error), memLimit float64, ) (modelpb.BatchProcessor, func(context.Context) error, error) { @@ -656,7 +636,7 @@ func (s *Runner) newFinalBatchProcessor( monitoring.Default.Remove("libbeat") libbeatMonitoringRegistry := monitoring.Default.NewRegistry("libbeat") if s.elasticsearchOutputConfig == nil { - return s.newLibbeatFinalBatchProcessor(tracer, libbeatMonitoringRegistry) + return s.newLibbeatFinalBatchProcessor(provider, libbeatMonitoringRegistry) } stateRegistry := monitoring.GetNamespace("state").GetRegistry() @@ -708,7 +688,7 @@ func (s *Runner) newFinalBatchProcessor( CompressionLevel: esConfig.CompressionLevel, FlushBytes: flushBytes, FlushInterval: esConfig.FlushInterval, - Tracer: tracer, + Tracer: provider.Tracer(), MaxRequests: esConfig.MaxRequests, Scaling: scalingCfg, Logger: zap.New(s.logger.Core(), zap.WithCaller(true)), @@ -807,7 +787,7 @@ func docappenderConfig( } func (s *Runner) newLibbeatFinalBatchProcessor( - tracer *apm.Tracer, + provider *instrumentation.Provider, libbeatMonitoringRegistry *monitoring.Registry, ) (modelpb.BatchProcessor, func(context.Context) error, error) { // When the publisher stops cleanly it will close its pipeline client, @@ -830,7 +810,7 @@ func (s *Runner) newLibbeatFinalBatchProcessor( Metrics: libbeatMonitoringRegistry, Telemetry: stateRegistry, Logger: logp.L().Named("publisher"), - Tracer: tracer, + Tracer: provider.Tracer(), } outputFactory := func(stats outputs.Observer) (string, outputs.Group, error) { if !s.outputConfig.IsSet() { @@ -850,7 +830,7 @@ func (s *Runner) newLibbeatFinalBatchProcessor( return nil, nil, fmt.Errorf("failed to create libbeat output pipeline: %w", err) } pipelineConnector := pipetool.WithACKer(pipeline, acker) - publisher, err := publish.NewPublisher(pipelineConnector, tracer) + publisher, err := publish.NewPublisher(pipelineConnector) if err != nil { return nil, nil, err } diff --git a/internal/beater/waitready.go b/internal/beater/waitready.go index 7bfaa6b04fd..8102c4e4adf 100644 --- a/internal/beater/waitready.go +++ b/internal/beater/waitready.go @@ -25,6 +25,7 @@ import ( "go.elastic.co/apm/v2" + "github.com/elastic/apm-server/internal/instrumentation" "github.com/elastic/elastic-agent-libs/logp" ) @@ -33,11 +34,12 @@ import ( func waitReady( ctx context.Context, interval time.Duration, - tracer *apm.Tracer, + provider *instrumentation.Provider, logger *logp.Logger, check func(context.Context) error, ) error { logger.Info("blocking ingestion until all preconditions are satisfied") + tracer := provider.Tracer() tx := tracer.StartTransaction("wait_for_preconditions", "init") defer tx.End() ctx = apm.ContextWithTransaction(ctx, tx) diff --git a/internal/instrumentation/config.go b/internal/instrumentation/config.go new file mode 100644 index 00000000000..92df0194043 --- /dev/null +++ b/internal/instrumentation/config.go @@ -0,0 +1,39 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package instrumentation + +import "github.com/elastic/elastic-agent-libs/config" + +type cfg struct { + isManaged bool + baseCfg *config.C +} + +type Option func(*cfg) + +func WithBaseCfg(baseCfg *config.C) Option { + return func(c *cfg) { + c.baseCfg = baseCfg + } +} + +func IsManaged(t bool) Option { + return func(c *cfg) { + c.isManaged = t + } +} diff --git a/internal/instrumentation/provider.go b/internal/instrumentation/provider.go new file mode 100644 index 00000000000..bbee460823a --- /dev/null +++ b/internal/instrumentation/provider.go @@ -0,0 +1,161 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package instrumentation + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "os" + "sync/atomic" + + "go.elastic.co/apm/module/apmotel/v2" + "go.elastic.co/apm/v2" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/metric" + "google.golang.org/protobuf/encoding/protojson" + + "github.com/elastic/apm-server/internal/version" + "github.com/elastic/beats/v7/libbeat/instrumentation" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-libs/config" +) + +type Provider struct { + tracer atomic.Value + cancel context.CancelFunc + listener net.Listener +} + +func New(opts ...Option) (*Provider, error) { + cfg := &cfg{} + for _, opt := range opts { + opt(cfg) + } + + p := &Provider{} + + if !cfg.isManaged && false { + if err := p.updateTracer(cfg.baseCfg); err != nil { + return nil, fmt.Errorf("failed to update tracer: %w", err) + } + + return p, nil + } + + ver := client.VersionInfo{ + Name: "apm-server", + Version: version.Version, + //BuildHash: version.CommitHash(), + } + c, _, err := client.NewV2FromReader(os.Stdin, ver) + if err != nil { + return nil, fmt.Errorf("failed to create new v2 client: %w", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + + if err := c.Start(ctx); err != nil { + cancel() + return nil, fmt.Errorf("failed to start agent v2 client: %w", err) + } + + p.cancel = cancel + + go func() { + for { + select { + case <-ctx.Done(): + return + case change := <-c.UnitChanges(): + if change.Triggers == client.TriggeredAPMChange { + apmConfig := change.Unit.Expected().APMConfig.Elastic + bb, err := protojson.Marshal(apmConfig) + if err != nil { + panic(err) + } + var m map[string]any + if err := json.Unmarshal(bb, &m); err != nil { + panic(err) + } + newCfg, err := config.NewConfigFrom(m) + if err != nil { + panic(err) + } + + if err := p.updateTracer(newCfg); err != nil { + panic(err) + } + } + case err := <-c.Errors(): + if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, io.EOF) { + fmt.Fprintf(os.Stderr, "GRPC client error: %+v\n", err) + } + } + } + }() + + return p, nil +} + +func (p *Provider) updateTracer(rawConfig *config.C) error { + instrumentation, err := instrumentation.New(rawConfig, "apm-server", version.Version) + if err != nil { + return fmt.Errorf("failed to create instrumentation: %w", err) + } + + tracer := instrumentation.Tracer() + p.listener = instrumentation.Listener() + + tracerProvider, err := apmotel.NewTracerProvider(apmotel.WithAPMTracer(tracer)) + if err != nil { + return fmt.Errorf("failed to create trace provider: %w", err) + } + otel.SetTracerProvider(tracerProvider) + + exporter, err := apmotel.NewGatherer() + if err != nil { + return fmt.Errorf("failed to create gatherer: %w", err) + } + meterProvider := metric.NewMeterProvider( + metric.WithReader(exporter), + ) + otel.SetMeterProvider(meterProvider) + tracer.RegisterMetricsGatherer(exporter) + + p.tracer.Store(tracer) + + return nil +} + +func (p *Provider) Tracer() *apm.Tracer { + return p.tracer.Load().(*apm.Tracer) +} + +func (p *Provider) Listener() net.Listener { + return p.listener +} + +func (p *Provider) Close() error { + p.Tracer().Close() + p.cancel() + return nil +} diff --git a/internal/publish/pub.go b/internal/publish/pub.go index 5f9632d9fb3..c781f16dddb 100644 --- a/internal/publish/pub.go +++ b/internal/publish/pub.go @@ -25,7 +25,6 @@ import ( "time" "github.com/pkg/errors" - "go.elastic.co/apm/v2" "go.elastic.co/fastjson" "github.com/elastic/beats/v7/libbeat/beat" @@ -47,7 +46,6 @@ type Reporter func(context.Context, PendingReq) error // concurrent HTTP requests trying to publish at the same time is limited. type Publisher struct { stopped chan struct{} - tracer *apm.Tracer client beat.Client mu sync.RWMutex @@ -73,10 +71,9 @@ var ( // // GOMAXPROCS goroutines are started for forwarding events to libbeat. // Stop must be called to close the beat.Client and free resources. -func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer) (*Publisher, error) { +func NewPublisher(pipeline beat.Pipeline) (*Publisher, error) { processingCfg := beat.ProcessingConfig{} p := &Publisher{ - tracer: tracer, stopped: make(chan struct{}), // One request will be actively processed by the diff --git a/internal/publish/pub_test.go b/internal/publish/pub_test.go index 37c1be50675..f2a1fa63e8d 100644 --- a/internal/publish/pub_test.go +++ b/internal/publish/pub_test.go @@ -31,8 +31,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.elastic.co/apm/v2/apmtest" - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/idxmgmt" "github.com/elastic/beats/v7/libbeat/outputs" @@ -52,7 +50,7 @@ func TestPublisherStop(t *testing.T) { // Create a pipeline with a limited queue size and no outputs, // so we can simulate a pipeline that blocks indefinitely. pipeline, client := newBlockingPipeline(t) - publisher, err := publish.NewPublisher(pipeline, apmtest.DiscardTracer) + publisher, err := publish.NewPublisher(pipeline) require.NoError(t, err) defer func() { cancelledContext, cancel := context.WithCancel(context.Background()) @@ -88,7 +86,7 @@ func TestPublisherStop(t *testing.T) { func TestPublisherStopShutdownInactive(t *testing.T) { pipeline, _ := newBlockingPipeline(t) - publisher, err := publish.NewPublisher(pipeline, apmtest.DiscardTracer) + publisher, err := publish.NewPublisher(pipeline) require.NoError(t, err) // There are no active events, so the publisher should stop immediately @@ -153,7 +151,6 @@ func BenchmarkPublisher(b *testing.B) { acker.Open() publisher, err := publish.NewPublisher( pipetool.WithACKer(pipeline, acker), - apmtest.DiscardTracer, ) require.NoError(b, err)