-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
2.x: Add efficient mergeWith(Single|Maybe|Completable) overloads. #5847
Conversation
Codecov Report
@@ Coverage Diff @@
## 2.x #5847 +/- ##
==========================================
+ Coverage 96.4% 96.5% +0.09%
- Complexity 5834 5852 +18
==========================================
Files 640 646 +6
Lines 41944 42608 +664
Branches 5804 5906 +102
==========================================
+ Hits 40438 41119 +681
+ Misses 582 576 -6
+ Partials 924 913 -11
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like the overloads 👍
void otherSuccess(T value) { | ||
if (compareAndSet(0, 1)) { | ||
actual.onNext(value); | ||
otherState = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we go and use some constants here to improve readability? It's really hard to follow.
|
||
@Test | ||
public void normal() { | ||
final TestSubscriber<Integer> ts = new TestSubscriber<Integer>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be inlined here by using test()
|
||
@Test | ||
public void normal() { | ||
final TestObserver<Integer> ts = new TestObserver<Integer>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's an to
and not ts
|
||
@Test | ||
public void cancel() { | ||
final PublishSubject<Integer> pp = PublishSubject.create(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ps
not pp
|
||
@Test | ||
public void normal() { | ||
final TestObserver<Integer> ts = new TestObserver<Integer>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
|
||
@Test | ||
public void normal() { | ||
final TestObserver<Integer> ts = new TestObserver<Integer>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: same
|
||
emitted = e; | ||
consumed = c; | ||
missed = addAndGet(-missed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see requested
not being reduced by emitted amount and emitted
is now non-volatile field. A bit different to older patterns I am familiar with. Looks handy to eliminate compareAndSet
calls on requested
from this spot. Nice.
if (compareAndSet(0, 1)) { | ||
long e = emitted; | ||
if (requested.get() != e) { | ||
SimplePlainQueue<T> q = queue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure don't need to call getOrCreateQueue()
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. If there is no queue yet or is empty, that means T can be emitted immediately and events from the main source won't be reordered. Otherwise the queue exists and the item should be queued.
return; | ||
} | ||
} | ||
drainLoop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm surprised to see this call drainLoop()
here rather than drain
(and in otherSuccess()
) . Are you sure access to drainLoop
will be serialized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the execution reaches this, the work-in-progress indicator is known to be non-zero as we are in the drain mode already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, thanks
Conflict resolved. There was an expected merge conflict with the |
This PR adds specialized overloads to the
mergeWith
operator inFlowable
andObservable
.If accepted, the marbles will be updated in a separate PR.
Related: #5350.