Skip to content

Commit

Permalink
[podmanreceiver] add scraper's shutdown method (#32981)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

- Adds the shutdown scraper method. The implementation cancels the
`containerEventLoop` go routine. Similar to the dockerstats:
https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/dockerstatsreceiver/receiver.go#L76
- Moves the configuration validation to the start method, in alignment
with the dockerstats receiver:
https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/dockerstatsreceiver/receiver.go#L55

**Link to tracking Issue:** Related issue
#29994

**Testing:** <Describe what testing was performed and which tests were
added.>
Use scraper instead of metrics receiver interface.

**Documentation:** <Describe the documentation added.>

---------

Co-authored-by: Curtis Robert <[email protected]>
  • Loading branch information
rogercoll and crobert-1 authored May 31, 2024
1 parent bf7cd57 commit e13b1a3
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 73 deletions.
27 changes: 27 additions & 0 deletions .chloggen/add_shutdown_podman.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: podmanreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add scraper's shutdown method

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29994]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
12 changes: 0 additions & 12 deletions receiver/podmanreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@
package podmanreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/podmanreceiver"

import (
"context"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/scraperhelper"

Expand Down Expand Up @@ -42,13 +40,3 @@ func createDefaultConfig() *Config {
func createDefaultReceiverConfig() component.Config {
return createDefaultConfig()
}

func createMetricsReceiver(
ctx context.Context,
params receiver.CreateSettings,
config component.Config,
consumer consumer.Metrics,
) (receiver.Metrics, error) {
podmanConfig := config.(*Config)
return newMetricsReceiver(ctx, params, podmanConfig, consumer, nil)
}
14 changes: 0 additions & 14 deletions receiver/podmanreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,3 @@ func TestCreateReceiver(t *testing.T) {
assert.NoError(t, err, "Metric receiver creation failed")
assert.NotNil(t, metricReceiver, "Receiver creation failed")
}

func TestCreateInvalidEndpoint(t *testing.T) {
factory := NewFactory()
config := factory.CreateDefaultConfig()
receiverCfg := config.(*Config)

receiverCfg.Endpoint = ""

params := receivertest.NewNopCreateSettings()
recv, err := factory.CreateMetricsReceiver(context.Background(), params, receiverCfg, consumertest.NewNop())
assert.Nil(t, recv)
assert.Error(t, err)
assert.Equal(t, "config.Endpoint must be specified", err.Error())
}
48 changes: 34 additions & 14 deletions receiver/podmanreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,40 +29,48 @@ type metricsReceiver struct {
clientFactory clientFactory
scraper *ContainerScraper
mb *metadata.MetricsBuilder
cancel context.CancelFunc
}

func newMetricsReceiver(
_ context.Context,
set receiver.CreateSettings,
config *Config,
nextConsumer consumer.Metrics,
clientFactory clientFactory,
) (receiver.Metrics, error) {
err := config.Validate()
if err != nil {
return nil, err
}

) *metricsReceiver {
if clientFactory == nil {
clientFactory = newLibpodClient
}

recv := &metricsReceiver{
return &metricsReceiver{
config: config,
clientFactory: clientFactory,
set: set,
mb: metadata.NewMetricsBuilder(config.MetricsBuilderConfig, set),
}
}

scrp, err := scraperhelper.NewScraper(metadata.Type.String(), recv.scrape, scraperhelper.WithStart(recv.start))
func createMetricsReceiver(
_ context.Context,
params receiver.CreateSettings,
config component.Config,
consumer consumer.Metrics,
) (receiver.Metrics, error) {
podmanConfig := config.(*Config)

recv := newMetricsReceiver(params, podmanConfig, nil)
scrp, err := scraperhelper.NewScraper(metadata.Type.String(), recv.scrape, scraperhelper.WithStart(recv.start), scraperhelper.WithShutdown(recv.shutdown))
if err != nil {
return nil, err
}
return scraperhelper.NewScraperControllerReceiver(&recv.config.ControllerConfig, set, nextConsumer, scraperhelper.AddScraper(scrp))
return scraperhelper.NewScraperControllerReceiver(&recv.config.ControllerConfig, params, consumer, scraperhelper.AddScraper(scrp))
}

func (r *metricsReceiver) start(ctx context.Context, _ component.Host) error {
var err error
err := r.config.Validate()
if err != nil {
return err
}

podmanClient, err := r.clientFactory(r.set.Logger, r.config)
if err != nil {
return err
Expand All @@ -72,7 +80,20 @@ func (r *metricsReceiver) start(ctx context.Context, _ component.Host) error {
if err = r.scraper.loadContainerList(ctx); err != nil {
return err
}
go r.scraper.containerEventLoop(ctx)

// context for long-running operation
cctx, cancel := context.WithCancel(context.Background())
r.cancel = cancel

go r.scraper.containerEventLoop(cctx)

return nil
}

func (r *metricsReceiver) shutdown(context.Context) error {
if r.cancel != nil {
r.cancel()
}
return nil
}

Expand Down Expand Up @@ -136,7 +157,6 @@ func (r *metricsReceiver) recordCPUMetrics(now pcommon.Timestamp, stats *contain
for i, cpu := range stats.PerCPU {
r.mb.RecordContainerCPUUsagePercpuDataPoint(now, int64(toSecondsWithNanosecondPrecision(cpu)), fmt.Sprintf("cpu%d", i))
}

}

func (r *metricsReceiver) recordNetworkMetrics(now pcommon.Timestamp, stats *containerStats) {
Expand Down
43 changes: 13 additions & 30 deletions receiver/podmanreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.opentelemetry.io/collector/receiver/scraperhelper"
"go.uber.org/zap"
Expand All @@ -31,21 +28,20 @@ func TestNewReceiver(t *testing.T) {
InitialDelay: time.Second,
},
}
nextConsumer := consumertest.NewNop()
mr, err := newMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), config, nextConsumer, nil)

mr := newMetricsReceiver(receivertest.NewNopCreateSettings(), config, nil)
assert.NotNil(t, mr)
assert.NoError(t, err)
}

func TestNewReceiverErrors(t *testing.T) {
r, err := newMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), &Config{}, consumertest.NewNop(), nil)
assert.Nil(t, r)
func TestErrorsInStart(t *testing.T) {
recv := newMetricsReceiver(receivertest.NewNopCreateSettings(), &Config{}, nil)
assert.NotNil(t, recv)
err := recv.start(context.Background(), componenttest.NewNopHost())
require.Error(t, err)
assert.Equal(t, "config.Endpoint must be specified", err.Error())

r, err = newMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), &Config{Endpoint: "someEndpoint"}, consumertest.NewNop(), nil)
assert.Nil(t, r)
recv = newMetricsReceiver(receivertest.NewNopCreateSettings(), &Config{Endpoint: "someEndpoint"}, nil)
assert.NotNil(t, recv)
err = recv.start(context.Background(), componenttest.NewNopHost())
require.Error(t, err)
assert.Equal(t, "config.CollectionInterval must be specified", err.Error())
}
Expand All @@ -55,13 +51,11 @@ func TestScraperLoop(t *testing.T) {
cfg.CollectionInterval = 100 * time.Millisecond

client := make(mockClient)
consumer := make(mockConsumer)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

r, err := newMetricsReceiver(ctx, receivertest.NewNopCreateSettings(), cfg, consumer, client.factory)
require.NoError(t, err)
r := newMetricsReceiver(receivertest.NewNopCreateSettings(), cfg, client.factory)
assert.NotNil(t, r)

go func() {
Expand All @@ -74,14 +68,14 @@ func TestScraperLoop(t *testing.T) {
}
}()

assert.NoError(t, r.Start(ctx, componenttest.NewNopHost()))
assert.NoError(t, r.start(ctx, componenttest.NewNopHost()))
defer func() { assert.NoError(t, r.shutdown(ctx)) }()

md := <-consumer
md, err := r.scrape(ctx)
assert.NoError(t, err)
assert.Equal(t, 1, md.ResourceMetrics().Len())

assertStatsEqualToMetrics(t, genContainerStats(), md)

assert.NoError(t, r.Shutdown(ctx))
}

type mockClient chan containerStatsReport
Expand All @@ -102,21 +96,10 @@ func (c mockClient) ping(context.Context) error {
return nil
}

type mockConsumer chan pmetric.Metrics

func (c mockClient) list(context.Context, url.Values) ([]container, error) {
return []container{{ID: "c1", Image: "localimage"}}, nil
}

func (c mockClient) events(context.Context, url.Values) (<-chan event, <-chan error) {
return nil, nil
}

func (m mockConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{}
}

func (m mockConsumer) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error {
m <- md
return nil
}
13 changes: 12 additions & 1 deletion receiver/podmanreceiver/receiver_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,27 @@ import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
)

func newMetricsReceiver(
_ context.Context,
_ receiver.CreateSettings,
_ *Config,
_ consumer.Metrics,
_ any,
) (receiver.Metrics, error) {
return nil, fmt.Errorf("podman receiver is not supported on windows")
}

func createMetricsReceiver(
_ context.Context,
params receiver.CreateSettings,
config component.Config,
consumer consumer.Metrics,
) (receiver.Metrics, error) {
podmanConfig := config.(*Config)

return newMetricsReceiver(params, podmanConfig, nil, consumer)
}
3 changes: 1 addition & 2 deletions receiver/podmanreceiver/receiver_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package podmanreceiver

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -13,7 +12,7 @@ import (
)

func TestNewReceiver(t *testing.T) {
mr, err := newMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), &Config{}, consumertest.NewNop(), nil)
mr, err := newMetricsReceiver(receivertest.NewNopCreateSettings(), &Config{}, consumertest.NewNop(), nil)
assert.Nil(t, mr)
assert.Error(t, err)
assert.Equal(t, "podman receiver is not supported on windows", err.Error())
Expand Down

0 comments on commit e13b1a3

Please sign in to comment.