-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathlive_processing_app.py
343 lines (283 loc) · 10.7 KB
/
live_processing_app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
import tweepy
import json
import sys
from sklearn.cluster import KMeans
import numpy as np
import random
import re
import gensim
import nltk
from keras.preprocessing import sequence
import pickle
from collections import defaultdict
import pandas as pd
import math
import time
import warnings
warnings.filterwarnings("ignore")
STOPWORDS = nltk.corpus.stopwords.words('english')
emoticons_str = r"""
(?:
[:=;] # Eyes
[oO\-]? # Nose (optional)
[D\)\]\(\]/\\OpP] # Mouth
)"""
regex_str = [
emoticons_str,
r'<[^>]+>', # HTML tags
r'(?:@[\w_]+)', # @-mentions
r"(?:\#+[\w_]+[\w\'_\-]*[\w_]+)", # hash-tags
r'http[s]?://(?:[a-z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-f][0-9a-f]))+', # URLs
r'(?:(?:\d+,?)+(?:\.?\d+)?)', # numbers
r"(?:[a-z][a-z'\-_]+[a-z])", # words with - and '
r'(?:[\w_]+)' # other words
# r'(?:\S)' # anything else
]
number_str = r'(?:(?:\d+,?)+(?:\.?\d+)?)'
tokens_re = re.compile(r'(' + '|'.join(regex_str) + ')', re.VERBOSE | re.IGNORECASE)
emoticon_re = re.compile(r'^' + emoticons_str + '$', re.VERBOSE | re.IGNORECASE)
number_re = re.compile(r'^' + number_str + '$', re.VERBOSE | re.IGNORECASE)
CONSUMER_KEY = 'Enter you Key'
CONSUMER_SECRET = 'Enter you Key'
ACCESS_TOKEN = 'Enter you Key'
ACCESS_SECRET = 'Enter you Key'
BUFFER = 500
NUMBER_CLUSTERS = 3
RANK_THRESHOLD = {}
RANK_THRESHOLD[0] = 10000
RANK_THRESHOLD[1] = 1000
RANK_THRESHOLD[2] = 1000
TWEETER_FEEDS = [
'nytimes', 'thesun', 'thetimes', 'ap', 'cnn',
'bbcnews', 'cnet', 'msnuk', 'telegraph']
def tokenize(s):
return tokens_re.findall(s)
def preprocess(s):
tokens = tokenize(s)
# tokens = [token.lower() if emoticon_re.search(token) == False and token not in STOPWORDS else for token in tokens]
tokens = map(lambda token: token.lower(),
filter(lambda token: emoticon_re.search(token) is None
and token not in STOPWORDS
and token.find('http') == -1
and number_re.search(token) is None
, tokens))
return tokens
def load_google_word2vec_model(path_google_word2vec):
google_model = gensim.models.KeyedVectors.load_word2vec_format(path_google_word2vec,
binary=True)
return google_model
def process_features(tweet_objects, google_model):
max_review_length = 7500
tweet_vectors = []
for data_dict in tweet_objects:
sentence = preprocess(data_dict['text'])
sentence_vector = []
for word in sentence:
try:
tmp_vec = google_model.word_vec(word).tolist() # np.array([1,1,1,1]).tolist()
except:
e = sys.exc_info()[0]
# print "<p>Error: %s</p>" % e
tmp_vec = []
sentence_vector += tmp_vec
dummy_list = []
dummy_list.append(sentence_vector)
sentence_vector = dummy_list
# print sentence_vector
sentence_vector = sequence.pad_sequences(sentence_vector,
maxlen=max_review_length,
padding='post', truncating='post',
dtype='float32')
tweet_vectors.append(sentence_vector[0])
return tweet_vectors
def _get_clusters(X, model):
clus = model.predict(X)
return clus
def _get_features_for_score(tweet_objects):
fields_dict = defaultdict(list)
for data_dict in tweet_objects:
# print '\n'.join(np.unique(data_dict.keys()))
fields_dict['favorite_count'].append(
int(data_dict['favorite_count']) if data_dict.get('favorite_count') is not None else 0)
fields_dict['favorited'].append(
bool(data_dict['favorited']) if data_dict.get('favorited') is not None else False)
fields_dict['possibly_sensitive'].append(
bool(data_dict['favorited']) if data_dict.get('possibly_sensitive') is not None else False)
fields_dict['retweet_count'].append(int(data_dict['retweet_count']))
fields_dict['retweet_count_ln'].append(math.log(int(data_dict['retweet_count']) + 1))
fields_dict['retweeted'].append(bool(data_dict['retweeted']))
fields_dict['retweeted_status'].append(1 if data_dict.get('retweeted_status') is not None else 0)
entities_encode = [0, 0, 0]
if len(data_dict['entities']['hashtags']) > 0:
entities_encode[0] = 1
if len(data_dict['entities']['urls']) > 0:
entities_encode[1] = 1
if len(data_dict['entities']['user_mentions']) > 0:
entities_encode[2] = 1
fields_dict['entities'].append(entities_encode)
X = pd.DataFrame()
X['favorited'] = fields_dict['favorited']
X['retweeted_status'] = fields_dict['retweeted_status']
X['retweeted'] = fields_dict['retweeted']
X['entities_h'] = map(lambda x: x[0], fields_dict['entities'])
X['entities_u'] = map(lambda x: x[0], fields_dict['entities'])
X['entities_m'] = map(lambda x: x[0], fields_dict['entities'])
X['favorite_count'] = fields_dict['favorite_count']
X['possibly_sensitive'] = fields_dict['possibly_sensitive']
y = pd.DataFrame()
y['target'] = fields_dict['retweet_count_ln']
y_true = pd.DataFrame()
y_true['count'] = fields_dict['retweet_count']
return X, y
def _get_tweet_scores(tweet_objects, model):
X, y = _get_features_for_score(tweet_objects)
pred = model.predict(X)
return pred
class rank_object:
def __init__(self):
self.posted = True
self.score = -1
self.text = ''
self.id = -1
self.created_at = ''
self.url = ''
self.retweet_count = -1
self.weighted_score = -1
self.retweet_id = None
def find_top_tweets(clusters, scores, tweet_objects):
rank_dict = {}
for i in range(NUMBER_CLUSTERS):
rank_dict[i] = rank_object()
for i, clus in enumerate(clusters):
if rank_dict[clus].weighted_score < (scores[i] * tweet_objects[i]['retweet_count']):
rank_dict[clus].score = scores[i]
rank_dict[clus].weighted_score = (scores[i] * tweet_objects[i]['retweet_count'])
rank_dict[clus].index = i
rank_dict[clus].text = tweet_objects[i]['text']
rank_dict[clus].id = tweet_objects[i]['id']
rank_dict[clus].posted = False
rank_dict[clus].created_at = tweet_objects[i]['created_at']
rank_dict[clus].retweet_count = tweet_objects[i]['retweet_count']
rank_dict[clus].retweet_id = tweet_objects[i]['retweeted_status']['id'] if tweet_objects[i].get(
'retweeted_status') is not None else None
return rank_dict
def write_tweets(top_tweets, post_file):
for i in range(NUMBER_CLUSTERS):
# print i, len(top_tweets[i])
for tweet in top_tweets[i]:
if tweet.posted == False and tweet.weighted_score >= RANK_THRESHOLD[i]:
line = str(i) + ',' + str(tweet.id) + ',' + str(tweet.created_at) + ',' + \
str(tweet.weighted_score) + ',' + str(tweet.score) + ',' + str(tweet.retweet_count) + ',' \
+ tweet.text.encode('utf8') + '\n'
post_file[i].write(line)
tweet.posted = True
post_file[i].flush()
def process_tweets(path_google_word2vec):
# get tweeter access
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
# load google word2vec
google_model = load_google_word2vec_model(path_google_word2vec)
# load clustering model
kmeans = pickle.load(open('kmeans_model.sav', 'rb'))
# load ranking model
rf = pickle.load(open('rf_model.sav', 'rb'))
# all top tweets from each download
total_top_tweets = defaultdict(list)
# posting tweet file
tweet_file = []
for i in range(NUMBER_CLUSTERS):
tweet_file.append(open('posted_tweets_clus_' + str(i + 1) + '.csv', 'w'))
line = 'cluster_id, tweet_id, created_at, weighted_score, score, retweet_count, text\n'
tweet_file[i].write(line)
# tweet dump file
tweet_dump = open('tweet_dump.json', 'w')
api = tweepy.API(auth)
num_tweets_download = 0
more_tweets = True
last_id = {}
# for feed in TWEETER_FEEDS:
# last_id[feed] = 0 #850000000000000000
last_id['nytimes'] = 853319033424314368 # 852854216603435008
last_id['thesun'] = 853320020012806144 # 852945509279047682
last_id['thetimes'] = 853262389231456257 # 852421095965773824
last_id['ap'] = 853320221683306496 # 852640846763655170
last_id['cnn'] = 853319517975576576 # 852780968947490816
last_id['bbcnews'] = 853315775528030208 # 852306647867420672
last_id['cnet'] = 853318276713197570 # 852650669161017344
last_id['msnuk'] = 852972496064311297 # 850373113925783557
last_id['telegraph'] = 853317000726274048 # 852551974021079040
while True:
total_tweet_objects = []
more_tweets = True
for feed in TWEETER_FEEDS:
more_tweets = True
user = api.get_user(feed)
recent_id = last_id[feed]
print(user.screen_name)
try:
tweet_objects = [status._json for status in
tweepy.Cursor(api.user_timeline, id=feed).items(BUFFER)]
except:
e = sys.exc_info()[0]
print "<p>Error: %s</p>" % e
tweet_objects = []
if len(tweet_objects) > 0:
recent_id = int(tweet_objects[0]['id'])
else:
more_tweets = False
while more_tweets:
# print len(tweet_objects)
# print tweet_objects[0]['id']
for tweet in tweet_objects:
if last_id[feed] >= int(tweet['id']):
more_tweets = False
break
total_tweet_objects.append(tweet)
# cursor only gives unread tweets only
try:
tweet_objects = [status._json for status in
tweepy.Cursor(api.user_timeline, id=feed).items(BUFFER)]
except:
e = sys.exc_info()[0]
# print "<p>Error: %s</p>" % e
tweet_objects = []
more_tweets = False
last_id[feed] = recent_id
# print feed, last_id[feed]
print "new len total_tweets: ", len(total_tweet_objects)
if len(total_tweet_objects) != 0:
print "final len total tweets: ", len(total_tweet_objects)
tweet_vectors = process_features(total_tweet_objects, google_model)
clusters = _get_clusters(list(tweet_vectors), kmeans)
predicted_scores = _get_tweet_scores(total_tweet_objects, rf)
top_tweets = find_top_tweets(clusters, predicted_scores, total_tweet_objects)
for i in range(NUMBER_CLUSTERS):
tweet_added = False
for j, item in enumerate(total_top_tweets[i]):
if tweet_added == False and (
item.id == top_tweets[i].retweet_id or item.retweet_id == top_tweets[i].retweet_id):
top_tweets[i].posted = total_top_tweets[i][j].posted
total_top_tweets[i][j] = top_tweets[i]
tweet_added = True
break
if tweet_added == False:
total_top_tweets[i].append(top_tweets[i])
write_tweets(total_top_tweets, tweet_file)
# writing all tweets to dump file
for index, tweet in enumerate(total_tweet_objects):
tweet['predicted_score'] = predicted_scores[index]
tweet['weight_score'] = tweet['retweet_count'] * predicted_scores[index]
tweet['cluster'] = clusters[index] + 1
tweet_dump.writelines(json.dumps(tweet, encoding="utf-8") + '\n')
tweet_dump.flush()
print "going to sleep .... %s" % time.ctime()
time.sleep(600)
print "wake up .... %s" % time.ctime()
if __name__ == '__main__':
if len(sys.argv) >= 2:
process_tweets(sys.argv[1])
else:
print "Error: Google word2vec path not specified"
print "Please download word2vec from here https://drive.google.com/file/d/0B7XkCwpI5KDYNlNUTTlSS21pQmM/edit"
print "Give the path to extraced GoogleNews-vectors-negative300.bin"