-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathqueue.js
85 lines (73 loc) · 1.53 KB
/
queue.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/**
* Queue middleware
*
* Batches up sequential put operations.
*
* This can lead to a speedup of about 400%,
* sacrificing individual response time for put
* operations.
*
* $ node bench/queue.js
*
* put : 133,630 w/s in 898ms
*
* $ node bench/queue.js queue
*
* queue : 446,096 w/s in 269ms
*
*/
module.exports = function (leveled) {
if (!leveled) throw new Error('leveled required')
var queue = new Queue(leveled)
return function queue (req, res, next) {
if (req.method != 'put') return next();
queue.push({
key : req.key,
val : req.val,
cb : res.end
})
}
}
/**
* Queue
*
* @param {Leveled} leveled
*/
function Queue(leveled) {
this.leveled = leveled
this.queue = []
this.processing = false
}
/**
* Add an operation to the queue
*
* @param {Object} obj
* @param {String} obj.key
* @param {String} obj.val
* @param {Function=} obj.cb
*/
Queue.prototype.push = function (obj) {
this.queue.push(obj)
if (!this.processing) this.process()
}
/**
* Process all the items in the queue in a batch
*/
Queue.prototype.process = function () {
var self = this
self.processing = true
var queue = self.queue.slice()
self.queue = []
var batch = self.leveled.batch()
var len = queue.length
for (var i = 0; i < len; i++) {
batch.put(queue[i].key, queue[i].val)
}
batch.write(function (err) {
for (var i = 0; i < len; i++) {
if (queue[i].cb) queue[i].cb(err)
}
self.processing = false
if (self.queue.length) self.process()
})
}