Skip to content

Commit

Permalink
Add write read latency histogram metrics to DB client and emit from c…
Browse files Browse the repository at this point in the history
…oordinator (#1533)
  • Loading branch information
robskillington authored Apr 4, 2019
1 parent 2d598d8 commit 5a72ada
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 45 deletions.
42 changes: 37 additions & 5 deletions src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,12 @@ type sessionMetrics struct {
sync.RWMutex
writeSuccess tally.Counter
writeErrors tally.Counter
writeLatencyHistogram tally.Histogram
writeNodesRespondingErrors []tally.Counter
writeNodesRespondingBadRequestErrors []tally.Counter
fetchSuccess tally.Counter
fetchErrors tally.Counter
fetchLatencyHistogram tally.Histogram
fetchNodesRespondingErrors []tally.Counter
fetchNodesRespondingBadRequestErrors []tally.Counter
topologyUpdatedSuccess tally.Counter
Expand All @@ -176,8 +178,10 @@ func newSessionMetrics(scope tally.Scope) sessionMetrics {
return sessionMetrics{
writeSuccess: scope.Counter("write.success"),
writeErrors: scope.Counter("write.errors"),
writeLatencyHistogram: histogramWithDurationBuckets(scope, "write.latency"),
fetchSuccess: scope.Counter("fetch.success"),
fetchErrors: scope.Counter("fetch.errors"),
fetchLatencyHistogram: histogramWithDurationBuckets(scope, "fetch.latency"),
topologyUpdatedSuccess: scope.Counter("topology.updated-success"),
topologyUpdatedError: scope.Counter("topology.updated-error"),
streamFromPeersMetrics: make(map[shardMetricsKey]streamFromPeersMetrics),
Expand Down Expand Up @@ -398,7 +402,7 @@ func (s *session) newPeerMetadataStreamingProgressMetrics(
return &m
}

func (s *session) incWriteMetrics(consistencyResultErr error, respErrs int32) {
func (s *session) recordWriteMetrics(consistencyResultErr error, respErrs int32, start time.Time) {
if idx := s.nodesRespondingErrorsMetricIndex(respErrs); idx >= 0 {
if IsBadRequestError(consistencyResultErr) {
s.metrics.writeNodesRespondingBadRequestErrors[idx].Inc(1)
Expand All @@ -411,9 +415,10 @@ func (s *session) incWriteMetrics(consistencyResultErr error, respErrs int32) {
} else {
s.metrics.writeErrors.Inc(1)
}
s.metrics.writeLatencyHistogram.RecordDuration(s.nowFn().Sub(start))
}

func (s *session) incFetchMetrics(consistencyResultErr error, respErrs int32) {
func (s *session) recordFetchMetrics(consistencyResultErr error, respErrs int32, start time.Time) {
if idx := s.nodesRespondingErrorsMetricIndex(respErrs); idx >= 0 {
if IsBadRequestError(consistencyResultErr) {
s.metrics.fetchNodesRespondingBadRequestErrors[idx].Inc(1)
Expand All @@ -426,6 +431,7 @@ func (s *session) incFetchMetrics(consistencyResultErr error, respErrs int32) {
} else {
s.metrics.fetchErrors.Inc(1)
}
s.metrics.fetchLatencyHistogram.RecordDuration(s.nowFn().Sub(start))
}

func (s *session) nodesRespondingErrorsMetricIndex(respErrs int32) int32 {
Expand Down Expand Up @@ -925,6 +931,8 @@ func (s *session) writeAttempt(
unit xtime.Unit,
annotation []byte,
) error {
startWriteAttempt := s.nowFn()

timeType, timeTypeErr := convert.ToTimeType(unit)
if timeTypeErr != nil {
return timeTypeErr
Expand Down Expand Up @@ -956,7 +964,7 @@ func (s *session) writeAttempt(
err = s.writeConsistencyResult(state.consistencyLevel, majority, enqueued,
enqueued-state.pending, int32(len(state.errors)), state.errors)

s.incWriteMetrics(err, int32(len(state.errors)))
s.recordWriteMetrics(err, int32(len(state.errors)), startWriteAttempt)

// must Unlock before decRef'ing, as the latter releases the writeState back into a
// pool if ref count == 0.
Expand Down Expand Up @@ -1361,7 +1369,7 @@ func (s *session) newFetchStateWithRLock(
fetchState.incRef()
if err := hq.Enqueue(op); err != nil {
fetchState.Unlock()
closer() // release the ref for the current go-routine
closer() // release the ref for the current go-routine
fetchState.decRef() // release the ref for the hostQueue
fetchState.decRef() // release the ref for the current go-routine

Expand Down Expand Up @@ -1399,6 +1407,7 @@ func (s *session) fetchIDsAttempt(
consistencyLevel topology.ReadConsistencyLevel
fetchBatchOpsByHostIdx [][]*fetchBatchOp
success = false
startFetchAttempt = s.nowFn()
)

// NB(prateek): need to make a copy of inputNamespace and inputIDs to control
Expand Down Expand Up @@ -1492,7 +1501,7 @@ func (s *session) fetchIDsAttempt(
responded := enqueued - atomic.LoadInt32(&pending)
err := s.readConsistencyResult(consistencyLevel, majority, enqueued,
responded, errsLen, reportErrors)
s.incFetchMetrics(err, errsLen)
s.recordFetchMetrics(err, errsLen, startFetchAttempt)
if err != nil {
resultErrLock.Lock()
if resultErr == nil {
Expand Down Expand Up @@ -3837,3 +3846,26 @@ func newTagsFromEncodedTags(

return tags, err
}

const (
// histogramDurationBucketsVersion must be bumped if histogramDurationBuckets is changed
// to namespace the different buckets from each other so they don't overlap and cause the
// histogram function to error out due to overlapping buckets in the same query.
histogramDurationBucketsVersion = "v1"
// histogramDurationBucketsVersionTag is the tag for the version of the buckets in use.
histogramDurationBucketsVersionTag = "schema"
)

// histogramDurationBuckets is a high resolution set of duration buckets.
func histogramDurationBuckets() tally.DurationBuckets {
return append(tally.DurationBuckets{0},
tally.MustMakeExponentialDurationBuckets(time.Millisecond, 1.25, 60)...)
}

// histogramWithDurationBuckets returns a histogram with the standard duration buckets.
func histogramWithDurationBuckets(scope tally.Scope, name string) tally.Histogram {
sub := scope.Tagged(map[string]string{
histogramDurationBucketsVersionTag: histogramDurationBucketsVersion,
})
return sub.Histogram(name, histogramDurationBuckets())
}
58 changes: 26 additions & 32 deletions src/query/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,23 +225,16 @@ func Run(runOpts RunOptions) {

logger.Info("setup grpc backend")
} else {
m3dbClusters, m3dbPoolWrapper, err = initClusters(cfg, runOpts.DBClient, logger)
m3dbClusters, m3dbPoolWrapper, err = initClusters(cfg,
runOpts.DBClient, instrumentOptions)
if err != nil {
logger.Fatal("unable to init clusters", zap.Error(err))
}

var cleanup cleanupFn
backendStorage, clusterClient, downsampler, cleanup, err = newM3DBStorage(
runOpts,
cfg,
tagOptions,
logger,
m3dbClusters,
m3dbPoolWrapper,
instrumentOptions,
readWorkerPool,
writeWorkerPool,
)
runOpts, cfg, tagOptions, m3dbClusters, m3dbPoolWrapper,
readWorkerPool, writeWorkerPool, instrumentOptions)
if err != nil {
logger.Fatal("unable to setup m3db backend", zap.Error(err))
}
Expand Down Expand Up @@ -351,14 +344,14 @@ func newM3DBStorage(
runOpts RunOptions,
cfg config.Configuration,
tagOptions models.TagOptions,
logger *zap.Logger,
clusters m3.Clusters,
poolWrapper *pools.PoolWrapper,
instrumentOptions instrument.Options,
readWorkerPool xsync.PooledWorkerPool,
writeWorkerPool xsync.PooledWorkerPool,
instrumentOptions instrument.Options,
) (storage.Storage, clusterclient.Client, downsample.Downsampler, cleanupFn, error) {
var (
logger = instrumentOptions.ZapLogger()
clusterClient clusterclient.Client
clusterClientWaitCh <-chan struct{}
)
Expand Down Expand Up @@ -395,15 +388,8 @@ func newM3DBStorage(
}
}

fanoutStorage, storageCleanup, err := newStorages(
logger,
clusters,
cfg,
tagOptions,
poolWrapper,
readWorkerPool,
writeWorkerPool,
)
fanoutStorage, storageCleanup, err := newStorages(clusters, cfg, tagOptions,
poolWrapper, readWorkerPool, writeWorkerPool, instrumentOptions)
if err != nil {
return nil, nil, nil, nil, errors.Wrap(err, "unable to set up storages")
}
Expand Down Expand Up @@ -546,18 +532,22 @@ func newDownsamplerAutoMappingRules(
func initClusters(
cfg config.Configuration,
dbClientCh <-chan client.Client,
logger *zap.Logger,
instrumentOpts instrument.Options,
) (m3.Clusters, *pools.PoolWrapper, error) {
instrumentOpts = instrumentOpts.
SetMetricsScope(instrumentOpts.MetricsScope().SubScope("m3db-client"))

var (
logger = instrumentOpts.ZapLogger()
clusters m3.Clusters
poolWrapper *pools.PoolWrapper
err error
)

if len(cfg.Clusters) > 0 {
clusters, err = cfg.Clusters.NewClusters(m3.ClustersStaticConfigurationOptions{
AsyncSessions: true,
})
clusters, err = cfg.Clusters.NewClusters(instrumentOpts,
m3.ClustersStaticConfigurationOptions{
AsyncSessions: true,
})
if err != nil {
return nil, nil, errors.Wrap(err, "unable to connect to clusters")
}
Expand All @@ -582,9 +572,10 @@ func initClusters(
},
}

clusters, err = clustersCfg.NewClusters(m3.ClustersStaticConfigurationOptions{
ProvidedSession: session,
})
clusters, err = clustersCfg.NewClusters(instrumentOpts,
m3.ClustersStaticConfigurationOptions{
ProvidedSession: session,
})
if err != nil {
return nil, nil, errors.Wrap(err, "unable to connect to clusters")
}
Expand All @@ -605,15 +596,18 @@ func initClusters(
}

func newStorages(
logger *zap.Logger,
clusters m3.Clusters,
cfg config.Configuration,
tagOptions models.TagOptions,
poolWrapper *pools.PoolWrapper,
readWorkerPool xsync.PooledWorkerPool,
writeWorkerPool xsync.PooledWorkerPool,
instrumentOpts instrument.Options,
) (storage.Storage, cleanupFn, error) {
cleanup := func() error { return nil }
var (
logger = instrumentOpts.ZapLogger()
cleanup = func() error { return nil }
)

// Setup query conversion cache.
conversionCacheConfig := cfg.Cache.QueryConversionCacheConfiguration()
Expand Down
4 changes: 3 additions & 1 deletion src/query/storage/m3/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/m3db/m3/src/dbnode/client"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3x/ident"
"github.com/m3db/m3x/instrument"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -97,7 +98,8 @@ func TestNewClustersFromConfig(t *testing.T) {
},
}

clusters, err := cfg.NewClusters(ClustersStaticConfigurationOptions{})
clusters, err := cfg.NewClusters(instrument.NewOptions(),
ClustersStaticConfigurationOptions{})
require.NoError(t, err)

// Resolve expected clusters and check attributes
Expand Down
16 changes: 9 additions & 7 deletions src/query/storage/m3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ import (
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/stores/m3db"
"github.com/m3db/m3x/ident"
"github.com/m3db/m3x/instrument"
)

var (
errNotAggregatedClusterNamespace = goerrors.New("not an aggregated cluster namespace")
errBothNamespaceTypeNewAndDeprecatedFieldsSet = goerrors.New("cannot specify both deprecated and non-deprecated fields for namespace type")

defaultNewClientConfigurationParams = client.ConfigurationParameters{}
)

// ClustersStaticConfiguration is a set of static cluster configurations.
Expand Down Expand Up @@ -169,6 +168,7 @@ type ClustersStaticConfigurationOptions struct {

// NewClusters instantiates a new Clusters instance.
func (c ClustersStaticConfiguration) NewClusters(
instrumentOpts instrument.Options,
opts ClustersStaticConfigurationOptions,
) (Clusters, error) {
var (
Expand All @@ -181,20 +181,22 @@ func (c ClustersStaticConfiguration) NewClusters(
)
for _, clusterCfg := range c {
var (
client client.Client
result client.Client
err error
)

if opts.ProvidedSession == nil {
// NB(r): If session is already provided, do not create a client
client, err = clusterCfg.newClient(defaultNewClientConfigurationParams)
// NB(r): Only create client session if not already provided.
result, err = clusterCfg.newClient(client.ConfigurationParameters{
InstrumentOptions: instrumentOpts,
})
if err != nil {
return nil, err
}
}

aggregatedClusterNamespacesCfg := &aggregatedClusterNamespacesConfiguration{
client: client,
client: result,
}

for _, n := range clusterCfg.Namespaces {
Expand All @@ -211,7 +213,7 @@ func (c ClustersStaticConfiguration) NewClusters(
"can be specified: specified %d", numUnaggregatedClusterNamespaces)
}

unaggregatedClusterNamespaceCfg.client = client
unaggregatedClusterNamespaceCfg.client = result
unaggregatedClusterNamespaceCfg.namespace = n

case storage.AggregatedMetricsType:
Expand Down

0 comments on commit 5a72ada

Please sign in to comment.