Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flattenConcurrentlyAtMost): add new extra #260

Merged
merged 1 commit into from
Jan 10, 2019

Conversation

xtianjohns
Copy link
Contributor

@xtianjohns xtianjohns commented Jul 24, 2018

Add flattenSequentiallyAtMost(n) extra.

Description

Add new extra operator file and unit tests. The operator extra started as a copy of flattenConcurrently wrapped in a factory function. From there, a few renames and some combined functionality from flattenSequentially to implement the inner buffer/queue named _seq.

There are some distinctions between both, obviously. The completion method (_c) needs to be able to determine if there are any currently connected streams, or any streams in the buffer. If the answer to both questions is no, then we send a completion event downstream.

Additionally, there may be a case when the last inner stream completes, there's no items left in the queue, and the meta stream has completed. This also means we should send a completion event downstream.

To track these fields, the operators gets some new properties, _l which tracks how many streams the operator is listening to, _seq which represents the buffer and _d which indicates whether the meta stream has completed.

Resolves #161.

Motivation and Context

flattenSequentiallyAtMost is designed to provide consumer-configurable concurrency to flattening operations. Two flattening extras exist which allow consumers to flatten a meta stream with maximum concurrency, or with no concurrency. This new operator supports a concurrency limit, representing the maximum amount of additional streams to connect to during flattening.

How Has This Been Tested?

I've added 2 new tests which verify that if an inner stream errors that we send an error downstream, and that we don't greedily connect to more than n streams at a time.

I've also duplicated both test suites from flattenSequentially and flattenConcurrently, and substituted those extras with flattenConcurrentlyAtMost(1) and flattenConcurrentlyAtMost(Infinity) respectively. This may be overkill, but was a powerful tool during development to know I wasn't violating semantics by adding the configurable buffer.

Checklist:

  • My code follows the code style of this project.
  • My change requires a change to the documentation.
  • I have updated the documentation accordingly.
  • I have added tests to cover my changes.
  • All new and existing tests passed.

@staltz
Copy link
Owner

staltz commented Jul 24, 2018

Wow, thanks again @xtianjohns! Overall looks great, and I really appreciate that you write textual description as well. I've been trying to close the gate for new operators (even extra) into xstream, I myself have been publishing new operators as separate repos, but I'll keep my word in issue #161 since that was the deal.

Some things to note: it's flattenConcurrentlyAtMost, not flattenSequentiallyAtMost, double check that you don't have that typo. E.g. the commit message is wrong.

"At most" means "not more than", so we should change the n+1 behavior to be just n. flattenConcurrentlyAtMost(1) means not more than 1 at a time.

Plus a comment inline I'll make...

@xtianjohns xtianjohns changed the title feat(flattenSequentiallyAtMost): add new extra feat(flattenConcurrentlyAtMost): add new extra Jul 24, 2018
@xtianjohns
Copy link
Contributor Author

Thanks @staltz! I dunno what happened with the message (I got the branch name right), I've rebased the change and I've also grep'd the diff for the name and made sure I'm using the right language.

I've updated the limit to reflect the max for number of streams to listen to, so now flattenSequentially and flattenConcurrentlyAtMost(1) are equivalent.

@xtianjohns xtianjohns force-pushed the flattenConcurrentlyAtMost branch from c91ba77 to a66d30b Compare July 24, 2018 21:22
@staltz
Copy link
Owner

staltz commented Jul 27, 2018

Thanks @xtianjohns! I noticed a few other issues, mostly with docs/tests. Comments inline

Copy link
Owner

@staltz staltz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs

@xtianjohns
Copy link
Contributor Author

Yep, these make sense @staltz, will fix.

Add flattenConcurrentlyAtMost(n) extra.

flattenConcurrentlyAtMost is designed to provide consumer-configurable
concurrency to flattening operations. Two flattening extras exist which
allow consumers to flatten a meta stream with maximum concurrency, or
with no concurrency. This new operator supports a concurrency limit,
representing the maximum amount of _additional_ streams to connect to
during flattening.

Resolve staltz#161.
@xtianjohns xtianjohns force-pushed the flattenConcurrentlyAtMost branch from a66d30b to 4663fe1 Compare January 9, 2019 19:19
@xtianjohns
Copy link
Contributor Author

Sorry for the wait, @staltz. You may be even less likely to merge now since it's been so long, but hey - at least you can decline a complete PR. 😄

@staltz staltz closed this Jan 10, 2019
@staltz staltz reopened this Jan 10, 2019
@staltz
Copy link
Owner

staltz commented Jan 10, 2019

@xtianjohns Thanks for the reminder! No reason to decline this PR, I probably just lost it in the middle of many other notifications. :)

@staltz staltz merged commit ef8ed44 into staltz:master Jan 10, 2019
@staltz
Copy link
Owner

staltz commented Jan 10, 2019

Released 11.9.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants