-
Notifications
You must be signed in to change notification settings - Fork 61
/
Copy pathspoofy.py
executable file
·153 lines (129 loc) · 4.18 KB
/
spoofy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#! /usr/bin/env python3
# spoofy.py
import argparse
import threading
from queue import Queue
from modules.dns import DNS
from modules.spf import SPF
from modules.dmarc import DMARC
from modules.bimi import BIMI
from modules.spoofing import Spoofing
from modules import report
print_lock = threading.Lock()
def process_domain(domain):
"""Process a domain to gather DNS, SPF, DMARC, and BIMI records, and evaluate spoofing potential."""
dns_info = DNS(domain)
spf = SPF(domain, dns_info.dns_server)
dmarc = DMARC(domain, dns_info.dns_server)
bimi_info = BIMI(domain, dns_info.dns_server)
spf_record = spf.spf_record
spf_all = spf.all_mechanism
spf_dns_query_count = spf.spf_dns_query_count
spf_too_many_dns_queries = spf.too_many_dns_queries
dmarc_record = dmarc.dmarc_record
dmarc_p = dmarc.policy
dmarc_pct = dmarc.pct
dmarc_aspf = dmarc.aspf
dmarc_sp = dmarc.sp
dmarc_fo = dmarc.fo
dmarc_rua = dmarc.rua
bimi_record = bimi_info.bimi_record
bimi_version = bimi_info.version
bimi_location = bimi_info.location
bimi_authority = bimi_info.authority
spoofing_info = Spoofing(
domain,
dmarc_record,
dmarc_p,
dmarc_aspf,
spf_record,
spf_all,
spf_dns_query_count,
dmarc_sp,
dmarc_pct,
)
domain_type = spoofing_info.domain_type
spoofing_possible = spoofing_info.spoofing_possible
spoofing_type = spoofing_info.spoofing_type
result = {
"DOMAIN": domain,
"DOMAIN_TYPE": domain_type,
"DNS_SERVER": dns_info.dns_server,
"SPF": spf_record,
"SPF_MULTIPLE_ALLS": spf_all,
"SPF_NUM_DNS_QUERIES": spf_dns_query_count,
"SPF_TOO_MANY_DNS_QUERIES": spf_too_many_dns_queries,
"DMARC": dmarc_record,
"DMARC_POLICY": dmarc_p,
"DMARC_PCT": dmarc_pct,
"DMARC_ASPF": dmarc_aspf,
"DMARC_SP": dmarc_sp,
"DMARC_FORENSIC_REPORT": dmarc_fo,
"DMARC_AGGREGATE_REPORT": dmarc_rua,
"BIMI_RECORD": bimi_record,
"BIMI_VERSION": bimi_version,
"BIMI_LOCATION": bimi_location,
"BIMI_AUTHORITY": bimi_authority,
"SPOOFING_POSSIBLE": spoofing_possible,
"SPOOFING_TYPE": spoofing_type,
}
return result
def worker(domain_queue, print_lock, output, results):
"""Worker function to process domains and output results."""
while True:
domain = domain_queue.get()
if domain is None:
break
result = process_domain(domain)
with print_lock:
if output == "stdout":
report.printer(**result)
else:
results.append(result)
domain_queue.task_done()
def main():
parser = argparse.ArgumentParser(
description="Process domains to gather DNS, SPF, DMARC, and BIMI records."
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-d", type=str, help="Single domain to process.")
group.add_argument(
"-iL", type=str, help="File containing a list of domains to process."
)
parser.add_argument(
"-o",
type=str,
choices=["stdout", "xls"],
default="stdout",
help="Output format: stdout or xls (default: stdout).",
)
parser.add_argument(
"-t", type=int, default=4, help="Number of threads to use (default: 4)"
)
args = parser.parse_args()
if args.d:
domains = [args.d]
elif args.iL:
with open(args.iL, "r") as file:
domains = [line.strip() for line in file]
domain_queue = Queue()
results = []
for domain in domains:
domain_queue.put(domain)
threads = []
for _ in range(min(args.t, len(domains))):
thread = threading.Thread(
target=worker, args=(domain_queue, print_lock, args.o, results)
)
thread.start()
threads.append(thread)
domain_queue.join()
if args.o == "xls" and results:
report.write_to_excel(results)
print("Results written to output.xlsx")
for _ in range(len(threads)):
domain_queue.put(None)
for thread in threads:
thread.join()
if __name__ == "__main__":
main()