-
Notifications
You must be signed in to change notification settings - Fork 29
/
server.js
197 lines (160 loc) · 6.22 KB
/
server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
var Q = require('q');
var express = require('express');
var app = express();
var bodyParser = require('body-parser')
var server = require('http').createServer(app);
var io = require('socket.io').listen(server);
var redis = require('redis');
// var config = require('./libs/config');
var moment = require('moment');
var connections = 0;
var PORT = 8080;
var REDIS_PORT = 6379;
var REDIS_HOST = 'localhost';
var redisClient = redis.createClient(REDIS_PORT, REDIS_HOST)
var redisPublishClient = redis.createClient(REDIS_PORT, REDIS_HOST)
var channelWatchList = [];
// parse application/x-www-form-urlencoded
app.use(bodyParser.urlencoded())
// parse application/json
app.use(bodyParser.json())
// parse application/vnd.api+json as json
app.use(bodyParser.json({ type: 'application/vnd.api+json' }))
//Include the client files as well.
app.use(express.static(__dirname + '/client/RedisChat/www'));
server.listen(PORT, function() {
console.log('RedisChat now listening on port: ' + PORT + '\n');
});
var allowCrossDomain = function(req, res, next) {
res.header('Access-Control-Allow-Origin', '*');
res.header('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE');
res.header('Access-Control-Allow-Headers', 'Content-Type, Authorization');
// intercept OPTIONS method
if ('OPTIONS' == req.method) {
res.send(200);
}
else {
next();
}
};
app.use(allowCrossDomain);
app.post('/login', function(req, resp) {
// console.log(req);
// console.log(req.param('name'));
var name = req.param('name');
var password = req.param('password');
if(!name || !password) {
resp.send('No username or password given. Please give correct credientials');
return;
}
console.log('Password attempt by: ' + name + ' at: ' + moment() );
if(password && password == 'letmein') {
//Add redis key for users
var userKey = 'user:' + name;
redisClient.set(userKey, moment(), redis.print);
}
connections += 1;
resp.send({success: true, name: name, id: connections});
})
function getMessages(channel) {
var deferred = Q.defer();
var messageChannel = 'messages:' + channel;
console.log('getMessages',messageChannel);
var args1 = [ messageChannel, 0, -1 ];
redisClient.zrange(args1, function (err, response) {
// if (err) throw err;
console.log('zrange messages: ' + channel + ' 0 -1', response);
var messageList = [];
for(var i = 0, j = response.length; i < j; i++) {
messageList.push(JSON.parse(response[i]));
}
deferred.resolve(messageList);
});
return deferred.promise;
}
function getChannels() {
return channelWatchList;
}
function removeKeys() {
console.log('We are removing old messages');
for(var channelIndex in channelWatchList) {
var channel = channelWatchList[channelIndex];
var messageChannel = 'messages:' + channel;
console.log('message channel', messageChannel)
var timeToRemove = moment().subtract('m', 1).unix();
redisClient.zrangebyscore(messageChannel, 0, timeToRemove, function(err, result) {
// console.log('Removing: ', result);
if(result && result.length > 0) {
for (var resultIndex in result) {
var message = JSON.parse(result[resultIndex]);
// console.log('emitting: ', message);
io.emit('message:remove:channel:' + channel, { message: message, channel: channel });
}
}
});
redisClient.zremrangebyscore(messageChannel, 0, timeToRemove, function(err, result) {
console.log('Removed ', result, ' messages');
});
}
}
function populateChannels() {
console.log('Populating channels from channels key');
redisClient.zrangebyscore('channels', 0, '+inf', function(err, result) {
console.log('Channels: ', result);
for(var channelIndex in result) {
channelWatchList.push(result[channelIndex]);
}
});
}
populateChannels();
var cleanUpMesssagesInterval = setInterval(removeKeys, 6000);
io.on('connection', function(socket){
var counter = 0;
console.log('RedisChat - user connected');
socket.on('disconnect', function(){
console.log('RedisChat - user disconnected');
});
socket.on('user:joined', function(user) {
var message = user.name + ' joined the room';
io.emit('user:joined', {message: message, time: moment(), expires: moment().add(10) })
})
socket.on('message:send', function(message){
console.log('message: ' + message);
console.log(JSON.stringify(message));
// var messageKey = 'message:' + message.name;
// console.log('Storing key: ' + messageKey);
var messageObj = { message: message.message, name: message.name, time: moment(), expires: moment().add('m', 2).unix() };
// console.log('this: ' + JSON.stringify(messageObj));
// redisClient.set(messageKey, JSON.stringify(messageObj), redis.print);
// redisClient.expire(messageKey, 600);
console.log('storing to set: messages:' + message.channel);
redisClient.zadd('messages:' + message.channel, moment().add('m', 2).unix(), JSON.stringify(messageObj));
//Relay the message out to all who are listening.
io.emit('message:channel:' + message.channel, messageObj);
console.log('emited: ' + messageObj);
});
socket.on('channel:join', function(channelInfo) {
console.log('Channel joined - ', channelInfo.channel);
console.log(channelInfo);
redisClient.zadd('channels', 100, channelInfo.channel, redis.print);
console.log('Added to channels: ', channelInfo.channel);
redisClient.zadd('users:' + channelInfo.channel, 100, channelInfo.name, redis.print);
// redisClient.zadd('messages:' + channelInfo.channel, 100, channelInfo.channel, redis.print);
console.log('messages:' + channelInfo.channel);
// socket.emit('messages:channel:' + channelInfo.channel, )
//Add to watch to remove list.
// for(var i = 0, j = channelWatchList.length; i < j; i++) {
// if()
// }
if(channelWatchList.indexOf(channelInfo.channel) == -1) {
channelWatchList.push(channelInfo.channel);
}
socket.emit('channels', channelWatchList);
//Emit back any messages that havent expired yet.
getMessages(channelInfo.channel).then(function(data){
console.log('got messages');
// console.log(data);
socket.emit('messages:channel:' + channelInfo.channel, data);
});
});
});