Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LDAP Channel Binding to GetUserSPNs.py #1652

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 182 additions & 68 deletions examples/GetUserSPNs.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
from impacket.smbconnection import SMBConnection, SessionError
from impacket.ntlm import compute_lmhash, compute_nthash

import ldap3


class GetUserSPNs:
@staticmethod
Expand Down Expand Up @@ -86,10 +88,12 @@ def __init__(self, username, password, user_domain, target_domain, cmdLineOption
self.__requestTGS = cmdLineOptions.request
# [!] in this script the value of -dc-ip option is self.__kdcIP and the value of -dc-host option is self.__kdcHost
self.__kdcIP = cmdLineOptions.dc_ip
self.__root = ''
self.__kdcHost = cmdLineOptions.dc_host
self.__saveTGS = cmdLineOptions.save
self.__requestUser = cmdLineOptions.request_user
self.__stealth = cmdLineOptions.stealth
self.__ldap_channel_binding = cmdLineOptions.ldap_channel_binding
if cmdLineOptions.hashes is not None:
self.__lmhash, self.__nthash = cmdLineOptions.hashes.split(':')

Expand Down Expand Up @@ -248,6 +252,74 @@ def outputTGS(self, ticket, oldSessionKey, sessionKey, username, spn, fd=None):
except Exception as e:
logging.error(str(e))

def get_ldap_connection(self, protocol='ldap', valid=False):
if self.__doKerberos is True:
try:
ldapConnection = ldap.LDAPConnection('%s://%s' % (protocol, self.__target), self.baseDN, self.__kdcIP)
ldapConnection.kerberosLogin(self.__username, self.__password, self.__domain, self.__lmhash,
self.__nthash,
self.__aesKey, kdcHost=self.__kdcIP)
return ldapConnection
except ldap.LDAPSessionError as e:
if str(e).find('strongerAuthRequired') >= 0 and protocol != 'ldaps':
# We need to try SSL
return self.get_ldap_connection(protocol='ldaps')
else:
if self.__kdcIP is not None and self.__kdcHost is not None:
logging.critical("If the credentials are valid, check the hostname and IP address of KDC. They "
"must match exactly each other")
raise

ldapUser = '%s\\%s' % (self.__targetDomain, self.__username)
if self.__ldap_channel_binding:
if not hasattr(ldap3, 'TLS_CHANNEL_BINDING'):
raise Exception("To use LDAP channel binding, install the patched ldap3 module: pip3 install git+https://github.com/ly4k/ldap3")
import ssl
version=ssl.PROTOCOL_TLSv1_2
tls = ldap3.Tls(validate=ssl.CERT_NONE, version=version, ciphers='ALL:@SECLEVEL=0')
server = ldap3.Server('ldaps://%s' % self.__target, use_ssl=True, get_info=ldap3.ALL, tls=tls)
else:
server = ldap3.Server('%s://%s' % (protocol, self.__target), get_info=ldap3.ALL)

if self.__ldap_channel_binding:
logging.debug('Authenticating to LDAP server with LDAPS + channel binding')
channel_binding = {"channel_binding": ldap3.TLS_CHANNEL_BINDING}
connection = ldap3.Connection(server,user=ldapUser,password=self.__password,
authentication=ldap3.NTLM,auto_referrals=False,
**channel_binding)
elif protocol == 'ldaps':
logging.debug('Authenticating to LDAP server with LDAPS')
connection = ldap3.Connection(server,user=ldapUser,password=self.__password,
authentication=ldap3.NTLM,auto_referrals=False)
else: # Basic LDAP
logging.debug('Authenticating to LDAP server with LDAP')
connection = ldap3.Connection(server, user=ldapUser, password=self.__password,
authentication=ldap3.NTLM)
if not connection.bind():
if connection.result['result'] == ldap3.core.results.RESULT_INVALID_CREDENTIALS and valid is True\
and protocol == 'ldaps' and self.__ldap_channel_binding is False:
logging.warning('Authentication failed with LDAPS, trying with channel binding')
self.__ldap_channel_binding = True
return self.get_ldap_connection(protocol='ldaps', valid=valid)
elif connection.result['result'] == ldap3.core.results.RESULT_STRONGER_AUTH_REQUIRED:
if protocol == 'ldaps' and self.__ldap_channel_binding is False:
logging.warning('Authentication failed with LDAPS, trying with channel binding')
self.__ldap_channel_binding = True
return self.get_ldap_connection(protocol='ldaps', valid=valid)
if protocol == 'ldap':
# setting this because we would not have received RESULT_STRONGER_AUTH_REQUIRED
# if the credentials were invalid. This is important because a lack of ldap channel binding
# in an environment that enforces it will often show up as a (misnomer) error RESULT_INVALID_CREDENTIALS
valid = True
logging.warning('Authentication failed with LDAP, trying LDAPS')
return self.get_ldap_connection(protocol='ldaps', valid=valid)
else:
raise Exception('Failed to authenticate. Error: %s'
% ldap3.core.results.RESULT_CODES[connection.result['result']])
logging.info('Successfully authenticated')
self.__root = server.info.other['defaultNamingContext'][0]
return connection

def run(self):
if self.__usersFile:
self.request_users_file_TGSs()
Expand All @@ -266,33 +338,9 @@ def run(self):
self.__target = self.getMachineName(self.__target)

# Connect to LDAP
try:
ldapConnection = ldap.LDAPConnection('ldap://%s' % self.__target, self.baseDN, self.__kdcIP)
if self.__doKerberos is not True:
ldapConnection.login(self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash)
else:
ldapConnection.kerberosLogin(self.__username, self.__password, self.__domain, self.__lmhash,
self.__nthash,
self.__aesKey, kdcHost=self.__kdcIP)
except ldap.LDAPSessionError as e:
if str(e).find('strongerAuthRequired') >= 0:
# We need to try SSL
ldapConnection = ldap.LDAPConnection('ldaps://%s' % self.__target, self.baseDN, self.__kdcIP)
if self.__doKerberos is not True:
ldapConnection.login(self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash)
else:
ldapConnection.kerberosLogin(self.__username, self.__password, self.__domain, self.__lmhash,
self.__nthash,
self.__aesKey, kdcHost=self.__kdcIP)
else:
if str(e).find('NTLMAuthNegotiate') >= 0:
logging.critical("NTLM negotiation failed. Probably NTLM is disabled. Try to use Kerberos "
"authentication instead.")
else:
if self.__kdcIP is not None and self.__kdcHost is not None:
logging.critical("If the credentials are valid, check the hostname and IP address of KDC. They "
"must match exactly each other")
raise
connection = self.get_ldap_connection()



# Building the search filter
filter_spn = "servicePrincipalName=*"
Expand All @@ -313,6 +361,58 @@ def run(self):

searchFilter += ')'


if isinstance(connection, ldap.LDAPConnection):
# python-ldap method, for Kerberos
answers = self.get_answers_ldap(connection, searchFilter)
else:
# ldap3 method, for everything else
answers = self.get_answers_ldap3(connection, searchFilter)


if len(answers) > 0:
self.printTable(answers, header=["ServicePrincipalName", "Name", "MemberOf", "PasswordLastSet", "LastLogon",
"Delegation"])
print('\n\n')

if self.__requestTGS is True or self.__requestUser is not None:
# Let's get unique user names and a SPN to request a TGS for
users = dict((vals[1], vals[0]) for vals in answers)

# Get a TGT for the current user
TGT = self.getTGT()

if self.__outputFileName is not None:
fd = open(self.__outputFileName, 'w+')
else:
fd = None

for user, SPN in users.items():
sAMAccountName = user
downLevelLogonName = self.__targetDomain + "\\" + sAMAccountName

try:
principalName = Principal()
principalName.type = constants.PrincipalNameType.NT_MS_PRINCIPAL.value
principalName.components = [downLevelLogonName]

tgs, cipher, oldSessionKey, sessionKey = getKerberosTGS(principalName, self.__domain,
self.__kdcIP,
TGT['KDC_REP'], TGT['cipher'],
TGT['sessionKey'])
self.outputTGS(tgs, oldSessionKey, sessionKey, sAMAccountName,
self.__targetDomain + "/" + sAMAccountName, fd)
except Exception as e:
logging.debug("Exception:", exc_info=True)
logging.error('Principal: %s - %s' % (downLevelLogonName, str(e)))

if fd is not None:
fd.close()

else:
print("No entries found!")

def get_answers_ldap(self, ldapConnection, searchFilter):
try:
# Microsoft Active Directory set an hard limit of 1000 entries returned by any search
paged_search_control = ldapasn1.SimplePagedResultsControl(criticality=True, size=1000)
Expand All @@ -327,7 +427,6 @@ def run(self):
# We should never reach this code as we use paged search now
logging.debug('sizeLimitExceeded exception caught, giving up and processing the data received')
resp = e.getAnswers()
pass
else:
raise

Expand Down Expand Up @@ -379,49 +478,63 @@ def run(self):
answers.append([spn, sAMAccountName, memberOf, pwdLastSet, lastLogon, delegation])
except Exception as e:
logging.error('Skipping item, cannot process due to error %s' % str(e))
pass
return answers

def get_answers_ldap3(self, connection, searchFilter):
# Microsoft Active Directory set an hard limit of 1000 entries returned by any search
resp = connection.extend.standard.paged_search(
search_base = self.__root,
search_filter = searchFilter,
attributes=['servicePrincipalName', 'sAMAccountName',
'pwdLastSet', 'MemberOf', 'userAccountControl', 'lastLogon'],
paged_size = 1000,
generator=False)

if len(answers) > 0:
self.printTable(answers, header=["ServicePrincipalName", "Name", "MemberOf", "PasswordLastSet", "LastLogon",
"Delegation"])
print('\n\n')

if self.__requestTGS is True or self.__requestUser is not None:
# Let's get unique user names and a SPN to request a TGS for
users = dict((vals[1], vals[0]) for vals in answers)

# Get a TGT for the current user
TGT = self.getTGT()

if self.__outputFileName is not None:
fd = open(self.__outputFileName, 'w+')
answers = []
# filter out the resRefs and keep the Entries
entries = [ item for item in resp if item['type'] == 'searchResEntry' ]
logging.debug('Total records returned %d' % len(entries))

for item in entries:
mustCommit = False
sAMAccountName = ''
memberOf = ''
SPNs = []
pwdLastSet = ''
userAccountControl = 0
lastLogon = 'N/A'
delegation = ''
try:
if item['attributes'].get('sAMAccountName', None) is None:
continue
else:
fd = None

for user, SPN in users.items():
sAMAccountName = user
downLevelLogonName = self.__targetDomain + "\\" + sAMAccountName

try:
principalName = Principal()
principalName.type = constants.PrincipalNameType.NT_MS_PRINCIPAL.value
principalName.components = [downLevelLogonName]

tgs, cipher, oldSessionKey, sessionKey = getKerberosTGS(principalName, self.__domain,
self.__kdcIP,
TGT['KDC_REP'], TGT['cipher'],
TGT['sessionKey'])
self.outputTGS(tgs, oldSessionKey, sessionKey, sAMAccountName,
self.__targetDomain + "/" + sAMAccountName, fd)
except Exception as e:
logging.debug("Exception:", exc_info=True)
logging.error('Principal: %s - %s' % (downLevelLogonName, str(e)))
sAMAccountName = item['attributes']['sAMAccountName']
mustCommit = True

userAccountControl = item['attributes']['userAccountControl']
if item['attributes']['userAccountControl'] & UF_TRUSTED_FOR_DELEGATION:
delegation = 'unconstrained'
elif int(userAccountControl) & UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION:
delegation = 'constrained'

# if the SPN is not a member of a group, this will throw an error
# so I just replace it with a string containing '(null)' because
# this does not get used in the SPN request anyways
memberOf = next(iter(item['attributes']['memberOf']), '(null)')
pwdLastSet = str(item['attributes']['pwdLastSet'])
lastLogon = str(item['attributes']['lastLogon'])
for spn in item['attributes']['servicePrincipalName']:
SPNs.append(spn)

if fd is not None:
fd.close()

else:
print("No entries found!")
if mustCommit is True:
if userAccountControl & UF_ACCOUNTDISABLE:
logging.debug('Bypassing disabled account %s ' % sAMAccountName)
else:
for spn in SPNs:
answers.append([spn, sAMAccountName, memberOf, pwdLastSet, lastLogon, delegation])
except Exception as e:
logging.error('Skipping item, cannot process due to error %s' % str(e))
return answers

def request_users_file_TGSs(self):

Expand Down Expand Up @@ -510,6 +623,7 @@ def request_multiple_TGSs(self, usernames):

group.add_argument('-hashes', action="store", metavar="LMHASH:NTHASH", help='NTLM hashes, format is LMHASH:NTHASH')
group.add_argument('-no-pass', action="store_true", help='don\'t ask for password (useful for -k)')
group.add_argument('-ldap-channel-binding', action="store_true", help='Use LDAP channel binding')
group.add_argument('-k', action="store_true",
help='Use Kerberos authentication. Grabs credentials from ccache file '
'(KRB5CCNAME) based on target parameters. If valid credentials '
Expand Down
Loading