-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathserver.js
172 lines (150 loc) · 5.24 KB
/
server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
const http = require('http');
const eetase = require('eetase');
const socketClusterServer = require('socketcluster-server');
const Action = require('socketcluster-server/action');
const socketClusterClient = require('socketcluster-client');
const uuid = require('uuid');
const packageVersion = require('./package.json').version;
const url = require('url');
const express = require('express');
const DEFAULT_PORT = 8888;
const PORT = Number(process.env.SCC_BROKER_SERVER_PORT) || DEFAULT_PORT;
const SCC_INSTANCE_ID = uuid.v4();
const SCC_STATE_SERVER_HOST = process.env.SCC_STATE_SERVER_HOST;
const SCC_STATE_SERVER_PORT = Number(process.env.SCC_STATE_SERVER_PORT) || 7777;
const SCC_INSTANCE_IP = process.env.SCC_INSTANCE_IP || null;
const SCC_INSTANCE_IP_FAMILY = process.env.SCC_INSTANCE_IP_FAMILY || 'IPv4';
const SCC_AUTH_KEY = process.env.SCC_AUTH_KEY || null;
const RETRY_DELAY = Number(process.env.SCC_BROKER_SERVER_RETRY_DELAY) || 2000;
const STATE_SERVER_CONNECT_TIMEOUT = Number(process.env.SCC_STATE_SERVER_CONNECT_TIMEOUT) || 3000;
const STATE_SERVER_ACK_TIMEOUT = Number(process.env.SCC_STATE_SERVER_ACK_TIMEOUT) || 2000;
const BROKER_SERVER_CONNECT_TIMEOUT = Number(process.env.SCC_BROKER_SERVER_CONNECT_TIMEOUT) || 10000;
const BROKER_SERVER_ACK_TIMEOUT = Number(process.env.SCC_BROKER_SERVER_ACK_TIMEOUT) || 10000;
const BROKER_SERVER_WS_ENGINE = process.env.SCC_BROKER_SERVER_WS_ENGINE || 'ws';
const SECURE = !!process.env.SCC_BROKER_SERVER_SECURE;
const RECONNECT_RANDOMNESS = 1000;
/**
* Log levels:
* 3 - log everything
* 2 - warnings and errors
* 1 - errors only
* 0 - log nothing
*/
let LOG_LEVEL;
if (typeof process.env.SCC_BROKER_SERVER_LOG_LEVEL !== 'undefined') {
LOG_LEVEL = Number(process.env.SCC_BROKER_SERVER_LOG_LEVEL);
} else {
LOG_LEVEL = 1;
}
if (!SCC_STATE_SERVER_HOST) {
throw new Error(
'No SCC_STATE_SERVER_HOST was specified - This should be provided ' +
'through the SCC_STATE_SERVER_HOST environment variable'
);
}
let agOptions = {
wsEngine: BROKER_SERVER_WS_ENGINE,
socketChannelLimit: null,
connectTimeout: BROKER_SERVER_CONNECT_TIMEOUT,
ackTimeout: BROKER_SERVER_ACK_TIMEOUT
};
if (process.env.SOCKETCLUSTER_OPTIONS) {
Object.assign(agOptions, JSON.parse(process.env.SOCKETCLUSTER_OPTIONS));
}
let httpServer = eetase(http.createServer());
let agServer = socketClusterServer.attach(httpServer, agOptions);
if (SCC_AUTH_KEY) {
agServer.setMiddleware(agServer.MIDDLEWARE_HANDSHAKE, async (middlewareStream) => {
for await (let action of middlewareStream) {
if (action.type === Action.HANDSHAKE_WS) {
let urlParts = url.parse(action.request.url, true);
if (!urlParts.query || urlParts.query.authKey !== SCC_AUTH_KEY) {
let err = new Error('Cannot connect to the cluster broker server without providing a valid authKey as a URL query argument.');
err.name = 'BadClusterAuthError';
action.block(err);
continue;
}
}
action.allow();
}
});
}
if (LOG_LEVEL >= 2) {
agServer.setMiddleware(agServer.MIDDLEWARE_INBOUND, async (middlewareStream) => {
for await (let action of middlewareStream) {
if (action.type === Action.SUBSCRIBE) {
console.log(`${action.socket.remoteAddress} subscribed to ${action.channel}`);
} else if (action.type === Action.PUBLISH_IN) {
if (LOG_LEVEL >= 3) {
console.log(`${action.socket.remoteAddress} published to ${action.channel}`);
}
}
action.allow();
}
});
}
let expressApp = express();
// Add GET /health-check express route
expressApp.get('/health-check', (req, res) => {
res.status(200).send('OK');
});
// HTTP request handling loop.
(async () => {
for await (let requestData of httpServer.listener('request')) {
expressApp.apply(null, requestData);
}
})();
let connectToClusterStateServer = function () {
let agStateSocketOptions = {
hostname: SCC_STATE_SERVER_HOST,
port: SCC_STATE_SERVER_PORT,
connectTimeout: STATE_SERVER_CONNECT_TIMEOUT,
ackTimeout: STATE_SERVER_ACK_TIMEOUT,
autoReconnectOptions: {
initialDelay: RETRY_DELAY,
randomness: RECONNECT_RANDOMNESS,
multiplier: 1,
maxDelay: RETRY_DELAY + RECONNECT_RANDOMNESS
},
query: {
authKey: SCC_AUTH_KEY,
instancePort: PORT,
instanceType: 'scc-broker',
version: packageVersion
}
};
let stateSocket = socketClusterClient.create(agStateSocketOptions);
(async () => {
for await (let {error} of stateSocket.listener('error')) {
if (LOG_LEVEL >= 1) {
console.error(error);
}
}
})();
let stateSocketData = {
instanceId: SCC_INSTANCE_ID,
instanceIp: SCC_INSTANCE_IP,
instanceIpFamily: SCC_INSTANCE_IP_FAMILY,
instanceSecure: SECURE
};
let emitJoinCluster = async () => {
try {
await stateSocket.invoke('sccBrokerJoinCluster', stateSocketData);
} catch (err) {
setTimeout(emitJoinCluster, RETRY_DELAY);
}
};
(async () => {
for await (let event of stateSocket.listener('connect')) {
emitJoinCluster();
}
})();
};
(async () => {
await httpServer.listener('listening').once();
if (LOG_LEVEL >= 3) {
console.log(`The scc-broker instance is listening on port ${PORT}`);
}
connectToClusterStateServer();
})();
httpServer.listen(PORT);