-
Notifications
You must be signed in to change notification settings - Fork 30
/
Copy pathindex.js
119 lines (110 loc) · 3.42 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
const { pipeline } = require('node:stream/promises')
const { Transform } = require('node:stream')
const jsonpour = require('jsonpour')
const package = require('./package.json')
const ccurllib = require('ccurllib')
const couchimport = async (opts) => {
// mandatory parameters
if (!opts.url || !opts.database) {
throw new Error('must supply url and database')
}
// streams
opts.rs = opts.rs || process.stdin
opts.ws = opts.ws || process.stdout
// buffer of documents waiting to be written
const batch = []
opts.batch = opts.batch > 1 ? opts.batch : 500
// status - the progress of the insert
const status = {
batch: 0,
batchSize: 0,
docSuccessCount: 0,
docFailCount: 0,
statusCodes: { },
errors: {}
}
// a Node.js stream transformer that takes a stream of individual
// changes and groups them into batches of opts.buffer except the
// last batch which may be smaller.
const batcher = new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform (obj, _, callback) {
// push the change into our batch array
batch.push(obj)
// if we have at least a full batch
if (batch.length >= opts.buffer) {
// send a full batch to the next thing in the pipeline
this.push(batch.splice(0, opts.buffer))
}
callback()
},
flush (callback) {
// handle the any remaining buffered data
if (batch.length > 0) {
// send anything left as a final batch
this.push(batch)
}
callback()
}
})
// a Node.js stream transformer that takes a stream of individual
// changes and groups them into batches of opts.buffer except the
// last batch which may be smaller.
const writer = new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform (obj, _, callback) {
// push the change into our batch array
const req = {
method: 'post',
url: `${opts.url}/${opts.database}/_bulk_docs`,
body: JSON.stringify({ docs: obj }),
headers: {
'user-agent': `${package.name}/${package.version}`,
'content-type': 'application/json'
}
}
status.batch++
status.batchSize = obj.length
ccurllib.request(req).then((response) => {
if (!status.statusCodes[response.status]) {
status.statusCodes[response.status] = 0
}
status.statusCodes[response.status]++
if (response.status < 400) {
// the status codes doesn't tell the whole storry, we have
// to inspect each of the array of responses to see if a
// document actually got insterted or not.
for(const r of response.result) {
if (r.ok) {
status.docSuccessCount++
} else {
status.docFailCount++
if (!status.errors[r.error]) {
status.errors[r.error] = 0
}
status.errors[r.error]++
}
}
} else {
// if we got an HTTP code >= 400 then all the inserts failed
status.docFailCount += obj.length
}
this.push(`written ${JSON.stringify(status)}\n`)
callback()
})
}
})
// stream every object from the results array via a filter to stdout
await pipeline(
opts.rs,
jsonpour.parse(),
batcher,
writer,
opts.ws,
{ end: false }
)
return status
}
module.exports = couchimport