-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
96 lines (75 loc) · 2.12 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
const build = require('pino-abstract-transport');
const undici = require('undici');
function defaultParseLine (line) {
const obj = JSON.parse(line);
const d = new Date();
return {
...obj,
logId: Math.floor(Math.random() * 100000),
time: d.getTime(),
dt: d.toISOString(),
};
}
module.exports = async function (options) {
let batchDataToSend = [];
let autoFlushTimeout;
const debugLog = (msg) => {
if (!options.debug) {
return;
}
process._rawDebug(msg);
}
const sendLogsToLogtail = async () => {
const body = JSON.stringify(batchDataToSend);
batchDataToSend = [];
debugLog("Sending: " + body);
const res = await undici.fetch('https://in.logtail.com', {
body,
method: 'POST',
headers: {
['content-type']: 'application/json',
['authorization']: `Bearer ${options.logtailToken}`,
},
});
debugLog("Response: " + res.status)
}
const send = async (log) => {
batchDataToSend.push(log);
// If set, reset flush timeout, we're already trying to send
if (autoFlushTimeout) {
clearTimeout(autoFlushTimeout);
}
if (batchDataToSend.length >= 10) {
debugLog("Time to send: " + batchDataToSend.length);
return sendLogsToLogtail();
}
autoFlushTimeout = setTimeout(() => {
debugLog("Flushing after 1 sec from last send...");
if (batchDataToSend.length === 0) return;
const noOp = () => {};
sendLogsToLogtail().then(noOp);
}, 1000);
debugLog("Not time to send: " + batchDataToSend.length);
return;
}
const parseLine = typeof options.parseLine === 'function' ? options.parseLine : defaultParseLine;
if(!options.logtailToken) {
throw new Error("Missing Logtail Authorization Token!");
}
return build(async function (source) {
for await (let obj of source) {
await send(obj)
}
}, {
parseLine,
// Handle transport shutdown
close(err, cb) {
if(err) {
debugLog("Error during the write: " + err);
cb();
}
debugLog("Forcing flush before close");
sendLogsToLogtail().then(cb);
}
})
}