forked from byteark/mmutils
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmerge_geoip_city_csv
executable file
·95 lines (78 loc) · 2.59 KB
/
merge_geoip_city_csv
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#!/usr/bin/env python
from argparse import ArgumentParser
import sys, os
from csv import reader, writer, Sniffer, QUOTE_ALL
import socket, struct
from ipaddress import IPv6Address
arg_parser = ArgumentParser()
arg_parser.add_argument('-o', '--output', default=None,
help = 'Output to the specified file name (default is stdout)')
arg_parser.add_argument('-l', '--location-file', default=None, required = True,
help = 'Specify location file for GeoIP City v4')
arg_parser.add_argument('ipv4_csv', help = 'Maxmind\'s GeoIP IPv4 city BLOCK file to merge')
arg_parser.add_argument('ipv6_csv', help = 'Maxmind\'s GeoIP IPv6 city file to merge')
args = arg_parser.parse_args()
def parse_locations(location_file) :
result = {}
with open(location_file, 'r') as f :
# skipe the first 2 lines
f.readline()
f.readline()
for row in reader(f) :
id = int(row[0])
country, region, city, postal_code, lat, lon, metro_code, area_code = row[1:]
result[id] = [country, region, city, lat, lon, postal_code, metro_code, area_code]
return result
def int2ip(addr):
return socket.inet_ntoa(struct.pack("!I", addr))
lsnat_ip = (
"0:0:0:0:0:ffff:100.64.0.1",
"0:0:0:0:0:ffff:100.127.255.254",
281472363659265,
281472367853566,
"TH",
"",
"",
"15.0000","100.0000",
"",
0,
0
)
if args.output is None :
output = sys.stdout
else :
output = open(args.output, 'w')
ipv4_csv = open(args.ipv4_csv, 'r')
ipv6_csv = open(args.ipv6_csv, 'r')
## Sniff the format
#dialect = Sniffer().sniff(ipv6_csv.read(1024))
#ipv6_csv.seek(0)
#
ipv4_locations = parse_locations(args.location_file)
#
#output_csv = writer(output, dialect, quoting = QUOTE_ALL)
output_csv = writer(output)
# For V4, we need to translate it to v6
# skip the first 2 line
ipv4_csv.readline()
ipv4_csv.readline()
lsnat_inserted = False
for row in reader(ipv4_csv) :
v6_start = '0:0:0:0:0:ffff:' + int2ip(int(row[0]))
v6_end = '0:0:0:0:0:ffff:' + int2ip(int(row[1]))
v6_start_int = int(IPv6Address(unicode(v6_start)))
v6_end_int = int(IPv6Address(unicode(v6_end)))
location = ipv4_locations[int(row[2])]
if not lsnat_inserted and (v6_start_int > lsnat_ip[2]) :
output_csv.writerow(lsnat_ip)
lsnat_inserted = True
output_csv.writerow([
v6_start, v6_end,
v6_start_int, v6_end_int,
] + location)
for row in reader(ipv6_csv) :
output_csv.writerow(row)
ipv4_csv.close()
ipv6_csv.close()
if not args.output is None :
output.close()