From d350d1aedf590624754d993cf57786decf1c4f0f Mon Sep 17 00:00:00 2001 From: Nikunj Aggarwal Date: Tue, 25 Sep 2018 15:01:32 -0400 Subject: [PATCH 1/3] Use pooling for writes Duplicate iterator Comments --- src/cmd/services/m3query/config/config.go | 5 +- src/query/pools/query_pools.go | 15 ++- src/query/server/server.go | 17 +++- src/query/storage/fanout/storage.go | 6 ++ src/query/storage/m3/storage.go | 108 ++++++++++------------ 5 files changed, 84 insertions(+), 67 deletions(-) diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index df8e16f6bb..f20db3ac20 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -70,7 +70,10 @@ type Configuration struct { // DecompressWorkerPoolSize is the size of the worker pool given to each // fetch request. - DecompressWorkerPoolSize int `yaml:"workerPoolSize"` + DecompressWorkerPoolSize int `yaml:"decompressWorkerPoolSize"` + + // WriteWorkerPoolSize is the size of the worker pool write requests. + WriteWorkerPoolSize int `yaml:"writeWorkerPoolSize"` } // LocalConfiguration is the local embedded configuration if running diff --git a/src/query/pools/query_pools.go b/src/query/pools/query_pools.go index 74ed57e637..3f4bede6d0 100644 --- a/src/query/pools/query_pools.go +++ b/src/query/pools/query_pools.go @@ -53,7 +53,7 @@ func BuildWorkerPools( cfg config.Configuration, logger *zap.Logger, scope tally.Scope, -) (pool.ObjectPool, instrument.Options) { +) (pool.ObjectPool, xsync.PooledWorkerPool, instrument.Options, error) { workerPoolCount := cfg.DecompressWorkerPoolCount if workerPoolCount == 0 { workerPoolCount = defaultWorkerPoolCount @@ -79,7 +79,18 @@ func BuildWorkerPools( return workerPool }) - return objectPool, instrumentOptions + writePoolSize := cfg.WriteWorkerPoolSize + if writePoolSize == 0 { + writePoolSize = defaultWorkerPoolSize + } + + writeWorkerPool, err := xsync.NewPooledWorkerPool(writePoolSize, xsync.NewPooledWorkerPoolOptions()) + if err != nil { + return nil, nil, instrumentOptions, err + } + + writeWorkerPool.Init() + return objectPool, writeWorkerPool, instrumentOptions, nil } type sessionPools struct { diff --git a/src/query/server/server.go b/src/query/server/server.go index ebfa5e9867..f74f517ae0 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -56,6 +56,7 @@ import ( "github.com/m3db/m3x/ident" "github.com/m3db/m3x/instrument" "github.com/m3db/m3x/pool" + xsync "github.com/m3db/m3x/sync" xtime "github.com/m3db/m3x/time" "github.com/pkg/errors" @@ -135,7 +136,10 @@ func Run(runOpts RunOptions) { enabled bool ) - workerPool, instrumentOptions := pools.BuildWorkerPools(cfg, logger, scope) + workerPool, writeWorkerPool, instrumentOptions, err := pools.BuildWorkerPools(cfg, logger, scope) + if err != nil { + logger.Fatal("could not create worker pools", zap.Any("error", err)) + } // For grpc backend, we need to setup only the grpc client and a storage accompanying that client. // For m3db backend, we need to make connections to the m3db cluster which generates a session and use the storage with the session. @@ -157,6 +161,7 @@ func Run(runOpts RunOptions) { cfg, logger, workerPool, + writeWorkerPool, instrumentOptions, ) if err != nil { @@ -228,6 +233,7 @@ func newM3DBStorage( cfg config.Configuration, logger *zap.Logger, workerPool pool.ObjectPool, + writeWorkerPool xsync.PooledWorkerPool, instrumentOptions instrument.Options, ) (storage.Storage, clusterclient.Client, downsample.Downsampler, cleanupFn, error) { var clusterClientCh <-chan clusterclient.Client @@ -269,7 +275,7 @@ func newM3DBStorage( return nil, nil, nil, nil, err } - fanoutStorage, storageCleanup, err := newStorages(logger, clusters, cfg, poolWrapper, workerPool) + fanoutStorage, storageCleanup, err := newStorages(logger, clusters, cfg, poolWrapper, workerPool, writeWorkerPool) if err != nil { return nil, nil, nil, nil, errors.Wrap(err, "unable to set up storages") } @@ -400,8 +406,8 @@ func newDownsamplerAutoMappingRules( func initClusters( cfg config.Configuration, dbClientCh <-chan client.Client, - logger *zap.Logger, -) (m3.Clusters, *pools.PoolWrapper, error) { + logger *zap.Logger, +) (m3.Clusters, *pools.PoolWrapper, error) { var ( clusters m3.Clusters poolWrapper *pools.PoolWrapper @@ -462,10 +468,11 @@ func newStorages( cfg config.Configuration, poolWrapper *pools.PoolWrapper, workerPool pool.ObjectPool, + writeWorkerPool xsync.PooledWorkerPool, ) (storage.Storage, cleanupFn, error) { cleanup := func() error { return nil } - localStorage := m3.NewStorage(clusters, workerPool) + localStorage := m3.NewStorage(clusters, workerPool, writeWorkerPool) stores := []storage.Storage{localStorage} remoteEnabled := false if cfg.RPC != nil && cfg.RPC.Enabled { diff --git a/src/query/storage/fanout/storage.go b/src/query/storage/fanout/storage.go index 868c7bdc8a..f17905ef02 100644 --- a/src/query/storage/fanout/storage.go +++ b/src/query/storage/fanout/storage.go @@ -129,7 +129,13 @@ func (s *fanoutStorage) FetchTags(ctx context.Context, query *storage.FetchQuery } func (s *fanoutStorage) Write(ctx context.Context, query *storage.WriteQuery) error { + // TODO: Consider removing this lookup on every write by maintaining different read/write lists stores := filterStores(s.stores, s.writeFilter, query) + // short circuit writes + if len(stores) == 1 { + return stores[0].Write(ctx, query) + } + requests := make([]execution.Request, len(stores)) for idx, store := range stores { requests[idx] = newWriteRequest(store, query) diff --git a/src/query/storage/m3/storage.go b/src/query/storage/m3/storage.go index 4d04b0a6d1..e20b025f97 100644 --- a/src/query/storage/m3/storage.go +++ b/src/query/storage/m3/storage.go @@ -33,10 +33,11 @@ import ( "github.com/m3db/m3/src/query/errors" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" - "github.com/m3db/m3/src/query/util/execution" + "github.com/m3db/m3/src/query/ts" + xerrors "github.com/m3db/m3x/errors" "github.com/m3db/m3x/ident" "github.com/m3db/m3x/pool" - xtime "github.com/m3db/m3x/time" + xsync "github.com/m3db/m3x/sync" ) var ( @@ -44,13 +45,15 @@ var ( ) type m3storage struct { - clusters Clusters - workerPool pool.ObjectPool + clusters Clusters + readWorkerPool pool.ObjectPool + writeWorkerPool xsync.PooledWorkerPool } // NewStorage creates a new local m3storage instance. -func NewStorage(clusters Clusters, workerPool pool.ObjectPool) Storage { - return &m3storage{clusters: clusters, workerPool: workerPool} +// TODO: Consider combining readWorkerPool and writeWorkerPool +func NewStorage(clusters Clusters, workerPool pool.ObjectPool, writeWorkerPool xsync.PooledWorkerPool) Storage { + return &m3storage{clusters: clusters, readWorkerPool: workerPool, writeWorkerPool: writeWorkerPool} } func (s *m3storage) Fetch( @@ -64,7 +67,7 @@ func (s *m3storage) Fetch( return nil, err } - return storage.SeriesIteratorsToFetchResult(raw, s.workerPool) + return storage.SeriesIteratorsToFetchResult(raw, s.readWorkerPool) } func (s *m3storage) FetchBlocks( @@ -259,20 +262,30 @@ func (s *m3storage) Write( identID := ident.StringID(id) // Set id to NoFinalize to avoid cloning it in write operations identID.NoFinalize() - common := &writeRequestCommon{ - store: s, - annotation: query.Annotation, - unit: query.Unit, - id: identID, - tagIterator: storage.TagsToIdentTagIterator(query.Tags), - attributes: query.Attributes, - } + tagIterator := storage.TagsToIdentTagIterator(query.Tags) - requests := make([]execution.Request, len(query.Datapoints)) - for idx, datapoint := range query.Datapoints { - requests[idx] = newWriteRequest(common, datapoint.Timestamp, datapoint.Value) + var ( + wg sync.WaitGroup + multiErr xerrors.MultiError + ) + + for _, datapoint := range query.Datapoints { + tagIter := tagIterator.Duplicate() + // capture var + datapoint := datapoint + s.writeWorkerPool.Go(func() { + wg.Add(1) + if err := s.writeSingle(ctx, query, datapoint, identID, tagIter); err != nil { + multiErr = multiErr.Add(err) + } + + tagIter.Close() + wg.Done() + }) } - return execution.ExecuteParallel(ctx, requests) + + wg.Wait() + return multiErr.FinalError() } func (s *m3storage) Type() storage.Type { @@ -283,31 +296,35 @@ func (s *m3storage) Close() error { return nil } -func (w *writeRequest) Process(ctx context.Context) error { - common := w.writeRequestCommon - store := common.store - id := common.id - +func (s *m3storage) writeSingle( + ctx context.Context, + query *storage.WriteQuery, + datapoint ts.Datapoint, + identID ident.ID, + iterator ident.TagIterator, +) error { var ( namespace ClusterNamespace err error ) - switch common.attributes.MetricsType { + + attributes := query.Attributes + switch attributes.MetricsType { case storage.UnaggregatedMetricsType: - namespace = store.clusters.UnaggregatedClusterNamespace() + namespace = s.clusters.UnaggregatedClusterNamespace() case storage.AggregatedMetricsType: attrs := RetentionResolution{ - Retention: common.attributes.Retention, - Resolution: common.attributes.Resolution, + Retention: attributes.Retention, + Resolution: attributes.Resolution, } var exists bool - namespace, exists = store.clusters.AggregatedClusterNamespace(attrs) + namespace, exists = s.clusters.AggregatedClusterNamespace(attrs) if !exists { err = fmt.Errorf("no configured cluster namespace for: retention=%s, resolution=%s", attrs.Retention.String(), attrs.Resolution.String()) } default: - metricsType := common.attributes.MetricsType + metricsType := attributes.MetricsType err = fmt.Errorf("invalid write request metrics type: %s (%d)", metricsType.String(), uint(metricsType)) } @@ -317,33 +334,6 @@ func (w *writeRequest) Process(ctx context.Context) error { namespaceID := namespace.NamespaceID() session := namespace.Session() - return session.WriteTagged(namespaceID, id, common.tagIterator, - w.timestamp, w.value, common.unit, common.annotation) -} - -type writeRequestCommon struct { - store *m3storage - annotation []byte - unit xtime.Unit - id ident.ID - tagIterator ident.TagIterator - attributes storage.Attributes -} - -type writeRequest struct { - writeRequestCommon *writeRequestCommon - timestamp time.Time - value float64 -} - -func newWriteRequest( - writeRequestCommon *writeRequestCommon, - timestamp time.Time, - value float64, -) execution.Request { - return &writeRequest{ - writeRequestCommon: writeRequestCommon, - timestamp: timestamp, - value: value, - } + return session.WriteTagged(namespaceID, identID, iterator, + datapoint.Timestamp, datapoint.Value, query.Unit, query.Annotation) } From d9344bb58b2653295ea64ca425da085a6f5e649d Mon Sep 17 00:00:00 2001 From: Nikunj Aggarwal Date: Thu, 27 Sep 2018 22:09:39 -0400 Subject: [PATCH 2/3] Fix test --- src/query/storage/m3/storage.go | 9 ++++----- src/query/storage/m3/storage_test.go | 6 +++++- src/query/test/m3/storage.go | 6 +++++- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/src/query/storage/m3/storage.go b/src/query/storage/m3/storage.go index e20b025f97..ff42ffe0b0 100644 --- a/src/query/storage/m3/storage.go +++ b/src/query/storage/m3/storage.go @@ -34,7 +34,6 @@ import ( "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/ts" - xerrors "github.com/m3db/m3x/errors" "github.com/m3db/m3x/ident" "github.com/m3db/m3x/pool" xsync "github.com/m3db/m3x/sync" @@ -266,17 +265,17 @@ func (s *m3storage) Write( var ( wg sync.WaitGroup - multiErr xerrors.MultiError + multiErr syncMultiErrs ) for _, datapoint := range query.Datapoints { tagIter := tagIterator.Duplicate() // capture var datapoint := datapoint + wg.Add(1) s.writeWorkerPool.Go(func() { - wg.Add(1) if err := s.writeSingle(ctx, query, datapoint, identID, tagIter); err != nil { - multiErr = multiErr.Add(err) + multiErr.add(err) } tagIter.Close() @@ -285,7 +284,7 @@ func (s *m3storage) Write( } wg.Wait() - return multiErr.FinalError() + return multiErr.finalError() } func (s *m3storage) Type() storage.Type { diff --git a/src/query/storage/m3/storage_test.go b/src/query/storage/m3/storage_test.go index 6001a1407a..d37459dbfe 100644 --- a/src/query/storage/m3/storage_test.go +++ b/src/query/storage/m3/storage_test.go @@ -38,6 +38,7 @@ import ( xtime "github.com/m3db/m3x/time" "github.com/golang/mock/gomock" + "github.com/m3db/m3x/sync" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -78,7 +79,10 @@ func setup( Resolution: time.Minute, }) require.NoError(t, err) - storage := NewStorage(clusters, nil) + writePool, err := sync.NewPooledWorkerPool(10, sync.NewPooledWorkerPoolOptions()) + require.NoError(t, err) + writePool.Init() + storage := NewStorage(clusters, nil, writePool) return storage, testSessions{ unaggregated1MonthRetention: unaggregated1MonthRetention, aggregated1MonthRetention1MinuteResolution: aggregated1MonthRetention1MinuteResolution, diff --git a/src/query/test/m3/storage.go b/src/query/test/m3/storage.go index fe0141a2d7..d84397801e 100644 --- a/src/query/test/m3/storage.go +++ b/src/query/test/m3/storage.go @@ -28,6 +28,7 @@ import ( "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/storage/m3" "github.com/m3db/m3x/ident" + "github.com/m3db/m3x/sync" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" @@ -54,6 +55,9 @@ func NewStorageAndSession( Retention: TestRetention, }) require.NoError(t, err) - storage := m3.NewStorage(clusters, nil) + writePool, err := sync.NewPooledWorkerPool(10, sync.NewPooledWorkerPoolOptions()) + require.NoError(t, err) + writePool.Init() + storage := m3.NewStorage(clusters, nil, writePool) return storage, session } From db4f6d4fe253860f94a1c1935e1a001c31255d5e Mon Sep 17 00:00:00 2001 From: Nikunj Aggarwal Date: Thu, 27 Sep 2018 22:17:49 -0400 Subject: [PATCH 3/3] Fix lint --- src/query/storage/m3/storage_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/storage/m3/storage_test.go b/src/query/storage/m3/storage_test.go index d37459dbfe..27a5c3bdf0 100644 --- a/src/query/storage/m3/storage_test.go +++ b/src/query/storage/m3/storage_test.go @@ -35,10 +35,10 @@ import ( "github.com/m3db/m3/src/query/ts" "github.com/m3db/m3/src/query/util/logging" "github.com/m3db/m3x/ident" + "github.com/m3db/m3x/sync" xtime "github.com/m3db/m3x/time" "github.com/golang/mock/gomock" - "github.com/m3db/m3x/sync" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" )