-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathHeadlessScannerDriver.py
142 lines (130 loc) · 6.48 KB
/
HeadlessScannerDriver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""
Copyright (c) 2013-2014 F-Secure
See LICENSE for details
This file is intended to be run as a Burp Suite Professional extension.
Burp and Burp Suite are trademarks of Postswigger, Ltd.
"""
from burp import IBurpExtender
from burp import IBurpExtenderCallbacks
from burp import IHttpRequestResponse
from burp import IHttpService
from burp import IProxyListener
from burp import IScannerListener
from burp import IHttpListener
from burp import IScanQueueItem
from burp import IInterceptedProxyMessage
from java.io import PrintWriter
import json
import re
import copy
class BurpExtender(IBurpExtender, IScannerListener, IProxyListener, IHttpListener):
def registerExtenderCallbacks(self, callbacks):
# keep a reference to our callbacks object
self._callbacks = callbacks
self._scanlist = [] # Holds scan items (Burp data structures)
self._scantarget = [] # Holds list of URLs added to scan
# set our extension name
callbacks.setExtensionName("Headless Scanner Driver")
# obtain our output stream
self._stdout = PrintWriter(callbacks.getStdout(), True)
self._stderr = PrintWriter(callbacks.getStderr(), True)
# register ourselves as listeners
callbacks.registerScannerListener(self)
callbacks.registerProxyListener(self)
self._stdout.println(json.dumps({"running": 1})) # Indicate we're up
self._stdout.flush()
return
def processProxyMessage(self, messageIsRequest, message):
# This method is called for every externally triggered request.
callbacks = self._callbacks # As stored in registerExtenderCallbacks
message.setInterceptAction(
IInterceptedProxyMessage.ACTION_DONT_INTERCEPT) # Inform Burp not to intercept the message.
if messageIsRequest == 1: # Booleans are integers
# Obtain message target & content
requestresponse = message.getMessageInfo()
request = requestresponse.getRequest() # returns array.array
target = requestresponse.getHttpService()
host = target.getHost()
port = target.getPort()
protocol = target.getProtocol()
# Interpret in-band signaling from the test driver
# (any request to ports 1111, 1112 will get intercepted as
# an instruction to this extension)
if port == 1111: # Show scan status
message.setInterceptAction(
IInterceptedProxyMessage.ACTION_DROP) # Was a control message, do not process further
statuses = []
for scaninstance in self._scanlist:
statuses.append(scaninstance.getStatus())
# This output may block due to output buffers being filled.
# When running this extension, something should be reading
# stdout.
self._stdout.println(json.dumps(statuses))
self._stdout.flush()
return
if port == 1112: # Dump results and quit
message.setInterceptAction(
IInterceptedProxyMessage.ACTION_DROP) # Was a control message, do not process further
scanissues = self.get_issues()
# This output may block due to output buffers being filled.
# When running this extension, something should be reading
# stdout.
self._stdout.println(json.dumps(scanissues, encoding="utf-8"))
self._stdout.flush()
callbacks.exitSuite(0) # Exit cleanly
return
if port == 1113: # Dump results but don't quit
message.setInterceptAction(
IInterceptedProxyMessage.ACTION_DROP) # Was a control message, do not process further
scanissues = self.get_issues()
#clear the scanlist to avoid getting previous issues in future scans
self._scanlist = []
# This output may block due to output buffers being filled.
# When running this extension, something should be reading
# stdout.
self._stdout.println(json.dumps(scanissues, encoding="utf-8"))
self._stdout.flush()
return
# Duplicate scan rejection
urlpath = re.search('^\w+ (.+) HTTP', request.tostring())
if urlpath is not None:
url = protocol + "://" + host + urlpath.group(1)
if self._scantarget.count(url) == 0: # Not already scanned?
self._scantarget.append(url)
# Start an active scan on the message
https = 0
if protocol == "https":
https = 1
scaninstance = callbacks.doActiveScan(host,
port,
https,
request)
self._scanlist.append(scaninstance)
return
def get_issues(self):
scanissues = []
# Collect issues. We have a list of scans that contain
# scan findings. Extract these and dump in a JSON.
for scaninstance in self._scanlist:
for scanissue in scaninstance.getIssues():
issue = {}
issue['url'] = scanissue.getUrl().toString()
issue['severity'] = scanissue.getSeverity()
issue['issuetype'] = scanissue.getIssueType()
issue['issuename'] = scanissue.getIssueName()
issue['issuedetail'] = scanissue.getIssueDetail()
issue['confidence'] = scanissue.getConfidence()
issue['host'] = scanissue.getHttpService().getHost()
issue['port'] = scanissue.getHttpService().getPort()
issue['protocol'] = scanissue.getHttpService().getProtocol()
messages = []
for httpmessage in scanissue.getHttpMessages():
request = httpmessage.getRequest().tostring()
request = request.encode('utf-8')
response = httpmessage.getResponse().tostring()
response = response.encode('utf-8')
messages.append((request,
response))
issue['messages'] = messages
scanissues.append(copy.copy(issue))
return scanissues