Skip to content

Commit

Permalink
Merge branch 'main' into feat/export-telemetrygen-funcs-for-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Erog38 authored Jan 17, 2025
2 parents 1580d8b + 20ecaff commit 9a2f91a
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 20 deletions.
27 changes: 27 additions & 0 deletions .chloggen/awss3exporter-sending-queue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awss3exporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Implement sending queue for S3 exporter

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37274, 36264]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
35 changes: 21 additions & 14 deletions exporter/awss3exporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,22 @@ This exporter targets to support proto/json format.

The following exporter configuration parameters are supported.

| Name | Description | Default |
|:----------------------|:-------------------------------------------------------------------------------------------------------------------------------------------|-------------|
| `region` | AWS region. | "us-east-1" |
| `s3_bucket` | S3 bucket | |
| `s3_prefix` | prefix for the S3 key (root directory inside bucket). | |
| `s3_partition` | time granularity of S3 key: hour or minute | "minute" |
| `role_arn` | the Role ARN to be assumed | |
| `file_prefix` | file prefix defined by user | |
| `marshaler` | marshaler used to produce output data | `otlp_json` |
| `encoding` | Encoding extension to use to marshal data. Overrides the `marshaler` configuration option if set. | |
| Name | Description | Default |
|:--------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------|-------------|
| `region` | AWS region. | "us-east-1" |
| `s3_bucket` | S3 bucket | |
| `s3_prefix` | prefix for the S3 key (root directory inside bucket). | |
| `s3_partition` | time granularity of S3 key: hour or minute | "minute" |
| `role_arn` | the Role ARN to be assumed | |
| `file_prefix` | file prefix defined by user | |
| `marshaler` | marshaler used to produce output data | `otlp_json` |
| `encoding` | Encoding extension to use to marshal data. Overrides the `marshaler` configuration option if set. | |
| `encoding_file_extension` | file format extension suffix when using the `encoding` configuration option. May be left empty for no suffix to be appended. | |
| `endpoint` | (REST API endpoint) overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | |
| `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false |
| `disable_ssl` | set this to `true` to disable SSL when sending requests | false |
| `compression` | should the file be compressed | none |
| `endpoint` | (REST API endpoint) overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | |
| `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false |
| `disable_ssl` | set this to `true` to disable SSL when sending requests | false |
| `compression` | should the file be compressed | none |
| `sending_queue` | [exporters common queuing](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) | disabled |

### Marshaler

Expand Down Expand Up @@ -68,6 +69,12 @@ exporters:
s3_bucket: 'databucket'
s3_prefix: 'metric'
s3_partition: 'minute'

# Optional (disabled by default)
sending_queue:
enabled: true
num_consumers: 10
queue_size: 100
```
Logs and traces will be stored inside 'databucket' in the following path format.
Expand Down
3 changes: 3 additions & 0 deletions exporter/awss3exporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/multierr"
)

Expand Down Expand Up @@ -49,6 +50,8 @@ const (

// Config contains the main configuration options for the s3 exporter
type Config struct {
QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"`

S3Uploader S3UploaderConfig `mapstructure:"s3uploader"`
MarshalerName MarshalerType `mapstructure:"marshaler"`

Expand Down
27 changes: 27 additions & 0 deletions exporter/awss3exporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/otelcol/otelcoltest"
"go.uber.org/multierr"

Expand All @@ -32,7 +33,12 @@ func TestLoadConfig(t *testing.T) {

e := cfg.Exporters[component.MustNewID("awss3")].(*Config)
encoding := component.MustNewIDWithName("foo", "bar")

queueCfg := exporterhelper.NewDefaultQueueConfig()
queueCfg.Enabled = false

assert.Equal(t, &Config{
QueueSettings: queueCfg,
Encoding: &encoding,
EncodingFileExtension: "baz",
S3Uploader: S3UploaderConfig{
Expand All @@ -59,9 +65,16 @@ func TestConfig(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, cfg)

queueCfg := exporterhelper.QueueConfig{
Enabled: true,
NumConsumers: 23,
QueueSize: 42,
}

e := cfg.Exporters[component.MustNewID("awss3")].(*Config)

assert.Equal(t, &Config{
QueueSettings: queueCfg,
S3Uploader: S3UploaderConfig{
Region: "us-east-1",
S3Bucket: "foo",
Expand All @@ -88,9 +101,13 @@ func TestConfigForS3CompatibleSystems(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, cfg)

queueCfg := exporterhelper.NewDefaultQueueConfig()
queueCfg.Enabled = false

e := cfg.Exporters[component.MustNewID("awss3")].(*Config)

assert.Equal(t, &Config{
QueueSettings: queueCfg,
S3Uploader: S3UploaderConfig{
Region: "us-east-1",
S3Bucket: "foo",
Expand Down Expand Up @@ -200,9 +217,13 @@ func TestMarshallerName(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, cfg)

queueCfg := exporterhelper.NewDefaultQueueConfig()
queueCfg.Enabled = false

e := cfg.Exporters[component.MustNewID("awss3")].(*Config)

assert.Equal(t, &Config{
QueueSettings: queueCfg,
S3Uploader: S3UploaderConfig{
Region: "us-east-1",
S3Bucket: "foo",
Expand All @@ -215,6 +236,7 @@ func TestMarshallerName(t *testing.T) {
e = cfg.Exporters[component.MustNewIDWithName("awss3", "proto")].(*Config)

assert.Equal(t, &Config{
QueueSettings: queueCfg,
S3Uploader: S3UploaderConfig{
Region: "us-east-1",
S3Bucket: "bar",
Expand All @@ -239,9 +261,13 @@ func TestCompressionName(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, cfg)

queueCfg := exporterhelper.NewDefaultQueueConfig()
queueCfg.Enabled = false

e := cfg.Exporters[component.MustNewID("awss3")].(*Config)

assert.Equal(t, &Config{
QueueSettings: queueCfg,
S3Uploader: S3UploaderConfig{
Region: "us-east-1",
S3Bucket: "foo",
Expand All @@ -255,6 +281,7 @@ func TestCompressionName(t *testing.T) {
e = cfg.Exporters[component.MustNewIDWithName("awss3", "proto")].(*Config)

assert.Equal(t, &Config{
QueueSettings: queueCfg,
S3Uploader: S3UploaderConfig{
Region: "us-east-1",
S3Bucket: "bar",
Expand Down
46 changes: 40 additions & 6 deletions exporter/awss3exporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ func NewFactory() exporter.Factory {
}

func createDefaultConfig() component.Config {
queueCfg := exporterhelper.NewDefaultQueueConfig()
queueCfg.Enabled = false

return &Config{
QueueSettings: queueCfg,
S3Uploader: S3UploaderConfig{
Region: "us-east-1",
S3Partition: "minute",
Expand All @@ -39,19 +43,31 @@ func createLogsExporter(ctx context.Context,
params exporter.Settings,
config component.Config,
) (exporter.Logs, error) {
s3Exporter := newS3Exporter(config.(*Config), "logs", params)
cfg, err := checkAndCastConfig(config)
if err != nil {
return nil, err
}

s3Exporter := newS3Exporter(cfg, "logs", params)

return exporterhelper.NewLogs(ctx, params,
config,
s3Exporter.ConsumeLogs,
exporterhelper.WithStart(s3Exporter.start))
exporterhelper.WithStart(s3Exporter.start),
exporterhelper.WithQueue(cfg.QueueSettings),
)
}

func createMetricsExporter(ctx context.Context,
params exporter.Settings,
config component.Config,
) (exporter.Metrics, error) {
s3Exporter := newS3Exporter(config.(*Config), "metrics", params)
cfg, err := checkAndCastConfig(config)
if err != nil {
return nil, err
}

s3Exporter := newS3Exporter(cfg, "metrics", params)

if config.(*Config).MarshalerName == SumoIC {
return nil, fmt.Errorf("metrics are not supported by sumo_ic output format")
Expand All @@ -60,14 +76,21 @@ func createMetricsExporter(ctx context.Context,
return exporterhelper.NewMetrics(ctx, params,
config,
s3Exporter.ConsumeMetrics,
exporterhelper.WithStart(s3Exporter.start))
exporterhelper.WithStart(s3Exporter.start),
exporterhelper.WithQueue(cfg.QueueSettings),
)
}

func createTracesExporter(ctx context.Context,
params exporter.Settings,
config component.Config,
) (exporter.Traces, error) {
s3Exporter := newS3Exporter(config.(*Config), "traces", params)
cfg, err := checkAndCastConfig(config)
if err != nil {
return nil, err
}

s3Exporter := newS3Exporter(cfg, "traces", params)

if config.(*Config).MarshalerName == SumoIC {
return nil, fmt.Errorf("traces are not supported by sumo_ic output format")
Expand All @@ -77,5 +100,16 @@ func createTracesExporter(ctx context.Context,
params,
config,
s3Exporter.ConsumeTraces,
exporterhelper.WithStart(s3Exporter.start))
exporterhelper.WithStart(s3Exporter.start),
exporterhelper.WithQueue(cfg.QueueSettings),
)
}

// checkAndCastConfig checks the configuration type and casts it to the S3 exporter Config struct.
func checkAndCastConfig(c component.Config) (*Config, error) {
cfg, ok := c.(*Config)
if !ok {
return nil, fmt.Errorf("config structure is not of type *awss3exporter.Config")
}
return cfg, nil
}
5 changes: 5 additions & 0 deletions exporter/awss3exporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ receivers:

exporters:
awss3:
sending_queue:
enabled: true
num_consumers: 23
queue_size: 42

s3uploader:
region: 'us-east-1'
s3_bucket: 'foo'
Expand Down

0 comments on commit 9a2f91a

Please sign in to comment.