Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pooling for writes to coordinator #942

Merged
merged 3 commits into from
Sep 28, 2018
Merged

Conversation

nikunjgit
Copy link
Contributor

No description provided.

}

writeWorkerPool := xsync.NewWorkerPool(writePoolSize)
return objectPool, writeWorkerPool, instrumentOptions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're getting quite a few things returns here, consider bundling it? I foresee a lot more pools being added as we work on perf :P

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wanna use the new worker pool I wrote for the m3db ingesters? Its way more performant because it doesn't allocate goroutines all the time: https://github.com/m3db/m3x/blob/master/sync/pooled_worker_pool.go

That should help fix a lot of the runtime.morestack/runtime.growstack/runtime.copystack in the m3coordinator flame graphs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure its API compatible too so I think you can literally just replace the constructors and be done

}
tagIterator := storage.TagsToIdentTagIterator(query.Tags)

var (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering how this is being broken up, it may be a good idea to use the pooled workerpools like read does? That way every incoming request will get a set of workers for all datapoints in it and we don't have situations where some requests get starved

If we want to simplify it a bit, can revisit my m3x PR at: m3db/m3x#181 and make it a first class citizen, then refactor the readerpools too?

Alternatively, just use the same pool as the readerpools here if you agree with taking this approach

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the rationale for pooled worker pools? Kind of confused by that

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General idea was to give each incoming request its own pool while having a static max size, so that the more expensive requests would not tie up smaller requests, if that's not the correct approach, happy to drop it and revert to your pooled worker pool

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sync'd up in person and settled on (pending discussion with @nikunjgit) replacing the existing WorkerPool with PooledWorkerPool and then getting rid of the generic ObjectPool on top of that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll use a PooledWorkerPool for writes and let @arnikola merge the read object pool in another diff. Sound good ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

)

for _, datapoint := range query.Datapoints {
s.writeWorkerPool.Go(func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to capture the datapoint variable here; surprised that there are no failing tests... might be good to add one that tries to write multiple datapoints and checks that the correct ones are being written?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point.

for _, datapoint := range query.Datapoints {
s.writeWorkerPool.Go(func() {
wg.Add(1)
tagIter := tagIterator.Duplicate()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these threadsafe? Also since these all use the same iterator, can you define this outside of the loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah makes sense

@@ -257,20 +260,28 @@ func (s *m3storage) Write(
id := query.Tags.ID()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should consider renaming WriteQuery to WriteRequest or something, query implies a response to a question

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used at a lot of places. I'd prefer to avoid that change in this diff

query *storage.WriteQuery,
datapoint ts.Datapoint,
identID ident.ID,
iterator ident.TagIterator) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

 iterator ident.TagIterator,
) error {

var (
namespace ClusterNamespace
err error
)
switch common.attributes.MetricsType {
switch query.Attributes.MetricsType {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: pull out query.Attributes and s.clusters into variables

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think s.clusters is fine

@@ -53,7 +53,7 @@ func BuildWorkerPools(
cfg config.Configuration,
logger *zap.Logger,
scope tally.Scope,
) (pool.ObjectPool, instrument.Options) {
) (pool.ObjectPool, xsync.WorkerPool, instrument.Options) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiousity, why is there a pool of worker pools?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed this quesiton in the other comment

writePoolSize = defaultWorkerPoolSize
}

writeWorkerPool := xsync.NewWorkerPool(writePoolSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to call init here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

@BertHartm
Copy link
Contributor

I'm happy to test this under load if it's at a point where that would make sense.

@matejzero
Copy link
Contributor

I'm happy to test this under load if it's at a point where that would make sense.

I'm happy to test too (with around 70k metrics/s).

Duplicate iterator

Comments
@nikunjgit nikunjgit force-pushed the nikunj/writeWorkerpool branch from beb200b to d350d1a Compare September 28, 2018 01:38
@codecov
Copy link

codecov bot commented Sep 28, 2018

Codecov Report

Merging #942 into master will decrease coverage by <.01%.
The diff coverage is 72.34%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #942      +/-   ##
==========================================
- Coverage   77.88%   77.88%   -0.01%     
==========================================
  Files         410      410              
  Lines       34363    34373      +10     
==========================================
+ Hits        26765    26770       +5     
- Misses       5750     5755       +5     
  Partials     1848     1848
Flag Coverage Δ
#dbnode 81.46% <ø> (+0.03%) ⬆️
#m3ninx 75.25% <ø> (-0.08%) ⬇️
#query 64.23% <72.34%> (-0.13%) ⬇️
#x 80.55% <ø> (ø) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update e483028...db4f6d4. Read the comment docs.

@@ -54,6 +55,9 @@ func NewStorageAndSession(
Retention: TestRetention,
})
require.NoError(t, err)
storage := m3.NewStorage(clusters, nil)
writePool, err := sync.NewPooledWorkerPool(10, sync.NewPooledWorkerPoolOptions())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider a test to make sure we're using the pools?

w.timestamp, w.value, common.unit, common.annotation)
}

type writeRequestCommon struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@nikunjgit nikunjgit merged commit 2ec02cb into master Sep 28, 2018
@nikunjgit nikunjgit deleted the nikunj/writeWorkerpool branch September 28, 2018 16:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants