From 5a72ada6da1c74f6e71d977f9e3cfbe15b55208b Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Thu, 4 Apr 2019 17:18:22 -0400 Subject: [PATCH] Add write read latency histogram metrics to DB client and emit from coordinator (#1533) --- src/dbnode/client/session.go | 42 +++++++++++++++++--- src/query/server/server.go | 58 +++++++++++++--------------- src/query/storage/m3/cluster_test.go | 4 +- src/query/storage/m3/config.go | 16 ++++---- 4 files changed, 75 insertions(+), 45 deletions(-) diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index e6bed8c4af..93acfdd26f 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -161,10 +161,12 @@ type sessionMetrics struct { sync.RWMutex writeSuccess tally.Counter writeErrors tally.Counter + writeLatencyHistogram tally.Histogram writeNodesRespondingErrors []tally.Counter writeNodesRespondingBadRequestErrors []tally.Counter fetchSuccess tally.Counter fetchErrors tally.Counter + fetchLatencyHistogram tally.Histogram fetchNodesRespondingErrors []tally.Counter fetchNodesRespondingBadRequestErrors []tally.Counter topologyUpdatedSuccess tally.Counter @@ -176,8 +178,10 @@ func newSessionMetrics(scope tally.Scope) sessionMetrics { return sessionMetrics{ writeSuccess: scope.Counter("write.success"), writeErrors: scope.Counter("write.errors"), + writeLatencyHistogram: histogramWithDurationBuckets(scope, "write.latency"), fetchSuccess: scope.Counter("fetch.success"), fetchErrors: scope.Counter("fetch.errors"), + fetchLatencyHistogram: histogramWithDurationBuckets(scope, "fetch.latency"), topologyUpdatedSuccess: scope.Counter("topology.updated-success"), topologyUpdatedError: scope.Counter("topology.updated-error"), streamFromPeersMetrics: make(map[shardMetricsKey]streamFromPeersMetrics), @@ -398,7 +402,7 @@ func (s *session) newPeerMetadataStreamingProgressMetrics( return &m } -func (s *session) incWriteMetrics(consistencyResultErr error, respErrs int32) { +func (s *session) recordWriteMetrics(consistencyResultErr error, respErrs int32, start time.Time) { if idx := s.nodesRespondingErrorsMetricIndex(respErrs); idx >= 0 { if IsBadRequestError(consistencyResultErr) { s.metrics.writeNodesRespondingBadRequestErrors[idx].Inc(1) @@ -411,9 +415,10 @@ func (s *session) incWriteMetrics(consistencyResultErr error, respErrs int32) { } else { s.metrics.writeErrors.Inc(1) } + s.metrics.writeLatencyHistogram.RecordDuration(s.nowFn().Sub(start)) } -func (s *session) incFetchMetrics(consistencyResultErr error, respErrs int32) { +func (s *session) recordFetchMetrics(consistencyResultErr error, respErrs int32, start time.Time) { if idx := s.nodesRespondingErrorsMetricIndex(respErrs); idx >= 0 { if IsBadRequestError(consistencyResultErr) { s.metrics.fetchNodesRespondingBadRequestErrors[idx].Inc(1) @@ -426,6 +431,7 @@ func (s *session) incFetchMetrics(consistencyResultErr error, respErrs int32) { } else { s.metrics.fetchErrors.Inc(1) } + s.metrics.fetchLatencyHistogram.RecordDuration(s.nowFn().Sub(start)) } func (s *session) nodesRespondingErrorsMetricIndex(respErrs int32) int32 { @@ -925,6 +931,8 @@ func (s *session) writeAttempt( unit xtime.Unit, annotation []byte, ) error { + startWriteAttempt := s.nowFn() + timeType, timeTypeErr := convert.ToTimeType(unit) if timeTypeErr != nil { return timeTypeErr @@ -956,7 +964,7 @@ func (s *session) writeAttempt( err = s.writeConsistencyResult(state.consistencyLevel, majority, enqueued, enqueued-state.pending, int32(len(state.errors)), state.errors) - s.incWriteMetrics(err, int32(len(state.errors))) + s.recordWriteMetrics(err, int32(len(state.errors)), startWriteAttempt) // must Unlock before decRef'ing, as the latter releases the writeState back into a // pool if ref count == 0. @@ -1361,7 +1369,7 @@ func (s *session) newFetchStateWithRLock( fetchState.incRef() if err := hq.Enqueue(op); err != nil { fetchState.Unlock() - closer() // release the ref for the current go-routine + closer() // release the ref for the current go-routine fetchState.decRef() // release the ref for the hostQueue fetchState.decRef() // release the ref for the current go-routine @@ -1399,6 +1407,7 @@ func (s *session) fetchIDsAttempt( consistencyLevel topology.ReadConsistencyLevel fetchBatchOpsByHostIdx [][]*fetchBatchOp success = false + startFetchAttempt = s.nowFn() ) // NB(prateek): need to make a copy of inputNamespace and inputIDs to control @@ -1492,7 +1501,7 @@ func (s *session) fetchIDsAttempt( responded := enqueued - atomic.LoadInt32(&pending) err := s.readConsistencyResult(consistencyLevel, majority, enqueued, responded, errsLen, reportErrors) - s.incFetchMetrics(err, errsLen) + s.recordFetchMetrics(err, errsLen, startFetchAttempt) if err != nil { resultErrLock.Lock() if resultErr == nil { @@ -3837,3 +3846,26 @@ func newTagsFromEncodedTags( return tags, err } + +const ( + // histogramDurationBucketsVersion must be bumped if histogramDurationBuckets is changed + // to namespace the different buckets from each other so they don't overlap and cause the + // histogram function to error out due to overlapping buckets in the same query. + histogramDurationBucketsVersion = "v1" + // histogramDurationBucketsVersionTag is the tag for the version of the buckets in use. + histogramDurationBucketsVersionTag = "schema" +) + +// histogramDurationBuckets is a high resolution set of duration buckets. +func histogramDurationBuckets() tally.DurationBuckets { + return append(tally.DurationBuckets{0}, + tally.MustMakeExponentialDurationBuckets(time.Millisecond, 1.25, 60)...) +} + +// histogramWithDurationBuckets returns a histogram with the standard duration buckets. +func histogramWithDurationBuckets(scope tally.Scope, name string) tally.Histogram { + sub := scope.Tagged(map[string]string{ + histogramDurationBucketsVersionTag: histogramDurationBucketsVersion, + }) + return sub.Histogram(name, histogramDurationBuckets()) +} diff --git a/src/query/server/server.go b/src/query/server/server.go index 917476278a..5d60acac7f 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -225,23 +225,16 @@ func Run(runOpts RunOptions) { logger.Info("setup grpc backend") } else { - m3dbClusters, m3dbPoolWrapper, err = initClusters(cfg, runOpts.DBClient, logger) + m3dbClusters, m3dbPoolWrapper, err = initClusters(cfg, + runOpts.DBClient, instrumentOptions) if err != nil { logger.Fatal("unable to init clusters", zap.Error(err)) } var cleanup cleanupFn backendStorage, clusterClient, downsampler, cleanup, err = newM3DBStorage( - runOpts, - cfg, - tagOptions, - logger, - m3dbClusters, - m3dbPoolWrapper, - instrumentOptions, - readWorkerPool, - writeWorkerPool, - ) + runOpts, cfg, tagOptions, m3dbClusters, m3dbPoolWrapper, + readWorkerPool, writeWorkerPool, instrumentOptions) if err != nil { logger.Fatal("unable to setup m3db backend", zap.Error(err)) } @@ -351,14 +344,14 @@ func newM3DBStorage( runOpts RunOptions, cfg config.Configuration, tagOptions models.TagOptions, - logger *zap.Logger, clusters m3.Clusters, poolWrapper *pools.PoolWrapper, - instrumentOptions instrument.Options, readWorkerPool xsync.PooledWorkerPool, writeWorkerPool xsync.PooledWorkerPool, + instrumentOptions instrument.Options, ) (storage.Storage, clusterclient.Client, downsample.Downsampler, cleanupFn, error) { var ( + logger = instrumentOptions.ZapLogger() clusterClient clusterclient.Client clusterClientWaitCh <-chan struct{} ) @@ -395,15 +388,8 @@ func newM3DBStorage( } } - fanoutStorage, storageCleanup, err := newStorages( - logger, - clusters, - cfg, - tagOptions, - poolWrapper, - readWorkerPool, - writeWorkerPool, - ) + fanoutStorage, storageCleanup, err := newStorages(clusters, cfg, tagOptions, + poolWrapper, readWorkerPool, writeWorkerPool, instrumentOptions) if err != nil { return nil, nil, nil, nil, errors.Wrap(err, "unable to set up storages") } @@ -546,18 +532,22 @@ func newDownsamplerAutoMappingRules( func initClusters( cfg config.Configuration, dbClientCh <-chan client.Client, - logger *zap.Logger, + instrumentOpts instrument.Options, ) (m3.Clusters, *pools.PoolWrapper, error) { + instrumentOpts = instrumentOpts. + SetMetricsScope(instrumentOpts.MetricsScope().SubScope("m3db-client")) + var ( + logger = instrumentOpts.ZapLogger() clusters m3.Clusters poolWrapper *pools.PoolWrapper err error ) - if len(cfg.Clusters) > 0 { - clusters, err = cfg.Clusters.NewClusters(m3.ClustersStaticConfigurationOptions{ - AsyncSessions: true, - }) + clusters, err = cfg.Clusters.NewClusters(instrumentOpts, + m3.ClustersStaticConfigurationOptions{ + AsyncSessions: true, + }) if err != nil { return nil, nil, errors.Wrap(err, "unable to connect to clusters") } @@ -582,9 +572,10 @@ func initClusters( }, } - clusters, err = clustersCfg.NewClusters(m3.ClustersStaticConfigurationOptions{ - ProvidedSession: session, - }) + clusters, err = clustersCfg.NewClusters(instrumentOpts, + m3.ClustersStaticConfigurationOptions{ + ProvidedSession: session, + }) if err != nil { return nil, nil, errors.Wrap(err, "unable to connect to clusters") } @@ -605,15 +596,18 @@ func initClusters( } func newStorages( - logger *zap.Logger, clusters m3.Clusters, cfg config.Configuration, tagOptions models.TagOptions, poolWrapper *pools.PoolWrapper, readWorkerPool xsync.PooledWorkerPool, writeWorkerPool xsync.PooledWorkerPool, + instrumentOpts instrument.Options, ) (storage.Storage, cleanupFn, error) { - cleanup := func() error { return nil } + var ( + logger = instrumentOpts.ZapLogger() + cleanup = func() error { return nil } + ) // Setup query conversion cache. conversionCacheConfig := cfg.Cache.QueryConversionCacheConfiguration() diff --git a/src/query/storage/m3/cluster_test.go b/src/query/storage/m3/cluster_test.go index 5ddde1f156..90ac70dee2 100644 --- a/src/query/storage/m3/cluster_test.go +++ b/src/query/storage/m3/cluster_test.go @@ -29,6 +29,7 @@ import ( "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3x/ident" + "github.com/m3db/m3x/instrument" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -97,7 +98,8 @@ func TestNewClustersFromConfig(t *testing.T) { }, } - clusters, err := cfg.NewClusters(ClustersStaticConfigurationOptions{}) + clusters, err := cfg.NewClusters(instrument.NewOptions(), + ClustersStaticConfigurationOptions{}) require.NoError(t, err) // Resolve expected clusters and check attributes diff --git a/src/query/storage/m3/config.go b/src/query/storage/m3/config.go index 4e76774ef8..447e7881bd 100644 --- a/src/query/storage/m3/config.go +++ b/src/query/storage/m3/config.go @@ -30,13 +30,12 @@ import ( "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/stores/m3db" "github.com/m3db/m3x/ident" + "github.com/m3db/m3x/instrument" ) var ( errNotAggregatedClusterNamespace = goerrors.New("not an aggregated cluster namespace") errBothNamespaceTypeNewAndDeprecatedFieldsSet = goerrors.New("cannot specify both deprecated and non-deprecated fields for namespace type") - - defaultNewClientConfigurationParams = client.ConfigurationParameters{} ) // ClustersStaticConfiguration is a set of static cluster configurations. @@ -169,6 +168,7 @@ type ClustersStaticConfigurationOptions struct { // NewClusters instantiates a new Clusters instance. func (c ClustersStaticConfiguration) NewClusters( + instrumentOpts instrument.Options, opts ClustersStaticConfigurationOptions, ) (Clusters, error) { var ( @@ -181,20 +181,22 @@ func (c ClustersStaticConfiguration) NewClusters( ) for _, clusterCfg := range c { var ( - client client.Client + result client.Client err error ) if opts.ProvidedSession == nil { - // NB(r): If session is already provided, do not create a client - client, err = clusterCfg.newClient(defaultNewClientConfigurationParams) + // NB(r): Only create client session if not already provided. + result, err = clusterCfg.newClient(client.ConfigurationParameters{ + InstrumentOptions: instrumentOpts, + }) if err != nil { return nil, err } } aggregatedClusterNamespacesCfg := &aggregatedClusterNamespacesConfiguration{ - client: client, + client: result, } for _, n := range clusterCfg.Namespaces { @@ -211,7 +213,7 @@ func (c ClustersStaticConfiguration) NewClusters( "can be specified: specified %d", numUnaggregatedClusterNamespaces) } - unaggregatedClusterNamespaceCfg.client = client + unaggregatedClusterNamespaceCfg.client = result unaggregatedClusterNamespaceCfg.namespace = n case storage.AggregatedMetricsType: