forked from lopsided98/mavlink_influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathget_schema.py
executable file
·56 lines (44 loc) · 1.62 KB
/
get_schema.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#!/usr/bin/env python3
import argparse
import datetime
import logging
import math
import os.path
from typing import Any, Dict, List, TextIO
from pymavlink.DFReader import DFMessage, DFReader_binary # type: ignore
_logger = logging.getLogger('mavlink_influxdb')
def main() -> None:
parser = argparse.ArgumentParser(
description="Print schema of dataflash logs.")
parser.add_argument('filename', help="Log filename")
parser.add_argument('--vehicle',
help="Vehicle name (stored in 'vehicle' tag)")
args = parser.parse_args()
log = DFReader_binary(args.filename, False)
common_tags: Dict[str, str] = {
'filename': os.path.basename(args.filename)
}
if args.vehicle:
common_tags['vehicle'] = args.vehicle
schema: Dict[str, Dict[str, str]] = {}
# Iterate through logfile, process data and collect schema information
while True:
entry: DFMessage = log.recv_msg()
if entry is None:
_logger.debug("No more log entries, break from processing loop")
break
msg_type = entry.fmt.name
if msg_type not in schema:
schema[msg_type] = {}
for field_name in entry.get_fieldnames():
field = getattr(entry, field_name)
field_type = type(field).__name__
schema[msg_type][field_name] = field_type
# Print out the schema
for msg_type, fields in schema.items():
print(f"Message Type: {msg_type}")
for field_name, field_type in fields.items():
print(f" {field_name}: {field_type}")
print()
if __name__ == "__main__":
main()