-
-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(flattenConcurrentlyAtMost): add new extra #260
Conversation
Wow, thanks again @xtianjohns! Overall looks great, and I really appreciate that you write textual description as well. I've been trying to close the gate for new operators (even extra) into xstream, I myself have been publishing new operators as separate repos, but I'll keep my word in issue #161 since that was the deal. Some things to note: it's flattenConcurrentlyAtMost, not flattenSequentiallyAtMost, double check that you don't have that typo. E.g. the commit message is wrong. "At most" means "not more than", so we should change the n+1 behavior to be just n. flattenConcurrentlyAtMost(1) means not more than 1 at a time. Plus a comment inline I'll make... |
Thanks @staltz! I dunno what happened with the message (I got the branch name right), I've rebased the change and I've also grep'd the diff for the name and made sure I'm using the right language. I've updated the limit to reflect the max for number of streams to listen to, so now |
c91ba77
to
a66d30b
Compare
Thanks @xtianjohns! I noticed a few other issues, mostly with docs/tests. Comments inline |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docs
Yep, these make sense @staltz, will fix. |
Add flattenConcurrentlyAtMost(n) extra. flattenConcurrentlyAtMost is designed to provide consumer-configurable concurrency to flattening operations. Two flattening extras exist which allow consumers to flatten a meta stream with maximum concurrency, or with no concurrency. This new operator supports a concurrency limit, representing the maximum amount of _additional_ streams to connect to during flattening. Resolve staltz#161.
a66d30b
to
4663fe1
Compare
Sorry for the wait, @staltz. You may be even less likely to merge now since it's been so long, but hey - at least you can decline a complete PR. 😄 |
@xtianjohns Thanks for the reminder! No reason to decline this PR, I probably just lost it in the middle of many other notifications. :) |
Released 11.9.0 |
Add flattenSequentiallyAtMost(n) extra.
Description
Add new extra operator file and unit tests. The operator extra started as a copy of
flattenConcurrently
wrapped in a factory function. From there, a few renames and some combined functionality fromflattenSequentially
to implement the inner buffer/queue named_seq
.There are some distinctions between both, obviously. The completion method (
_c
) needs to be able to determine if there are any currently connected streams, or any streams in the buffer. If the answer to both questions is no, then we send a completion event downstream.Additionally, there may be a case when the last inner stream completes, there's no items left in the queue, and the meta stream has completed. This also means we should send a completion event downstream.
To track these fields, the operators gets some new properties,
_l
which tracks how many streams the operator is listening to,_seq
which represents the buffer and_d
which indicates whether the meta stream has completed.Resolves #161.
Motivation and Context
flattenSequentiallyAtMost is designed to provide consumer-configurable concurrency to flattening operations. Two flattening extras exist which allow consumers to flatten a meta stream with maximum concurrency, or with no concurrency. This new operator supports a concurrency limit, representing the maximum amount of additional streams to connect to during flattening.
How Has This Been Tested?
I've added 2 new tests which verify that if an inner stream errors that we send an error downstream, and that we don't greedily connect to more than
n
streams at a time.I've also duplicated both test suites from
flattenSequentially
andflattenConcurrently
, and substituted those extras withflattenConcurrentlyAtMost(1)
andflattenConcurrentlyAtMost(Infinity)
respectively. This may be overkill, but was a powerful tool during development to know I wasn't violating semantics by adding the configurable buffer.Checklist: