You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Currently the zip operator has an unbounded queue for each Observable it is combining together. This makes it async and susceptible to buffer-bloat when fast-producer/slow-consumer issues occur, but even worse with zip it can occur when it has fast-producer/slow-producer. Consider you can have n producers, all it takes is one producer to cause all the rest to buffer.
Similar to recent changes to observeOn (#835) we want zip to default to blocking onNext ("non-blocking on the outside, blocking on the inside") so that each Observable can only onNext when the previous onNext was consumed.
Overloads should exist to allow buffering if desired. The goal is for normal use of Rx to provide natural back-pressure that only gets lost when consciously choosing to do so or using explicit operators that do queueing (such as buffer and window).
On the current zipimplementation, making the internal queues block at size 1 will not work, as it deadlocks anytime 2 synchronous Observables are zipped together (common).
An alternative approach is to start treating zip as an operator that requires concurrency to behave correctly (similar to window, buffer, interval). With concurrency it can subscribe to each Observable and block them as needed. The problem with this is that it could result in many blocking threads. This could potentially be reduced by conditionally removing the concurrency if it is seen that the Observable is already running on another thread. The other problem is likely performance impact.
Yet another proposal (unknown if it would work) is a scheme that would require changes to Subscriber (highly unwanted) to allow pausing/resuming of Observables that zip could control.
It may not be possible to achieve the goal of zip with back-pressure with reasonable trade-offs but we'd like to pursue it.
The text was updated successfully, but these errors were encountered:
Currently the
zip
operator has an unbounded queue for eachObservable
it is combining together. This makes it async and susceptible to buffer-bloat when fast-producer/slow-consumer issues occur, but even worse withzip
it can occur when it has fast-producer/slow-producer. Consider you can have n producers, all it takes is one producer to cause all the rest to buffer.Similar to recent changes to
observeOn
(#835) we wantzip
to default to blockingonNext
("non-blocking on the outside, blocking on the inside") so that eachObservable
can onlyonNext
when the previousonNext
was consumed.Overloads should exist to allow buffering if desired. The goal is for normal use of Rx to provide natural back-pressure that only gets lost when consciously choosing to do so or using explicit operators that do queueing (such as
buffer
andwindow
).On the current
zip
implementation, making the internal queues block at size 1 will not work, as it deadlocks anytime 2 synchronousObservable
s are zipped together (common).An alternative approach is to start treating
zip
as an operator that requires concurrency to behave correctly (similar towindow
,buffer
,interval
). With concurrency it cansubscribe
to eachObservable
and block them as needed. The problem with this is that it could result in many blocking threads. This could potentially be reduced by conditionally removing the concurrency if it is seen that theObservable
is already running on another thread. The other problem is likely performance impact.Yet another proposal (unknown if it would work) is a scheme that would require changes to
Subscriber
(highly unwanted) to allow pausing/resuming ofObservable
s thatzip
could control.It may not be possible to achieve the goal of
zip
with back-pressure with reasonable trade-offs but we'd like to pursue it.The text was updated successfully, but these errors were encountered: