Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: allow a single node to activate stress relief mode during significant load increase #1256

Merged
merged 5 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions collect/stressRelief.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (s *StressRelief) Start() error {
// register stress level metrics
s.RefineryMetrics.Register("cluster_stress_level", "gauge")
s.RefineryMetrics.Register("individual_stress_level", "gauge")
s.RefineryMetrics.Register("stress_level", "gauge")
s.RefineryMetrics.Register("stress_relief_activated", "gauge")

// We use an algorithms map so that we can name these algorithms, which makes it easier for several things:
Expand Down Expand Up @@ -159,11 +160,11 @@ func (s *StressRelief) Start() error {
tickCounter = 0
)

tick := time.NewTicker(100 * time.Millisecond)
tick := s.Clock.NewTicker(100 * time.Millisecond)
defer tick.Stop()
for {
select {
case <-tick.C:
case <-tick.Chan():
currentLevel := s.Recalc()

if lastLevel != currentLevel || tickCounter == maxTicksBetweenReports {
Expand Down Expand Up @@ -390,7 +391,7 @@ func (s *StressRelief) Recalc() uint {
formula = fmt.Sprintf("%s(%v/%v)=%v", c.Algorithm, c.Numerator, c.Denominator, stress)
}
}
s.Logger.Debug().WithField("stress_level", maximumLevel).WithField("stress_formula", s.formula).WithField("reason", reason).Logf("calculated stress level")
s.Logger.Debug().WithField("individual_stress_level", maximumLevel).WithField("stress_formula", s.formula).WithField("reason", reason).Logf("calculated stress level")

s.RefineryMetrics.Gauge("individual_stress_level", float64(maximumLevel))
localLevel := uint(maximumLevel)
Expand All @@ -401,7 +402,11 @@ func (s *StressRelief) Recalc() uint {
s.lock.Lock()
defer s.lock.Unlock()

s.overallStressLevel = clusterStressLevel
// The overall stress level is the max of the individual and cluster stress levels
// If a single node is under significant stress, it can activate stress relief mode
s.overallStressLevel = uint(math.Max(float64(clusterStressLevel), float64(localLevel)))
s.RefineryMetrics.Gauge("stress_level", s.overallStressLevel)

s.reason = reason
s.formula = formula

Expand All @@ -414,18 +419,18 @@ func (s *StressRelief) Recalc() uint {
// If it's off, should we activate it?
if !s.stressed && s.overallStressLevel >= s.activateLevel {
s.stressed = true
s.Logger.Warn().WithField("cluster_stress_level", s.overallStressLevel).WithField("stress_formula", s.formula).WithField("reason", s.reason).Logf("StressRelief has been activated")
s.Logger.Warn().WithField("stress_level", s.overallStressLevel).WithField("stress_formula", s.formula).WithField("reason", s.reason).Logf("StressRelief has been activated")
VinozzZ marked this conversation as resolved.
Show resolved Hide resolved
}
// We want make sure that stress relief is below the deactivate level
// for a minimum time after the last time we said it should be, so
// whenever it's above that value we push the time out.
if s.stressed && s.overallStressLevel >= s.deactivateLevel {
s.stayOnUntil = time.Now().Add(s.minDuration)
s.stayOnUntil = s.Clock.Now().Add(s.minDuration)
}
// If it's on, should we deactivate it?
if s.stressed && s.overallStressLevel < s.deactivateLevel && s.Clock.Now().After(s.stayOnUntil) {
s.stressed = false
s.Logger.Warn().WithField("cluster_stress_level", s.overallStressLevel).Logf("StressRelief has been deactivated")
s.Logger.Warn().WithField("stress_level", s.overallStressLevel).Logf("StressRelief has been deactivated")
}
}

Expand Down
84 changes: 84 additions & 0 deletions collect/stress_relief_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/honeycombio/refinery/metrics"
"github.com/honeycombio/refinery/pubsub"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -128,6 +129,89 @@ func TestStressRelief_Peer(t *testing.T) {
}, 2*time.Second, 100*time.Millisecond, "stress relief should be false")
}

func TestStressRelief_OverallStressLevel(t *testing.T) {
clock := clockwork.NewFakeClock()
sr, stop := newStressRelief(t, clock, nil)
defer stop()

sr.Start()

sr.RefineryMetrics.Register("collector_incoming_queue_length", "gauge")

sr.RefineryMetrics.Store("INCOMING_CAP", 1200)

cfg := config.StressReliefConfig{
Mode: "monitor",
ActivationLevel: 80,
DeactivationLevel: 65,
MinimumActivationDuration: config.Duration(5 * time.Second),
}

// On startup, the stress relief should not be active
sr.UpdateFromConfig(cfg)
require.False(t, sr.Stressed())

// Test 1
// when a single peer's individual stress level is above the activation level
// the overall stress level should be above the activation level
// and the stress relief should be active
sr.RefineryMetrics.Gauge("collector_incoming_queue_length", 965)
clock.Advance(time.Second * 1)
sr.stressLevels = make(map[string]stressReport, 100)
for i := 0; i < 100; i++ {
key := fmt.Sprintf("peer%d", i)
sr.stressLevels[key] = stressReport{
key: key,
level: 10,
timestamp: sr.Clock.Now(),
}
}

localLevel := sr.Recalc()
sr.lock.RLock()
require.Equal(t, localLevel, sr.overallStressLevel)
require.True(t, sr.stressed)
sr.lock.Unlock()

// Test 2
// when a single peer's individual stress level is below the activation level
// and the rest of the cluster is above the activation level
// the single peer should remain in stress relief mode
sr.RefineryMetrics.Gauge("collector_incoming_queue_length", 10)
for i := 0; i < 100; i++ {
key := fmt.Sprintf("peer%d", i)
sr.stressLevels[key] = stressReport{
key: key,
level: 85,
timestamp: sr.Clock.Now(),
}
}
localLevel = sr.Recalc()
sr.lock.RLock()
require.Greater(t, sr.overallStressLevel, localLevel)
require.True(t, sr.stressed)
sr.lock.Unlock()

// Test 3
// Only when both the single peer's individual stress level and the cluster stress
// level is below the activation level, the stress relief should be deactivated.
sr.RefineryMetrics.Gauge("collector_incoming_queue_length", 10)
for i := 0; i < 100; i++ {
key := fmt.Sprintf("peer%d", i)
sr.stressLevels[key] = stressReport{
key: key,
level: 1,
timestamp: sr.Clock.Now(),
}
}
clock.Advance(sr.minDuration * 2)
localLevel = sr.Recalc()
sr.lock.RLock()
assert.Equal(t, sr.overallStressLevel, localLevel)
assert.False(t, sr.stressed)
sr.lock.Unlock()
}

// TestStressRelief_Sample tests that traces are sampled deterministically
// by traceID.
// The test generates 10000 traceIDs and checks that the sampling rate is
Expand Down
Loading