forked from RedisTimeSeries/RedisTimeSeries
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathGraphiteServer.py
executable file
·78 lines (65 loc) · 2.86 KB
/
GraphiteServer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#!/usr/bin/env python
from __future__ import print_function
import re
import argparse
import redis
from gevent.server import StreamServer
REDIS_POOL = None
GRAPHITE_PROTO_RE = re.compile(r"(.*?)\s+([\d.]+)\s+(\d+)")
def process_connection(socket, _):
"""
Per-Connection handler, read all lines and send to redis
"""
# using a makefile because we want to use readline()
rfileobj = socket.makefile(mode='rb')
redis_client = redis.Redis(connection_pool=REDIS_POOL)
while True:
line = rfileobj.readline()
if not line:
# client disconnect
break
data = GRAPHITE_PROTO_RE.findall(line)
if data:
# the line is in graphite format
try:
path, value, timestamp = data[0]
value = float(value)
timestamp = int(timestamp)
except Exception as ex:
print("could parse an element %s" % ex)
break
try:
redis_client.execute_command("ts.add", path, timestamp, value)
except redis.ResponseError as ex:
# small hack, for performance reasons its better to first try to add an metric
# instead of checking per metric if it exists or not
if 'the key does not exists' in ex.message:
redis_client.execute_command("ts.create",
path,
MAX_RETENTION,
SAMPLES_PER_CHUNK)
redis_client.execute_command("ts.add", path, timestamp, value)
else:
raise
else:
print("line is not in graphite format: %s" % line)
break
rfileobj.close()
def main():
global REDIS_POOL, MAX_RETENTION, SAMPLES_PER_CHUNK
parser = argparse.ArgumentParser()
parser.add_argument("--host", help="server address to listen to", default="127.0.0.1")
parser.add_argument("--port", help="port number to listen to", default=2003, type=int)
parser.add_argument("--redis-server", help="redis server address")
parser.add_argument("--redis-port", help="redis server port", default=6379, type=int)
parser.add_argument("--max-retention", help="default retention time (in seconds)", default=3600, type=int)
parser.add_argument("--samples-per-chunk", help="default samples per memory chunk", default=360, type=int)
args = parser.parse_args()
MAX_RETENTION = args.max_retention
SAMPLES_PER_CHUNK = args.samples_per_chunk
REDIS_POOL = redis.ConnectionPool(host=args.redis_server, port=args.redis_port)
server = StreamServer((args.host, args.port), process_connection)
print('Starting Graphite server on %s:%s' % (args.host, args.port))
server.serve_forever()
if __name__ == '__main__':
main()