-
Notifications
You must be signed in to change notification settings - Fork 3k
/
Copy pathsubscribeToResult.ts
84 lines (81 loc) · 3.07 KB
/
subscribeToResult.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import { root } from './root';
import { isArrayLike } from './isArrayLike';
import { isPromise } from './isPromise';
import { isObject } from './isObject';
import { Subscriber } from '../Subscriber';
import { Observable, ObservableInput } from '../Observable';
import { iterator as Symbol_iterator } from '../symbol/iterator';
import { Subscription } from '../Subscription';
import { InnerSubscriber } from '../InnerSubscriber';
import { OuterSubscriber } from '../OuterSubscriber';
import { observable as Symbol_observable } from '../symbol/observable';
export function subscribeToResult<T, R>(outerSubscriber: OuterSubscriber<T, R>,
result: any,
outerValue?: T,
outerIndex?: number): Subscription;
export function subscribeToResult<T>(outerSubscriber: OuterSubscriber<any, any>,
result: ObservableInput<T>,
outerValue?: T,
outerIndex?: number): Subscription {
let destination: Subscriber<any> = new InnerSubscriber(outerSubscriber, outerValue, outerIndex);
if (destination.closed) {
return null;
}
if (result instanceof Observable) {
if (result._isScalar) {
destination.next((<any>result).value);
destination.complete();
return null;
} else {
return result.subscribe(destination);
}
} else if (isArrayLike(result)) {
for (let i = 0, len = result.length; i < len && !destination.closed; i++) {
destination.next(result[i]);
}
if (!destination.closed) {
destination.complete();
}
} else if (isPromise(result)) {
result.then(
(value) => {
if (!destination.closed) {
destination.next(<any>value);
destination.complete();
}
},
(err: any) => destination.error(err)
)
.then(null, (err: any) => {
// Escaping the Promise trap: globally throw unhandled errors
root.setTimeout(() => { throw err; });
});
return destination;
} else if (result && typeof result[Symbol_iterator] === 'function') {
const iterator = <any>result[Symbol_iterator]();
do {
let item = iterator.next();
if (item.done) {
destination.complete();
break;
}
destination.next(item.value);
if (destination.closed) {
break;
}
} while (true);
} else if (result && typeof result[Symbol_observable] === 'function') {
const obs = result[Symbol_observable]();
if (typeof obs.subscribe !== 'function') {
destination.error(new TypeError('Provided object does not correctly implement Symbol.observable'));
} else {
return obs.subscribe(new InnerSubscriber(outerSubscriber, outerValue, outerIndex));
}
} else {
const value = isObject(result) ? 'an invalid object' : `'${result}'`;
const msg = `You provided ${value} where a stream was expected.`
+ ' You can provide an Observable, Promise, Array, or Iterable.';
destination.error(new TypeError(msg));
}
return null;
}