Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GroupByUntil to use BufferUntilSubscriber #1177

Merged
merged 1 commit into from
May 9, 2014

Conversation

akarnokd
Copy link
Member

@akarnokd akarnokd commented May 9, 2014

Matches groupBy behavior. (I still think throwing an error for non-first subscribers is too harsh.)

@cloudbees-pull-request-builder

RxJava-pull-requests #1090 FAILURE
Looks like there's a problem with this pull request

@benjchristensen
Copy link
Member

(I still think throwing an error for non-first subscribers is too harsh.)

What would you do differently that doesn't either require multicasting and re-create the problem of losing messages or unbounded buffering and memory bloat to not lose messages?

@akarnokd
Copy link
Member Author

akarnokd commented May 9, 2014

I would implement a replayFirst() and replayAll() methods on the GroupedObservable and let the programmer decide. The default behavior would be a simple publish. The problem with the BufferUntilSubscriber is that there is no way to request a different behavior.

@benjchristensen
Copy link
Member

I think that would still require buffering everything, as they could change their mind at any time and invoke replayAll().

Can you provide use cases where replayFirst() and replayAll() would be used?

Here is how I think of groupBy and the various challenges it has:

o.groupBy(function).skip(1).flatMap(group -> {
   if(something) {
      return group.map(t -> {
         // do stuff
      });
   } else {
     return group.replayAll().map(t -> {
         // do stuff
      });
   }
});

In this case, publish is done on some, replayAll() done on some, and 1 is skipped.

For the skipped one, there is no way for us to know it was skipped, or subscription is just being delayed.

BufferUntilSubscriber still doesn't have a good solution for this and will have a memory leak as noted in its comments:

 * This currently has temporary unbounded buffers. It needs to become bounded and then do one of two things:
 * 
 * 1) blow up and make the user do something about it
 * 2) work with the backpressure solution ... still to be implemented (such as co-routines)

For the ones that are subscribed to, under what circumstance would replayAll or replayFirst be used? If suggests the developer is capturing the GroupedObservable's, storing them and subscribing to them multiple times. That just doesn't make sense inside a groupBy.

A GroupedObservable inside a groupBy should be subscribed to once inside the flatMap. Like all other Observables, if it is intended to be shared the developer should use .replay(), .publish(), .multicast(s) or whatever fits their use case.

Also, if we put replayFirst() and replayAll() on GroupedObservable this means that every GroupedObservable has just been made into a ReplaySubject, which should not be a requirement of a GroupedObservable.

It would help me if you could provide concrete examples of why replayFirst() and replayAll() would be helpful and how they wouldn't cause other problems (like unbounded buffers that we also need to solve).

@akarnokd
Copy link
Member Author

akarnokd commented May 9, 2014

Unfortunately, I can't give concrete use cases for this. I have some expectation of correctness and behavior, but I don't have real requirements coming from real-world use. I think the current BufferUntilSubscriber does what most users would expect when they concat or merge groups so let's leave it this way and put emphasis on auto-connecting replay() (i.e., replayAll()) and publish() (i.e., the original publish behavior).

@benjchristensen
Copy link
Member

Okay. I fully agree that BufferUntilSubscriber feels like a hack, and we are definitely making trade-offs. My goal is to keep those hacks internal and not modify public APIs and allow us to improve over time as our ideas on this improve.

I didn't quite understand this part ->

put emphasis on auto-connecting replay() (i.e., replayAll()) and publish() (i.e., the original publish behavior).

Do you mean just in the documentation?

benjchristensen added a commit that referenced this pull request May 9, 2014
GroupByUntil to use BufferUntilSubscriber
@benjchristensen benjchristensen merged commit 461730f into ReactiveX:master May 9, 2014
@akarnokd
Copy link
Member Author

akarnokd commented May 9, 2014

Do you mean just in the documentation?
I meant a replay() and publish() overload that auto-connects instead of returning ConnectableObservable, if we want such overloads instead of the longer .publish().refCount().

@akarnokd akarnokd deleted the GroupByUntilTimeGap branch May 9, 2014 17:58
@benjchristensen benjchristensen mentioned this pull request Jun 1, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants