-
Notifications
You must be signed in to change notification settings - Fork 458
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use pooling for writes to coordinator #942
Conversation
src/query/pools/query_pools.go
Outdated
} | ||
|
||
writeWorkerPool := xsync.NewWorkerPool(writePoolSize) | ||
return objectPool, writeWorkerPool, instrumentOptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're getting quite a few things returns here, consider bundling it? I foresee a lot more pools being added as we work on perf :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wanna use the new worker pool I wrote for the m3db ingesters? Its way more performant because it doesn't allocate goroutines all the time: https://github.com/m3db/m3x/blob/master/sync/pooled_worker_pool.go
That should help fix a lot of the runtime.morestack/runtime.growstack/runtime.copystack
in the m3coordinator flame graphs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure its API compatible too so I think you can literally just replace the constructors and be done
} | ||
tagIterator := storage.TagsToIdentTagIterator(query.Tags) | ||
|
||
var ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering how this is being broken up, it may be a good idea to use the pooled workerpools like read does? That way every incoming request will get a set of workers for all datapoints in it and we don't have situations where some requests get starved
If we want to simplify it a bit, can revisit my m3x PR at: m3db/m3x#181 and make it a first class citizen, then refactor the readerpools too?
Alternatively, just use the same pool as the readerpools here if you agree with taking this approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain the rationale for pooled worker pools? Kind of confused by that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General idea was to give each incoming request its own pool while having a static max size, so that the more expensive requests would not tie up smaller requests, if that's not the correct approach, happy to drop it and revert to your pooled worker pool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sync'd up in person and settled on (pending discussion with @nikunjgit) replacing the existing WorkerPool with PooledWorkerPool and then getting rid of the generic ObjectPool on top of that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'll use a PooledWorkerPool for writes and let @arnikola merge the read object pool in another diff. Sound good ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
) | ||
|
||
for _, datapoint := range query.Datapoints { | ||
s.writeWorkerPool.Go(func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you need to capture the datapoint variable here; surprised that there are no failing tests... might be good to add one that tries to write multiple datapoints and checks that the correct ones are being written?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point.
src/query/storage/m3/storage.go
Outdated
for _, datapoint := range query.Datapoints { | ||
s.writeWorkerPool.Go(func() { | ||
wg.Add(1) | ||
tagIter := tagIterator.Duplicate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these threadsafe? Also since these all use the same iterator, can you define this outside of the loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah makes sense
src/query/storage/m3/storage.go
Outdated
@@ -257,20 +260,28 @@ func (s *m3storage) Write( | |||
id := query.Tags.ID() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should consider renaming WriteQuery to WriteRequest or something, query implies a response to a question
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
used at a lot of places. I'd prefer to avoid that change in this diff
src/query/storage/m3/storage.go
Outdated
query *storage.WriteQuery, | ||
datapoint ts.Datapoint, | ||
identID ident.ID, | ||
iterator ident.TagIterator) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
iterator ident.TagIterator,
) error {
src/query/storage/m3/storage.go
Outdated
var ( | ||
namespace ClusterNamespace | ||
err error | ||
) | ||
switch common.attributes.MetricsType { | ||
switch query.Attributes.MetricsType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: pull out query.Attributes
and s.clusters
into variables
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think s.clusters is fine
src/query/pools/query_pools.go
Outdated
@@ -53,7 +53,7 @@ func BuildWorkerPools( | |||
cfg config.Configuration, | |||
logger *zap.Logger, | |||
scope tally.Scope, | |||
) (pool.ObjectPool, instrument.Options) { | |||
) (pool.ObjectPool, xsync.WorkerPool, instrument.Options) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiousity, why is there a pool of worker pools?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed this quesiton in the other comment
src/query/pools/query_pools.go
Outdated
writePoolSize = defaultWorkerPoolSize | ||
} | ||
|
||
writeWorkerPool := xsync.NewWorkerPool(writePoolSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you need to call init here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point
I'm happy to test this under load if it's at a point where that would make sense. |
I'm happy to test too (with around 70k metrics/s). |
Duplicate iterator Comments
beb200b
to
d350d1a
Compare
Codecov Report
@@ Coverage Diff @@
## master #942 +/- ##
==========================================
- Coverage 77.88% 77.88% -0.01%
==========================================
Files 410 410
Lines 34363 34373 +10
==========================================
+ Hits 26765 26770 +5
- Misses 5750 5755 +5
Partials 1848 1848
Continue to review full report at Codecov.
|
@@ -54,6 +55,9 @@ func NewStorageAndSession( | |||
Retention: TestRetention, | |||
}) | |||
require.NoError(t, err) | |||
storage := m3.NewStorage(clusters, nil) | |||
writePool, err := sync.NewPooledWorkerPool(10, sync.NewPooledWorkerPoolOptions()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider a test to make sure we're using the pools?
w.timestamp, w.value, common.unit, common.annotation) | ||
} | ||
|
||
type writeRequestCommon struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
No description provided.