Skip to content

Commit

Permalink
Add SRV resolver to loadbalancer exporter to use hostnames and track IPs
Browse files Browse the repository at this point in the history
  • Loading branch information
snuggie12 committed Feb 26, 2024
1 parent 3c97c8f commit 0beac79
Show file tree
Hide file tree
Showing 7 changed files with 802 additions and 5 deletions.
27 changes: 27 additions & 0 deletions .chloggen/srv-resolver-for-loadbalancing-exporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: loadbalancingexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: New SRV resolver for loadbalancing exporter for static hostnames with changing IPs

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: ["18412"]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
43 changes: 39 additions & 4 deletions exporter/loadbalancingexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ The `loadbalancingexporter` will, irrespective of the chosen resolver (`static`,
Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using the processor.

* The `otlp` property configures the template used for building the OTLP exporter. Refer to the OTLP Exporter documentation for information on which options are available. Note that the `endpoint` property should not be set and will be overridden by this exporter with the backend endpoint.
* The `resolver` accepts a `static` node, a `dns` or a `k8s` service. If all three are specified, `k8s` takes precedence.
* The `resolver` accepts a `static` node, a `dns`, `srv`, or a `k8s` service. If all four are specified, `k8s` takes precedence.
* The `hostname` property inside a `dns` node specifies the hostname to query in order to obtain the list of IP addresses.
* The `dns` node also accepts the following optional properties:
* `hostname` DNS hostname to resolve.
Expand All @@ -71,9 +71,14 @@ Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using th
* `service` Kubernetes service to resolve, e.g. `lb-svc.lb-ns`. If no namespace is specified, an attempt will be made to infer the namespace for this collector, and if this fails it will fall back to the `default` namespace.
* `ports` port to be used for exporting the traces to the addresses resolved from `service`. If `ports` is not specified, the default port 4317 is used. When multiple ports are specified, two backends are added to the load balancer as if they were at different pods.
* The `routing_key` property is used to route spans to exporters based on different parameters. This functionality is currently enabled only for `trace` pipeline types. It supports one of the following values:
* `service`: exports spans based on their service name. This is useful when using processors like the span metrics, so all spans for each service are sent to consistent collector instances for metric collection. Otherwise, metrics for the same services are sent to different collectors, making aggregations inaccurate.
* `traceID` (default): exports spans based on their `traceID`.
* If not configured, defaults to `traceID` based routing.
* `service`: exports spans based on their service name. This is useful when using processors like the span metrics, so all spans for each service are sent to consistent collector instances for metric collection. Otherwise, metrics for the same services are sent to different collectors, making aggregations inaccurate.
* `traceID` (default): exports spans based on their `traceID`.
* If not configured, defaults to `traceID` based routing.
* The `srv` node accepts the following optional properties:
* `hostname` DNS SRV hostname to resolve.
* `port` port to be used for exporting the traces to the IP addresses resolved from `hostname`. If `port` is not specified, the default port 4317 is used.
* `interval` resolver interval in go-Duration format, e.g. `5s`, `1d`, `30m`. If not specified, `5s` will be used.
* `timeout` resolver timeout in go-Duration format, e.g. `5s`, `1d`, `30m`. If not specified, `1s` will be used.

Simple example
```yaml
Expand Down Expand Up @@ -162,6 +167,36 @@ service:
- loadbalancing
```
The SRV Resolver is useful in situations when you want to return hostnames instead of IPs for endpoints. An example would be a `StatefulSet`-backed headless kubernetes `Service` with istio. Example:

```yaml
receivers:
otlp:
protocols:
grpc:
endpoint: localhost:4317
processors:
exporters:
loadbalancing:
protocol:
otlp: {}
resolver:
srv:
hostname: _<svc-port-name>._<svc-port-protocol>.<svc-name>.<svc-namespace>.svc.cluster.local
routing_key: traceID
service:
pipelines:
traces:
receivers:
- otlp
processors: []
exporters:
- loadbalancing
```

For testing purposes, the following configuration can be used, where both the load balancer and all backends are running locally:
```yaml
receivers:
Expand Down
12 changes: 11 additions & 1 deletion exporter/loadbalancingexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type ResolverSettings struct {
Static *StaticResolver `mapstructure:"static"`
DNS *DNSResolver `mapstructure:"dns"`
K8sSvc *K8sSvcResolver `mapstructure:"k8s"`
SRV *SRVResolver `mapstructure:"srv"`
}

// StaticResolver defines the configuration for the resolver providing a fixed list of backends
Expand All @@ -50,8 +51,17 @@ type DNSResolver struct {
Timeout time.Duration `mapstructure:"timeout"`
}

// K8sSvcResolver defines the configuration for the DNS resolver
// K8sSvcResolver defines the configuration for the kubernetes Service resolver
type K8sSvcResolver struct {
Service string `mapstructure:"service"`
Ports []int32 `mapstructure:"ports"`
}

// TODO: Should a common struct be used for dns-based resolvers?
// SRVResolver defines the configuration for the DNS resolver of SRV records for headless Services
type SRVResolver struct {
Hostname string `mapstructure:"hostname"`
Port string `mapstructure:"port"`
Interval time.Duration `mapstructure:"interval"`
Timeout time.Duration `mapstructure:"timeout"`
}
1 change: 1 addition & 0 deletions exporter/loadbalancingexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
go.opentelemetry.io/otel/trace v1.23.1
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20230711023510-fffb14384f22
k8s.io/api v0.29.2
k8s.io/apimachinery v0.29.2
k8s.io/client-go v0.29.2
Expand Down
9 changes: 9 additions & 0 deletions exporter/loadbalancingexporter/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ func newLoadBalancer(params exporter.CreateSettings, cfg component.Config, facto
return nil, err
}
}
if oCfg.Resolver.SRV != nil {
srvLogger := params.Logger.With(zap.String("resolver", "DNS SRV"))

var err error
res, err = newSRVResolver(srvLogger, oCfg.Resolver.SRV.Hostname, oCfg.Resolver.SRV.Port, oCfg.Resolver.SRV.Interval, oCfg.Resolver.SRV.Timeout)
if err != nil {
return nil, err
}
}

if res == nil {
return nil, errNoResolver
Expand Down
252 changes: 252 additions & 0 deletions exporter/loadbalancingexporter/resolver_srv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter"

import (
"context"
"errors"
"fmt"
"net"
"strings"
"sync"
"time"

"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

// TODO: What is this?
var _ resolver = (*srvResolver)(nil)

// TODO: Should these be moved to somethingl ike resolver_common.go?
// const (
// defaultResInterval = 5 * time.Second
// defaultResTimeout = time.Second
// )

var (
errNoSRV = errors.New("no SRV record found")
errBadSRV = errors.New("SRV hostname must be in the form of _service._proto.name")
errNotSingleIP = errors.New("underlying A record must return a single IP address")

srvResolverMutator = tag.Upsert(tag.MustNewKey("resolver"), "srv")

srvResolverSuccessTrueMutators = []tag.Mutator{srvResolverMutator, successTrueMutator}
srvResolverSuccessFalseMutators = []tag.Mutator{srvResolverMutator, successFalseMutator}
)

type srvResolver struct {
logger *zap.Logger

srvService string
srvProto string
srvName string
port string
resolver multiResolver
resInterval time.Duration
resTimeout time.Duration

endpoints []string
endpointsWithIPs map[string]string
onChangeCallbacks []func([]string)

stopCh chan (struct{})
updateLock sync.Mutex
shutdownWg sync.WaitGroup
changeCallbackLock sync.RWMutex
}

type multiResolver interface {
LookupIPAddr(ctx context.Context, host string) ([]net.IPAddr, error)
LookupSRV(ctx context.Context, service, proto, name string) (cname string, addrs []*net.SRV, err error)
}

func newSRVResolver(logger *zap.Logger, srvHostname string, port string, interval time.Duration, timeout time.Duration) (*srvResolver, error) {
if len(srvHostname) == 0 {
return nil, errNoSRV
}
if interval == 0 {
interval = defaultResInterval
}
if timeout == 0 {
timeout = defaultResTimeout
}

service, proto, name, err := parseSRVHostname(srvHostname)
if err != nil {
logger.Warn("failed to parse SRV hostname", zap.Error(err))
}

return &srvResolver{
logger: logger,
srvService: service,
srvProto: proto,
srvName: name,
port: port,
resolver: &net.Resolver{},
resInterval: interval,
resTimeout: timeout,
stopCh: make(chan struct{}),
}, nil
}

func (r *srvResolver) start(ctx context.Context) error {
if _, err := r.resolve(ctx); err != nil {
r.logger.Warn("failed to resolve", zap.Error(err))
}

go r.periodicallyResolve()

r.logger.Debug("SRV resolver started",
zap.String("SRV name", r.srvName), zap.String("port", r.port),
zap.Duration("interval", r.resInterval), zap.Duration("timeout", r.resTimeout))
return nil
}

func (r *srvResolver) shutdown(_ context.Context) error {
r.changeCallbackLock.Lock()
r.onChangeCallbacks = nil
r.changeCallbackLock.Unlock()

close(r.stopCh)
r.shutdownWg.Wait()
return nil
}

func (r *srvResolver) periodicallyResolve() {
ticker := time.NewTicker(r.resInterval)

for {
select {
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), r.resTimeout)
if _, err := r.resolve(ctx); err != nil {
r.logger.Warn("failed to resolve", zap.Error(err))
} else {
r.logger.Debug("resolved successfully")
}
cancel()
case <-r.stopCh:
return
}
}
}

func (r *srvResolver) resolve(ctx context.Context) ([]string, error) {
r.shutdownWg.Add(1)
defer r.shutdownWg.Done()

_, srvs, err := r.resolver.LookupSRV(ctx, r.srvService, r.srvProto, r.srvName)
if err != nil {
_ = stats.RecordWithTags(ctx, srvResolverSuccessFalseMutators, mNumResolutions.M(1))
return nil, err
}

_ = stats.RecordWithTags(ctx, srvResolverSuccessTrueMutators, mNumResolutions.M(1))

// backendsWithIPs tracks the IP addresses for changes
backendsWithIPs := make(map[string]string)
for _, srv := range srvs {
target := strings.TrimSuffix(srv.Target, ".")
backendsWithIPs[target] = ""
}
// backends is what we use to compare against the current endpoints
var backends []string

// Lookup the IP addresses for the A records
for aRec := range backendsWithIPs {

// handle backends first
backend := aRec
// if a port is specified in the configuration, add it
if r.port != "" {
backend = fmt.Sprintf("%s:%s", backend, r.port)
}
backends = append(backends, backend)

ips, err := r.resolver.LookupIPAddr(ctx, aRec)
// Return the A record. If we can't resolve them, we'll try again next iteration
if err != nil {
_ = stats.RecordWithTags(ctx, srvResolverSuccessFalseMutators, mNumResolutions.M(1))
continue
}
// A headless Service SRV target only returns 1 IP address for its A record
if len(ips) > 1 {
return nil, errNotSingleIP
}

ip := ips[0]
if ip.IP.To4() != nil {
backendsWithIPs[aRec] = ip.String()
} else {
// it's an IPv6 address
backendsWithIPs[aRec] = fmt.Sprintf("[%s]", ip.String())
}
}

var freshBackends []string
for endpoint := range backendsWithIPs {
// If the old map doesn't have the endpoint, it's fresh
if _, ok := r.endpointsWithIPs[endpoint]; !ok {
freshBackends = append(freshBackends, endpoint)
// If the old map has the endpoint and IPs match it's still fresh
// Else freshBackends will be smaller and used later during callbacks
} else if backendsWithIPs[endpoint] == r.endpointsWithIPs[endpoint] {
freshBackends = append(freshBackends, endpoint)
}
}

// keep both in the same order
slices.Sort(freshBackends)
slices.Sort(backends)

if equalStringSlice(r.endpoints, freshBackends) {
r.logger.Debug("No change in endpoints")
return r.endpoints, nil
}

// the list has changed!
r.updateLock.Lock()
r.logger.Debug("Updating endpoints", zap.Strings("new endpoints", backends))
r.logger.Debug("Endpoints with IPs", zap.Any("old", r.endpointsWithIPs), zap.Any("new", backendsWithIPs))
r.endpoints = backends
r.endpointsWithIPs = backendsWithIPs
r.updateLock.Unlock()
_ = stats.RecordWithTags(ctx, srvResolverSuccessTrueMutators, mNumBackends.M(int64(len(backends))))

// propagate the change
r.changeCallbackLock.RLock()
for _, callback := range r.onChangeCallbacks {
// If backends != freshBackends it means an endpoint needs refreshed
if !equalStringSlice(r.endpoints, freshBackends) {
r.logger.Debug("Stale endpoints present", zap.Strings("fresh endpoints", freshBackends))
callback(freshBackends)
}
callback(r.endpoints)
}
r.changeCallbackLock.RUnlock()

return r.endpoints, nil
}

func (r *srvResolver) onChange(f func([]string)) {
r.changeCallbackLock.Lock()
defer r.changeCallbackLock.Unlock()
r.onChangeCallbacks = append(r.onChangeCallbacks, f)
}

func parseSRVHostname(srvHostname string) (service string, proto string, name string, err error) {
parts := strings.Split(srvHostname, ".")
if len(parts) < 3 {
return "", "", "", errBadSRV
}

service = strings.TrimPrefix(parts[0], "_")
proto = strings.TrimPrefix(parts[1], "_")
name = strings.Join(parts[2:], ".")

return service, proto, name, nil
}
Loading

0 comments on commit 0beac79

Please sign in to comment.