Skip to content

ralusek/streamie

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

81 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Streamie: It's ex-streamie cool!

GitHub license npm version PRs Welcome

What is a streamie?

A streamie is an alternative to promises, streams, async iterators, arrays, and reactive observables like rxJS or Highland. It provides a wide array of features like pagination, mapping, filtering, batching, flattening, and concurrency control.

Why should I use a streamie?

Because it's the simplest and most familiar interface for common but complex behaviors on indefinite data.

1.) A streamie has useful iterator methods like .map, .filter, and .push on an infinite, asynchronous collection. All handler functions in these iterators are themselves asynchronous, so promises returned in them will be awaited for the item to have been considered processed and the queue to progress.

2.) A streamie offers an extremely simple interface for modifying control flow through various asynchronous activities, notably:

  • concurrency: for any iterative method, a concurrency can be specified to parallelize that asynchronous action
  • batching/flattening: for any iterative method, a batchSize can be specified to allow a batching of inputs up to this count before executing the iterator method. Likewise a flatten: true may be specified to flatten the input of the an iterator method.
  • backpressure: backpressure is automatically handled so that asynchronous tasks at different points in the pipeline cannot iterate beyond what its outputs are capable of handling.

3.) Fully typed in TypeScript with inference in most cases.

Seriously, check out this interface.

Installation

npm install --save streamie

Examples

Pagination

const paginator = streamie(async (page: number, { push }) => {
  // Fetch data from an API or other source.
  const data = await fetchData(page);
  // If there's more data, push a new item into the streamie queue for handling.
  if (data.hasMore) {
    push(page + 1);
  }
  // Return the data to be processed by downstream functions.
  return data.items;
}, {});

// Start the streamie with the first page.
paginator.push(0);

Note, data sources like this which are producing their own inputs can be self-seeded like this:

const paginator = streamie(async (page: number, { push }) => {
  // Fetch data from an API or other source.
  const data = await fetchData(page);
  // If there's more data, push a new item into the streamie queue for handling.
  if (data.hasMore) {
    push(page + 1);
  }
  // Return the data to be processed by downstream functions.
  return data.items;
}, { seed: 0 }); // Automatically push this in as a starting value to begin processing

Flattening

Well, imagine that this fetchData returns 50 items at a time, and we want to handle them individually for some purpose.

We can begin by flattening the output of this streamie, so rather than streaming out in chunks of 50, they stream out as individual items.

const paginator = streamie(async (page: number, { push }) => {
  const data = await fetchData(page);
  if (data.hasMore) {
    push(page + 1);
  }
  return data.items;
}, { seed: 0, flatten: true });

Now's let's do an example of handling them individually

paginator
.map((item: Item) => {
  return doSomethingIndividually(item);
}, {});

Batching

Or we can go in the opposite direction, where we're then going to take these individual items streaming out, and we have an api we can use that can accept 10 of them in a single request. By using batchSize, anything > 1 will group up the inputs into a batch of that size prior to calling the handler.

paginator
.map((items: Item[]) => {
  return upload10AtATime(items);
}, { batchSize: 10 });

Concurrency

Well what if this API let us do that, and said we could upload 10 in a single request, and perform 5 of those requests simultaneously?

paginator
.map((items: Item[]) => {
  return upload10AtATime(items);
}, { batchSize: 10, concurrency: 5 });

Draining/Completion/Promises

And what happens when we're done? Well, you can call streamie.drain() to drain the remainder of the items. If a batchSize is in effect, the final batch during draining is allowed to be less than that number in order to fully drain. Upstream streamies will signal to downstream ones to drain when all of their inputs have drained.

Every streamie returns a promise that will be resolved upon being fully drained. A streamie's handler can also drain itself, such as the case with our paginator:

const paginator = streamie(async (page: number, { push, drain }) => {
  const data = await fetchData(page);
  if (data.hasMore) {
    push(page + 1);
  } else drain();
  return data.items;
}, { seed: 0, flatten: true });

And to mark the process complete with the promise, here's the whole thing:

await paginator
.map((items: Item[]) => {
  return upload10AtATime(items);
}, { batchSize: 10, concurrency: 5 })
.promise;

// Here, the process is complete.

Typescript

In most cases, the types can be inferred, however, if you have certain combinations of modifiers, Typescript struggles to infer correctly. The generics have a simple interface for specifying the input queue item type and the output item queue type, as well as the config object.

The generics are specified like this:

streamie<number, number[], { batchSize: 2}>(
  (inputs: number[]) => inputs.map(input => input * 2),
  { batchSize: 2 }
);

Contributing

We welcome contributions! Please see our contributing guidelines for more information.

License

MIT

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published