-
Notifications
You must be signed in to change notification settings - Fork 32
/
Copy pathindex.js
190 lines (159 loc) · 5.64 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
'use strict';
module.exports = tileReduce;
var EventEmitter = require('events').EventEmitter;
var cpus = require('os').cpus().length;
var vm = require('vm');
var fs = require('fs');
var fork = require('child_process').fork;
var path = require('path');
var binarysplit = require('binary-split');
var cover = require('./cover');
var streamArray = require('stream-array');
var MBTiles = require('@mapbox/mbtiles');
var through = require('through2');
// Suppress max listener warnings. We need at least 1 listener per worker.
process.stderr.setMaxListeners(0);
function tileReduce(options) {
var ee = new EventEmitter();
var workers = ee.workers = [];
var workersReady = 0;
var maxWorkers = Math.min(cpus, options.maxWorkers || cpus);
var output = options.output || process.stdout;
var tileStream = null;
var tilesDone = 0;
var tilesSent = 0;
var pauseLimit = options.batch || 5000;
var start = Date.now();
var timer;
// Validate syntax in the map script to fail faster
try {
new vm.Script(fs.readFileSync(options.map), {filename: options.map}); // eslint-disable-line
} catch (e) {
if (e instanceof SyntaxError) {
e.message = 'tile-reduce found a syntax error in your map script: ' + options.map + '\n\n' + e.message;
throw e;
} else if (e instanceof Error) {
e.message = 'tile-reduce was unable to find or require your map script: ' + options.map + '\n\n' + e.message;
throw e;
}
}
if (options.tileStream) {
// Pass through a dummy pipe. This ensures the stream is in the proper mode.
// See last paragraph of the 'classic readable streams' section at
// https://github.com/substack/stream-handbook#classic-readable-streams
options.tileStream = options.tileStream.pipe(through.obj());
}
log('Starting up ' + maxWorkers + ' workers... ');
if (output) output.setMaxListeners(0);
var mapOptions = options.mapOptions || {};
for (var i = 0; i < maxWorkers; i++) {
var worker = fork(path.join(__dirname, 'worker.js'), [options.map, JSON.stringify(options.sources), JSON.stringify(mapOptions)], {silent: true});
worker.stdout.pipe(binarysplit('\x1e')).pipe(output);
worker.stderr.pipe(process.stderr);
worker.on('message', handleMessage);
workers.push(worker);
}
function handleMessage(message) {
if (message.reduce) reduce(message.value, message.tile);
else if (message.ready && ++workersReady === workers.length) run();
}
function run() {
log('Job started.\n');
ee.emit('start');
timer = setInterval(updateStatus, 64);
var tiles = cover(options);
if (tiles) {
// JS tile array, GeoJSON or bbox
log('Processing ' + tiles.length + ' tiles.\n');
tileStream = streamArray(tiles)
.on('data', handleTile)
.on('end', streamEnded);
} else if (options.tileStream) {
log('Processing tile coords from tile stream.\n');
tileStream = options.tileStream;
tileStream
.on('data', handleTileStreamLine)
.on('end', streamEnded)
.resume();
} else {
// try to get tiles from mbtiles (either specified by sourceCover or first encountered)
var source;
for (var i = 0; i < options.sources.length; i++) {
source = options.sources[i];
if (options.sources[i].mbtiles && (!options.sourceCover || options.sourceCover === source.name)) break;
source = null;
}
if (source) {
log('Processing tile coords from "' + source.name + '" source.\n');
var db = new MBTiles(source.mbtiles, function(err) {
if (err) throw err;
tileStream = db.createZXYStream()
.pipe(binarysplit('\n'))
.on('data', handleZXYLine)
.on('end', streamEnded);
});
} else {
throw new Error(options.sourceCover ?
'Specified source for cover not found.' :
'No area or tiles specified for the job.');
}
}
}
var paused = false;
var ended = false;
function streamEnded() {
ended = true;
if (tilesDone === tilesSent) shutdown();
}
function handleTile(tile) {
var workerId = tilesSent++ % workers.length;
ee.emit('map', tile, workerId);
workers[workerId].send(tile);
if (!paused && tilesSent - tilesDone > pauseLimit) {
paused = true;
tileStream.pause();
}
}
function handleTileStreamLine(line) {
var tile = line;
if (typeof line === 'string' || line instanceof Buffer) {
tile = line.toString().split(' ');
}
handleTile(tile.map(Number));
}
function handleZXYLine(line) {
var tile = line.toString().split('/');
handleTile([+tile[1], +tile[2], +tile[0]]);
}
function reduce(value, tile) {
if (value !== null && value !== undefined) ee.emit('reduce', value, tile);
if (paused && tilesSent - tilesDone < (pauseLimit / 2)) {
paused = false;
tileStream.resume();
}
if (++tilesDone === tilesSent && ended) shutdown();
}
function shutdown() {
while (workers.length) workers.pop().kill('SIGHUP');
clearTimeout(timer);
updateStatus();
log('.\n');
ee.emit('end');
}
/* istanbul ignore next */
function updateStatus() {
if (options.log === false || !process.stderr.cursorTo) return;
var s = Math.floor((Date.now() - start) / 1000);
var h = Math.floor(s / 3600);
var m = Math.floor((s - h * 3600) / 60);
var time = (h ? h + 'h ' : '') + (h || m ? m + 'm ' : '') + (s % 60) + 's';
process.stderr.cursorTo(0);
process.stderr.write(tilesDone + ' tiles processed in ' + time);
process.stderr.clearLine(1);
}
/* istanbul ignore next */
function log(str) {
if (options.log !== false) process.stderr.write(str);
}
return ee;
}