Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: introduce in-memory stats for testing #2735

Merged
merged 6 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 231 additions & 0 deletions services/stats/memstats/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package memstats

import (
"context"
"sync"
"time"

"github.com/rudderlabs/rudder-server/services/stats"
)

var _ stats.Stats = (*Store)(nil)

var _ stats.Measurement = (*Measurement)(nil)

type Store struct {
Copy link
Contributor

@atzoum atzoum Nov 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all fields in the Store struct could be unexported and intialization could be taken care once and for all by a producer function using Opts if necessary. This would eliminate the need for the init method being called by other methods and the sync.Once field e.g.

type Opt func(*Store)

var WithNow = func(now func() time.Time) Opt {
	return func(s *Store) {
		s.now = now
	}
}

// New returns a new stats.Stats implementation that stores stats in-memory.
func New(opts ...Opt) *Store {
	s := &Store{
		byKey: make(map[string]*measurement),
		now:   time.Now,
	}
	for _, opt := range opts {
		opt(s)
	}
	return s
}

type Store struct {
	mu    sync.Mutex
	byKey map[string]*measurement
	now func() time.Time
}

and other packages could simply do

store := memstats.New(
		memstats.WithNow(func() time.Time {
			return now
		}))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your suggestion and my implementation are good patterns to initialise components in Go. I use and like both depending on the situation.

I don't see it as a problem having unexported fields for options; keep in mind most standard libraries are using this pattern.

The downside of the lazy init approach is that we have to call .init in every exported method. The with approach also has an overhead of as well but is safer indeed.

mu sync.Mutex
byKey map[string]*Measurement
now func() time.Time
}

type Measurement struct {
mu sync.Mutex
startTime time.Time
now func() time.Time

tags stats.Tags
name string
mType string

sum float64
values []float64
durations []time.Duration
}

func (m *Measurement) LastValue() float64 {
m.mu.Lock()
defer m.mu.Unlock()

if len(m.values) == 0 {
return 0
}

return m.values[len(m.values)-1]
}

func (m *Measurement) Values() []float64 {
m.mu.Lock()
defer m.mu.Unlock()

s := make([]float64, len(m.values))
copy(s, m.values)

return s
}

func (m *Measurement) LastDuration() time.Duration {
m.mu.Lock()
defer m.mu.Unlock()

if len(m.durations) == 0 {
return 0
}

return m.durations[len(m.durations)-1]
}

func (m *Measurement) Durations() []time.Duration {
m.mu.Lock()
defer m.mu.Unlock()

s := make([]time.Duration, len(m.durations))
copy(s, m.durations)

return s
}

// Count implements stats.Measurement
func (m *Measurement) Count(n int) {
m.mu.Lock()
defer m.mu.Unlock()

if m.mType != stats.CountType {
panic("operation Count not supported for measurement type:" + m.mType)
}

m.sum += float64(n)
m.values = append(m.values, m.sum)
}

// Increment implements stats.Measurement
func (m *Measurement) Increment() {
if m.mType != stats.CountType {
panic("operation Increment not supported for measurement type:" + m.mType)
}

m.Count(1)
}

// Gauge implements stats.Measurement
func (m *Measurement) Gauge(value interface{}) {
if m.mType != stats.GaugeType {
panic("operation Gauge not supported for measurement type:" + m.mType)
}

m.mu.Lock()
defer m.mu.Unlock()

m.values = append(m.values, value.(float64))
}

// Observe implements stats.Measurement
func (m *Measurement) Observe(value float64) {
if m.mType != stats.HistogramType {
panic("operation Observe not supported for measurement type:" + m.mType)
}

m.mu.Lock()
defer m.mu.Unlock()

m.values = append(m.values, value)
}

// Start implements stats.Measurement
func (m *Measurement) Start() {
if m.mType != stats.TimerType {
panic("operation Start not supported for measurement type:" + m.mType)
}

m.mu.Lock()
defer m.mu.Unlock()

m.startTime = m.now()
}

// End implements stats.Measurement
func (m *Measurement) End() {
if m.mType != stats.TimerType {
panic("operation End not supported for measurement type:" + m.mType)
}

m.SendTiming(m.now().Sub(m.startTime))
}

// Since implements stats.Measurement
func (m *Measurement) Since(start time.Time) {
if m.mType != stats.TimerType {
panic("operation Since not supported for measurement type:" + m.mType)
}

m.SendTiming(m.now().Sub(start))
}

// SendTiming implements stats.Measurement
func (m *Measurement) SendTiming(duration time.Duration) {
if m.mType != stats.TimerType {
panic("operation SendTiming not supported for measurement type:" + m.mType)
}

m.mu.Lock()
defer m.mu.Unlock()

m.durations = append(m.durations, duration)
}

type Opts func(*Store)

func WithNow(nowFn func() time.Time) Opts {
return func(s *Store) {
s.now = nowFn
}
}

func New(opts ...Opts) *Store {
s := &Store{
byKey: make(map[string]*Measurement),
now: time.Now,
}

for _, opt := range opts {
opt(s)
}

return s
}

// NewStat implements stats.Stats
func (ms *Store) NewStat(name, statType string) (m stats.Measurement) {
return ms.NewTaggedStat(name, statType, nil)
}

// NewTaggedStat implements stats.Stats
func (ms *Store) NewTaggedStat(name, statType string, tags stats.Tags) stats.Measurement {
return ms.NewSampledTaggedStat(name, statType, tags)
}

// NewSampledTaggedStat implements stats.Stats
func (ms *Store) NewSampledTaggedStat(name, statType string, tags stats.Tags) stats.Measurement {
ms.mu.Lock()
defer ms.mu.Unlock()

m := &Measurement{
name: name,
tags: tags,
mType: statType,

now: ms.now,
}

ms.byKey[ms.getKey(name, tags)] = m
return m
}

// Get the stored measurement with the name and tags.
// If no measurement is found, nil is returned.
func (ms *Store) Get(name string, tags stats.Tags) *Measurement {
ms.mu.Lock()
defer ms.mu.Unlock()

return ms.byKey[ms.getKey(name, tags)]
}

// Start implements stats.Stats
func (*Store) Start(_ context.Context) {}

// Stop implements stats.Stats
func (*Store) Stop() {}

// getKey maps name and tags, to a store lookup key.
func (*Store) getKey(name string, tags stats.Tags) string {
return name + tags.String()
}
143 changes: 143 additions & 0 deletions services/stats/memstats/stats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package memstats_test

import (
"context"
"testing"
"time"

"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/services/stats/memstats"
"github.com/stretchr/testify/require"
)

func TestStats(t *testing.T) {
now := time.Now()

store := memstats.New(
memstats.WithNow(func() time.Time {
return now
}),
)

commonTags := stats.Tags{"tag1": "value1"}

t.Run("test Count", func(t *testing.T) {
name := "testCount"

m := store.NewTaggedStat(name, stats.CountType, commonTags)

m.Increment()

require.Equal(t, 1.0, store.Get(name, commonTags).LastValue())
require.Equal(t, []float64{1.0}, store.Get(name, commonTags).Values())

m.Count(2)

require.Equal(t, 3.0, store.Get(name, commonTags).LastValue())
require.Equal(t, []float64{1.0, 3.0}, store.Get(name, commonTags).Values())
})

t.Run("test Gauge", func(t *testing.T) {
name := "testGauge"
m := store.NewTaggedStat(name, stats.GaugeType, commonTags)

m.Gauge(1.0)

require.Equal(t, 1.0, store.Get(name, commonTags).LastValue())
require.Equal(t, []float64{1.0}, store.Get(name, commonTags).Values())

m.Gauge(2.0)

require.Equal(t, 2.0, store.Get(name, commonTags).LastValue())
require.Equal(t, []float64{1.0, 2.0}, store.Get(name, commonTags).Values())
})

t.Run("test Histogram", func(t *testing.T) {
name := "testHistogram"
m := store.NewTaggedStat(name, stats.HistogramType, commonTags)

m.Observe(1.0)

require.Equal(t, 1.0, store.Get(name, commonTags).LastValue())
require.Equal(t, []float64{1.0}, store.Get(name, commonTags).Values())

m.Observe(2.0)

require.Equal(t, 2.0, store.Get(name, commonTags).LastValue())
require.Equal(t, []float64{1.0, 2.0}, store.Get(name, commonTags).Values())
})

t.Run("test Timer", func(t *testing.T) {
name := "testTimer"

m := store.NewTaggedStat(name, stats.TimerType, commonTags)

m.SendTiming(time.Second)
require.Equal(t, time.Second, store.Get(name, commonTags).LastDuration())
require.Equal(t, []time.Duration{time.Second}, store.Get(name, commonTags).Durations())

m.SendTiming(time.Minute)
require.Equal(t, time.Minute, store.Get(name, commonTags).LastDuration())
require.Equal(t,
[]time.Duration{time.Second, time.Minute},
store.Get(name, commonTags).Durations(),
)

m.Start()
now = now.Add(time.Hour)
m.End()
require.Equal(t, time.Hour, store.Get(name, commonTags).LastDuration())
require.Equal(t,
[]time.Duration{time.Second, time.Minute, time.Hour},
store.Get(name, commonTags).Durations(),
)

m.Since(now.Add(-time.Minute))
require.Equal(t, time.Minute, store.Get(name, commonTags).LastDuration())
require.Equal(t,
[]time.Duration{time.Second, time.Minute, time.Hour, time.Minute},
store.Get(name, commonTags).Durations(),
)
})

t.Run("invalid operations", func(t *testing.T) {
require.PanicsWithValue(t, "operation Count not supported for measurement type:gauge", func() {
store.NewTaggedStat("invalid_count", stats.GaugeType, commonTags).Count(1)
})
require.PanicsWithValue(t, "operation Increment not supported for measurement type:gauge", func() {
store.NewTaggedStat("invalid_inc", stats.GaugeType, commonTags).Increment()
})
require.PanicsWithValue(t, "operation Gauge not supported for measurement type:count", func() {
store.NewTaggedStat("invalid_gauge", stats.CountType, commonTags).Gauge(1)
})
require.PanicsWithValue(t, "operation SendTiming not supported for measurement type:histogram", func() {
store.NewTaggedStat("invalid_send_timing", stats.HistogramType, commonTags).SendTiming(time.Second)
})
require.PanicsWithValue(t, "operation Start not supported for measurement type:histogram", func() {
store.NewTaggedStat("invalid_start", stats.HistogramType, commonTags).Start()
})
require.PanicsWithValue(t, "operation End not supported for measurement type:histogram", func() {
store.NewTaggedStat("invalid_end", stats.HistogramType, commonTags).End()
})
require.PanicsWithValue(t, "operation Since not supported for measurement type:histogram", func() {
store.NewTaggedStat("invalid_since", stats.HistogramType, commonTags).Since(time.Now())
})
require.PanicsWithValue(t, "operation Observe not supported for measurement type:timer", func() {
store.NewTaggedStat("invalid_observe", stats.TimerType, commonTags).Observe(1)
})
})

t.Run("no op", func(t *testing.T) {
store.Start(context.Background())
store.Stop()
})

t.Run("no tags", func(t *testing.T) {
name := "no_tags"
m := store.NewStat(name, stats.CountType)

m.Increment()

require.Equal(t, 1.0, store.Get(name, nil).LastValue())
})
}