-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathlog-helper.py
executable file
·107 lines (84 loc) · 3.65 KB
/
log-helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#!/usr/bin/env python3
import sys
from pathlib import Path
import subprocess
import datetime
import json
from typing import Optional
class Tool(object):
def run(self):
args = ['log', 'show'] + self.process_log_command_args()
print(' '.join(args))
subprocess.run(args)
@classmethod
def process_log_command_args(cls):
args_out = []
args_in = sys.argv[1:]
timestamp_argument_indexes = []
archive_path = None
state = 'start'
while args_in:
arg = args_in.pop(0)
print('state', state, arg)
if state == 'reading_archive_path':
archive_path = Path(arg)
assert archive_path.exists()
state = 'start'
elif state == 'reading_timestamp':
timestamp_argument_indexes.append((len(args_out)))
state = 'start'
else:
if arg.endswith('.logarchive'):
args_in.insert(0, arg)
state = 'reading_archive_path'
continue
elif arg == '--archive':
state = 'reading_archive_path'
elif arg in ['--start', '--end']:
state = 'reading_timestamp'
args_out.append(arg)
timezone_delta = None
if archive_path:
timezone_delta = cls.timezone_delta_for_log_archive_path(archive_path)
if timezone_delta:
print(f'Timezone delta: {timezone_delta}')
if timestamp_argument_indexes and timezone_delta:
cls.update_timestamp_args(args_out, timestamp_argument_indexes, timezone_delta)
return args_out
@classmethod
def update_timestamp_args(cls, args_out, timestamp_argument_indexes, timezone_delta):
format = '%Y-%m-%d %H:%M:%S'
for index in timestamp_argument_indexes:
timestamp = datetime.datetime.strptime(args_out[index], format)
adjusted_timestamp = timestamp - timezone_delta
args_out[index] = adjusted_timestamp.strftime(format)
@classmethod
def timezone_delta_for_log_archive_path(cls, path: Path) -> Optional[datetime.timedelta]:
first_line_timestamp_original = cls.timestamp_for_first_log_line(path, None)
first_line_timestamp_local = cls.timestamp_for_first_log_line(path, 'local')
if not (first_line_timestamp_original and first_line_timestamp_local):
print('Unable to get timestamp from first log line', file=sys.stderr)
return None
# print('originator:', first_line_timestamp_original, first_line_timestamp_original.utcoffset())
# print('local:', first_line_timestamp_local, first_line_timestamp_local.utcoffset())
delta = first_line_timestamp_original.utcoffset() - first_line_timestamp_local.utcoffset()
return delta
@classmethod
def timestamp_for_first_log_line(cls, archive_path: str, timezone: str) -> Optional[datetime.datetime]:
cmd = ['log', 'show', '--archive', archive_path, '--style', 'ndjson']
if timezone:
cmd.extend(['--timezone', timezone])
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
for line in process.stdout:
record = json.loads(line)
return cls.datetime_for_log_timestamp(record['timestamp'])
return None
@staticmethod
def datetime_for_log_timestamp(log_timestamp):
timestamp = log_timestamp[:29] + ':' + log_timestamp[29:]
return datetime.datetime.fromisoformat(timestamp)
@classmethod
def main(cls):
return cls().run()
if __name__ == "__main__":
sys.exit(Tool.main())