-
Notifications
You must be signed in to change notification settings - Fork 458
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use pooling for writes to coordinator #942
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,24 +33,26 @@ import ( | |
"github.com/m3db/m3/src/query/errors" | ||
"github.com/m3db/m3/src/query/models" | ||
"github.com/m3db/m3/src/query/storage" | ||
"github.com/m3db/m3/src/query/util/execution" | ||
"github.com/m3db/m3/src/query/ts" | ||
"github.com/m3db/m3x/ident" | ||
"github.com/m3db/m3x/pool" | ||
xtime "github.com/m3db/m3x/time" | ||
xsync "github.com/m3db/m3x/sync" | ||
) | ||
|
||
var ( | ||
errNoLocalClustersFulfillsQuery = goerrors.New("no clusters can fulfill query") | ||
) | ||
|
||
type m3storage struct { | ||
clusters Clusters | ||
workerPool pool.ObjectPool | ||
clusters Clusters | ||
readWorkerPool pool.ObjectPool | ||
writeWorkerPool xsync.PooledWorkerPool | ||
} | ||
|
||
// NewStorage creates a new local m3storage instance. | ||
func NewStorage(clusters Clusters, workerPool pool.ObjectPool) Storage { | ||
return &m3storage{clusters: clusters, workerPool: workerPool} | ||
// TODO: Consider combining readWorkerPool and writeWorkerPool | ||
func NewStorage(clusters Clusters, workerPool pool.ObjectPool, writeWorkerPool xsync.PooledWorkerPool) Storage { | ||
return &m3storage{clusters: clusters, readWorkerPool: workerPool, writeWorkerPool: writeWorkerPool} | ||
} | ||
|
||
func (s *m3storage) Fetch( | ||
|
@@ -64,7 +66,7 @@ func (s *m3storage) Fetch( | |
return nil, err | ||
} | ||
|
||
return storage.SeriesIteratorsToFetchResult(raw, s.workerPool) | ||
return storage.SeriesIteratorsToFetchResult(raw, s.readWorkerPool) | ||
} | ||
|
||
func (s *m3storage) FetchBlocks( | ||
|
@@ -259,20 +261,30 @@ func (s *m3storage) Write( | |
identID := ident.StringID(id) | ||
// Set id to NoFinalize to avoid cloning it in write operations | ||
identID.NoFinalize() | ||
common := &writeRequestCommon{ | ||
store: s, | ||
annotation: query.Annotation, | ||
unit: query.Unit, | ||
id: identID, | ||
tagIterator: storage.TagsToIdentTagIterator(query.Tags), | ||
attributes: query.Attributes, | ||
} | ||
tagIterator := storage.TagsToIdentTagIterator(query.Tags) | ||
|
||
var ( | ||
wg sync.WaitGroup | ||
multiErr syncMultiErrs | ||
) | ||
|
||
for _, datapoint := range query.Datapoints { | ||
tagIter := tagIterator.Duplicate() | ||
// capture var | ||
datapoint := datapoint | ||
wg.Add(1) | ||
s.writeWorkerPool.Go(func() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you need to capture the datapoint variable here; surprised that there are no failing tests... might be good to add one that tries to write multiple datapoints and checks that the correct ones are being written? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good point. |
||
if err := s.writeSingle(ctx, query, datapoint, identID, tagIter); err != nil { | ||
multiErr.add(err) | ||
} | ||
|
||
requests := make([]execution.Request, len(query.Datapoints)) | ||
for idx, datapoint := range query.Datapoints { | ||
requests[idx] = newWriteRequest(common, datapoint.Timestamp, datapoint.Value) | ||
tagIter.Close() | ||
wg.Done() | ||
}) | ||
} | ||
return execution.ExecuteParallel(ctx, requests) | ||
|
||
wg.Wait() | ||
return multiErr.finalError() | ||
} | ||
|
||
func (s *m3storage) Type() storage.Type { | ||
|
@@ -283,31 +295,35 @@ func (s *m3storage) Close() error { | |
return nil | ||
} | ||
|
||
func (w *writeRequest) Process(ctx context.Context) error { | ||
common := w.writeRequestCommon | ||
store := common.store | ||
id := common.id | ||
|
||
func (s *m3storage) writeSingle( | ||
ctx context.Context, | ||
query *storage.WriteQuery, | ||
datapoint ts.Datapoint, | ||
identID ident.ID, | ||
iterator ident.TagIterator, | ||
) error { | ||
var ( | ||
namespace ClusterNamespace | ||
err error | ||
) | ||
switch common.attributes.MetricsType { | ||
|
||
attributes := query.Attributes | ||
switch attributes.MetricsType { | ||
case storage.UnaggregatedMetricsType: | ||
namespace = store.clusters.UnaggregatedClusterNamespace() | ||
namespace = s.clusters.UnaggregatedClusterNamespace() | ||
case storage.AggregatedMetricsType: | ||
attrs := RetentionResolution{ | ||
Retention: common.attributes.Retention, | ||
Resolution: common.attributes.Resolution, | ||
Retention: attributes.Retention, | ||
Resolution: attributes.Resolution, | ||
} | ||
var exists bool | ||
namespace, exists = store.clusters.AggregatedClusterNamespace(attrs) | ||
namespace, exists = s.clusters.AggregatedClusterNamespace(attrs) | ||
if !exists { | ||
err = fmt.Errorf("no configured cluster namespace for: retention=%s, resolution=%s", | ||
attrs.Retention.String(), attrs.Resolution.String()) | ||
} | ||
default: | ||
metricsType := common.attributes.MetricsType | ||
metricsType := attributes.MetricsType | ||
err = fmt.Errorf("invalid write request metrics type: %s (%d)", | ||
metricsType.String(), uint(metricsType)) | ||
} | ||
|
@@ -317,33 +333,6 @@ func (w *writeRequest) Process(ctx context.Context) error { | |
|
||
namespaceID := namespace.NamespaceID() | ||
session := namespace.Session() | ||
return session.WriteTagged(namespaceID, id, common.tagIterator, | ||
w.timestamp, w.value, common.unit, common.annotation) | ||
} | ||
|
||
type writeRequestCommon struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
store *m3storage | ||
annotation []byte | ||
unit xtime.Unit | ||
id ident.ID | ||
tagIterator ident.TagIterator | ||
attributes storage.Attributes | ||
} | ||
|
||
type writeRequest struct { | ||
writeRequestCommon *writeRequestCommon | ||
timestamp time.Time | ||
value float64 | ||
} | ||
|
||
func newWriteRequest( | ||
writeRequestCommon *writeRequestCommon, | ||
timestamp time.Time, | ||
value float64, | ||
) execution.Request { | ||
return &writeRequest{ | ||
writeRequestCommon: writeRequestCommon, | ||
timestamp: timestamp, | ||
value: value, | ||
} | ||
return session.WriteTagged(namespaceID, identID, iterator, | ||
datapoint.Timestamp, datapoint.Value, query.Unit, query.Annotation) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,7 @@ import ( | |
"github.com/m3db/m3/src/query/storage" | ||
"github.com/m3db/m3/src/query/storage/m3" | ||
"github.com/m3db/m3x/ident" | ||
"github.com/m3db/m3x/sync" | ||
|
||
"github.com/golang/mock/gomock" | ||
"github.com/stretchr/testify/require" | ||
|
@@ -54,6 +55,9 @@ func NewStorageAndSession( | |
Retention: TestRetention, | ||
}) | ||
require.NoError(t, err) | ||
storage := m3.NewStorage(clusters, nil) | ||
writePool, err := sync.NewPooledWorkerPool(10, sync.NewPooledWorkerPoolOptions()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider a test to make sure we're using the pools? |
||
require.NoError(t, err) | ||
writePool.Init() | ||
storage := m3.NewStorage(clusters, nil, writePool) | ||
return storage, session | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering how this is being broken up, it may be a good idea to use the pooled workerpools like read does? That way every incoming request will get a set of workers for all datapoints in it and we don't have situations where some requests get starved
If we want to simplify it a bit, can revisit my m3x PR at: m3db/m3x#181 and make it a first class citizen, then refactor the readerpools too?
Alternatively, just use the same pool as the readerpools here if you agree with taking this approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain the rationale for pooled worker pools? Kind of confused by that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General idea was to give each incoming request its own pool while having a static max size, so that the more expensive requests would not tie up smaller requests, if that's not the correct approach, happy to drop it and revert to your pooled worker pool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sync'd up in person and settled on (pending discussion with @nikunjgit) replacing the existing WorkerPool with PooledWorkerPool and then getting rid of the generic ObjectPool on top of that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'll use a PooledWorkerPool for writes and let @arnikola merge the read object pool in another diff. Sound good ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure