Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pooling for writes to coordinator #942

Merged
merged 3 commits into from
Sep 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ type Configuration struct {

// DecompressWorkerPoolSize is the size of the worker pool given to each
// fetch request.
DecompressWorkerPoolSize int `yaml:"workerPoolSize"`
DecompressWorkerPoolSize int `yaml:"decompressWorkerPoolSize"`

// WriteWorkerPoolSize is the size of the worker pool write requests.
WriteWorkerPoolSize int `yaml:"writeWorkerPoolSize"`
}

// LocalConfiguration is the local embedded configuration if running
Expand Down
15 changes: 13 additions & 2 deletions src/query/pools/query_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func BuildWorkerPools(
cfg config.Configuration,
logger *zap.Logger,
scope tally.Scope,
) (pool.ObjectPool, instrument.Options) {
) (pool.ObjectPool, xsync.PooledWorkerPool, instrument.Options, error) {
workerPoolCount := cfg.DecompressWorkerPoolCount
if workerPoolCount == 0 {
workerPoolCount = defaultWorkerPoolCount
Expand All @@ -79,7 +79,18 @@ func BuildWorkerPools(
return workerPool
})

return objectPool, instrumentOptions
writePoolSize := cfg.WriteWorkerPoolSize
if writePoolSize == 0 {
writePoolSize = defaultWorkerPoolSize
}

writeWorkerPool, err := xsync.NewPooledWorkerPool(writePoolSize, xsync.NewPooledWorkerPoolOptions())
if err != nil {
return nil, nil, instrumentOptions, err
}

writeWorkerPool.Init()
return objectPool, writeWorkerPool, instrumentOptions, nil
}

type sessionPools struct {
Expand Down
17 changes: 12 additions & 5 deletions src/query/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/m3db/m3x/ident"
"github.com/m3db/m3x/instrument"
"github.com/m3db/m3x/pool"
xsync "github.com/m3db/m3x/sync"
xtime "github.com/m3db/m3x/time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -135,7 +136,10 @@ func Run(runOpts RunOptions) {
enabled bool
)

workerPool, instrumentOptions := pools.BuildWorkerPools(cfg, logger, scope)
workerPool, writeWorkerPool, instrumentOptions, err := pools.BuildWorkerPools(cfg, logger, scope)
if err != nil {
logger.Fatal("could not create worker pools", zap.Any("error", err))
}

// For grpc backend, we need to setup only the grpc client and a storage accompanying that client.
// For m3db backend, we need to make connections to the m3db cluster which generates a session and use the storage with the session.
Expand All @@ -157,6 +161,7 @@ func Run(runOpts RunOptions) {
cfg,
logger,
workerPool,
writeWorkerPool,
instrumentOptions,
)
if err != nil {
Expand Down Expand Up @@ -228,6 +233,7 @@ func newM3DBStorage(
cfg config.Configuration,
logger *zap.Logger,
workerPool pool.ObjectPool,
writeWorkerPool xsync.PooledWorkerPool,
instrumentOptions instrument.Options,
) (storage.Storage, clusterclient.Client, downsample.Downsampler, cleanupFn, error) {
var clusterClientCh <-chan clusterclient.Client
Expand Down Expand Up @@ -269,7 +275,7 @@ func newM3DBStorage(
return nil, nil, nil, nil, err
}

fanoutStorage, storageCleanup, err := newStorages(logger, clusters, cfg, poolWrapper, workerPool)
fanoutStorage, storageCleanup, err := newStorages(logger, clusters, cfg, poolWrapper, workerPool, writeWorkerPool)
if err != nil {
return nil, nil, nil, nil, errors.Wrap(err, "unable to set up storages")
}
Expand Down Expand Up @@ -400,8 +406,8 @@ func newDownsamplerAutoMappingRules(
func initClusters(
cfg config.Configuration,
dbClientCh <-chan client.Client,
logger *zap.Logger,
) (m3.Clusters, *pools.PoolWrapper, error) {
logger *zap.Logger,
) (m3.Clusters, *pools.PoolWrapper, error) {
var (
clusters m3.Clusters
poolWrapper *pools.PoolWrapper
Expand Down Expand Up @@ -462,10 +468,11 @@ func newStorages(
cfg config.Configuration,
poolWrapper *pools.PoolWrapper,
workerPool pool.ObjectPool,
writeWorkerPool xsync.PooledWorkerPool,
) (storage.Storage, cleanupFn, error) {
cleanup := func() error { return nil }

localStorage := m3.NewStorage(clusters, workerPool)
localStorage := m3.NewStorage(clusters, workerPool, writeWorkerPool)
stores := []storage.Storage{localStorage}
remoteEnabled := false
if cfg.RPC != nil && cfg.RPC.Enabled {
Expand Down
6 changes: 6 additions & 0 deletions src/query/storage/fanout/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,13 @@ func (s *fanoutStorage) FetchTags(ctx context.Context, query *storage.FetchQuery
}

func (s *fanoutStorage) Write(ctx context.Context, query *storage.WriteQuery) error {
// TODO: Consider removing this lookup on every write by maintaining different read/write lists
stores := filterStores(s.stores, s.writeFilter, query)
// short circuit writes
if len(stores) == 1 {
return stores[0].Write(ctx, query)
}

requests := make([]execution.Request, len(stores))
for idx, store := range stores {
requests[idx] = newWriteRequest(store, query)
Expand Down
107 changes: 48 additions & 59 deletions src/query/storage/m3/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,26 @@ import (
"github.com/m3db/m3/src/query/errors"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/util/execution"
"github.com/m3db/m3/src/query/ts"
"github.com/m3db/m3x/ident"
"github.com/m3db/m3x/pool"
xtime "github.com/m3db/m3x/time"
xsync "github.com/m3db/m3x/sync"
)

var (
errNoLocalClustersFulfillsQuery = goerrors.New("no clusters can fulfill query")
)

type m3storage struct {
clusters Clusters
workerPool pool.ObjectPool
clusters Clusters
readWorkerPool pool.ObjectPool
writeWorkerPool xsync.PooledWorkerPool
}

// NewStorage creates a new local m3storage instance.
func NewStorage(clusters Clusters, workerPool pool.ObjectPool) Storage {
return &m3storage{clusters: clusters, workerPool: workerPool}
// TODO: Consider combining readWorkerPool and writeWorkerPool
func NewStorage(clusters Clusters, workerPool pool.ObjectPool, writeWorkerPool xsync.PooledWorkerPool) Storage {
return &m3storage{clusters: clusters, readWorkerPool: workerPool, writeWorkerPool: writeWorkerPool}
}

func (s *m3storage) Fetch(
Expand All @@ -64,7 +66,7 @@ func (s *m3storage) Fetch(
return nil, err
}

return storage.SeriesIteratorsToFetchResult(raw, s.workerPool)
return storage.SeriesIteratorsToFetchResult(raw, s.readWorkerPool)
}

func (s *m3storage) FetchBlocks(
Expand Down Expand Up @@ -259,20 +261,30 @@ func (s *m3storage) Write(
identID := ident.StringID(id)
// Set id to NoFinalize to avoid cloning it in write operations
identID.NoFinalize()
common := &writeRequestCommon{
store: s,
annotation: query.Annotation,
unit: query.Unit,
id: identID,
tagIterator: storage.TagsToIdentTagIterator(query.Tags),
attributes: query.Attributes,
}
tagIterator := storage.TagsToIdentTagIterator(query.Tags)

var (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering how this is being broken up, it may be a good idea to use the pooled workerpools like read does? That way every incoming request will get a set of workers for all datapoints in it and we don't have situations where some requests get starved

If we want to simplify it a bit, can revisit my m3x PR at: m3db/m3x#181 and make it a first class citizen, then refactor the readerpools too?

Alternatively, just use the same pool as the readerpools here if you agree with taking this approach

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the rationale for pooled worker pools? Kind of confused by that

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General idea was to give each incoming request its own pool while having a static max size, so that the more expensive requests would not tie up smaller requests, if that's not the correct approach, happy to drop it and revert to your pooled worker pool

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sync'd up in person and settled on (pending discussion with @nikunjgit) replacing the existing WorkerPool with PooledWorkerPool and then getting rid of the generic ObjectPool on top of that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll use a PooledWorkerPool for writes and let @arnikola merge the read object pool in another diff. Sound good ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

wg sync.WaitGroup
multiErr syncMultiErrs
)

for _, datapoint := range query.Datapoints {
tagIter := tagIterator.Duplicate()
// capture var
datapoint := datapoint
wg.Add(1)
s.writeWorkerPool.Go(func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to capture the datapoint variable here; surprised that there are no failing tests... might be good to add one that tries to write multiple datapoints and checks that the correct ones are being written?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point.

if err := s.writeSingle(ctx, query, datapoint, identID, tagIter); err != nil {
multiErr.add(err)
}

requests := make([]execution.Request, len(query.Datapoints))
for idx, datapoint := range query.Datapoints {
requests[idx] = newWriteRequest(common, datapoint.Timestamp, datapoint.Value)
tagIter.Close()
wg.Done()
})
}
return execution.ExecuteParallel(ctx, requests)

wg.Wait()
return multiErr.finalError()
}

func (s *m3storage) Type() storage.Type {
Expand All @@ -283,31 +295,35 @@ func (s *m3storage) Close() error {
return nil
}

func (w *writeRequest) Process(ctx context.Context) error {
common := w.writeRequestCommon
store := common.store
id := common.id

func (s *m3storage) writeSingle(
ctx context.Context,
query *storage.WriteQuery,
datapoint ts.Datapoint,
identID ident.ID,
iterator ident.TagIterator,
) error {
var (
namespace ClusterNamespace
err error
)
switch common.attributes.MetricsType {

attributes := query.Attributes
switch attributes.MetricsType {
case storage.UnaggregatedMetricsType:
namespace = store.clusters.UnaggregatedClusterNamespace()
namespace = s.clusters.UnaggregatedClusterNamespace()
case storage.AggregatedMetricsType:
attrs := RetentionResolution{
Retention: common.attributes.Retention,
Resolution: common.attributes.Resolution,
Retention: attributes.Retention,
Resolution: attributes.Resolution,
}
var exists bool
namespace, exists = store.clusters.AggregatedClusterNamespace(attrs)
namespace, exists = s.clusters.AggregatedClusterNamespace(attrs)
if !exists {
err = fmt.Errorf("no configured cluster namespace for: retention=%s, resolution=%s",
attrs.Retention.String(), attrs.Resolution.String())
}
default:
metricsType := common.attributes.MetricsType
metricsType := attributes.MetricsType
err = fmt.Errorf("invalid write request metrics type: %s (%d)",
metricsType.String(), uint(metricsType))
}
Expand All @@ -317,33 +333,6 @@ func (w *writeRequest) Process(ctx context.Context) error {

namespaceID := namespace.NamespaceID()
session := namespace.Session()
return session.WriteTagged(namespaceID, id, common.tagIterator,
w.timestamp, w.value, common.unit, common.annotation)
}

type writeRequestCommon struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

store *m3storage
annotation []byte
unit xtime.Unit
id ident.ID
tagIterator ident.TagIterator
attributes storage.Attributes
}

type writeRequest struct {
writeRequestCommon *writeRequestCommon
timestamp time.Time
value float64
}

func newWriteRequest(
writeRequestCommon *writeRequestCommon,
timestamp time.Time,
value float64,
) execution.Request {
return &writeRequest{
writeRequestCommon: writeRequestCommon,
timestamp: timestamp,
value: value,
}
return session.WriteTagged(namespaceID, identID, iterator,
datapoint.Timestamp, datapoint.Value, query.Unit, query.Annotation)
}
6 changes: 5 additions & 1 deletion src/query/storage/m3/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/m3db/m3/src/query/ts"
"github.com/m3db/m3/src/query/util/logging"
"github.com/m3db/m3x/ident"
"github.com/m3db/m3x/sync"
xtime "github.com/m3db/m3x/time"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -78,7 +79,10 @@ func setup(
Resolution: time.Minute,
})
require.NoError(t, err)
storage := NewStorage(clusters, nil)
writePool, err := sync.NewPooledWorkerPool(10, sync.NewPooledWorkerPoolOptions())
require.NoError(t, err)
writePool.Init()
storage := NewStorage(clusters, nil, writePool)
return storage, testSessions{
unaggregated1MonthRetention: unaggregated1MonthRetention,
aggregated1MonthRetention1MinuteResolution: aggregated1MonthRetention1MinuteResolution,
Expand Down
6 changes: 5 additions & 1 deletion src/query/test/m3/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/storage/m3"
"github.com/m3db/m3x/ident"
"github.com/m3db/m3x/sync"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
Expand All @@ -54,6 +55,9 @@ func NewStorageAndSession(
Retention: TestRetention,
})
require.NoError(t, err)
storage := m3.NewStorage(clusters, nil)
writePool, err := sync.NewPooledWorkerPool(10, sync.NewPooledWorkerPoolOptions())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider a test to make sure we're using the pools?

require.NoError(t, err)
writePool.Init()
storage := m3.NewStorage(clusters, nil, writePool)
return storage, session
}