-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
142 lines (139 loc) · 2.97 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
'use strict'
Object.defineProperty(exports, '__esModule', {value: true})
exports.default = makeDefer
exports.makeBroadcastStream = makeBroadcastStream
exports.makeSingleStream = makeSingleStream
function makeDefer() {
let resolve = undefined
let reject = undefined
const promise = new Promise((rs, rj) => {
resolve = rs
reject = rj
})
return {
resolve,
reject,
promise
}
}
function makeBroadcastStream() {
const listeners = []
let done = false
let defer = makeDefer()
return {
[Symbol.asyncIterator]() {
return {
async next() {
if (done)
return {value: undefined, done: true}
const value = await defer.promise
defer = makeDefer()
return value
},
async return(value) {
return {value: undefined, done: true}
},
async throw(e) {
return {value: undefined, done: true}
}
}
},
listen(onNext, {onError, onDone} = {}) {
if (done) {
onDone === null || onDone === void 0 ? void 0 : onDone()
return () => {
}
}
const listener = {onNext, onError, onDone}
listeners.push(listener)
return function removeListener() {
const idx = listeners.lastIndexOf(listener)
if (idx >= 0)
listeners.splice(idx, 1)
}
},
next(value) {
if (done)
return
defer.resolve({value, done: false})
for (const {onNext} of listeners)
onNext(value)
},
throw(error) {
if (done)
return
done = true
defer.reject(error)
for (const {onError} of listeners)
onError === null || onError === void 0 ? void 0 : onError(error)
},
done() {
if (done)
return
done = true
defer.resolve({value: undefined, done: true})
for (const {onDone} of listeners)
onDone === null || onDone === void 0 ? void 0 : onDone()
},
}
}
function makeSingleStream() {
// at least one of chunk or defers is empty, all the time
const chunk = []
const defers = []
let done = false
let error
return {
[Symbol.asyncIterator]() {
return {
async next() {
if (chunk.length)
return chunk.shift()
if (done) {
if (error)
throw error
return {value: undefined, done: true}
}
// chunk must currently be empty, it is safe to make defers non-empty
const defer = makeDefer()
defers.push(defer)
return defer.promise
},
async return(value) {
return {value: undefined, done: true}
},
async throw(e) {
return {value: undefined, done: true}
}
}
},
next(value) {
if (done)
return
if (defers.length) {
const defer = defers.shift()
defer.resolve({value, done: false})
return
}
// defers must currently be empty, it is safe to make chunk non-empty
chunk.push({value, done: false})
},
throw(err) {
if (done)
return
done = true
error = err
for (const defer of defers)
defer.reject(err)
defers.length = 0
},
done() {
if (done)
return
done = true
for (const defer of defers)
defer.resolve({value: undefined, done: true})
defers.length = 0
},
}
}