Skip to content

Commit

Permalink
Initial Discovery properties provider and config incorporation
Browse files Browse the repository at this point in the history
  • Loading branch information
rmfitzpatrick committed Jan 20, 2023
1 parent c04d35e commit 9b79500
Show file tree
Hide file tree
Showing 20 changed files with 995 additions and 57 deletions.
3 changes: 3 additions & 0 deletions cmd/otelcol/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ func main() {
ResolverSettings: confmap.ResolverSettings{
URIs: collectorSettings.ResolverURIs(),
Providers: map[string]confmap.Provider{
discovery.PropertyScheme(): configprovider.NewConfigSourceConfigMapProvider(
discovery.PropertyProvider(), zap.NewNop(), info, hooks, configsources.Get()...,
),
discovery.ConfigDScheme(): configprovider.NewConfigSourceConfigMapProvider(
discovery.ConfigDProvider(),
zap.NewNop(), // The service logger is not available yet, setting it to Nop.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ require (
github.com/Shopify/sarama v1.37.2 // indirect
github.com/Showmax/go-fqdn v1.0.0 // indirect
github.com/StackExchange/wmi v1.2.1 // indirect
github.com/alecthomas/participle/v2 v2.0.0-beta.5 // indirect
github.com/alecthomas/participle/v2 v2.0.0-beta.5
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect
github.com/apache/thrift v0.17.0 // indirect
Expand Down
94 changes: 82 additions & 12 deletions internal/confmapprovider/discovery/discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/base64"
"fmt"
"os"
"strings"
"sync"
"time"

Expand All @@ -42,11 +43,15 @@ import (

"github.com/signalfx/splunk-otel-collector/internal/common/discovery"
"github.com/signalfx/splunk-otel-collector/internal/components"
"github.com/signalfx/splunk-otel-collector/internal/confmapprovider/discovery/properties"
"github.com/signalfx/splunk-otel-collector/internal/receiver/discoveryreceiver"
"github.com/signalfx/splunk-otel-collector/internal/version"
)

const durationEnvVar = "SPLUNK_DISCOVERY_DURATION"
const (
durationEnvVar = "SPLUNK_DISCOVERY_DURATION"
logLevelEnvVar = "SPLUNK_DISCOVERY_LOG_LEVEL"
)

// discoverer provides the mechanism for a "preflight" collector service
// that will stand up the observers and discovery receivers based on the .discovery.yaml
Expand All @@ -64,9 +69,12 @@ type discoverer struct {
unexpandedReceiverEntries map[component.ID]map[component.ID]map[string]any
discoveredConfig map[component.ID]map[string]any
discoveredObservers map[component.ID]discovery.StatusType
info component.BuildInfo
duration time.Duration
mu sync.Mutex
// propertiesConf is a store of all properties from cmdline args and env vars
// that's merged with receiver/observer configs before creation
propertiesConf *confmap.Conf
info component.BuildInfo
duration time.Duration
mu sync.Mutex
}

func newDiscoverer(logger *zap.Logger) (*discoverer, error) {
Expand All @@ -87,7 +95,6 @@ func newDiscoverer(logger *zap.Logger) (*discoverer, error) {
if err != nil {
return (*discoverer)(nil), err
}

m := &discoverer{
logger: logger,
info: info,
Expand All @@ -102,9 +109,31 @@ func newDiscoverer(logger *zap.Logger) (*discoverer, error) {
discoveredConfig: map[component.ID]map[string]any{},
discoveredObservers: map[component.ID]discovery.StatusType{},
}
m.propertiesConf = m.propertiesConfFromEnv()
return m, nil
}

func (d *discoverer) propertiesConfFromEnv() *confmap.Conf {
propertiesConf := confmap.New()
for _, env := range os.Environ() {
equalsIdx := strings.Index(env, "=")
if equalsIdx != -1 && len(env) > equalsIdx+1 {
envVar := env[:equalsIdx]
if envVar == logLevelEnvVar || envVar == durationEnvVar {
continue
}
if p, ok, e := properties.NewPropertyFromEnvVar(envVar, env[equalsIdx+1:]); ok {
if e != nil {
d.logger.Info(fmt.Sprintf("invalid discovery property environment variable %q", env), zap.Error(e))
continue
}
propertiesConf.Merge(confmap.NewFromStringMap(p.ToStringMap()))
}
}
}
return propertiesConf
}

// discover will create all .discovery.yaml components, start them, wait the configured
// duration, and tear them down before returning the discovery config.
func (d *discoverer) discover(cfg *Config) (map[string]any, error) {
Expand Down Expand Up @@ -197,19 +226,38 @@ func (d *discoverer) createDiscoveryReceiversAndObservers(cfg *Config) (map[comp
discoveryReceiverRaw := map[string]any{}
receivers := map[string]any{}

receiversPropertiesConf := confmap.New()
if d.propertiesConf.IsSet("receivers") {
receiversPropertiesConf, err = d.propertiesConf.Sub("receivers")
if err != nil {
return nil, nil, fmt.Errorf("failed obtaining receivers properties config: %w", err)
}
}
for receiverID, receiver := range cfg.ReceiversToDiscover {
if ok, err = d.updateReceiverForObserver(receiverID, receiver, observerID); err != nil {
return nil, nil, err
} else if !ok {
continue
}
d.addUnexpandedReceiverConfig(receiverID, observerID, receiver.Entry.ToStringMap())
receivers[receiverID.String()] = receiver.Entry.ToStringMap()
receiverEntry := receiver.Entry.ToStringMap()
if receiversPropertiesConf.IsSet(receiverID.String()) {
receiverPropertiesConf, e := receiversPropertiesConf.Sub(receiverID.String())
if e != nil {
return nil, nil, fmt.Errorf("failed obtaining receiver properties config: %w", e)
}
entryConf := confmap.NewFromStringMap(receiverEntry)
if err = entryConf.Merge(receiverPropertiesConf); err != nil {
return nil, nil, fmt.Errorf("failed merging receiver properties config: %w", err)
}
receiverEntry = entryConf.ToStringMap()
}

d.addUnexpandedReceiverConfig(receiverID, observerID, receiverEntry)
receivers[receiverID.String()] = receiverEntry
}

discoveryReceiverRaw["receivers"] = receivers
discoveryReceiverConfMap := confmap.NewFromStringMap(discoveryReceiverRaw)

if err = d.expandConverter.Convert(context.Background(), discoveryReceiverConfMap); err != nil {
return nil, nil, fmt.Errorf("error converting environment variables in receiver config: %w", err)
}
Expand Down Expand Up @@ -241,6 +289,24 @@ func (d *discoverer) createObserver(observerID component.ID, cfg *Config) (otelc

observerConfig := observerFactory.CreateDefaultConfig()
observerCfgMap := confmap.NewFromStringMap(cfg.DiscoveryObservers[observerID].ToStringMap())

if d.propertiesConf.IsSet("extensions") {
propertiesConf, e := d.propertiesConf.Sub("extensions")
if e != nil {
return nil, fmt.Errorf("failed obtaining extensions properties config: %w", e)
}
if propertiesConf.IsSet(observerID.String()) {
propertiesConf, e = propertiesConf.Sub(observerID.String())
if e != nil {
return nil, fmt.Errorf("failed obtaining observer properties config: %w", e)
}
if err = observerCfgMap.Merge(propertiesConf); err != nil {
return nil, fmt.Errorf("failed merging observer properties config: %w", err)
}
cfg.DiscoveryObservers[observerID] = ExtensionEntry{observerCfgMap.ToStringMap()}
}
}

if err = d.expandConverter.Convert(context.Background(), observerCfgMap); err != nil {
return nil, fmt.Errorf("error converting environment variables in %q config: %w", observerID.String(), err)
}
Expand Down Expand Up @@ -323,15 +389,17 @@ func (d *discoverer) discoveryConfig(cfg *Config) (map[string]any, error) {
}
}
if receiverAdded {
dCfg.Merge(confmap.NewFromStringMap(map[string]any{
if err := dCfg.Merge(confmap.NewFromStringMap(map[string]any{
"service": map[string]any{
"pipelines": map[string]any{
"metrics": map[string]any{
"receivers": []string{"receiver_creator/discovery"},
},
},
},
}))
})); err != nil {
return nil, fmt.Errorf("failed adding receiver_creator/discovery to metrics pipeline: %w", err)
}
}

extensions := confmap.NewFromStringMap(map[string]any{"extensions": map[string]any{}})
Expand Down Expand Up @@ -573,10 +641,12 @@ func (d *discoverer) ConsumeLogs(_ context.Context, ld plog.Logs) error {

func determineCurrentStatus(current, observed discovery.StatusType) discovery.StatusType {
switch {
case current == discovery.Successful:
// once successful never revert
case observed == discovery.Successful:
current = discovery.Successful
case current == discovery.Failed && observed == discovery.Partial:
current = discovery.Partial
case current == discovery.Partial:
// only update if observed successful (above)
default:
current = observed
}
Expand Down
23 changes: 23 additions & 0 deletions internal/confmapprovider/discovery/discoverer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package discovery

import (
"fmt"
"os"
"strings"
"testing"
Expand All @@ -25,6 +26,8 @@ import (
"go.opentelemetry.io/collector/confmap"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"

"github.com/signalfx/splunk-otel-collector/internal/common/discovery"
)

func TestDiscovererDurationFromEnv(t *testing.T) {
Expand Down Expand Up @@ -140,3 +143,23 @@ func TestMergeEntries(t *testing.T) {
"five.key": "five.val",
}, first)
}

func TestDetermineCurrentStatus(t *testing.T) {
for _, test := range []struct {
current, observed, expected discovery.StatusType
}{
{"failed", "failed", "failed"},
{"failed", "partial", "partial"},
{"failed", "successful", "successful"},
{"partial", "failed", "partial"},
{"partial", "partial", "partial"},
{"partial", "successful", "successful"},
{"successful", "failed", "successful"},
{"successful", "partial", "successful"},
{"successful", "successful", "successful"},
} {
t.Run(fmt.Sprintf("%s:%s->%s", test.current, test.observed, test.expected), func(t *testing.T) {
require.Equal(t, test.expected, determineCurrentStatus(test.current, test.observed))
})
}
}
57 changes: 57 additions & 0 deletions internal/confmapprovider/discovery/properties/env_var.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright Splunk, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package properties

import (
"fmt"

"github.com/alecthomas/participle/v2"
"github.com/alecthomas/participle/v2/lexer"
)

// SPLUNK_DISCOVERY_RECEIVERS_receiver_x2d_type_x2f_receiver_x2d_name_CONFIG_field_x3a3a_subfield=VAL
// SPLUNK_DISCOVERY_EXTENSIONS_observer_x2d_type_x2f_observer_x2d_name_CONFIG_field_x3a3a_subfield=VAL

var envVarLex = lexer.MustSimple([]lexer.SimpleRule{
{Name: "Underscore", Pattern: `_`},
{Name: "String", Pattern: `[^_]+`},
})

var envVarParser = participle.MustBuild[EnvVarProperty](
participle.Lexer(envVarLex),
participle.UseLookahead(participle.MaxLookahead),
)

type EnvVarProperty struct {
ComponentType string `parser:"'SPLUNK' Underscore 'DISCOVERY' Underscore @('RECEIVERS' | 'EXTENSIONS') Underscore"`
Component EnvVarComponentID `parser:"@@"`
Key string `parser:"Underscore 'CONFIG' Underscore @(String|Underscore)+"`
Val string
}

type EnvVarComponentID struct {
Type string `parser:"@~(Underscore (?= 'CONFIG'))+"`
// _x2f_ -> '/'
Name string `parser:"(Underscore 'x2f' Underscore @(~(?= Underscore (?= 'CONFIG'))+|''))?"`
}

func NewEnvVarProperty(property, val string) (*EnvVarProperty, error) {
p, err := envVarParser.ParseString("SPLUNK_DISCOVERY", property)
if err != nil {
return nil, fmt.Errorf("invalid property env var (parsing error): %w", err)
}
p.Val = val
return p, nil
}
88 changes: 88 additions & 0 deletions internal/confmapprovider/discovery/properties/env_var_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright Splunk, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package properties

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestValidEnvVarProperties(t *testing.T) {
for _, tt := range []struct {
expected *Property
envVar string
}{
{envVar: "SPLUNK_DISCOVERY_RECEIVERS_receiver_x2d_type_x2f__CONFIG_one",
expected: &Property{
stringMap: map[string]any{
"receivers": map[string]any{
"receiver-type": map[string]any{
"config": map[string]any{
"one": "val"},
},
},
},
ComponentType: "receivers",
Component: ComponentID{Type: "receiver-type"},
Key: "one",
Val: "val",
},
},
{envVar: "SPLUNK_DISCOVERY_EXTENSIONS_extension_x2e_type_x2f_extension____name_CONFIG_one_x3a__x3a_two",
expected: &Property{
stringMap: map[string]any{
"extensions": map[string]any{
"extension.type/extension____name": map[string]any{
"one": map[string]any{
"two": "val",
},
},
},
},
ComponentType: "extensions",
Component: ComponentID{Type: "extension.type", Name: "extension____name"},
Key: "one::two",
Val: "val",
},
},
} {
t.Run(tt.envVar, func(t *testing.T) {
p, ok, err := NewPropertyFromEnvVar(tt.envVar, "val")
require.True(t, ok)
require.NoError(t, err)
require.NotNil(t, p)
require.Equal(t, tt.expected, p)
})
}
}

func TestInvalidEnvVarProperties(t *testing.T) {
for _, tt := range []struct {
envVar, expectedError string
}{
{envVar: "SPLUNK_DISCOVERY_NOTVALIDCOMPONENT_TYPE_CONFIG_ONE", expectedError: "invalid env var property (parsing error): invalid property env var (parsing error): SPLUNK_DISCOVERY:1:18: unexpected token \"NOTVALIDCOMPONENT\" (expected (\"RECEIVERS\" | \"EXTENSIONS\") <underscore> EnvVarComponentID <underscore> \"CONFIG\" <underscore> (<string> | <underscore>)+)"},
{envVar: "SPLUNK_DISCOVERY_RECEIVERS_TYPE_NOTCONFIG_ONE", expectedError: "invalid env var property (parsing error): invalid property env var (parsing error): SPLUNK_DISCOVERY:1:46: unexpected token \"<EOF>\" (expected <underscore> \"CONFIG\" <underscore> (<string> | <underscore>)+)"},
{envVar: "SPLUNK_DISCOVERY_EXTENSIONS_TYPE_x2f_NAME_CONFIG_", expectedError: "invalid env var property (parsing error): invalid property env var (parsing error): SPLUNK_DISCOVERY:1:50: sub-expression (<string> | <underscore>)+ must match at least once"},
} {
t.Run(tt.envVar, func(t *testing.T) {
p, ok, err := NewPropertyFromEnvVar(tt.envVar, "val")
require.True(t, ok)
require.Error(t, err)
require.EqualError(t, err, tt.expectedError)
require.Nil(t, p)
})
}
}
Loading

0 comments on commit 9b79500

Please sign in to comment.