-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdestroy-old-instances.ts
56 lines (47 loc) · 1.52 KB
/
destroy-old-instances.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import { BehaviorSubject, Observable, of } from "rxjs";
import { map, scan, endWith, filter } from "rxjs/operators";
import { createSubscriber } from "./subscriber";
class ElementWithResources {
private _flow$ = new BehaviorSubject(1);
constructor(public readonly id: number) {
this._flow$ = new BehaviorSubject(id);
}
public get flow$(): BehaviorSubject<number> {
return this._flow$;
}
public destroy(): void {
this._flow$.complete();
}
}
function createElementFlow(...ids: number[]): Observable<ElementWithResources> {
return of(...ids).pipe(
map(id => new ElementWithResources(id))
)
}
const flowOfElements$ = createElementFlow(1, 2);
function consumeElements(elements$: Observable<ElementWithResources>) {
elements$.subscribe(element => {
element.flow$.subscribe(createSubscriber(`Resource subscriber ${element.id}`));
});
}
consumeElements(flowOfElements$);
const flowWhichClearResources$ = createElementFlow(3, 4).pipe(
scan((old: ElementWithResources, current: ElementWithResources) => {
if (old) {
old.destroy();
}
return current;
})
);
consumeElements(flowWhichClearResources$);
const flowWhichClearAllResources$ = createElementFlow(5, 6).pipe(
endWith(undefined),
scan((old: ElementWithResources, current: ElementWithResources) => {
if (old) {
old.destroy();
}
return current;
}),
filter(element => !!element)
);
consumeElements(flowWhichClearAllResources$);