-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Operation: buffer #281
Operation: buffer #281
Conversation
RxJava-pull-requests #153 SUCCESS |
Thank you @michaeldejong I will review and either merge or provide feedback. |
This is a great piece of code @michaeldejong ... I don't fully understand all the nuances yet but I'm going to start posting some questions inline on the code. The code looks good and is very well documented, and the unit tests are awesome, very key to me understanding the code. |
creator.stop(); | ||
buffers.emitAllBuffers(); | ||
observer.onError(e); | ||
e.printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like it was accidentally left behind after doing some debugging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you're right. I'll remove this...
RxJava-pull-requests #166 FAILURE |
@michaeldejong Sorry for not responding to this yet ... I will but have become very busy on some other things for a bit. |
Anyone else interested and able to provide a code review? |
@michaeldejong at nearly 1500 concise lines of code and tests, this PR could be daunting for someone to start reviewing. I'd estimate review at somewhere around an hour minimum. Why don't you draw in folks in by adding usage examples into your PR description as a first step. Ask someone who uses javarx if they don't mind taking a pass at review? I can make time next week sometime, too. |
…s to rx.util package.
RxJava-pull-requests #169 FAILURE |
I've updated the PR description with an explanation of what each operator variation does, and how it does it. I hope this helps in understanding the concepts behind this operator. |
FYI: might want to bump this commit as I think the above failure might be transient |
@michaeldejong Can you respond to Ben's question about the test returning 3 empty events? |
RxJava-pull-requests #179 FAILURE |
RxJava-pull-requests #180 SUCCESS |
@@ -635,6 +638,222 @@ public Subscription call(Observer<T> observer) { | |||
} | |||
|
|||
/** | |||
<<<<<<< HEAD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a leftover from a failed merge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed here: f651b7a
Operation: buffer
The buffer operation is an operation which allows you to buffer incoming events into one or more buffers. Depending on their purpose, these buffers can be emitted to an
Observer
when needed. In essence this means that events are collected and propagated to theObserver
in batches. The original issue, which this pull request addresses, mentions a total of 10 variations on this operator. This pull request contains code which supports all of them.The 10 variations which exist in Rx .Net, can be subdivided into two categories: single and multi buffers. With single buffers, only one buffer at any time is actively collecting incoming events, whereas with multi buffers, multiple buffers are actively collecting incoming events. In the first case the
Observer
will receive every original event only once in one buffer, whereas in the latter case, theObserver
will receive every original event zero or more times.Single buffers
buffer(bufferClosingSelector)
The bufferClosingSelector parameter is a
Func0<Observable<BufferClosing>>
. It uses toFunc0
object to construct anObservable
which produces aBufferClosing
object. Once this object has been produced by theObservable
the currently active buffer will be closed and emitted to theObserver
. At the same time a new buffer will be created which will start recording incoming events.buffer(count)
This operator closes and emits the current buffer after counting a certain amount of received events. At the same time it will create a new buffer which will start recording incoming events. One example would be
buffer(2)
, which with the following input:[0, 1, 2, 3, 4, 5]
will output the following buffers:[0, 1], [2, 3], [4, 5]
.buffer(timespan)
This operator closes and emits the current buffer after a certain amount of time has elapsed. At the same time it will create a new buffer which will start recording incoming events.
buffer(timespan, scheduler)
Same as previous operator, but now with a custom scheduler.
buffer(timespan, count)
This operator closes and emits the current buffer after counting a certain amount of received events or after a certain amount of time has elapsed. At the same time it will create a new buffer which will start recording incoming events.
buffer(timespan, count, scheduler)
Same as previous operator, but now with a custom scheduler.
Multiple buffers
buffer(count, skip)
This operator will create a new buffer after it has received 'skip' amount of events. Each buffer will be closed once it has reached a capacity of '[count'. One example would be
buffer(3, 1)
, which with the following input:0, 1, 2, 3, 4, 5]
, will output the following buffers:[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4, 5]
.buffer(bufferOpenings, bufferClosingSelector)
This operator takes two parameters: an
Observable<BufferOpening>
and aFunc1<BufferOpening, Observable<BufferClosing>>
. The first parameter determines when buffers are created. The second parameter determines when the buffers are closed. Every time aBufferOpening
object is received from theObservable
a new buffer is created. The receivedBufferOpening
object is fed into the second parameter which yields anObservable<BufferClosing>
object. When thisObservable
produces aBufferClosing
object, the associated buffer is closed and emitted.buffer(timespan, timeshift)
This operator is very similar to
buffer(count, skip)
, but in stead of counting events, it's based on time. Thetimeshift
period defines how often a new buffer will be created. Thetimespan
period defines the period between buffer construction and buffer emission.buffer(timespan, timeshift, scheduler)
Same as previous operator, but now with a custom scheduler.
As always, feedback is welcome!