-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathusb_antimalware.py
311 lines (261 loc) · 11.4 KB
/
usb_antimalware.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
import os
import hashlib
import tkinter as tk
from tkinter import messagebox
import psutil
import clamd
from threading import Thread
import time
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
import tensorflow as tf
from sklearn.ensemble import IsolationForest
import requests
import joblib
model = joblib.load('nlp_malware_model.pkl')
# Set your VirusTotal API key here
API_KEY = 'YOUR_VIRUSTOTAL_API_KEY'
# VirusTotal API URLs
VIRUSTOTAL_API_URL = "https://www.virustotal.com/api/v3/files/"
VIRUSTOTAL_UPLOAD_URL = "https://www.virustotal.com/api/v3/files"
VIRUSTOTAL_ANALYSIS_URL = "https://www.virustotal.com/api/v3/analyses/"
# List of known bad file extensions often used by malware
SUSPICIOUS_EXTENSIONS = ['.exe', '.vbs', '.bat', '.lnk', '.pif', '.scr', '.com']
# Define autorun file to look for
AUTORUN_FILENAME = 'autorun.inf'
# Set up ClamAV client
cd = clamd.ClamdUnixSocket()
# Load pre-trained models
nlp_model = joblib.load('nlp_malware_model.pkl')
vectorizer = joblib.load('tfidf_vectorizer.pkl')
malware_model = tf.keras.models.load_model('malware_cnn.h5')
anomaly_model = joblib.load('anomaly_detection_model.pkl')
ai_model = joblib.load('malware_detection_model.pkl')
# Function to calculate MD5 hash of a file
def calculate_md5(file_path):
with open(file_path, 'rb') as f:
file_hash = hashlib.md5()
while chunk := f.read(8192):
file_hash.update(chunk)
return file_hash.hexdigest()
# AI-based file classification using deep learning
def classify_file_with_dl(file_path):
with open(file_path, 'rb') as f:
file_data = f.read()
# Preprocess file data to fit the input of the deep learning model
file_data = preprocess_file_for_model(file_data)
prediction = malware_model.predict(file_data)
if prediction > 0.5:
return True # Malware detected
else:
return False
# AI-based file scanning using a custom machine learning model
def scan_file_with_ai(file_path):
features = extract_features(file_path)
prediction = ai_model.predict([features])
if prediction == 1:
return True # Malware detected
else:
return False
# Extract file features such as size, entropy, and static code patterns
def extract_features(file_path):
size = os.path.getsize(file_path)
with open(file_path, 'rb') as f:
file_data = f.read()
entropy = calculate_entropy(file_data)
return [size, entropy]
# Example feature extraction method (entropy calculation)
def calculate_entropy(data):
from collections import Counter
byte_counts = Counter(data)
entropy = -sum((count / len(data)) * (count / len(data)).bit_length() for count in byte_counts.values())
return entropy
# AI-based script file analysis using NLP
def analyze_script_file(file_path):
with open(file_path, 'r') as f:
script_content = f.read()
features = vectorizer.transform([script_content])
prediction = nlp_model.predict(features)
if prediction == 1:
return True # Malicious content detected
else:
return False
# Real-time detection of USB insertion (requires psutil)
def monitor_usb(gui_output, auto_run=False):
existing_drives = set(psutil.disk_partitions())
while True:
current_drives = set(psutil.disk_partitions())
if current_drives != existing_drives:
added_drives = current_drives - existing_drives
if added_drives:
for drive in added_drives:
gui_output.insert(tk.END, f"USB drive {drive.device} detected!")
if auto_run:
gui_output.insert(tk.END, "Auto-scanning enabled. Scanning now...")
scan_system(gui_output)
existing_drives = current_drives
time.sleep(5)
# VirusTotal: Get report of a file by hash
def get_file_report(file_hash):
"""Fetches the report of a file based on its hash from VirusTotal."""
headers = {
"x-apikey": API_KEY
}
url = VIRUSTOTAL_API_URL + file_hash
response = requests.get(url, headers=headers)
if response.status_code == 200:
threat_data = response.json()
process_threat_data(threat_data)
else:
print(f"Failed to retrieve report for file hash {file_hash}. Status Code: {response.status_code}")
return None
def process_threat_data(threat_data):
"""Processes and prints threat data from VirusTotal."""
if 'data' in threat_data and 'attributes' in threat_data['data']:
attributes = threat_data['data']['attributes']
last_analysis_results = attributes.get('last_analysis_results', {})
# Display results from different antivirus engines
print("Analysis Results:")
for engine, result in last_analysis_results.items():
print(f"{engine}: {result['category']} ({result['result']})")
# Example: Print if the file was flagged as malicious
malicious_count = attributes.get('last_analysis_stats', {}).get('malicious', 0)
print(f"Malicious Detections: {malicious_count}")
else:
print("No threat data available.")
# VirusTotal: Upload file and scan
def upload_and_scan_file(file_path):
"""Uploads a file to VirusTotal for scanning and returns the scan ID."""
headers = {
"x-apikey": API_KEY
}
with open(file_path, 'rb') as file:
files = {'file': (file_path, file)}
response = requests.post(VIRUSTOTAL_UPLOAD_URL, headers=headers, files=files)
if response.status_code == 200:
result = response.json()
scan_id = result['data']['id']
print(f"File uploaded successfully. Scan ID: {scan_id}")
return scan_id
else:
print(f"Failed to upload file. Status Code: {response.status_code}")
return None
# VirusTotal: Get report based on scan ID
def get_scan_report(scan_id):
"""Retrieves a scan report based on the scan ID."""
url = VIRUSTOTAL_ANALYSIS_URL + scan_id
headers = {
"x-apikey": API_KEY
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
threat_data = response.json()
process_threat_data(threat_data)
else:
print(f"Failed to retrieve scan report. Status Code: {response.status_code}")
# Function to scan for suspicious files in the USB drive
def scan_usb(drive, gui_output):
suspicious_files = []
for root, dirs, files in os.walk(drive):
for file in files:
full_path = os.path.join(root, file)
# Check for known malicious file extensions
if any(file.endswith(ext) for ext in SUSPICIOUS_EXTENSIONS):
gui_output.insert(tk.END, f"Suspicious file found: {full_path}")
suspicious_files.append(full_path)
# Scan the file with ClamAV
scan_result = cd.scan(full_path)
if scan_result and scan_result[full_path][0] == 'FOUND':
gui_output.insert(tk.END, f"Malicious file detected by ClamAV: {full_path}")
remove_files([full_path])
# AI-based analysis with deep learning and machine learning
elif classify_file_with_dl(full_path):
gui_output.insert(tk.END, f"AI (Deep Learning) detected malware in: {full_path}")
remove_files([full_path])
elif scan_file_with_ai(full_path):
gui_output.insert(tk.END, f"AI (Machine Learning) detected malware in: {full_path}")
remove_files([full_path])
elif analyze_script_file(full_path):
gui_output.insert(tk.END, f"AI (NLP) detected malicious script content in: {full_path}")
remove_files([full_path])
return suspicious_files
# Function to check for autorun file, which is a common sign of USB malware
def check_for_autorun(drive, gui_output):
autorun_path = os.path.join(drive, AUTORUN_FILENAME)
if os.path.exists(autorun_path):
gui_output.insert(tk.END, f"Autorun file found: {autorun_path}")
return autorun_path
return None
# Function to remove suspicious files
def remove_files(file_list):
for file in file_list:
try:
os.remove(file)
print(f"Removed: {file}")
except Exception as e:
print(f"Failed to remove {file}: {str(e)}")
# Function to scan the system for suspicious files and autorun
def scan_system(gui_output):
drives = [f"{drive}:\\" for drive in "ABCDEFGHIJKLMNOPQRSTUVWXYZ" if os.path.exists(f"{drive}:\\")]
for drive in drives:
gui_output.insert(tk.END, f"Scanning drive {drive}")
suspicious_files = scan_usb(drive, gui_output)
autorun_file = check_for_autorun(drive, gui_output)
# If malicious files or autorun are detected, remove them
if suspicious_files or autorun_file:
gui_output.insert(tk.END, "Malicious items found! Removing...")
remove_files(suspicious_files)
if autorun_file:
os.remove(autorun_file)
gui_output.insert(tk.END, f"Removed Autorun: {autorun_file}")
else:
gui_output.insert(tk.END, f"No suspicious files found on {drive}")
# GUI Creation with tkinter
class AntiMalwareApp:
def __init__(self, root):
self.root = root
self.root.title("USB Anti-Malware Scanner")
self.root.geometry("500x400")
# Create a frame for the scan button and output
frame = tk.Frame(self.root)
frame.pack(pady=20)
# Text output box
self.output = tk.Listbox(frame, width=60, height=15)
self.output.pack(side=tk.LEFT)
# Scrollbar for output box
scrollbar = tk.Scrollbar(frame, orient=tk.VERTICAL)
scrollbar.config(command=self.output.yview)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.output.config(yscrollcommand=scrollbar.set)
# Scan button for manual scan
self.scan_button = tk.Button(self.root, text="Manual Scan USB", command=self.manual_scan)
self.scan_button.pack(pady=10)
# Auto-scan checkbox
self.auto_scan_var = tk.BooleanVar()
self.auto_scan_check = tk.Checkbutton(self.root, text="Enable Auto-Scan", variable=self.auto_scan_var)
self.auto_scan_check.pack(pady=5)
# Start real-time monitoring in a separate thread
self.monitor_thread = Thread(target=monitor_usb, args=(self.output, False))
self.monitor_thread.daemon = True
self.monitor_thread.start()
# Auto scan any USB connected
self.output.insert(tk.END, "Monitoring USB connections...")
def manual_scan(self):
"""Triggers manual scan of all drives."""
self.output.insert(tk.END, "Manual scan started.")
scan_system(self.output)
def toggle_auto_scan(self):
"""Enable/Disable auto scanning."""
if self.auto_scan_var.get():
self.output.insert(tk.END, "Auto-scan enabled.")
else:
self.output.insert(tk.END, "Auto-scan disabled.")
def start_monitoring(self):
"""Start monitoring thread with auto-scan option."""
self.monitor_thread = Thread(target=monitor_usb, args=(self.output, self.auto_scan_var.get()))
self.monitor_thread.daemon = True
self.monitor_thread.start()
if __name__ == '__main__':
root = tk.Tk()
app = AntiMalwareApp(root)
root.mainloop()