Skip to content

Commit

Permalink
feat: accumulator with mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
Derek Burgman committed Jan 25, 2022
1 parent 549466e commit 6afb408
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 22 deletions.
4 changes: 4 additions & 0 deletions packages/dbx-firebase/src/lib/firestore/iterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ export class FirestoreItemPageIterator<T> {

export class FirestoreItemPageIteratorIterationInstance<T> extends AbstractMappedPageItemIteration<FirestoreItemPageQueryResult<T>, QueryDocumentSnapshot<T>[], InternalFirestoreItemPageIteratorIterationInstance<T>> implements PageItemIteration<QueryDocumentSnapshot<T>[]>, Destroyable {

get snapshotIteration(): InternalFirestoreItemPageIteratorIterationInstance<T> {
return this._instance;
}

protected _mapStateValue(input: FirestoreItemPageQueryResult<T>): QueryDocumentSnapshot<T>[] {
return input.docs;
}
Expand Down
77 changes: 56 additions & 21 deletions packages/rxjs/src/lib/iterator/iteration.accumulator.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,54 @@
import { SubscriptionObject } from '../subscription';
import { distinctUntilChanged, filter } from 'rxjs/operators';
import { distinctUntilArrayLengthChanges, scanBuildArray, filterMaybe, scanIntoArray } from "../rxjs";
import { lastValue, filterMaybeValues, Destroyable } from "@dereekb/util";
import { lastValue, filterMaybeValues, Destroyable, filterMaybeValuesFn, Maybe } from "@dereekb/util";
import { map, Observable, shareReplay, skipWhile } from "rxjs";
import { ItemIteration, PageItemIteration } from "./iteration";
import { LoadingState, loadingStateHasError } from '../loading';

/**
* An item iteration that exposes all accumulated values.
* An object that accumulates and exposes values from an ItemIteration.
*/
export interface ItemIterationAccumulator<V> {
export interface ItemIterationAccumulator<I, O, N extends ItemIteration<I> = ItemIteration<I>> {

/**
* Iteration being accumulated.
*/
readonly itemIteration: ItemIteration<V>;
readonly itemIteration: N;

/**
* Returns all items loaded so far in the iteration in a single array.
*/
readonly allItems$: Observable<V[]>;
readonly allItems$: Observable<O[]>;

}

/**
* An item iteration that exposes all accumulated values.
* An object that accumulates and exposes values from a PageItemIteration.
*/
export interface PageItemIterationAccumulator<V> extends ItemIterationAccumulator<V> {
export type PageItemIterationAccumulator<I, O, N extends PageItemIteration<I> = PageItemIteration<I>> = ItemIterationAccumulator<I, O, N>;

/**
* Iteration being accumulated.
*/
readonly itemIteration: PageItemIteration<V>;
/**
* An accumulator with no mapping.
*/
export type MonotypeItemIterationAccumulator<I, N extends ItemIteration<I> = ItemIteration<I>> = ItemIterationAccumulator<I, I, N>;

/**
* A page accumulator with no mapping.
*/
export type MonotypePageItemIterationAccumulator<I, N extends PageItemIteration<I> = PageItemIteration<I>> = ItemIterationAccumulator<I, I, N>;

}

export class ItemIterationAccumulatorInstance<V, I extends ItemIteration<V> = ItemIteration<V>> implements ItemIterationAccumulator<V>, Destroyable {
export type ItemIterationAccumulatorMapFunction<I, O> = ((item: I) => Maybe<O>) | ((item: I, state: LoadingState<I>) => Maybe<O>);

constructor(readonly itemIteration: I) { }
/**
* ItemIterationAccumulator implementation.
*/
export class ItemIterationAccumulatorInstance<I, O, N extends ItemIteration<I> = ItemIteration<I>> implements ItemIterationAccumulator<I, O, N>, Destroyable {

readonly latestSuccessfulState$: Observable<LoadingState<V>> = this.itemIteration.latestState$.pipe(
constructor(readonly itemIteration: N, readonly mapItem: ItemIterationAccumulatorMapFunction<I, O>) { }

readonly latestSuccessfulState$: Observable<LoadingState<I>> = this.itemIteration.latestState$.pipe(
filter(x => !loadingStateHasError(x)),
distinctUntilChanged(),
shareReplay(1)
Expand All @@ -48,7 +57,7 @@ export class ItemIterationAccumulatorInstance<V, I extends ItemIteration<V> = It
/**
* All successful page results in a single array.
*/
readonly allSuccessfulStates$: Observable<LoadingState<V>[]> = this.latestSuccessfulState$.pipe(
readonly allSuccessfulStates$: Observable<LoadingState<I>[]> = this.latestSuccessfulState$.pipe(
scanIntoArray({ immutable: false }),
distinctUntilArrayLengthChanges(),
shareReplay(1)
Expand All @@ -59,8 +68,19 @@ export class ItemIterationAccumulatorInstance<V, I extends ItemIteration<V> = It
shareReplay(1)
);

readonly allItems$: Observable<V[]> = this.allSuccessfulStates$.pipe(
// MARK: ItemIterationAccumulator
readonly allItems$: Observable<O[]> = this.allSuccessfulStates$.pipe(
scanBuildArray((allSuccessfulStates) => {
const mapStateToItem: (state: LoadingState<I>) => O = (state) => {
let result: Maybe<O>;

if (state.model != null) {
result = this.mapItem(state.model, state);
}

return result;
};

/*
We start with allSuccessfulPageResults$ since it contains all page results since the start of the iterator,
and subscription to allItems may not have started at the same time.
Expand All @@ -71,11 +91,11 @@ export class ItemIterationAccumulatorInstance<V, I extends ItemIteration<V> = It
*/
const allPageResultsUpToFirstSubscription = allSuccessfulStates;
const firstLatestState = lastValue(allPageResultsUpToFirstSubscription);
const seed: V[] = filterMaybeValues(allPageResultsUpToFirstSubscription.map(x => x.model));
const seed: O[] = filterMaybeValues(allPageResultsUpToFirstSubscription.map(mapStateToItem));

const accumulatorObs: Observable<V> = this.latestSuccessfulState$.pipe(
const accumulatorObs: Observable<O> = this.latestSuccessfulState$.pipe(
skipWhile(x => x === firstLatestState),
map(x => x.model),
map(mapStateToItem),
filterMaybe()
);

Expand All @@ -94,4 +114,19 @@ export class ItemIterationAccumulatorInstance<V, I extends ItemIteration<V> = It

}

export class PageItemIterationAccumulatorInstance<V, I extends PageItemIteration<V> = PageItemIteration<V>> extends ItemIterationAccumulatorInstance<V, I> implements PageItemIterationAccumulator<V>, Destroyable { }
/**
* Creates a new ItemAccumulator instance give the input ItemIteration.
*
* @param itemIteration
* @param mapItem
* @returns
*/
export function itemAccumulator<I, N extends ItemIteration<I>>(itemIteration: N): ItemIterationAccumulatorInstance<I, I, N>;
export function itemAccumulator<I, O, N extends ItemIteration<I>>(itemIteration: N, mapItem?: ItemIterationAccumulatorMapFunction<I, O>): ItemIterationAccumulatorInstance<I, O, N>;
export function itemAccumulator<I, O, N extends ItemIteration<I>>(itemIteration: N, mapItem?: ItemIterationAccumulatorMapFunction<I, O>): ItemIterationAccumulatorInstance<I, O, N> {
if (!mapItem) {
mapItem = (a) => a;
}

return new ItemIterationAccumulatorInstance<I, O, N>(itemIteration, mapItem);
}
2 changes: 1 addition & 1 deletion packages/rxjs/src/lib/iterator/iterator.mapped.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { ItemPageIteratorIterationInstance } from "./iterator.page";

export abstract class AbstractMappedPageItemIteration<I, O, M extends ItemPageIteratorIterationInstance<I, any, any>> implements PageItemIteration<O>, Destroyable {

constructor(private readonly _instance: M) { }
constructor(protected readonly _instance: M) { }

get maxPageLoadLimit() {
return this._instance.maxPageLoadLimit;
Expand Down

0 comments on commit 6afb408

Please sign in to comment.