-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
API for Creating Safe Back Pressure Observables #3003
Comments
What's wrong with AbstractOnSubscribe? In addition, there is a PR #2813 which features a batching-capable producer-construction. Also you can read my blog about Advanced RxJava which explains the concepts and shows examples about how one can build producers and thus operators and |
@akarnokd The purpose of this issue is to initiate a discussion and refine the concepts in AbstractOnSubscribe and the AbstractProducer. Both of these facilities are currently annotated Some items that need some work on the AbstractOnSubscribe:
The AbstractProducer is clearly a better approach to me. For every |
@benjchristensen and I worked on the following examples and we would like input on the following APIs. We have broken apart this issue into what we think are 2 primary use cases. Firstly there is the case where data is generated asynchronously from a non-blocking source. Secondly there is an api for writing responsible observables from synchronous sources. This looks very similar to the AbstractOnSubscribe. The async case is illustrated in code below. We are considering a system which calls your function every time there is a call to // to illustrate method signature in below example
class ObservableAsync {
/**
* Has an overload for handling request(Long.MAX_VALUE)
* It is possible to chunk an infinite request and make a requestFiniteFunc
* play nicely.
*
* Note: this assumes serialized request calls per subscriber. This means we
* will have to do thread stealing (essentially merge).
*/
public static <T> Observable<T> createAsynchronous(
Func0<S> generatorFunc,
Func2<S, Long requested, Observable<T>> requestFiniteFunc,
Func1<S, Observable<T>> requestInfiniteFunc,
Action1<S> onTerminate) {}
}
// Data state class used in the below example
public class RestClientState<T> {
private Client client = /* your favorite async client */ null;
private Integer lastIndex = 0;
}
// ASYNC EXAMPLE
Observable.<T>createAsynchronous(
() -> { new RestClientState<Foo>(url) },
(RestClientState<Foo> state, Long requested) -> {
int i = state.lastIndex;
state.lastIndex = i + requested;
Observable<Foo> obs = state.client.getRange(i, i+requested);
return obs;
},
(RestClientState<Foo> state) -> {
Observable<Foo> allOfThem = state.client.getAll();
return allOfThem;
},
(RestClientState<Foo> state) -> {
state.client.free();
}); The synchronous case (below) is a modification from the existing designs embodied in the
class ObservableSync {
public static <T> Observable<T> createSynchronous(
Func0<S> generatorFunc,
Func2<S, Subscriber<T>, S> nextFunc,
Action1<S> onTerminate) {/*...*/}
// Functional equivalent overload but no need to return
public static <T> Observable<T> createSynchronousStateful(
Func0<S> generatorFunc,
Action2<S, Subscriber<T>> nextFunc,
Action1<S> onTerminate) {/*...*/}
}
List<Foo> myList = null; // ....
// SYNC Example
Observable.<Foo>createSynchronous(
() -> { myList.iterator() },
(Iterator<Foo> state, Subscriber<Foo> child) -> {
if (!state.hasNext())
child.onCompleted();
// cannot onNext more than once from func
child.onNext(state.next());
return state;
},
(it) -> {it.free()}
).subscribe(); |
How will you make sure the observable contains exactly n elements? What kind of observable would you create and return?
Requested amounts are Longs. |
@akarnokd The return type would be a plain ole Do we have to ensure that the observable contains exactly n elements? If they write an irresponsible observable then they get a I have edited the comment to correct the types. |
I think that an implementation for #3017 should help serialize the output from multiple observables. |
@akarnokd Do you disagree with anything here in principle? If not, are you already working on refactoring the synchronous |
I'm not working on any of the cases and I'm not planning it either. |
I strongly agree that we need to improve how These "abstract" classes are the base, but I suggest we need to incorporate them into the Observable API such as this: Non-backpressured Push SourcesThis is for pure push sources that don't have backpressure. Observable.createEventStream(..., DefaultBackpressureStrategy) For example: // mouse event listener with default backpressure strategy to DROP
Observable.createEventStream(o -> {
mouseEventListener.addListener(l -> o.onNext(l));
}, OnBackpressure.DROP); Backpressured Synchronous Pull Sources (Batched Stream)This is for data sources that can be pulled synchronously, such as an Observable.createSync(...) Backpressured Asynchronous Pull Sources (Batched Stream)This is for data sources that can be pulled asynchronously, such as network IO, async File IO, or retrieving from queues across thread-boundaries, etc. This is very common, and very hard to do right. Observable.createAsync(...) Backpressured Asynchronous Pull Sources (Single Item)This is actually for returning a This is the same behavior as the Single<T> Observable.createFetch(...) SummaryIf we can improve how these all work, it will greatly simplify creation and use of Observable. Are there use cases or variants I'm missing? What should we name them? What are the method signatures we should have for each? |
We have |
I think we should come up with factory methods on Observable to expose these abilities alongside the default 'create'. What do you think the right names are? |
|
How about something starting with 'create' so that people find it alongside the 'create' method in Javadocs and IDE code-completion? We want to nudge people to using these instead of normal 'create'. Also, I don't want to confuse with the Rx.Net/RxJS 'generate', as they aren't exactly the same (though the sync one is similar). |
A quick note that I don't think the names |
It'd be a good idea if we fixed these inefficient Initial names that come to mind... But considering @davidmoten 's input perhaps the existing names are not indicative of their usage. That said, would you agree with |
We have
|
I'm closing this issue due to inactivity. If you have further input on the issue, don't hesitate to reopen this issue or post a new one. |
The Problem:
Creating Back-Pressure Enabled Observables that produce data from a non-blocking source is hard and our current APIs for creating arbitrary observables do not lend towards creating observables that respect the back pressure rules. Users have a choice between calling one of the Observable static method to generate standard case observables using well known patterns or they can attempt to implement a responsible observable out of an
OnSubscribe<T>
. I will define a "responsible Observable" to be an observable that does the following:subscriber.onNext(t)
only when thesubscriberCapacity > 0
wheresubscriberCapacity
is the sum of the integer arguments for all invocations ofproducer.request(n)
minus the number of calls made tosubscriber.onNext(t)
.subscriber.onCompleted()
orsubscriber.onError(e)
only once.subscriber.onNext(t)
concurrently.The onSubscribe lifecycle of a responsible observable should fit within the following lifecycle.
onSubscribe.call(Subscriber<T>)
method is invoked by theObservable.subscribe(Subscriber<T>)
method.Producer
via thesubscriber.setProducer(Producer<T>)
method. At some point in the (potentially distant) future the subscriber may callproducer.request(n)
. This signals to the observable that capacity is allocated in the subscriber.3a. Values are produced via a call to
subscriber.onNext(t)
.3b. At some point a terminal event is emitted via
onCompleted
oronError
.This allows for many different variations of control flow given that onNexts could possibly be scheduled, data could be pre-fetched, excess data could be batched or dropped, etc...
Proposal
With this ground work in mind I would like to start a discussion about a general framework that correctly models the various use cases, particularly with respects to the following requirements:
Naive Implementation
User provides callbacks that the framework will call between each of the steps above. That is
onSubscribe.call(s)
S dataState = onSubscribeGenerator.call()
subscriber.setProducer(p)
setting a framework Producerp.request(n)
dataState
for future requests.dataState = onRequestCallback.call(dataState, n, s)
Obvious short comings of this approach: Order may not be preserved across multiple requests if the
onRequestCallback
schedules the onNext. Also onNexts may be concurrent so the subscriber must be wrapped with a serializing subscriber. Also, a case could be made that many observables created should specify the backpressure strategy. Any thoughts are welcome.The text was updated successfully, but these errors were encountered: