Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/kafkareceiver]: allow tunable fetch sizes #1

Merged
merged 1 commit into from
Aug 6, 2024

Conversation

cxdy
Copy link
Member

@cxdy cxdy commented Jul 23, 2024

Description:
This commit adds the ability to tune the minimum, default and maximum fetch sizes for the Kafka Receiver in the OpenTelemetry configuration file.

The defaults are kept consistent with the defaults imposed by sarama

Link to tracking Issue:
Resolves open-telemetry#22741

Testing:
Built the binary with make docker-otelcontribcol, uploaded to Docker Hub to pull the image into our Kubernetes deployment, and hit the cluster with 200k logs per second with the following settings:

        default_fetch_size: 15728640 # 15MB
        max_fetch_size: 31457280 # 30MB
        min_fetch_size: 1048576  # 1MB

These limits are arbitrarily high for no reason other than because I just wanted to set them higher and see how far I could push it in our cluster.

No new tests were added, but added to existing configuration tests in config.go and factory.go

Documentation:
Added the new configuration options to the receiver's README

@cxdy cxdy changed the title [receiver/kafkaexporter]: allow tunable fetch sizes [receiver/kafkareceiver]: allow tunable fetch sizes Jul 23, 2024
This commit adds the ability to tune the minimum, default and maximum fetch sizes for Kafka in the OpenTelemetry configuration file.
@cxdy cxdy force-pushed the feat/kafkareceiver-fetch-configurable branch from 5320cc4 to 8c7e4d8 Compare August 6, 2024 17:56
@cxdy cxdy merged commit 71cdb84 into linode-obs:main Aug 6, 2024
@cxdy cxdy deleted the feat/kafkareceiver-fetch-configurable branch August 6, 2024 18:11
cxdy pushed a commit that referenced this pull request Oct 19, 2024
… Histo --> Histogram (open-telemetry#33824)

## Description

This PR adds a custom metric function to the transformprocessor to
convert exponential histograms to explicit histograms.

Link to tracking issue: Resolves open-telemetry#33827

**Function Name**
```
convert_exponential_histogram_to_explicit_histogram
```

**Arguments:**

- `distribution` (_upper, midpoint, uniform, random_)
- `ExplicitBoundaries: []float64`

**Usage example:**

```yaml
processors:
  transform:
    error_mode: propagate
    metric_statements:
    - context: metric
      statements:
        - convert_exponential_histogram_to_explicit_histogram("random", [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0]) 
```

**Converts:**

```
Resource SchemaURL: 
ScopeMetrics #0
ScopeMetrics SchemaURL: 
InstrumentationScope  
Metric #0
Descriptor:
     -> Name: response_time
     -> Description: 
     -> Unit: 
     -> DataType: ExponentialHistogram
     -> AggregationTemporality: Delta
ExponentialHistogramDataPoints #0
Data point attributes:
     -> metric_type: Str(timing)
StartTimestamp: 1970-01-01 00:00:00 +0000 UTC
Timestamp: 2024-07-31 09:35:25.212037 +0000 UTC
Count: 44
Sum: 999.000000
Min: 40.000000
Max: 245.000000
Bucket (32.000000, 64.000000], Count: 10
Bucket (64.000000, 128.000000], Count: 22
Bucket (128.000000, 256.000000], Count: 12
        {"kind": "exporter", "data_type": "metrics", "name": "debug"}
```

**To:**

```
Resource SchemaURL: 
ScopeMetrics #0
ScopeMetrics SchemaURL: 
InstrumentationScope  
Metric #0
Descriptor:
     -> Name: response_time
     -> Description: 
     -> Unit: 
     -> DataType: Histogram
     -> AggregationTemporality: Delta
HistogramDataPoints #0
Data point attributes:
     -> metric_type: Str(timing)
StartTimestamp: 1970-01-01 00:00:00 +0000 UTC
Timestamp: 2024-07-30 21:37:07.830902 +0000 UTC
Count: 44
Sum: 999.000000
Min: 40.000000
Max: 245.000000
ExplicitBounds #0: 10.000000
ExplicitBounds #1: 20.000000
ExplicitBounds #2: 30.000000
ExplicitBounds #3: 40.000000
ExplicitBounds #4: 50.000000
ExplicitBounds #5: 60.000000
ExplicitBounds open-telemetry#6: 70.000000
ExplicitBounds open-telemetry#7: 80.000000
ExplicitBounds open-telemetry#8: 90.000000
ExplicitBounds open-telemetry#9: 100.000000
Buckets #0, Count: 0
Buckets #1, Count: 0
Buckets #2, Count: 0
Buckets #3, Count: 2
Buckets #4, Count: 5
Buckets #5, Count: 0
Buckets open-telemetry#6, Count: 3
Buckets open-telemetry#7, Count: 7
Buckets open-telemetry#8, Count: 2
Buckets open-telemetry#9, Count: 4
Buckets open-telemetry#10, Count: 21
        {"kind": "exporter", "data_type": "metrics", "name": "debug"}
```

### Testing

- Several unit tests have been created. We have also tested by ingesting
and converting exponential histograms from the `statsdreceiver` as well
as directly via the `otlpreceiver` over grpc over several hours with a
large amount of data.

- We have clients that have been running this solution in production for
a number of weeks.

### Readme description:

### convert_exponential_hist_to_explicit_hist

`convert_exponential_hist_to_explicit_hist([ExplicitBounds])`

the `convert_exponential_hist_to_explicit_hist` function converts an
ExponentialHistogram to an Explicit (_normal_) Histogram.

`ExplicitBounds` is represents the list of bucket boundaries for the new
histogram. This argument is __required__ and __cannot be empty__.

__WARNING:__

The process of converting an ExponentialHistogram to an Explicit
Histogram is not perfect and may result in a loss of precision. It is
important to define an appropriate set of bucket boundaries to minimize
this loss. For example, selecting Boundaries that are too high or too
low may result histogram buckets that are too wide or too narrow,
respectively.

---------

Co-authored-by: Kent Quirk <[email protected]>
Co-authored-by: Tyler Helmuth <[email protected]>
cxdy pushed a commit that referenced this pull request Oct 19, 2024
…etry#35544)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

As described at
open-telemetry#35491,
it is useful to provide the option to the users for defining
`receiver_creator`'s templates per container.

In this regard, the current PR introduces a new type of Endpoint called
`PodContainer` that matches the rule type `pod.container`. This Endpoint
is emitted for each container of the Pod similarly to how the `Port`
Endpoints are emitted per container that defines a port.

A complete example on how to use this feature to apply different parsing
on each of the Pod's container is provided in the `How to test this
manually` section.

**Link to tracking Issue:** <Issue number if applicable> Fixes
open-telemetry#35491

**Testing:** <Describe what testing was performed and which tests were
added.> TBA

**Documentation:** <Describe the documentation added.> TBA


### How to test this manually

1. Use the following values file to deploy the Collector's Helm chart
```yaml
mode: daemonset

image:
  repository: otelcontribcol-dev
  tag: "latest"
  pullPolicy: IfNotPresent

command:
  name: otelcontribcol

clusterRole:
  create: true
  rules:
   - apiGroups:
     - ''
     resources:
     - 'pods'
     - 'nodes'
     verbs:
     - 'get'
     - 'list'
     - 'watch'
   - apiGroups: [ "" ]
     resources: [ "nodes/proxy"]
     verbs: [ "get" ]
   - apiGroups:
       - ""
     resources:
       - nodes/stats
     verbs:
       - get
   - nonResourceURLs:
       - "/metrics"
     verbs:
       - get

extraVolumeMounts:
 - name: varlogpods
   mountPath: /var/log/pods
   readOnly: true

extraVolumes:
  - name: varlogpods
    hostPath:
      path: /var/log/pods

config:
  extensions:
    k8s_observer:
      auth_type: serviceAccount
      node: ${env:K8S_NODE_NAME}
      observe_nodes: true
  exporters:
    debug:
      verbosity: basic
  receivers:
    receiver_creator/logs:
      watch_observers: [ k8s_observer ]
      receivers:
        filelog/busybox:
          rule: type == "pod.container" && pod.labels["otel.logs"] == "true" && container_name == "busybox"
          config:
            include:
              - /var/log/pods/`pod.namespace`_`pod.name`_`pod.uid`/`container_name`/*.log
            include_file_name: false
            include_file_path: true
            operators:
              - id: container-parser
                type: container
              - type: add
                field: attributes.log.template
                value: busybox
        filelog/lazybox:
          rule: type == "pod.container" && pod.labels["otel.logs"] == "true" && container_name == "lazybox"
          config:
            include:
              - /var/log/pods/`pod.namespace`_`pod.name`_`pod.uid`/`container_name`/*.log
            include_file_name: false
            include_file_path: true
            operators:
              - id: container-parser
                type: container
              - type: add
                field: attributes.log.template
                value: lazybox
  service:
    extensions: [health_check, k8s_observer]
    pipelines:
      logs:
        receivers: [receiver_creator/logs]
        processors: [batch]
        exporters: [debug]
```
2. Follow the logs of the Collector's Pod i.e: `k logs -f
daemonset-opentelemetry-collector-agent-2hrg5`
3. Deploy a sample Pod which consists of 2 different containers:

```yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: daemonset-logs
  labels:
    app: daemonset-logs
spec:
  selector:
    matchLabels:
      app.kubernetes.io/component: migration-logger
      otel.logs: "true"
  template:
    metadata:
      labels:
        app.kubernetes.io/component: migration-logger
        otel.logs: "true"
    spec:
      tolerations:
        - key: node-role.kubernetes.io/master
          effect: NoSchedule
      containers:
        - name: lazybox
          image: busybox
          args:
            - /bin/sh
            - -c
            - while true; do echo "otel logs at $(date +%H:%M:%S)" && sleep 0.1s; done
        - name: busybox
          image: busybox
          args:
            - /bin/sh
            - -c
            - while true; do echo "otel logs at $(date +%H:%M:%S)" && sleep 0.1s; done
```

Verify in the logs that only 2 filelog receivers are started, one per
container:

```console
2024-10-02T12:05:17.506Z	info	[email protected]/observerhandler.go:96	starting receiver	{"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/lazybox", "endpoint": "10.244.0.13", "endpoint_id": "k8s_observer/01543800-cfea-4c10-8220-387e60f65151/lazybox"}
2024-10-02T12:05:17.508Z	info	adapter/receiver.go:47	Starting stanza receiver	{"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/lazybox/receiver_creator/logs{endpoint=\"10.244.0.13\"}/k8s_observer/01543800-cfea-4c10-8220-387e60f65151/lazybox"}
2024-10-02T12:05:17.508Z	info	[email protected]/observerhandler.go:96	starting receiver	{"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/busybox", "endpoint": "10.244.0.13", "endpoint_id": "k8s_observer/01543800-cfea-4c10-8220-387e60f65151/busybox"}
2024-10-02T12:05:17.510Z	info	adapter/receiver.go:47	Starting stanza receiver	{"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/busybox/receiver_creator/logs{endpoint=\"10.244.0.13\"}/k8s_observer/01543800-cfea-4c10-8220-387e60f65151/busybox"}
2024-10-02T12:05:17.709Z	info	fileconsumer/file.go:256	Started watching file	{"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/lazybox/receiver_creator/logs{endpoint=\"10.244.0.13\"}/k8s_observer/01543800-cfea-4c10-8220-387e60f65151/lazybox", "component": "fileconsumer", "path": "/var/log/pods/default_daemonset-logs-sz4zk_01543800-cfea-4c10-8220-387e60f65151/lazybox/0.log"}
2024-10-02T12:05:17.712Z	info	fileconsumer/file.go:256	Started watching file	{"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/busybox/receiver_creator/logs{endpoint=\"10.244.0.13\"}/k8s_observer/01543800-cfea-4c10-8220-387e60f65151/busybox", "component": "fileconsumer", "path": "/var/log/pods/default_daemonset-logs-sz4zk_01543800-cfea-4c10-8220-387e60f65151/busybox/0.log"}
```

In addition verify that the proper attributes are added per container
according to the 2 different filelog receiver definitions:


```console
2024-10-02T12:23:55.117Z	info	ResourceLog #0
Resource SchemaURL: 
Resource attributes:
     -> k8s.pod.name: Str(daemonset-logs-sz4zk)
     -> k8s.container.restart_count: Str(0)
     -> k8s.pod.uid: Str(01543800-cfea-4c10-8220-387e60f65151)
     -> k8s.container.name: Str(lazybox)
     -> k8s.namespace.name: Str(default)
     -> container.id: Str(63a8e69bdc6ee95ee7918baf913a548190f32838adeb0e6189a8210e05157b40)
     -> container.image.name: Str(busybox)
ScopeLogs #0
ScopeLogs SchemaURL: 
InstrumentationScope  
LogRecord #0
ObservedTimestamp: 2024-10-02 12:23:54.896772888 +0000 UTC
Timestamp: 2024-10-02 12:23:54.750904381 +0000 UTC
SeverityText: 
SeverityNumber: Unspecified(0)
Body: Str(otel logs at 12:23:54)
Attributes:
     -> log.iostream: Str(stdout)
     -> logtag: Str(F)
     -> log: Map({"template":"lazybox"})
     -> log.file.path: Str(/var/log/pods/default_daemonset-logs-sz4zk_01543800-cfea-4c10-8220-387e60f65151/lazybox/0.log)
Trace ID: 
Span ID: 
Flags: 0
ResourceLog #1
Resource SchemaURL: 
Resource attributes:
     -> k8s.container.restart_count: Str(0)
     -> k8s.pod.uid: Str(01543800-cfea-4c10-8220-387e60f65151)
     -> k8s.container.name: Str(busybox)
     -> k8s.namespace.name: Str(default)
     -> k8s.pod.name: Str(daemonset-logs-sz4zk)
     -> container.id: Str(47163758424f2bc5382b1e9702301be23cab368b590b5fbf0b30affa09b4a199)
     -> container.image.name: Str(busybox)
ScopeLogs #0
ScopeLogs SchemaURL: 
InstrumentationScope  
LogRecord #0
ObservedTimestamp: 2024-10-02 12:23:54.897788935 +0000 UTC
Timestamp: 2024-10-02 12:23:54.749885634 +0000 UTC
SeverityText: 
SeverityNumber: Unspecified(0)
Body: Str(otel logs at 12:23:54)
Attributes:
     -> log.file.path: Str(/var/log/pods/default_daemonset-logs-sz4zk_01543800-cfea-4c10-8220-387e60f65151/busybox/0.log)
     -> logtag: Str(F)
     -> log.iostream: Str(stdout)
     -> log: Map({"template":"busybox"})
Trace ID: 
Span ID: 
Flags: 0
```

Signed-off-by: ChrsMark <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

How to optimize Kafka receiver for slow consumption of messages?
3 participants