Skip to content

Commit

Permalink
feat: added onMatchDelta
Browse files Browse the repository at this point in the history
- added onTrueToFalse, onFalseToTrue
  • Loading branch information
dereekb committed Mar 24, 2022
1 parent b4d4b6b commit e36fb4c
Show file tree
Hide file tree
Showing 5 changed files with 302 additions and 2 deletions.
65 changes: 65 additions & 0 deletions packages/rxjs/src/lib/rxjs/boolean.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { skipFirstMaybe } from './value';
import { Maybe } from '@dereekb/util';
import { BehaviorSubject, Subject } from 'rxjs';
import { first, tap } from 'rxjs/operators';
import { SubscriptionObject } from '../subscription';
import { onFalseToTrue, onTrueToFalse } from './boolean';

describe('boolean operators', () => {

let subject: Subject<boolean>;
let sub: SubscriptionObject;

beforeEach(() => {
subject = new Subject<boolean>();
sub = new SubscriptionObject();
})

afterEach(() => {
sub.destroy();
subject.complete();
});

describe('onTrueToFalse', () => {

const from = true;
const to = false;

it('should emit when "true" becomes "false"', (done) => {

sub.subscription = subject.pipe(
onTrueToFalse(),
first()
).subscribe((value) => {
expect(value).toBe(to);
done();
});

subject.next(from);
subject.next(to);
});

});

describe('onFalseToTrue', () => {

const from = false;
const to = true;

it('should emit when "false" becomes "true"', (done) => {

sub.subscription = subject.pipe(
onFalseToTrue(),
first()
).subscribe((value) => {
expect(value).toBe(to);
done();
});

subject.next(from);
subject.next(to);
});

});

})
34 changes: 32 additions & 2 deletions packages/rxjs/src/lib/rxjs/boolean.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,38 @@
import { OperatorFunction, identity } from "rxjs";
import { OperatorFunction, identity, MonoTypeOperatorFunction, map } from "rxjs";
import { onMatchDelta } from "./delta";

/**
* Returns the pipe if usePipe is true, otherwise returns the identity.
*/
export function pipeIf<A>(usePipe: boolean, pipe: OperatorFunction<A, A>): OperatorFunction<A, A> {
export function pipeIf<A>(usePipe: boolean, pipe: OperatorFunction<A, A>): OperatorFunction<A, A> {
return (usePipe) ? pipe : identity;
}

/**
* Maps the opposite value of the input boolean.
*/
export function isNot(): MonoTypeOperatorFunction<boolean> {
return map(x => !x);
}

/**
* Emits a value when moving from a true value to a false value.
*/
export function onTrueToFalse(): MonoTypeOperatorFunction<boolean> {
return onMatchDelta({
from: true,
to: false,
requireConsecutive: true
});
}

/**
* Emits a value when moving from a false value to a true value.
*/
export function onFalseToTrue(): MonoTypeOperatorFunction<boolean> {
return onMatchDelta({
from: false,
to: true,
requireConsecutive: true
});
}
99 changes: 99 additions & 0 deletions packages/rxjs/src/lib/rxjs/delta.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { filterMaybe } from '@dereekb/rxjs';
import { SubscriptionObject } from './../subscription';
import { of, timeout } from 'rxjs';
import { Subject } from 'rxjs';
import { first, tap } from 'rxjs/operators';
import { onMatchDelta } from './delta';

describe('onMatchDelta', () => {

const from = 0;
const to = 1;

let subject: Subject<number>;
let sub: SubscriptionObject;

beforeEach(() => {
subject = new Subject<number>();
sub = new SubscriptionObject();
})

afterEach(() => {
sub.destroy();
subject.complete();
});

describe('requireConsecutive=true', () => {

it('should emit if the prevous value is equal to "from" and the current value is equal to "to".', (done) => {
sub.subscription = subject.pipe(
onMatchDelta({
from,
to,
requireConsecutive: true
}),
first()
).subscribe((value) => {
expect(value).toBe(to);
done();
});

subject.next(from);
subject.next(to);
});

it('should not emit if the prevous value is not equal to "from" and the current value is equal to "to".', (done) => {
sub.subscription = subject.pipe(
onMatchDelta({
from,
to,
requireConsecutive: true
}),
first(),
timeout({
first: 1000,
with: () => of(null as any as number).pipe(
tap(() => done()),
filterMaybe()
)
})
).subscribe(() => {
fail();
});

subject.next(from);
subject.next(2);
subject.next(to);
});

});

describe('requireConsecutive=false', () => {

it('should should emit once the target "from" value has been seen once.', () => {
sub.subscription = subject.pipe(
onMatchDelta({
from,
to,
requireConsecutive: true
}),
first(),
timeout({
first: 1000,
with: () => of(null as any as number).pipe(
tap(() => fail()),
filterMaybe()
)
})
).subscribe((value) => {
expect(value).toBe(to);
});

subject.next(from);
subject.next(2);
subject.next(to);
});

});

});
105 changes: 105 additions & 0 deletions packages/rxjs/src/lib/rxjs/delta.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import { filter, first, map, MonoTypeOperatorFunction, Observable, scan, skipUntil } from 'rxjs';

/**
* onDelta function configuration.
*/
export interface OnMatchDeltaConfig<T> {
/**
* The first from value.
*/
from: T;
/**
* The target value to recieve after the first.
*/
to: T;
/**
* Comparison function to compare equality between the emission and the target values.
*
* isMatch is checked for each value, and at the time a match is found, allowing a double check to occur on the from target value.
*/
isMatch?: (a: T, b: T) => boolean;
/**
* Whether or not the two values must be emitted consencutively.
*
* For example, if requiredConsecutive=true and we are waiting for 1 -> 2, and the emissions are 1,0,2, the observable function will not emit 2.
*/
requireConsecutive?: boolean;
}

interface OnMatchDeltaScan<T> {
/**
* Whether or not to emit.
*/
emit: boolean; // null for the initial value.
/**
* Whether or not a from match has been hit.
*
* In cases of requireConsecutive=false, this value retains true until emit occurs.
*/
fromMatch: boolean;
/**
* The current fromMatch value.
*/
value: T;
}

/**
* Emits a value when going from one matching value to a target value.
*
* The first value must be determined first before the second is raised.
*/
export function onMatchDelta<T>(config: OnMatchDeltaConfig<T>): MonoTypeOperatorFunction<T> {
const { isMatch: inputIsSame, from, to, requireConsecutive } = config;
const isMatch = inputIsSame ?? ((a: T, b: T) => a === b);

return (obs: Observable<T>) => {
return obs.pipe(
scan((acc: OnMatchDeltaScan<T>, next: T, index: number) => {
let emit: boolean = false;
let fromMatch: boolean = acc.fromMatch;
let value!: T;

// If we do have a match check the next value is a match for delta emission.
if (acc.fromMatch) {
const toMatch = isMatch(to, next);

if (toMatch) {

// if the two value matches, check fromMatch once more
fromMatch = isMatch(from, acc.value);

// emit if both are in agreement
emit = fromMatch && toMatch;

if (emit) {

// set the emit value
value = next;

// set fromMatch for the followup emission
fromMatch = isMatch(from, next);
}
}
}

// If we aren't emitting, update fromMatch/value depending on current state.
if (!emit) {

// if we don't have a from match yet or we require consecutive successes, check next as the from value.
if (!acc.fromMatch || requireConsecutive) {
fromMatch = isMatch(from, next);
value = next;
}
}

return {
emit,
value,
fromMatch
};
}, { emit: false, fromMatch: false, value: 0 as any }),
filter(({ emit }) => Boolean(emit)),
map(({ value }) => value)
);
};
}
1 change: 1 addition & 0 deletions packages/rxjs/src/lib/rxjs/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export * from './array';
export * from './boolean';
export * from './delta';
export * from './getter';
export * from './loading';
export * from './misc';
Expand Down

0 comments on commit e36fb4c

Please sign in to comment.