-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
2.x: internal API to get distinct Workers from some Schedulers #5741
Conversation
Codecov Report
@@ Coverage Diff @@
## 2.x #5741 +/- ##
============================================
- Coverage 96.29% 96.18% -0.12%
Complexity 5833 5833
============================================
Files 634 634
Lines 41615 41639 +24
Branches 5761 5766 +5
============================================
- Hits 40073 40050 -23
- Misses 611 640 +29
- Partials 931 949 +18
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename PR to "Internal API to …"?
|
||
@Override | ||
public void createWorkers(int number, WorkerCallback callback) { | ||
int c = cores; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cores
is final
in this context, why copy it to local var?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With potential volatiles around, this won't re-read the field every time it is needed in the loop below.
* The callback interface for the {@link SchedulerMultiWorkerSupport#createWorkers(int, WorkerCallback)} | ||
* method. | ||
*/ | ||
interface WorkerCallback { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WorkerCreatedCallback
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While still it is an internal API, I don't think we need a more complicated naming.
* @param index the worker index, zero-based | ||
* @param worker the worker instance | ||
*/ | ||
void onWorker(int index, @NonNull Scheduler.Worker worker); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
onCreated
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the use case for the index?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In ParallelRunOn
, it let's index into the parent
and subscribers
arrays without the need for additional state, such as counting how many times onWorker
was invoked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, potentially reduces allocations for the user, thanks
👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, will be useful.
* @param index the worker index, zero-based | ||
* @param worker the worker instance | ||
*/ | ||
void onWorker(int index, @NonNull Scheduler.Worker worker); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the use case for the index?
The PR has already been merged but for the record I approve too |
Thanks for the reviews @artem-zinnatullin @vanniktech & @davidmoten ! |
This PR adds an internal interface
SchedulerMultiWorkerSupport
that allows retrieving multiple workers from aScheduler
that implements this interface.The standard
Scheduler.getWorker()
can be invoked as many times as necessary, but specific implementations such as thecomputation()
Scheduler
and theParallelScheduler
is not guaranteed to return workers that are backed by distinct single-threaded thread pools.This does not effect other scheduler types because:
single()
) or don't use threads at all (trampoline()
) andio()
,newThread()
).Such worker reuse can happen when in a highly concurrent application, typical tasks are mixed with parallel tasks and both pull out workers from these
Scheduler
s. If this is happens, it is possible there will be duplicate threads used by the parallel operations and thus not utilize the originally intended parallelism level.By implementing this suggested interface, a batch-retrieval can be supported by the
Scheduler
s and they can make sure the caller gets as many distinct thread-pool as possible. If more workers are requested than theScheduler
's parallelism, the workers are handed out in round-robin fashion similar to the standardcreateWorker()
.Why a callback instead of returning an array?