-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathelastic_link.py
36 lines (31 loc) · 1.21 KB
/
elastic_link.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, parallel_bulk
from datetime import datetime
class ElasticLink:
def __init__(self, ela_host, ela_index):
self.ela_host = ela_host
self.ela_index = ela_index
def connect(self):
self.es = Elasticsearch(self.ela_host)
def push_to_server(self, info):
try:
bulk(self.es, [{
'_op_type': 'create',
'_index': self.ela_index,
'@timestamp': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f') + 'Z',
'info': info
}])
except Exception as e:
print('Unable to push to Elasticsearch server: ', e)
#def push_all_to_server(self, docs):
# for success, info in parallel_bulk(self.es, self._pre_process_docs(docs)):
# if not success:
# print('Push failed: ', info)
#def _pre_process_docs(self, docs):
# for doc in docs:
# yield {
# '_index': self.ela_index,
# '_timestamp': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f') + 'Z',
# '_id': doc['name'],
# 'doc': doc
# }