-
Notifications
You must be signed in to change notification settings - Fork 322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: introduce in-memory stats for testing #2735
Merged
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
73fc1f6
chore: introduce memory stats for testing
lvrach 0d8d0ed
chore: keep record of stats values/durations
lvrach ce8226b
refactor: use thread safe methods to get values & durations
lvrach d755518
refactor: use New creation pattern
lvrach 0daa80e
chore: fmt
lvrach 7f0c52e
Merge branch 'master' into chore.memstats
lvrach File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,231 @@ | ||
package memstats | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
"time" | ||
|
||
"github.com/rudderlabs/rudder-server/services/stats" | ||
) | ||
|
||
var _ stats.Stats = (*Store)(nil) | ||
|
||
var _ stats.Measurement = (*Measurement)(nil) | ||
|
||
type Store struct { | ||
mu sync.Mutex | ||
byKey map[string]*Measurement | ||
now func() time.Time | ||
} | ||
|
||
type Measurement struct { | ||
mu sync.Mutex | ||
startTime time.Time | ||
now func() time.Time | ||
|
||
tags stats.Tags | ||
name string | ||
mType string | ||
|
||
sum float64 | ||
values []float64 | ||
durations []time.Duration | ||
} | ||
|
||
func (m *Measurement) LastValue() float64 { | ||
m.mu.Lock() | ||
defer m.mu.Unlock() | ||
|
||
if len(m.values) == 0 { | ||
return 0 | ||
} | ||
|
||
return m.values[len(m.values)-1] | ||
} | ||
|
||
func (m *Measurement) Values() []float64 { | ||
m.mu.Lock() | ||
defer m.mu.Unlock() | ||
|
||
s := make([]float64, len(m.values)) | ||
copy(s, m.values) | ||
|
||
return s | ||
} | ||
|
||
func (m *Measurement) LastDuration() time.Duration { | ||
m.mu.Lock() | ||
defer m.mu.Unlock() | ||
|
||
if len(m.durations) == 0 { | ||
return 0 | ||
} | ||
|
||
return m.durations[len(m.durations)-1] | ||
} | ||
|
||
func (m *Measurement) Durations() []time.Duration { | ||
m.mu.Lock() | ||
defer m.mu.Unlock() | ||
|
||
s := make([]time.Duration, len(m.durations)) | ||
copy(s, m.durations) | ||
|
||
return s | ||
} | ||
|
||
// Count implements stats.Measurement | ||
func (m *Measurement) Count(n int) { | ||
m.mu.Lock() | ||
defer m.mu.Unlock() | ||
|
||
if m.mType != stats.CountType { | ||
panic("operation Count not supported for measurement type:" + m.mType) | ||
} | ||
|
||
m.sum += float64(n) | ||
m.values = append(m.values, m.sum) | ||
} | ||
|
||
// Increment implements stats.Measurement | ||
func (m *Measurement) Increment() { | ||
if m.mType != stats.CountType { | ||
panic("operation Increment not supported for measurement type:" + m.mType) | ||
} | ||
|
||
m.Count(1) | ||
} | ||
|
||
// Gauge implements stats.Measurement | ||
func (m *Measurement) Gauge(value interface{}) { | ||
if m.mType != stats.GaugeType { | ||
panic("operation Gauge not supported for measurement type:" + m.mType) | ||
} | ||
|
||
m.mu.Lock() | ||
defer m.mu.Unlock() | ||
|
||
m.values = append(m.values, value.(float64)) | ||
} | ||
|
||
// Observe implements stats.Measurement | ||
func (m *Measurement) Observe(value float64) { | ||
if m.mType != stats.HistogramType { | ||
panic("operation Observe not supported for measurement type:" + m.mType) | ||
} | ||
|
||
m.mu.Lock() | ||
defer m.mu.Unlock() | ||
|
||
m.values = append(m.values, value) | ||
} | ||
|
||
// Start implements stats.Measurement | ||
func (m *Measurement) Start() { | ||
if m.mType != stats.TimerType { | ||
panic("operation Start not supported for measurement type:" + m.mType) | ||
} | ||
|
||
m.mu.Lock() | ||
defer m.mu.Unlock() | ||
|
||
m.startTime = m.now() | ||
} | ||
|
||
// End implements stats.Measurement | ||
func (m *Measurement) End() { | ||
if m.mType != stats.TimerType { | ||
panic("operation End not supported for measurement type:" + m.mType) | ||
} | ||
|
||
m.SendTiming(m.now().Sub(m.startTime)) | ||
} | ||
|
||
// Since implements stats.Measurement | ||
func (m *Measurement) Since(start time.Time) { | ||
if m.mType != stats.TimerType { | ||
panic("operation Since not supported for measurement type:" + m.mType) | ||
} | ||
|
||
m.SendTiming(m.now().Sub(start)) | ||
} | ||
|
||
// SendTiming implements stats.Measurement | ||
func (m *Measurement) SendTiming(duration time.Duration) { | ||
if m.mType != stats.TimerType { | ||
panic("operation SendTiming not supported for measurement type:" + m.mType) | ||
} | ||
|
||
m.mu.Lock() | ||
defer m.mu.Unlock() | ||
|
||
m.durations = append(m.durations, duration) | ||
} | ||
|
||
type Opts func(*Store) | ||
|
||
func WithNow(nowFn func() time.Time) Opts { | ||
return func(s *Store) { | ||
s.now = nowFn | ||
} | ||
} | ||
|
||
func New(opts ...Opts) *Store { | ||
s := &Store{ | ||
byKey: make(map[string]*Measurement), | ||
now: time.Now, | ||
} | ||
|
||
for _, opt := range opts { | ||
opt(s) | ||
} | ||
|
||
return s | ||
} | ||
|
||
// NewStat implements stats.Stats | ||
func (ms *Store) NewStat(name, statType string) (m stats.Measurement) { | ||
return ms.NewTaggedStat(name, statType, nil) | ||
} | ||
|
||
// NewTaggedStat implements stats.Stats | ||
func (ms *Store) NewTaggedStat(name, statType string, tags stats.Tags) stats.Measurement { | ||
return ms.NewSampledTaggedStat(name, statType, tags) | ||
} | ||
|
||
// NewSampledTaggedStat implements stats.Stats | ||
func (ms *Store) NewSampledTaggedStat(name, statType string, tags stats.Tags) stats.Measurement { | ||
ms.mu.Lock() | ||
defer ms.mu.Unlock() | ||
|
||
m := &Measurement{ | ||
name: name, | ||
tags: tags, | ||
mType: statType, | ||
|
||
now: ms.now, | ||
} | ||
|
||
ms.byKey[ms.getKey(name, tags)] = m | ||
return m | ||
} | ||
|
||
// Get the stored measurement with the name and tags. | ||
// If no measurement is found, nil is returned. | ||
func (ms *Store) Get(name string, tags stats.Tags) *Measurement { | ||
ms.mu.Lock() | ||
defer ms.mu.Unlock() | ||
|
||
return ms.byKey[ms.getKey(name, tags)] | ||
} | ||
|
||
// Start implements stats.Stats | ||
func (*Store) Start(_ context.Context) {} | ||
|
||
// Stop implements stats.Stats | ||
func (*Store) Stop() {} | ||
|
||
// getKey maps name and tags, to a store lookup key. | ||
func (*Store) getKey(name string, tags stats.Tags) string { | ||
return name + tags.String() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
package memstats_test | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"github.com/rudderlabs/rudder-server/services/stats" | ||
"github.com/rudderlabs/rudder-server/services/stats/memstats" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestStats(t *testing.T) { | ||
now := time.Now() | ||
|
||
store := memstats.New( | ||
memstats.WithNow(func() time.Time { | ||
return now | ||
}), | ||
) | ||
|
||
commonTags := stats.Tags{"tag1": "value1"} | ||
|
||
t.Run("test Count", func(t *testing.T) { | ||
name := "testCount" | ||
|
||
m := store.NewTaggedStat(name, stats.CountType, commonTags) | ||
|
||
m.Increment() | ||
|
||
require.Equal(t, 1.0, store.Get(name, commonTags).LastValue()) | ||
require.Equal(t, []float64{1.0}, store.Get(name, commonTags).Values()) | ||
|
||
m.Count(2) | ||
|
||
require.Equal(t, 3.0, store.Get(name, commonTags).LastValue()) | ||
require.Equal(t, []float64{1.0, 3.0}, store.Get(name, commonTags).Values()) | ||
}) | ||
|
||
t.Run("test Gauge", func(t *testing.T) { | ||
name := "testGauge" | ||
m := store.NewTaggedStat(name, stats.GaugeType, commonTags) | ||
|
||
m.Gauge(1.0) | ||
|
||
require.Equal(t, 1.0, store.Get(name, commonTags).LastValue()) | ||
require.Equal(t, []float64{1.0}, store.Get(name, commonTags).Values()) | ||
|
||
m.Gauge(2.0) | ||
|
||
require.Equal(t, 2.0, store.Get(name, commonTags).LastValue()) | ||
require.Equal(t, []float64{1.0, 2.0}, store.Get(name, commonTags).Values()) | ||
}) | ||
|
||
t.Run("test Histogram", func(t *testing.T) { | ||
name := "testHistogram" | ||
m := store.NewTaggedStat(name, stats.HistogramType, commonTags) | ||
|
||
m.Observe(1.0) | ||
|
||
require.Equal(t, 1.0, store.Get(name, commonTags).LastValue()) | ||
require.Equal(t, []float64{1.0}, store.Get(name, commonTags).Values()) | ||
|
||
m.Observe(2.0) | ||
|
||
require.Equal(t, 2.0, store.Get(name, commonTags).LastValue()) | ||
require.Equal(t, []float64{1.0, 2.0}, store.Get(name, commonTags).Values()) | ||
}) | ||
|
||
t.Run("test Timer", func(t *testing.T) { | ||
name := "testTimer" | ||
|
||
m := store.NewTaggedStat(name, stats.TimerType, commonTags) | ||
|
||
m.SendTiming(time.Second) | ||
require.Equal(t, time.Second, store.Get(name, commonTags).LastDuration()) | ||
require.Equal(t, []time.Duration{time.Second}, store.Get(name, commonTags).Durations()) | ||
|
||
m.SendTiming(time.Minute) | ||
require.Equal(t, time.Minute, store.Get(name, commonTags).LastDuration()) | ||
require.Equal(t, | ||
[]time.Duration{time.Second, time.Minute}, | ||
store.Get(name, commonTags).Durations(), | ||
) | ||
|
||
m.Start() | ||
now = now.Add(time.Hour) | ||
m.End() | ||
require.Equal(t, time.Hour, store.Get(name, commonTags).LastDuration()) | ||
require.Equal(t, | ||
[]time.Duration{time.Second, time.Minute, time.Hour}, | ||
store.Get(name, commonTags).Durations(), | ||
) | ||
|
||
m.Since(now.Add(-time.Minute)) | ||
require.Equal(t, time.Minute, store.Get(name, commonTags).LastDuration()) | ||
require.Equal(t, | ||
[]time.Duration{time.Second, time.Minute, time.Hour, time.Minute}, | ||
store.Get(name, commonTags).Durations(), | ||
) | ||
}) | ||
|
||
t.Run("invalid operations", func(t *testing.T) { | ||
require.PanicsWithValue(t, "operation Count not supported for measurement type:gauge", func() { | ||
store.NewTaggedStat("invalid_count", stats.GaugeType, commonTags).Count(1) | ||
}) | ||
require.PanicsWithValue(t, "operation Increment not supported for measurement type:gauge", func() { | ||
store.NewTaggedStat("invalid_inc", stats.GaugeType, commonTags).Increment() | ||
}) | ||
require.PanicsWithValue(t, "operation Gauge not supported for measurement type:count", func() { | ||
store.NewTaggedStat("invalid_gauge", stats.CountType, commonTags).Gauge(1) | ||
}) | ||
require.PanicsWithValue(t, "operation SendTiming not supported for measurement type:histogram", func() { | ||
store.NewTaggedStat("invalid_send_timing", stats.HistogramType, commonTags).SendTiming(time.Second) | ||
}) | ||
require.PanicsWithValue(t, "operation Start not supported for measurement type:histogram", func() { | ||
store.NewTaggedStat("invalid_start", stats.HistogramType, commonTags).Start() | ||
}) | ||
require.PanicsWithValue(t, "operation End not supported for measurement type:histogram", func() { | ||
store.NewTaggedStat("invalid_end", stats.HistogramType, commonTags).End() | ||
}) | ||
require.PanicsWithValue(t, "operation Since not supported for measurement type:histogram", func() { | ||
store.NewTaggedStat("invalid_since", stats.HistogramType, commonTags).Since(time.Now()) | ||
}) | ||
require.PanicsWithValue(t, "operation Observe not supported for measurement type:timer", func() { | ||
store.NewTaggedStat("invalid_observe", stats.TimerType, commonTags).Observe(1) | ||
}) | ||
}) | ||
|
||
t.Run("no op", func(t *testing.T) { | ||
store.Start(context.Background()) | ||
store.Stop() | ||
}) | ||
|
||
t.Run("no tags", func(t *testing.T) { | ||
name := "no_tags" | ||
m := store.NewStat(name, stats.CountType) | ||
|
||
m.Increment() | ||
|
||
require.Equal(t, 1.0, store.Get(name, nil).LastValue()) | ||
}) | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all fields in the
Store
struct could be unexported and intialization could be taken care once and for all by a producer function usingOpts
if necessary. This would eliminate the need for theinit
method being called by other methods and thesync.Once
field e.g.and other packages could simply do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your suggestion and my implementation are good patterns to initialise components in Go. I use and like both depending on the situation.
I don't see it as a problem having unexported fields for options; keep in mind most standard libraries are using this pattern.
The downside of the lazy init approach is that we have to call
.init
in every exported method. Thewith
approach also has an overhead of as well but is safer indeed.