Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use pub/sub for stress relief #1221

Merged
merged 10 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 40 additions & 24 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/honeycombio/refinery/sample"
"github.com/honeycombio/refinery/transmit"
"github.com/honeycombio/refinery/types"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"
)

Expand All @@ -34,7 +35,7 @@ type Collector interface {
AddSpanFromPeer(*types.Span) error
Stressed() bool
GetStressedSampleRate(traceID string) (rate uint, keep bool, reason string)
ProcessSpanImmediately(sp *types.Span, keep bool, sampleRate uint, reason string)
ProcessSpanImmediately(sp *types.Span) (processed bool, keep bool)
}

func GetCollectorImplementation(c config.Config) Collector {
Expand All @@ -54,6 +55,7 @@ const (
type InMemCollector struct {
Config config.Config `inject:""`
Logger logger.Logger `inject:""`
Clock clockwork.Clock `inject:""`
Tracer trace.Tracer `inject:"tracer"`
Transmission transmit.Transmission `inject:"upstreamTransmission"`
Metrics metrics.Metrics `inject:"genericMetrics"`
Expand Down Expand Up @@ -98,8 +100,6 @@ func (i *InMemCollector) Start() error {
i.Metrics.Register("collector_peer_queue_length", "gauge")
i.Metrics.Register("collector_incoming_queue_length", "gauge")
i.Metrics.Register("collector_peer_queue", "histogram")
i.Metrics.Register("stress_level", "gauge")
i.Metrics.Register("stress_relief_activated", "gauge")
i.Metrics.Register("collector_cache_size", "gauge")
i.Metrics.Register("memory_heap_allocation", "gauge")
i.Metrics.Register("span_received", "counter")
Expand Down Expand Up @@ -323,12 +323,6 @@ func (i *InMemCollector) collect() {
i.Metrics.Histogram("collector_peer_queue", float64(len(i.fromPeer)))
i.Metrics.Gauge("collector_incoming_queue_length", float64(len(i.incoming)))
i.Metrics.Gauge("collector_peer_queue_length", float64(len(i.fromPeer)))
i.Metrics.Gauge("stress_level", float64(i.StressRelief.StressLevel()))
if i.StressRelief.Stressed() {
i.Metrics.Gauge("stress_relief_activated", 1)
} else {
i.Metrics.Gauge("stress_relief_activated", 0)
}

// Always drain peer channel before doing anything else. By processing peer
// traffic preferentially we avoid the situation where the cluster essentially
Expand Down Expand Up @@ -469,23 +463,42 @@ func (i *InMemCollector) processSpan(sp *types.Span) {
// cache as "kept".
// It doesn't do any logging and barely touches metrics; this is about as
// minimal as we can make it.
func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span, keep bool, sampleRate uint, reason string) {
now := time.Now()
trace := &types.Trace{
APIHost: sp.APIHost,
APIKey: sp.APIKey,
Dataset: sp.Dataset,
TraceID: sp.TraceID,
ArrivalTime: now,
SendBy: now,
}
// we do want a record of how we disposed of traces in case more come in after we've
// turned off stress relief (if stress relief is on we'll keep making the same decisions)
i.sampleTraceCache.Record(trace, keep, reason)
func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span) (processed bool, keep bool) {
_, span := otelutil.StartSpanWith(context.Background(), i.Tracer, "collector.ProcessSpanImmediately", "trace_id", sp.TraceID)
kentquirk marked this conversation as resolved.
Show resolved Hide resolved
defer span.End()

if !i.StressRelief.ShouldSampleDeterministically(sp.TraceID) {
otelutil.AddSpanField(span, "nondeterministic", 1)
return false, false
}

var rate uint
record, reason, found := i.sampleTraceCache.Check(sp)
if !found {
rate, keep, reason = i.StressRelief.GetSampleRate(sp.TraceID)
now := i.Clock.Now()
trace := &types.Trace{
APIHost: sp.APIHost,
APIKey: sp.APIKey,
Dataset: sp.Dataset,
TraceID: sp.TraceID,
ArrivalTime: now,
SendBy: now,
}
// we do want a record of how we disposed of traces in case more come in after we've
// turned off stress relief (if stress relief is on we'll keep making the same decisions)
i.sampleTraceCache.Record(trace, keep, reason)
} else {
rate = record.Rate()
keep = record.Kept()
}

if !keep {
i.Metrics.Increment("dropped_from_stress")
return
return true, false
}

i.Metrics.Increment("kept_from_stress")
// ok, we're sending it, so decorate it first
sp.Event.Data["meta.stressed"] = true
if i.Config.GetAddRuleReasonToTrace() {
Expand All @@ -494,9 +507,12 @@ func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span, keep bool, sampl
if i.hostname != "" {
sp.Data["meta.refinery.local_hostname"] = i.hostname
}

i.addAdditionalAttributes(sp)
mergeTraceAndSpanSampleRates(sp, sampleRate, i.Config.GetIsDryRun())
mergeTraceAndSpanSampleRates(sp, rate, i.Config.GetIsDryRun())
i.Transmission.EnqueueSpan(sp)

return true, true
}

// dealWithSentTrace handles a span that has arrived after the sampling decision
Expand Down
2 changes: 2 additions & 0 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/facebookgo/inject"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace/noop"
Expand Down Expand Up @@ -746,6 +747,7 @@ func TestDependencyInjection(t *testing.T) {
&inject.Object{Value: &config.MockConfig{}},
&inject.Object{Value: &logger.NullLogger{}},
&inject.Object{Value: noop.NewTracerProvider().Tracer("test"), Name: "tracer"},
&inject.Object{Value: clockwork.NewRealClock()},
&inject.Object{Value: &transmit.MockTransmission{}, Name: "upstreamTransmission"},
&inject.Object{Value: &metrics.NullMetrics{}, Name: "genericMetrics"},
&inject.Object{Value: &sample.SamplerFactory{}},
Expand Down
4 changes: 3 additions & 1 deletion collect/mockCollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ func (m *MockCollector) GetStressedSampleRate(traceID string) (rate uint, keep b
return 0, false, ""
}

func (m *MockCollector) ProcessSpanImmediately(sp *types.Span, keep bool, sampleRate uint, reason string) {
func (m *MockCollector) ProcessSpanImmediately(sp *types.Span) (bool, bool) {
m.Spans <- sp

return true, true
}

func (m *MockCollector) Stressed() bool {
Expand Down
Loading
Loading