forked from mludvig/aws-utils
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdecode-awslogs
executable file
·102 lines (85 loc) · 2.54 KB
/
decode-awslogs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#!/usr/bin/env python3
# Docode SNS emails: {"awslogs":{"data":"H4sIAAAAAAA..."}}
import sys
import base64
import json
import gzip
import re
import termios
base64match = "[A-Za-z0-9\+/]+=*"
class BytesEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, bytes):
return {
"_type": "str",
"value": base64.b64encode(o).decode('ascii')
}
return super(BytesEncoder, self).default(o)
def decode(data):
# Is it JSON string?
try:
data = json.loads(data)
return data
except:
pass
# Nope, try to decode it from Base64 and/or GZIP
try:
if re.match(f"^{base64match}$", data):
data = base64.b64decode(data)
except:
pass
try:
data = gzip.decompress(data)
data = data.decode('ascii')
except:
pass
return data
def traverse(data):
#print(f"\n=== {str(data)[:100]}... ({type(data)}) ===")
if isinstance(data, str):
data_new = decode(data)
if data_new == data:
return data
data = data_new
if isinstance(data, dict):
for key in data:
data[key] = traverse(data[key])
try:
data[key] = json.loads(data[key])
except:
pass
if isinstance(data[key], dict):
data[key] = traverse(data[key])
elif isinstance(data[key], str):
# Is there JSON embedded in the string?
json_start = data[key].find("{\"")
if json_start >= 0:
subtree = traverse(data[key][json_start:])
data[key] = [data[key][:json_start], subtree]
elif isinstance(data, list):
data_new = []
for item in data:
data_new.append(traverse(item))
data = data_new
elif isinstance(data, str):
data = traverse(data)
return data
if __name__ == "__main__":
# Switch terminal to non-canonical mode
# to avoid 4096 chars clipboard limit
if sys.stdin.isatty():
fd = sys.stdin.fileno()
new = old = termios.tcgetattr(fd)
new[3] &= ~termios.ICANON
termios.tcsetattr(fd, termios.TCSAFLUSH, new)
# Read in the awslog message
print("Copy & paste 'awslogs' json:")
data_in = ""
while True:
buf = sys.stdin.read(1).strip()
data_in += buf
if not buf:
break
# Process all b64 recursively
data = traverse(data_in)
print(json.dumps(data, indent=2, cls=BytesEncoder))