diff --git a/.chloggen/telemetrygen-public-api.yaml b/.chloggen/telemetrygen-public-api.yaml new file mode 100644 index 000000000000..c68c46db5ea9 --- /dev/null +++ b/.chloggen/telemetrygen-public-api.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: telemetrygen + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Exported the API for telemetrygen for test uses. Additionally added new E2E tests and fixed race condition + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36984] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/cmd/telemetrygen/config.go b/cmd/telemetrygen/config.go index 80aaaaa1e5d4..d29fbe2ec992 100644 --- a/cmd/telemetrygen/config.go +++ b/cmd/telemetrygen/config.go @@ -12,10 +12,10 @@ import ( "github.com/spf13/cobra" - "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/logs" "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/metrics" - "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/traces" + "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/pkg/logs" + "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/pkg/metrics" + "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/pkg/traces" ) var ( @@ -64,13 +64,13 @@ var logsCmd = &cobra.Command{ func init() { rootCmd.AddCommand(tracesCmd, metricsCmd, logsCmd) - tracesCfg = new(traces.Config) + tracesCfg = traces.NewConfig() tracesCfg.Flags(tracesCmd.Flags()) - metricsCfg = new(metrics.Config) + metricsCfg = metrics.NewConfig() metricsCfg.Flags(metricsCmd.Flags()) - logsCfg = new(logs.Config) + logsCfg = logs.NewConfig() logsCfg.Flags(logsCmd.Flags()) // Disabling completion command for end user @@ -81,7 +81,7 @@ func init() { // Execute tries to run the input command func Execute() { if err := rootCmd.Execute(); err != nil { - // TODO: Uncomment the line below when using Run instead of RunE in the xxxCmd functions + // TODO: Uncomment the line below when using run instead of RunE in the xxxCmd functions // fmt.Fprintln(os.Stderr, err) os.Exit(1) } diff --git a/cmd/telemetrygen/internal/common/config.go b/cmd/telemetrygen/internal/common/config.go index c6592b6fa582..44ca9aa936d8 100644 --- a/cmd/telemetrygen/internal/common/config.go +++ b/cmd/telemetrygen/internal/common/config.go @@ -148,40 +148,59 @@ func (c *Config) GetHeaders() map[string]string { // CommonFlags registers common config flags. func (c *Config) CommonFlags(fs *pflag.FlagSet) { - fs.IntVar(&c.WorkerCount, "workers", 1, "Number of workers (goroutines) to run") - fs.Float64Var(&c.Rate, "rate", 0, "Approximately how many metrics/spans/logs per second each worker should generate. Zero means no throttling.") - fs.DurationVar(&c.TotalDuration, "duration", 0, "For how long to run the test") - fs.DurationVar(&c.ReportingInterval, "interval", 1*time.Second, "Reporting interval") + fs.IntVar(&c.WorkerCount, "workers", c.WorkerCount, "Number of workers (goroutines) to run") + fs.Float64Var(&c.Rate, "rate", c.Rate, "Approximately how many metrics/spans/logs per second each worker should generate. Zero means no throttling.") + fs.DurationVar(&c.TotalDuration, "duration", c.TotalDuration, "For how long to run the test") + fs.DurationVar(&c.ReportingInterval, "interval", c.ReportingInterval, "Reporting interval") - fs.StringVar(&c.CustomEndpoint, "otlp-endpoint", "", "Destination endpoint for exporting logs, metrics and traces") - fs.BoolVar(&c.Insecure, "otlp-insecure", false, "Whether to enable client transport security for the exporter's grpc or http connection") - fs.BoolVar(&c.InsecureSkipVerify, "otlp-insecure-skip-verify", false, "Whether a client verifies the server's certificate chain and host name") - fs.BoolVar(&c.UseHTTP, "otlp-http", false, "Whether to use HTTP exporter rather than a gRPC one") + fs.StringVar(&c.CustomEndpoint, "otlp-endpoint", c.CustomEndpoint, "Destination endpoint for exporting logs, metrics and traces") + fs.BoolVar(&c.Insecure, "otlp-insecure", c.Insecure, "Whether to enable client transport security for the exporter's grpc or http connection") + fs.BoolVar(&c.InsecureSkipVerify, "otlp-insecure-skip-verify", c.InsecureSkipVerify, "Whether a client verifies the server's certificate chain and host name") + fs.BoolVar(&c.UseHTTP, "otlp-http", c.UseHTTP, "Whether to use HTTP exporter rather than a gRPC one") // custom headers - c.Headers = make(KeyValue) fs.Var(&c.Headers, "otlp-header", "Custom header to be passed along with each OTLP request. The value is expected in the format key=\"value\". "+ "Note you may need to escape the quotes when using the tool from a cli. "+ `Flag may be repeated to set multiple headers (e.g --otlp-header key1=\"value1\" --otlp-header key2=\"value2\")`) // custom resource attributes - c.ResourceAttributes = make(KeyValue) fs.Var(&c.ResourceAttributes, "otlp-attributes", "Custom resource attributes to use. The value is expected in the format key=\"value\". "+ "You can use key=true or key=false. to set boolean attribute."+ "Note you may need to escape the quotes when using the tool from a cli. "+ `Flag may be repeated to set multiple attributes (e.g --otlp-attributes key1=\"value1\" --otlp-attributes key2=\"value2\" --telemetry-attributes key3=true)`) - c.TelemetryAttributes = make(KeyValue) fs.Var(&c.TelemetryAttributes, "telemetry-attributes", "Custom telemetry attributes to use. The value is expected in the format key=\"value\". "+ "You can use key=true or key=false. to set boolean attribute."+ "Note you may need to escape the quotes when using the tool from a cli. "+ `Flag may be repeated to set multiple attributes (e.g --telemetry-attributes key1=\"value1\" --telemetry-attributes key2=\"value2\" --telemetry-attributes key3=true)`) // TLS CA configuration - fs.StringVar(&c.CaFile, "ca-cert", "", "Trusted Certificate Authority to verify server certificate") + fs.StringVar(&c.CaFile, "ca-cert", c.CaFile, "Trusted Certificate Authority to verify server certificate") // mTLS configuration - fs.BoolVar(&c.ClientAuth.Enabled, "mtls", false, "Whether to require client authentication for mTLS") - fs.StringVar(&c.ClientAuth.ClientCertFile, "client-cert", "", "Client certificate file") - fs.StringVar(&c.ClientAuth.ClientKeyFile, "client-key", "", "Client private key file") + fs.BoolVar(&c.ClientAuth.Enabled, "mtls", c.ClientAuth.Enabled, "Whether to require client authentication for mTLS") + fs.StringVar(&c.ClientAuth.ClientCertFile, "client-cert", c.ClientAuth.ClientCertFile, "Client certificate file") + fs.StringVar(&c.ClientAuth.ClientKeyFile, "client-key", c.ClientAuth.ClientKeyFile, "Client private key file") +} + +// SetDefaults is here to mirror the defaults for flags above, +// This allows for us to have a single place to change the defaults +// while exposing the API for use. +func (c *Config) SetDefaults() { + c.WorkerCount = 1 + c.Rate = 0 + c.TotalDuration = 0 + c.ReportingInterval = 1 * time.Second + c.CustomEndpoint = "" + c.Insecure = false + c.InsecureSkipVerify = false + c.UseHTTP = false + c.HTTPPath = "" + c.Headers = make(KeyValue) + c.ResourceAttributes = make(KeyValue) + c.TelemetryAttributes = make(KeyValue) + c.CaFile = "" + c.ClientAuth.Enabled = false + c.ClientAuth.ClientCertFile = "" + c.ClientAuth.ClientKeyFile = "" } diff --git a/cmd/telemetrygen/internal/e2etest/go.mod b/cmd/telemetrygen/internal/e2etest/go.mod index 0583717d93c4..c15474a39490 100644 --- a/cmd/telemetrygen/internal/e2etest/go.mod +++ b/cmd/telemetrygen/internal/e2etest/go.mod @@ -65,14 +65,21 @@ require ( go.opentelemetry.io/collector/pipeline v0.119.0 // indirect go.opentelemetry.io/collector/receiver v0.119.0 // indirect go.opentelemetry.io/collector/receiver/xreceiver v0.119.0 // indirect + go.opentelemetry.io/collector/semconv v0.119.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect go.opentelemetry.io/otel v1.34.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.10.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.10.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.34.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.34.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0 // indirect + go.opentelemetry.io/otel/log v0.10.0 // indirect go.opentelemetry.io/otel/metric v1.34.0 // indirect go.opentelemetry.io/otel/sdk v1.34.0 // indirect + go.opentelemetry.io/otel/sdk/log v0.10.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect go.opentelemetry.io/otel/trace v1.34.0 // indirect go.opentelemetry.io/proto/otlp v1.5.0 // indirect diff --git a/cmd/telemetrygen/internal/e2etest/go.sum b/cmd/telemetrygen/internal/e2etest/go.sum index 440a48cb41d1..531f614caf68 100644 --- a/cmd/telemetrygen/internal/e2etest/go.sum +++ b/cmd/telemetrygen/internal/e2etest/go.sum @@ -136,22 +136,36 @@ go.opentelemetry.io/collector/receiver/receivertest v0.119.0 h1:thZkyftPCNit/m2b go.opentelemetry.io/collector/receiver/receivertest v0.119.0/go.mod h1:DZM70vofnquGkQiTfT5ZSFZlohxANl9XOrVq9h5IKnc= go.opentelemetry.io/collector/receiver/xreceiver v0.119.0 h1:ZcTO+h+r9TyR1XgMhA7FTSTV9RF+z/IDPrcRIg1l56U= go.opentelemetry.io/collector/receiver/xreceiver v0.119.0/go.mod h1:AkoWhnYFMygK7Tlzez398ti20NqydX8wxPVWU86+baE= +go.opentelemetry.io/collector/semconv v0.119.0 h1:xo+V3a7hnK0I6fxAWCXT8BIT1PCBYd4emolhoKSDUlI= +go.opentelemetry.io/collector/semconv v0.119.0/go.mod h1:N6XE8Q0JKgBN2fAhkUQtqK9LT7rEGR6+Wu/Rtbal1iI= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0 h1:rgMkmiGfix9vFJDcDi1PK8WEQP4FLQwLDfhp5ZLpFeE= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0/go.mod h1:ijPqXp5P6IRRByFVVg9DY8P5HkxkHE5ARIa+86aXPf4= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 h1:CV7UdSGJt/Ao6Gp4CXckLxVRRsRgDHoI8XjbL3PDl8s= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0/go.mod h1:FRmFuRJfag1IZ2dPkHnEoSFVgTVPUd2qf5Vi69hLb8I= go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.10.0 h1:5dTKu4I5Dn4P2hxyW3l3jTaZx9ACgg0ECos1eAVrheY= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.10.0/go.mod h1:P5HcUI8obLrCCmM3sbVBohZFH34iszk/+CPWuakZWL8= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.10.0 h1:q/heq5Zh8xV1+7GoMGJpTxM2Lhq5+bFxB29tshuRuw0= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.10.0/go.mod h1:leO2CSTg0Y+LyvmR7Wm4pUxE8KAmaM2GCVx7O+RATLA= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.34.0 h1:ajl4QczuJVA2TU9W9AGw++86Xga/RKt//16z/yxPgdk= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.34.0/go.mod h1:Vn3/rlOJ3ntf/Q3zAI0V5lDnTbHGaUsNUeF6nZmm7pA= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.34.0 h1:opwv08VbCZ8iecIWs+McMdHRcAXzjAeda3uG2kI/hcA= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.34.0/go.mod h1:oOP3ABpW7vFHulLpE8aYtNBodrHhMTrvfxUXGvqm7Ac= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 h1:OeNbIYk/2C15ckl7glBlOBp5+WlYsOElzTNmiPW/x60= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0/go.mod h1:7Bept48yIeqxP2OZ9/AqIpYS94h2or0aB4FypJTc8ZM= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0 h1:tgJ0uaNS4c98WRNUEx5U3aDlrDOI5Rs+1Vifcw4DJ8U= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0/go.mod h1:U7HYyW0zt/a9x5J1Kjs+r1f/d4ZHnYFclhYY2+YbeoE= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0 h1:BEj3SPM81McUZHYjRS5pEgNgnmzGJ5tRpU5krWnV8Bs= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0/go.mod h1:9cKLGBDzI/F3NoHLQGm4ZrYdIHsvGt6ej6hUowxY0J4= +go.opentelemetry.io/otel/log v0.10.0 h1:1CXmspaRITvFcjA4kyVszuG4HjA61fPDxMb7q3BuyF0= +go.opentelemetry.io/otel/log v0.10.0/go.mod h1:PbVdm9bXKku/gL0oFfUF4wwsQsOPlpo4VEqjvxih+FM= go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU= +go.opentelemetry.io/otel/sdk/log v0.10.0 h1:lR4teQGWfeDVGoute6l0Ou+RpFqQ9vaPdrNJlST0bvw= +go.opentelemetry.io/otel/sdk/log v0.10.0/go.mod h1:A+V1UTWREhWAittaQEG4bYm4gAZa6xnvVu+xKrIRkzo= go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk= go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= diff --git a/cmd/telemetrygen/internal/e2etest/logs_test.go b/cmd/telemetrygen/internal/e2etest/logs_test.go new file mode 100644 index 000000000000..ffbefb45177e --- /dev/null +++ b/cmd/telemetrygen/internal/e2etest/logs_test.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package e2etest + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver/otlpreceiver" + "go.opentelemetry.io/collector/receiver/receivertest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/pkg/logs" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" +) + +func TestGenerateLogs(t *testing.T) { + f := otlpreceiver.NewFactory() + sink := &consumertest.LogsSink{} + rCfg := f.CreateDefaultConfig() + endpoint := testutil.GetAvailableLocalAddress(t) + rCfg.(*otlpreceiver.Config).GRPC.NetAddr.Endpoint = endpoint + r, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(), rCfg, sink) + require.NoError(t, err) + err = r.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + defer func() { + require.NoError(t, r.Shutdown(context.Background())) + }() + cfg := logs.NewConfig() + cfg.WorkerCount = 10 + cfg.Rate = 10 + cfg.TotalDuration = 10 * time.Second + cfg.ReportingInterval = 10 + cfg.CustomEndpoint = endpoint + cfg.Insecure = true + cfg.SkipSettingGRPCLogger = true + cfg.NumLogs = 6000 + go func() { + err = logs.Start(cfg) + assert.NoError(t, err) + }() + require.Eventually(t, func() bool { + return len(sink.AllLogs()) > 0 + }, 10*time.Second, 100*time.Millisecond) +} diff --git a/cmd/telemetrygen/internal/e2etest/metrics_test.go b/cmd/telemetrygen/internal/e2etest/metrics_test.go new file mode 100644 index 000000000000..73e1c6102543 --- /dev/null +++ b/cmd/telemetrygen/internal/e2etest/metrics_test.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package e2etest + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver/otlpreceiver" + "go.opentelemetry.io/collector/receiver/receivertest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/pkg/metrics" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" +) + +func TestGenerateMetrics(t *testing.T) { + f := otlpreceiver.NewFactory() + sink := &consumertest.MetricsSink{} + rCfg := f.CreateDefaultConfig() + endpoint := testutil.GetAvailableLocalAddress(t) + rCfg.(*otlpreceiver.Config).GRPC.NetAddr.Endpoint = endpoint + r, err := f.CreateMetrics(context.Background(), receivertest.NewNopSettings(), rCfg, sink) + require.NoError(t, err) + err = r.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + defer func() { + require.NoError(t, r.Shutdown(context.Background())) + }() + cfg := metrics.NewConfig() + cfg.WorkerCount = 10 + cfg.Rate = 10 + cfg.TotalDuration = 10 * time.Second + cfg.ReportingInterval = 10 + cfg.CustomEndpoint = endpoint + cfg.Insecure = true + cfg.SkipSettingGRPCLogger = true + cfg.NumMetrics = 6000 + go func() { + err = metrics.Start(cfg) + assert.NoError(t, err) + }() + require.Eventually(t, func() bool { + return len(sink.AllMetrics()) > 0 + }, 10*time.Second, 100*time.Millisecond) +} diff --git a/cmd/telemetrygen/internal/e2etest/e2e_test.go b/cmd/telemetrygen/internal/e2etest/traces_test.go similarity index 68% rename from cmd/telemetrygen/internal/e2etest/e2e_test.go rename to cmd/telemetrygen/internal/e2etest/traces_test.go index 0d76261a9350..4b7e18deefba 100644 --- a/cmd/telemetrygen/internal/e2etest/e2e_test.go +++ b/cmd/telemetrygen/internal/e2etest/traces_test.go @@ -15,8 +15,7 @@ import ( "go.opentelemetry.io/collector/receiver/otlpreceiver" "go.opentelemetry.io/collector/receiver/receivertest" - "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/common" - "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/traces" + "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/pkg/traces" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" ) @@ -33,25 +32,15 @@ func TestGenerateTraces(t *testing.T) { defer func() { require.NoError(t, r.Shutdown(context.Background())) }() - cfg := &traces.Config{ - Config: common.Config{ - WorkerCount: 10, - Rate: 10, - TotalDuration: 10 * time.Second, - ReportingInterval: 10, - CustomEndpoint: endpoint, - Insecure: true, - UseHTTP: false, - Headers: nil, - ResourceAttributes: nil, - SkipSettingGRPCLogger: true, - }, - NumTraces: 6000, - ServiceName: "foo", - StatusCode: "0", - LoadSize: 0, - Batch: true, - } + cfg := traces.NewConfig() + cfg.WorkerCount = 10 + cfg.Rate = 10 + cfg.TotalDuration = 10 * time.Second + cfg.ReportingInterval = 10 + cfg.CustomEndpoint = endpoint + cfg.Insecure = true + cfg.SkipSettingGRPCLogger = true + cfg.NumTraces = 6000 go func() { err = traces.Start(cfg) assert.NoError(t, err) diff --git a/cmd/telemetrygen/internal/logs/config.go b/cmd/telemetrygen/internal/logs/config.go deleted file mode 100644 index dd17973bb66d..000000000000 --- a/cmd/telemetrygen/internal/logs/config.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package logs - -import ( - "fmt" - - "github.com/spf13/pflag" - - "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/common" -) - -// Config describes the test scenario. -type Config struct { - common.Config - NumLogs int - Body string - SeverityText string - SeverityNumber int32 - TraceID string - SpanID string -} - -// Flags registers config flags. -func (c *Config) Flags(fs *pflag.FlagSet) { - c.CommonFlags(fs) - - fs.StringVar(&c.HTTPPath, "otlp-http-url-path", "/v1/logs", "Which URL path to write to") - - fs.IntVar(&c.NumLogs, "logs", 1, "Number of logs to generate in each worker (ignored if duration is provided)") - fs.StringVar(&c.Body, "body", "the message", "Body of the log") - fs.StringVar(&c.SeverityText, "severity-text", "Info", "Severity text of the log") - fs.Int32Var(&c.SeverityNumber, "severity-number", 9, "Severity number of the log, range from 1 to 24 (inclusive)") - fs.StringVar(&c.TraceID, "trace-id", "", "TraceID of the log") - fs.StringVar(&c.SpanID, "span-id", "", "SpanID of the log") -} - -// Validate validates the test scenario parameters. -func (c *Config) Validate() error { - if c.TotalDuration <= 0 && c.NumLogs <= 0 { - return fmt.Errorf("either `logs` or `duration` must be greater than 0") - } - - if c.TraceID != "" { - if err := common.ValidateTraceID(c.TraceID); err != nil { - return err - } - } - - if c.SpanID != "" { - if err := common.ValidateSpanID(c.SpanID); err != nil { - return err - } - } - - return nil -} diff --git a/cmd/telemetrygen/internal/traces/config.go b/cmd/telemetrygen/internal/traces/config.go deleted file mode 100644 index 5bb63dffbd9a..000000000000 --- a/cmd/telemetrygen/internal/traces/config.go +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package traces - -import ( - "fmt" - "time" - - "github.com/spf13/pflag" - - "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/common" -) - -// Config describes the test scenario. -type Config struct { - common.Config - NumTraces int - NumChildSpans int - PropagateContext bool - ServiceName string - StatusCode string - Batch bool - LoadSize int - - SpanDuration time.Duration -} - -// Flags registers config flags. -func (c *Config) Flags(fs *pflag.FlagSet) { - c.CommonFlags(fs) - - fs.StringVar(&c.HTTPPath, "otlp-http-url-path", "/v1/traces", "Which URL path to write to") - - fs.IntVar(&c.NumTraces, "traces", 1, "Number of traces to generate in each worker (ignored if duration is provided)") - fs.IntVar(&c.NumChildSpans, "child-spans", 1, "Number of child spans to generate for each trace") - fs.BoolVar(&c.PropagateContext, "marshal", false, "Whether to marshal trace context via HTTP headers") - fs.StringVar(&c.ServiceName, "service", "telemetrygen", "Service name to use") - fs.StringVar(&c.StatusCode, "status-code", "0", "Status code to use for the spans, one of (Unset, Error, Ok) or the equivalent integer (0,1,2)") - fs.BoolVar(&c.Batch, "batch", true, "Whether to batch traces") - fs.IntVar(&c.LoadSize, "size", 0, "Desired minimum size in MB of string data for each trace generated. This can be used to test traces with large payloads, i.e. when testing the OTLP receiver endpoint max receive size.") - fs.DurationVar(&c.SpanDuration, "span-duration", 123*time.Microsecond, "The duration of each generated span.") -} - -// Validate validates the test scenario parameters. -func (c *Config) Validate() error { - if c.TotalDuration <= 0 && c.NumTraces <= 0 { - return fmt.Errorf("either `traces` or `duration` must be greater than 0") - } - return nil -} diff --git a/cmd/telemetrygen/pkg/logs/config.go b/cmd/telemetrygen/pkg/logs/config.go new file mode 100644 index 000000000000..a624a9d16eb0 --- /dev/null +++ b/cmd/telemetrygen/pkg/logs/config.go @@ -0,0 +1,78 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package logs + +import ( + "fmt" + + "github.com/spf13/pflag" + + "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/common" +) + +// Config describes the test scenario. +type Config struct { + common.Config + NumLogs int + Body string + SeverityText string + SeverityNumber int32 + TraceID string + SpanID string +} + +func NewConfig() *Config { + cfg := &Config{} + cfg.SetDefaults() + return cfg +} + +// Flags registers config flags. +func (c *Config) Flags(fs *pflag.FlagSet) { + c.CommonFlags(fs) + + fs.StringVar(&c.HTTPPath, "otlp-http-url-path", c.HTTPPath, "Which URL path to write to") + + fs.IntVar(&c.NumLogs, "logs", c.NumLogs, "Number of logs to generate in each worker (ignored if duration is provided)") + fs.StringVar(&c.Body, "body", c.Body, "Body of the log") + fs.StringVar(&c.SeverityText, "severity-text", c.SeverityText, "Severity text of the log") + fs.Int32Var(&c.SeverityNumber, "severity-number", c.SeverityNumber, "Severity number of the log, range from 1 to 24 (inclusive)") + fs.StringVar(&c.TraceID, "trace-id", c.TraceID, "TraceID of the log") + fs.StringVar(&c.SpanID, "span-id", c.SpanID, "SpanID of the log") +} + +// SetDefaults sets the default values for the configuration +// This is called before parsing the command line flags and when +// calling NewConfig() +func (c *Config) SetDefaults() { + c.Config.SetDefaults() + c.HTTPPath = "/v1/logs" + c.NumLogs = 1 + c.Body = "the message" + c.SeverityText = "Info" + c.SeverityNumber = 9 + c.TraceID = "" + c.SpanID = "" +} + +// Validate validates the test scenario parameters. +func (c *Config) Validate() error { + if c.TotalDuration <= 0 && c.NumLogs <= 0 { + return fmt.Errorf("either `logs` or `duration` must be greater than 0") + } + + if c.TraceID != "" { + if err := common.ValidateTraceID(c.TraceID); err != nil { + return err + } + } + + if c.SpanID != "" { + if err := common.ValidateSpanID(c.SpanID); err != nil { + return err + } + } + + return nil +} diff --git a/cmd/telemetrygen/internal/logs/exporter.go b/cmd/telemetrygen/pkg/logs/exporter.go similarity index 100% rename from cmd/telemetrygen/internal/logs/exporter.go rename to cmd/telemetrygen/pkg/logs/exporter.go diff --git a/cmd/telemetrygen/internal/logs/logs.go b/cmd/telemetrygen/pkg/logs/logs.go similarity index 70% rename from cmd/telemetrygen/internal/logs/logs.go rename to cmd/telemetrygen/pkg/logs/logs.go index ff428fc93e1b..b96ffc41948f 100644 --- a/cmd/telemetrygen/internal/logs/logs.go +++ b/cmd/telemetrygen/pkg/logs/logs.go @@ -29,45 +29,18 @@ func Start(cfg *Config) error { if err != nil { return err } - expFunc := func() (sdklog.Exporter, error) { - var exp sdklog.Exporter - if cfg.UseHTTP { - var exporterOpts []otlploghttp.Option - - logger.Info("starting HTTP exporter") - exporterOpts, err = httpExporterOptions(cfg) - if err != nil { - return nil, err - } - exp, err = otlploghttp.New(context.Background(), exporterOpts...) - if err != nil { - return nil, fmt.Errorf("failed to obtain OTLP HTTP exporter: %w", err) - } - } else { - var exporterOpts []otlploggrpc.Option - logger.Info("starting gRPC exporter") - exporterOpts, err = grpcExporterOptions(cfg) - if err != nil { - return nil, err - } - exp, err = otlploggrpc.New(context.Background(), exporterOpts...) - if err != nil { - return nil, fmt.Errorf("failed to obtain OTLP gRPC exporter: %w", err) - } - } - return exp, err - } + logger.Info("starting the logs generator with configuration", zap.Any("config", cfg)) - if err = Run(cfg, expFunc, logger); err != nil { + if err = run(cfg, exporterFactory(cfg, logger), logger); err != nil { return err } return nil } -// Run executes the test scenario. -func Run(c *Config, exp func() (sdklog.Exporter, error), logger *zap.Logger) error { +// run executes the test scenario. +func run(c *Config, expF exporterFunc, logger *zap.Logger) error { if err := c.Validate(); err != nil { return err } @@ -111,7 +84,17 @@ func Run(c *Config, exp func() (sdklog.Exporter, error), logger *zap.Logger) err traceID: c.TraceID, spanID: c.SpanID, } - + exp, err := expF() + if err != nil { + w.logger.Error("failed to create the exporter", zap.Error(err)) + return err + } + defer func() { + w.logger.Info("stopping the exporter") + if tempError := exp.Shutdown(context.Background()); tempError != nil { + w.logger.Error("failed to stop the exporter", zap.Error(tempError)) + } + }() go w.simulateLogs(res, exp, c.GetTelemetryAttributes()) } if c.TotalDuration > 0 { @@ -122,6 +105,45 @@ func Run(c *Config, exp func() (sdklog.Exporter, error), logger *zap.Logger) err return nil } +type exporterFunc func() (sdklog.Exporter, error) + +func exporterFactory(cfg *Config, logger *zap.Logger) exporterFunc { + return func() (sdklog.Exporter, error) { + return createExporter(cfg, logger) + } +} + +func createExporter(cfg *Config, logger *zap.Logger) (sdklog.Exporter, error) { + var exp sdklog.Exporter + var err error + if cfg.UseHTTP { + var exporterOpts []otlploghttp.Option + + logger.Info("starting HTTP exporter") + exporterOpts, err = httpExporterOptions(cfg) + if err != nil { + return nil, err + } + exp, err = otlploghttp.New(context.Background(), exporterOpts...) + if err != nil { + return nil, fmt.Errorf("failed to obtain OTLP HTTP exporter: %w", err) + } + } else { + var exporterOpts []otlploggrpc.Option + + logger.Info("starting gRPC exporter") + exporterOpts, err = grpcExporterOptions(cfg) + if err != nil { + return nil, err + } + exp, err = otlploggrpc.New(context.Background(), exporterOpts...) + if err != nil { + return nil, fmt.Errorf("failed to obtain OTLP gRPC exporter: %w", err) + } + } + return exp, err +} + func parseSeverity(severityText string, severityNumber int32) (string, log.Severity, error) { sn := log.Severity(severityNumber) if sn < log.SeverityTrace1 || sn > log.SeverityFatal4 { diff --git a/cmd/telemetrygen/internal/logs/package_test.go b/cmd/telemetrygen/pkg/logs/package_test.go similarity index 100% rename from cmd/telemetrygen/internal/logs/package_test.go rename to cmd/telemetrygen/pkg/logs/package_test.go diff --git a/cmd/telemetrygen/internal/logs/worker.go b/cmd/telemetrygen/pkg/logs/worker.go similarity index 84% rename from cmd/telemetrygen/internal/logs/worker.go rename to cmd/telemetrygen/pkg/logs/worker.go index 26de676e8efe..a449f7912742 100644 --- a/cmd/telemetrygen/internal/logs/worker.go +++ b/cmd/telemetrygen/pkg/logs/worker.go @@ -35,23 +35,10 @@ type worker struct { spanID string // spanID string } -func (w worker) simulateLogs(res *resource.Resource, exporterFunc func() (sdklog.Exporter, error), telemetryAttributes []attribute.KeyValue) { +func (w worker) simulateLogs(res *resource.Resource, exporter sdklog.Exporter, telemetryAttributes []attribute.KeyValue) { limiter := rate.NewLimiter(w.limitPerSecond, 1) var i int64 - exporter, err := exporterFunc() - if err != nil { - w.logger.Error("failed to create the exporter", zap.Error(err)) - return - } - - defer func() { - w.logger.Info("stopping the exporter") - if tempError := exporter.Shutdown(context.Background()); tempError != nil { - w.logger.Error("failed to stop the exporter", zap.Error(tempError)) - } - }() - for w.running.Load() { var tid trace.TraceID var sid trace.SpanID diff --git a/cmd/telemetrygen/internal/logs/worker_test.go b/cmd/telemetrygen/pkg/logs/worker_test.go similarity index 93% rename from cmd/telemetrygen/internal/logs/worker_test.go rename to cmd/telemetrygen/pkg/logs/worker_test.go index 2d1095afe326..1ba828323edc 100644 --- a/cmd/telemetrygen/internal/logs/worker_test.go +++ b/cmd/telemetrygen/pkg/logs/worker_test.go @@ -58,7 +58,7 @@ func TestFixedNumberOfLogs(t *testing.T) { // test logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, expFunc, logger)) + require.NoError(t, run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) @@ -82,7 +82,7 @@ func TestRateOfLogs(t *testing.T) { } // test - require.NoError(t, Run(cfg, expFunc, zap.NewNop())) + require.NoError(t, run(cfg, expFunc, zap.NewNop())) // verify // the minimum acceptable number of logs for the rate of 10/sec for half a second @@ -107,7 +107,7 @@ func TestUnthrottled(t *testing.T) { // test logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, expFunc, logger)) + require.NoError(t, run(cfg, expFunc, logger)) assert.Greater(t, len(m.logs), 100, "there should have been more than 100 logs, had %d", len(m.logs)) } @@ -129,7 +129,7 @@ func TestCustomBody(t *testing.T) { // test logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, expFunc, logger)) + require.NoError(t, run(cfg, expFunc, logger)) assert.Equal(t, "custom body", m.logs[0].Body().AsString()) } @@ -144,7 +144,7 @@ func TestLogsWithNoTelemetryAttributes(t *testing.T) { // test logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, expFunc, logger)) + require.NoError(t, run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) @@ -166,7 +166,7 @@ func TestLogsWithOneTelemetryAttributes(t *testing.T) { // test logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, expFunc, logger)) + require.NoError(t, run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) @@ -195,7 +195,7 @@ func TestLogsWithMultipleTelemetryAttributes(t *testing.T) { // test logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, expFunc, logger)) + require.NoError(t, run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) @@ -219,7 +219,7 @@ func TestLogsWithTraceIDAndSpanID(t *testing.T) { // test logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, expFunc, logger)) + require.NoError(t, run(cfg, expFunc, logger)) // verify require.Len(t, m.logs, qty) @@ -276,7 +276,7 @@ func TestValidate(t *testing.T) { return m, nil } logger, _ := zap.NewDevelopment() - require.EqualError(t, Run(tt.cfg, expFunc, logger), tt.wantErrMessage) + require.EqualError(t, run(tt.cfg, expFunc, logger), tt.wantErrMessage) }) } } diff --git a/cmd/telemetrygen/internal/metrics/config.go b/cmd/telemetrygen/pkg/metrics/config.go similarity index 55% rename from cmd/telemetrygen/internal/metrics/config.go rename to cmd/telemetrygen/pkg/metrics/config.go index 16c1306a9071..3ccd66251a8f 100644 --- a/cmd/telemetrygen/internal/metrics/config.go +++ b/cmd/telemetrygen/pkg/metrics/config.go @@ -16,26 +16,44 @@ type Config struct { common.Config NumMetrics int MetricName string - MetricType metricType + MetricType MetricType SpanID string TraceID string } +// NewConfig creates a new Config with default values. +func NewConfig() *Config { + cfg := &Config{} + cfg.SetDefaults() + return cfg +} + // Flags registers config flags. func (c *Config) Flags(fs *pflag.FlagSet) { - // Use Gauge as default metric type. - c.MetricName = "gen" - c.MetricType = metricTypeGauge - c.CommonFlags(fs) - fs.StringVar(&c.HTTPPath, "otlp-http-url-path", "/v1/metrics", "Which URL path to write to") + fs.StringVar(&c.HTTPPath, "otlp-http-url-path", c.HTTPPath, "Which URL path to write to") fs.Var(&c.MetricType, "metric-type", "Metric type enum. must be one of 'Gauge' or 'Sum'") - fs.IntVar(&c.NumMetrics, "metrics", 1, "Number of metrics to generate in each worker (ignored if duration is provided)") + fs.IntVar(&c.NumMetrics, "metrics", c.NumMetrics, "Number of metrics to generate in each worker (ignored if duration is provided)") - fs.StringVar(&c.TraceID, "trace-id", "", "TraceID to use as exemplar") - fs.StringVar(&c.SpanID, "span-id", "", "SpanID to use as exemplar") + fs.StringVar(&c.TraceID, "trace-id", c.TraceID, "TraceID to use as exemplar") + fs.StringVar(&c.SpanID, "span-id", c.SpanID, "SpanID to use as exemplar") +} + +// SetDefaults sets the default values for the configuration +// This is called before parsing the command line flags and when +// calling NewConfig() +func (c *Config) SetDefaults() { + c.Config.SetDefaults() + c.HTTPPath = "/v1/metrics" + c.NumMetrics = 1 + + // Use Gauge as default metric type. + c.MetricType = MetricTypeGauge + c.MetricName = "gen" + c.TraceID = "" + c.SpanID = "" } // Validate validates the test scenario parameters. diff --git a/cmd/telemetrygen/internal/metrics/exporter.go b/cmd/telemetrygen/pkg/metrics/exporter.go similarity index 100% rename from cmd/telemetrygen/internal/metrics/exporter.go rename to cmd/telemetrygen/pkg/metrics/exporter.go diff --git a/cmd/telemetrygen/internal/metrics/metrics.go b/cmd/telemetrygen/pkg/metrics/metrics.go similarity index 62% rename from cmd/telemetrygen/internal/metrics/metrics.go rename to cmd/telemetrygen/pkg/metrics/metrics.go index ec05571412e5..bfbc9ab529e9 100644 --- a/cmd/telemetrygen/internal/metrics/metrics.go +++ b/cmd/telemetrygen/pkg/metrics/metrics.go @@ -29,47 +29,18 @@ func Start(cfg *Config) error { if err != nil { return err } - logger.Info("starting the metrics generator with configuration", zap.Any("config", cfg)) - - expFunc := func() (sdkmetric.Exporter, error) { - var exp sdkmetric.Exporter - if cfg.UseHTTP { - var exporterOpts []otlpmetrichttp.Option - logger.Info("starting HTTP exporter") - exporterOpts, err = httpExporterOptions(cfg) - if err != nil { - return nil, err - } - exp, err = otlpmetrichttp.New(context.Background(), exporterOpts...) - if err != nil { - return nil, fmt.Errorf("failed to obtain OTLP HTTP exporter: %w", err) - } - } else { - var exporterOpts []otlpmetricgrpc.Option - - logger.Info("starting gRPC exporter") - exporterOpts, err = grpcExporterOptions(cfg) - if err != nil { - return nil, err - } - exp, err = otlpmetricgrpc.New(context.Background(), exporterOpts...) - if err != nil { - return nil, fmt.Errorf("failed to obtain OTLP gRPC exporter: %w", err) - } - } - return exp, err - } + logger.Info("starting the metrics generator with configuration", zap.Any("config", cfg)) - if err = Run(cfg, expFunc, logger); err != nil { + if err = run(cfg, exporterFactory(cfg, logger), logger); err != nil { return err } return nil } -// Run executes the test scenario. -func Run(c *Config, exp func() (sdkmetric.Exporter, error), logger *zap.Logger) error { +// run executes the test scenario. +func run(c *Config, expF exporterFunc, logger *zap.Logger) error { if err := c.Validate(); err != nil { return err } @@ -106,6 +77,18 @@ func Run(c *Config, exp func() (sdkmetric.Exporter, error), logger *zap.Logger) logger: logger.With(zap.Int("worker", i)), index: i, } + exp, err := expF() + if err != nil { + w.logger.Error("failed to create the exporter", zap.Error(err)) + return err + } + + defer func() { + w.logger.Info("stopping the exporter") + if tempError := exp.Shutdown(context.Background()); tempError != nil { + w.logger.Error("failed to stop the exporter", zap.Error(tempError)) + } + }() go w.simulateMetrics(res, exp, c.GetTelemetryAttributes()) } @@ -117,6 +100,45 @@ func Run(c *Config, exp func() (sdkmetric.Exporter, error), logger *zap.Logger) return nil } +type exporterFunc func() (sdkmetric.Exporter, error) + +func exporterFactory(cfg *Config, logger *zap.Logger) exporterFunc { + return func() (sdkmetric.Exporter, error) { + return createExporter(cfg, logger) + } +} + +func createExporter(cfg *Config, logger *zap.Logger) (sdkmetric.Exporter, error) { + var exp sdkmetric.Exporter + var err error + if cfg.UseHTTP { + var exporterOpts []otlpmetrichttp.Option + + logger.Info("starting HTTP exporter") + exporterOpts, err = httpExporterOptions(cfg) + if err != nil { + return nil, err + } + exp, err = otlpmetrichttp.New(context.Background(), exporterOpts...) + if err != nil { + return nil, fmt.Errorf("failed to obtain OTLP HTTP exporter: %w", err) + } + } else { + var exporterOpts []otlpmetricgrpc.Option + + logger.Info("starting gRPC exporter") + exporterOpts, err = grpcExporterOptions(cfg) + if err != nil { + return nil, err + } + exp, err = otlpmetricgrpc.New(context.Background(), exporterOpts...) + if err != nil { + return nil, fmt.Errorf("failed to obtain OTLP gRPC exporter: %w", err) + } + } + return exp, err +} + func exemplarsFromConfig(c *Config) []metricdata.Exemplar[int64] { if c.TraceID != "" || c.SpanID != "" { var exemplars []metricdata.Exemplar[int64] diff --git a/cmd/telemetrygen/internal/metrics/metrics_test.go b/cmd/telemetrygen/pkg/metrics/metrics_test.go similarity index 100% rename from cmd/telemetrygen/internal/metrics/metrics_test.go rename to cmd/telemetrygen/pkg/metrics/metrics_test.go diff --git a/cmd/telemetrygen/internal/metrics/metrics_types.go b/cmd/telemetrygen/pkg/metrics/metrics_types.go similarity index 59% rename from cmd/telemetrygen/internal/metrics/metrics_types.go rename to cmd/telemetrygen/pkg/metrics/metrics_types.go index e07b26720acb..af2d3db623ae 100644 --- a/cmd/telemetrygen/internal/metrics/metrics_types.go +++ b/cmd/telemetrygen/pkg/metrics/metrics_types.go @@ -7,24 +7,24 @@ import ( "errors" ) -type metricType string +type MetricType string const ( - metricTypeGauge = "Gauge" - metricTypeSum = "Sum" - metricTypeHistogram = "Histogram" + MetricTypeGauge MetricType = "Gauge" + MetricTypeSum MetricType = "Sum" + MetricTypeHistogram MetricType = "Histogram" ) // String is used both by fmt.Print and by Cobra in help text -func (e *metricType) String() string { +func (e *MetricType) String() string { return string(*e) } // Set must have pointer receiver so it doesn't change the value of a copy -func (e *metricType) Set(v string) error { +func (e *MetricType) Set(v string) error { switch v { case "Gauge", "Sum", "Histogram": - *e = metricType(v) + *e = MetricType(v) return nil default: return errors.New(`must be one of "Gauge", "Sum", "Histogram"`) @@ -32,6 +32,6 @@ func (e *metricType) Set(v string) error { } // Type is only used in help text -func (e *metricType) Type() string { - return "metricType" +func (e *MetricType) Type() string { + return "MetricType" } diff --git a/cmd/telemetrygen/internal/metrics/package_test.go b/cmd/telemetrygen/pkg/metrics/package_test.go similarity index 100% rename from cmd/telemetrygen/internal/metrics/package_test.go rename to cmd/telemetrygen/pkg/metrics/package_test.go diff --git a/cmd/telemetrygen/internal/metrics/worker.go b/cmd/telemetrygen/pkg/metrics/worker.go similarity index 88% rename from cmd/telemetrygen/internal/metrics/worker.go rename to cmd/telemetrygen/pkg/metrics/worker.go index 02a0c7ded490..2483670e55f9 100644 --- a/cmd/telemetrygen/internal/metrics/worker.go +++ b/cmd/telemetrygen/pkg/metrics/worker.go @@ -20,7 +20,7 @@ import ( type worker struct { running *atomic.Bool // pointer to shared flag that indicates it's time to stop the test metricName string // name of metric to generate - metricType metricType // type of metric to generate + metricType MetricType // type of metric to generate exemplars []metricdata.Exemplar[int64] // exemplars to attach to the metric numMetrics int // how many metrics the worker has to generate (only when duration==0) totalDuration time.Duration // how long to run the test for (overrides `numMetrics`) @@ -76,28 +76,15 @@ var histogramBucketSamples = []struct { }, } -func (w worker) simulateMetrics(res *resource.Resource, exporterFunc func() (sdkmetric.Exporter, error), signalAttrs []attribute.KeyValue) { +func (w worker) simulateMetrics(res *resource.Resource, exporter sdkmetric.Exporter, signalAttrs []attribute.KeyValue) { limiter := rate.NewLimiter(w.limitPerSecond, 1) - exporter, err := exporterFunc() - if err != nil { - w.logger.Error("failed to create the exporter", zap.Error(err)) - return - } - - defer func() { - w.logger.Info("stopping the exporter") - if tempError := exporter.Shutdown(context.Background()); tempError != nil { - w.logger.Error("failed to stop the exporter", zap.Error(tempError)) - } - }() - var i int64 for w.running.Load() { var metrics []metricdata.Metrics switch w.metricType { - case metricTypeGauge: + case MetricTypeGauge: metrics = append(metrics, metricdata.Metrics{ Name: w.metricName, Data: metricdata.Gauge[int64]{ @@ -111,7 +98,7 @@ func (w worker) simulateMetrics(res *resource.Resource, exporterFunc func() (sdk }, }, }) - case metricTypeSum: + case MetricTypeSum: metrics = append(metrics, metricdata.Metrics{ Name: w.metricName, Data: metricdata.Sum[int64]{ @@ -128,7 +115,7 @@ func (w worker) simulateMetrics(res *resource.Resource, exporterFunc func() (sdk }, }, }) - case metricTypeHistogram: + case MetricTypeHistogram: iteration := uint64(i) % 10 sum := histogramBucketSamples[iteration].sum bucketCounts := histogramBucketSamples[iteration].bucketCounts diff --git a/cmd/telemetrygen/internal/metrics/worker_test.go b/cmd/telemetrygen/pkg/metrics/worker_test.go similarity index 88% rename from cmd/telemetrygen/internal/metrics/worker_test.go rename to cmd/telemetrygen/pkg/metrics/worker_test.go index 7574ac203757..fe83abdcc81f 100644 --- a/cmd/telemetrygen/internal/metrics/worker_test.go +++ b/cmd/telemetrygen/pkg/metrics/worker_test.go @@ -57,7 +57,7 @@ func TestFixedNumberOfMetrics(t *testing.T) { WorkerCount: 1, }, NumMetrics: 5, - MetricType: metricTypeSum, + MetricType: MetricTypeSum, } m := &mockExporter{} expFunc := func() (sdkmetric.Exporter, error) { @@ -66,7 +66,7 @@ func TestFixedNumberOfMetrics(t *testing.T) { // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, expFunc, logger)) + require.NoError(t, run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) // assert @@ -81,7 +81,7 @@ func TestRateOfMetrics(t *testing.T) { TotalDuration: time.Second / 2, WorkerCount: 1, }, - MetricType: metricTypeSum, + MetricType: MetricTypeSum, } m := &mockExporter{} expFunc := func() (sdkmetric.Exporter, error) { @@ -89,7 +89,7 @@ func TestRateOfMetrics(t *testing.T) { } // act - require.NoError(t, Run(cfg, expFunc, zap.NewNop())) + require.NoError(t, run(cfg, expFunc, zap.NewNop())) // assert // the minimum acceptable number of metrics for the rate of 10/sec for half a second @@ -105,7 +105,7 @@ func TestUnthrottled(t *testing.T) { TotalDuration: 1 * time.Second, WorkerCount: 1, }, - MetricType: metricTypeSum, + MetricType: MetricTypeSum, } m := &mockExporter{} expFunc := func() (sdkmetric.Exporter, error) { @@ -114,7 +114,7 @@ func TestUnthrottled(t *testing.T) { // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, expFunc, logger)) + require.NoError(t, run(cfg, expFunc, logger)) // assert assert.Greater(t, len(m.rms), 100, "there should have been more than 100 metrics, had %d", len(m.rms)) @@ -123,7 +123,7 @@ func TestUnthrottled(t *testing.T) { func TestSumNoTelemetryAttrs(t *testing.T) { // arrange qty := 2 - cfg := configWithNoAttributes(metricTypeSum, qty) + cfg := configWithNoAttributes(MetricTypeSum, qty) m := &mockExporter{} expFunc := func() (sdkmetric.Exporter, error) { return m, nil @@ -131,7 +131,7 @@ func TestSumNoTelemetryAttrs(t *testing.T) { // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, expFunc, logger)) + require.NoError(t, run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) @@ -151,7 +151,7 @@ func TestSumNoTelemetryAttrs(t *testing.T) { func TestGaugeNoTelemetryAttrs(t *testing.T) { // arrange qty := 2 - cfg := configWithNoAttributes(metricTypeGauge, qty) + cfg := configWithNoAttributes(MetricTypeGauge, qty) m := &mockExporter{} expFunc := func() (sdkmetric.Exporter, error) { return m, nil @@ -159,7 +159,7 @@ func TestGaugeNoTelemetryAttrs(t *testing.T) { // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, expFunc, logger)) + require.NoError(t, run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) @@ -179,7 +179,7 @@ func TestGaugeNoTelemetryAttrs(t *testing.T) { func TestSumSingleTelemetryAttr(t *testing.T) { // arrange qty := 2 - cfg := configWithOneAttribute(metricTypeSum, qty) + cfg := configWithOneAttribute(MetricTypeSum, qty) m := &mockExporter{} expFunc := func() (sdkmetric.Exporter, error) { return m, nil @@ -187,7 +187,7 @@ func TestSumSingleTelemetryAttr(t *testing.T) { // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, expFunc, logger)) + require.NoError(t, run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) @@ -209,7 +209,7 @@ func TestSumSingleTelemetryAttr(t *testing.T) { func TestGaugeSingleTelemetryAttr(t *testing.T) { // arrange qty := 2 - cfg := configWithOneAttribute(metricTypeGauge, qty) + cfg := configWithOneAttribute(MetricTypeGauge, qty) m := &mockExporter{} expFunc := func() (sdkmetric.Exporter, error) { return m, nil @@ -217,7 +217,7 @@ func TestGaugeSingleTelemetryAttr(t *testing.T) { // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, expFunc, logger)) + require.NoError(t, run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) @@ -239,7 +239,7 @@ func TestGaugeSingleTelemetryAttr(t *testing.T) { func TestSumMultipleTelemetryAttr(t *testing.T) { // arrange qty := 2 - cfg := configWithMultipleAttributes(metricTypeSum, qty) + cfg := configWithMultipleAttributes(MetricTypeSum, qty) m := &mockExporter{} expFunc := func() (sdkmetric.Exporter, error) { return m, nil @@ -247,7 +247,7 @@ func TestSumMultipleTelemetryAttr(t *testing.T) { // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, expFunc, logger)) + require.NoError(t, run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) @@ -271,7 +271,7 @@ func TestSumMultipleTelemetryAttr(t *testing.T) { func TestGaugeMultipleTelemetryAttr(t *testing.T) { // arrange qty := 2 - cfg := configWithMultipleAttributes(metricTypeGauge, qty) + cfg := configWithMultipleAttributes(MetricTypeGauge, qty) m := &mockExporter{} expFunc := func() (sdkmetric.Exporter, error) { return m, nil @@ -279,7 +279,7 @@ func TestGaugeMultipleTelemetryAttr(t *testing.T) { // act logger, _ := zap.NewDevelopment() - require.NoError(t, Run(cfg, expFunc, logger)) + require.NoError(t, run(cfg, expFunc, logger)) time.Sleep(1 * time.Second) @@ -312,7 +312,7 @@ func TestValidate(t *testing.T) { Config: common.Config{ WorkerCount: 1, }, - MetricType: metricTypeSum, + MetricType: MetricTypeSum, TraceID: "123", }, wantErrMessage: "either `metrics` or `duration` must be greater than 0", @@ -324,7 +324,7 @@ func TestValidate(t *testing.T) { WorkerCount: 1, }, NumMetrics: 5, - MetricType: metricTypeSum, + MetricType: MetricTypeSum, TraceID: "123", }, wantErrMessage: "TraceID must be a 32 character hex string, like: 'ae87dadd90e9935a4bc9660628efd569'", @@ -336,7 +336,7 @@ func TestValidate(t *testing.T) { WorkerCount: 1, }, NumMetrics: 5, - MetricType: metricTypeSum, + MetricType: MetricTypeSum, TraceID: "ae87dadd90e9935a4bc9660628efd569", SpanID: "123", }, @@ -350,12 +350,12 @@ func TestValidate(t *testing.T) { return m, nil } logger, _ := zap.NewDevelopment() - require.EqualError(t, Run(tt.cfg, expFunc, logger), tt.wantErrMessage) + require.EqualError(t, run(tt.cfg, expFunc, logger), tt.wantErrMessage) }) } } -func configWithNoAttributes(metric metricType, qty int) *Config { +func configWithNoAttributes(metric MetricType, qty int) *Config { return &Config{ Config: common.Config{ WorkerCount: 1, @@ -367,7 +367,7 @@ func configWithNoAttributes(metric metricType, qty int) *Config { } } -func configWithOneAttribute(metric metricType, qty int) *Config { +func configWithOneAttribute(metric MetricType, qty int) *Config { return &Config{ Config: common.Config{ WorkerCount: 1, @@ -379,7 +379,7 @@ func configWithOneAttribute(metric metricType, qty int) *Config { } } -func configWithMultipleAttributes(metric metricType, qty int) *Config { +func configWithMultipleAttributes(metric MetricType, qty int) *Config { kvs := common.KeyValue{telemetryAttrKeyOne: telemetryAttrValueOne, telemetryAttrKeyTwo: telemetryAttrValueTwo} return &Config{ Config: common.Config{ diff --git a/cmd/telemetrygen/pkg/traces/config.go b/cmd/telemetrygen/pkg/traces/config.go new file mode 100644 index 000000000000..5ff431132911 --- /dev/null +++ b/cmd/telemetrygen/pkg/traces/config.go @@ -0,0 +1,73 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package traces + +import ( + "fmt" + "time" + + "github.com/spf13/pflag" + + "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/common" +) + +// Config describes the test scenario. +type Config struct { + common.Config + NumTraces int + NumChildSpans int + PropagateContext bool + ServiceName string + StatusCode string + Batch bool + LoadSize int + + SpanDuration time.Duration +} + +func NewConfig() *Config { + cfg := &Config{} + cfg.SetDefaults() + return cfg +} + +// Flags registers config flags. +func (c *Config) Flags(fs *pflag.FlagSet) { + c.CommonFlags(fs) + + fs.StringVar(&c.HTTPPath, "otlp-http-url-path", c.HTTPPath, "Which URL path to write to") + + fs.IntVar(&c.NumTraces, "traces", c.NumTraces, "Number of traces to generate in each worker (ignored if duration is provided)") + fs.IntVar(&c.NumChildSpans, "child-spans", c.NumChildSpans, "Number of child spans to generate for each trace") + fs.BoolVar(&c.PropagateContext, "marshal", c.PropagateContext, "Whether to marshal trace context via HTTP headers") + fs.StringVar(&c.ServiceName, "service", c.ServiceName, "Service name to use") + fs.StringVar(&c.StatusCode, "status-code", c.StatusCode, "Status code to use for the spans, one of (Unset, Error, Ok) or the equivalent integer (0,1,2)") + fs.BoolVar(&c.Batch, "batch", c.Batch, "Whether to batch traces") + fs.IntVar(&c.LoadSize, "size", c.LoadSize, "Desired minimum size in MB of string data for each trace generated. This can be used to test traces with large payloads, i.e. when testing the OTLP receiver endpoint max receive size.") + fs.DurationVar(&c.SpanDuration, "span-duration", c.SpanDuration, "The duration of each generated span.") +} + +// SetDefaults sets the default values for the configuration +// This is called before parsing the command line flags and when +// calling NewConfig() +func (c *Config) SetDefaults() { + c.Config.SetDefaults() + c.HTTPPath = "/v1/traces" + c.NumTraces = 1 + c.NumChildSpans = 1 + c.PropagateContext = false + c.ServiceName = "telemetrygen" + c.StatusCode = "0" + c.Batch = true + c.LoadSize = 0 + c.SpanDuration = 123 * time.Microsecond +} + +// Validate validates the test scenario parameters. +func (c *Config) Validate() error { + if c.TotalDuration <= 0 && c.NumTraces <= 0 { + return fmt.Errorf("either `traces` or `duration` must be greater than 0") + } + return nil +} diff --git a/cmd/telemetrygen/internal/traces/exporter.go b/cmd/telemetrygen/pkg/traces/exporter.go similarity index 100% rename from cmd/telemetrygen/internal/traces/exporter.go rename to cmd/telemetrygen/pkg/traces/exporter.go diff --git a/cmd/telemetrygen/internal/traces/exporter_test.go b/cmd/telemetrygen/pkg/traces/exporter_test.go similarity index 100% rename from cmd/telemetrygen/internal/traces/exporter_test.go rename to cmd/telemetrygen/pkg/traces/exporter_test.go diff --git a/cmd/telemetrygen/internal/traces/package_test.go b/cmd/telemetrygen/pkg/traces/package_test.go similarity index 100% rename from cmd/telemetrygen/internal/traces/package_test.go rename to cmd/telemetrygen/pkg/traces/package_test.go diff --git a/cmd/telemetrygen/internal/traces/traces.go b/cmd/telemetrygen/pkg/traces/traces.go similarity index 97% rename from cmd/telemetrygen/internal/traces/traces.go rename to cmd/telemetrygen/pkg/traces/traces.go index 29c0aacc5d27..1a281e5faa53 100644 --- a/cmd/telemetrygen/internal/traces/traces.go +++ b/cmd/telemetrygen/pkg/traces/traces.go @@ -72,6 +72,7 @@ func Start(cfg *Config) error { ssp = sdktrace.NewBatchSpanProcessor(exp, sdktrace.WithBatchTimeout(time.Second)) defer func() { logger.Info("stop the batch span processor") + if tempError := ssp.Shutdown(context.Background()); tempError != nil { logger.Error("failed to stop the batch span processor", zap.Error(tempError)) } @@ -90,9 +91,10 @@ func Start(cfg *Config) error { if cfg.Batch { tracerProvider.RegisterSpanProcessor(ssp) } + otel.SetTracerProvider(tracerProvider) - if err = Run(cfg, logger); err != nil { + if err = run(cfg, logger); err != nil { logger.Error("failed to execute the test scenario.", zap.Error(err)) return err } @@ -100,8 +102,8 @@ func Start(cfg *Config) error { return nil } -// Run executes the test scenario. -func Run(c *Config, logger *zap.Logger) error { +// run executes the test scenario. +func run(c *Config, logger *zap.Logger) error { if err := c.Validate(); err != nil { return err } diff --git a/cmd/telemetrygen/internal/traces/worker.go b/cmd/telemetrygen/pkg/traces/worker.go similarity index 100% rename from cmd/telemetrygen/internal/traces/worker.go rename to cmd/telemetrygen/pkg/traces/worker.go diff --git a/cmd/telemetrygen/internal/traces/worker_test.go b/cmd/telemetrygen/pkg/traces/worker_test.go similarity index 94% rename from cmd/telemetrygen/internal/traces/worker_test.go rename to cmd/telemetrygen/pkg/traces/worker_test.go index 8c85b90cb305..9fc8894611b8 100644 --- a/cmd/telemetrygen/internal/traces/worker_test.go +++ b/cmd/telemetrygen/pkg/traces/worker_test.go @@ -44,7 +44,7 @@ func TestFixedNumberOfTraces(t *testing.T) { } // test - require.NoError(t, Run(cfg, zap.NewNop())) + require.NoError(t, run(cfg, zap.NewNop())) // verify assert.Len(t, syncer.spans, 2) // each trace has two spans @@ -69,7 +69,7 @@ func TestNumberOfSpans(t *testing.T) { expectedNumSpans := cfg.NumChildSpans + 1 // each trace has 1 + NumChildSpans spans // test - require.NoError(t, Run(cfg, zap.NewNop())) + require.NoError(t, run(cfg, zap.NewNop())) // verify assert.Len(t, syncer.spans, expectedNumSpans) @@ -96,7 +96,7 @@ func TestRateOfSpans(t *testing.T) { require.Empty(t, syncer.spans) // test - require.NoError(t, Run(cfg, zap.NewNop())) + require.NoError(t, run(cfg, zap.NewNop())) // verify // the minimum acceptable number of spans for the rate of 10/sec for half a second @@ -128,7 +128,7 @@ func TestSpanDuration(t *testing.T) { require.Empty(t, syncer.spans) // test - require.NoError(t, Run(cfg, zap.NewNop())) + require.NoError(t, run(cfg, zap.NewNop())) for _, span := range syncer.spans { startTime, endTime := span.StartTime(), span.EndTime() @@ -157,7 +157,7 @@ func TestUnthrottled(t *testing.T) { require.Empty(t, syncer.spans) // test - require.NoError(t, Run(cfg, zap.NewNop())) + require.NoError(t, run(cfg, zap.NewNop())) // verify // the minimum acceptable number of spans -- the real number should be > 10k, but CI env might be slower @@ -181,7 +181,7 @@ func TestSpanKind(t *testing.T) { } // test - require.NoError(t, Run(cfg, zap.NewNop())) + require.NoError(t, run(cfg, zap.NewNop())) // verify that the default Span Kind is being overridden for _, span := range syncer.spans { @@ -232,13 +232,13 @@ func TestSpanStatuses(t *testing.T) { // test the program given input, including erroneous inputs if tt.validInput { - require.NoError(t, Run(cfg, zap.NewNop())) + require.NoError(t, run(cfg, zap.NewNop())) // verify that the default the span status is set as expected for _, span := range syncer.spans { assert.Equalf(t, span.Status().Code, tt.spanStatus, "span status: %v and expected status %v", span.Status().Code, tt.spanStatus) } } else { - require.Error(t, Run(cfg, zap.NewNop())) + require.Error(t, run(cfg, zap.NewNop())) } }) } @@ -256,7 +256,7 @@ func TestSpansWithNoAttrs(t *testing.T) { cfg := configWithNoAttributes(2, "") // test - require.NoError(t, Run(cfg, zap.NewNop())) + require.NoError(t, run(cfg, zap.NewNop())) // verify assert.Len(t, syncer.spans, 4) // each trace has two spans @@ -278,7 +278,7 @@ func TestSpansWithOneAttrs(t *testing.T) { cfg := configWithOneAttribute(2, "") // test - require.NoError(t, Run(cfg, zap.NewNop())) + require.NoError(t, run(cfg, zap.NewNop())) // verify assert.Len(t, syncer.spans, 4) // each trace has two spans @@ -300,7 +300,7 @@ func TestSpansWithMultipleAttrs(t *testing.T) { cfg := configWithMultipleAttributes(2, "") // test - require.NoError(t, Run(cfg, zap.NewNop())) + require.NoError(t, run(cfg, zap.NewNop())) // verify assert.Len(t, syncer.spans, 4) // each trace has two spans @@ -335,7 +335,7 @@ func TestValidate(t *testing.T) { tracerProvider.RegisterSpanProcessor(sp) otel.SetTracerProvider(tracerProvider) logger, _ := zap.NewDevelopment() - require.EqualError(t, Run(tt.cfg, logger), tt.wantErrMessage) + require.EqualError(t, run(tt.cfg, logger), tt.wantErrMessage) }) } }