Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New LDAP Flag Find Delegation #381

Merged
merged 19 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions nxc/parsers/ldap_results.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from impacket.ldap import ldapasn1 as ldapasn1_impacket


def parse_result_attributes(ldap_response):
parsed_response = []
for entry in ldap_response:
Expand All @@ -8,7 +9,15 @@ def parse_result_attributes(ldap_response):
continue
attribute_map = {}
for attribute in entry["attributes"]:
val = [str(val).encode(val.encoding).decode("utf-8") for val in attribute["vals"].components]
attribute_map[str(attribute["type"])] = val if len(val) > 1 else val[0]
val_list = []
for val in attribute["vals"].components:
try:
encoding = val.encoding
val_decoded = str(val).encode(encoding).decode("utf-8")
except UnicodeDecodeError:
# If we can't decode the value, we'll just return the bytes
val_decoded = val.__bytes__()
val_list.append(val_decoded)
attribute_map[str(attribute["type"])] = val_list if len(val_list) > 1 else val_list[0]
parsed_response.append(attribute_map)
return parsed_response
return parsed_response
116 changes: 116 additions & 0 deletions nxc/protocols/ldap.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from impacket.krb5.kerberosv5 import getKerberosTGS, SessionKeyDecryptionError
from impacket.krb5.types import Principal, KerberosException
from impacket.ldap import ldap as ldap_impacket
from impacket.ldap import ldaptypes
from impacket.ldap import ldapasn1 as ldapasn1_impacket
from impacket.ldap.ldap import LDAPFilterSyntaxError
from impacket.smb import SMB_DIALECT
Expand Down Expand Up @@ -1085,6 +1086,121 @@ def query(self):
vals = vals.replace("SetOf: ", "")
self.logger.highlight(f"{attr:<20} {vals}")

def find_delegation(self):
# Constants for delegation types
UF_TRUSTED_FOR_DELEGATION = 0x80000
UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION = 0x1000000
UF_ACCOUNTDISABLE = 0x2
SERVER_TRUST_ACCOUNT = 0x2000

def printTable(items, header):
colLen = []

# Calculating maximum lenght before parsing CN.
for i, col in enumerate(header):
rowMaxLen = max(len(row[1].split(",")[0].split("CN=")[-1]) for row in items) if i == 1 else max(len(str(row[i])) for row in items)
colLen.append(max(rowMaxLen, len(col)))

# Create the format string for each row
outputFormat = " ".join([f"{{{num}:{width}s}}" for num, width in enumerate(colLen)])

# Print header
self.logger.highlight(outputFormat.format(*header))
self.logger.highlight(" ".join(["-" * itemLen for itemLen in colLen]))

# Print rows
for row in items:
# Get first CN value.
if "CN=" in row[1]:
row[1] = row[1].split(",")[0].split("CN=")[-1]

# Added join for DelegationRightsTo
row[3] = ", ".join(str(x) for x in row[3]) if isinstance(row[3], list) else row[3]

self.logger.highlight(outputFormat.format(*row))

# Building the search filter
search_filter = (f"(&(|(UserAccountControl:1.2.840.113556.1.4.803:={UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION})"
f"(UserAccountControl:1.2.840.113556.1.4.803:={UF_TRUSTED_FOR_DELEGATION})"
"(msDS-AllowedToDelegateTo=*)(msDS-AllowedToActOnBehalfOfOtherIdentity=*))"
f"(!(UserAccountControl:1.2.840.113556.1.4.803:={UF_ACCOUNTDISABLE}))"
f"(!(UserAccountControl:1.2.840.113556.1.4.803:={SERVER_TRUST_ACCOUNT})))")
termanix marked this conversation as resolved.
Show resolved Hide resolved

attributes = ["sAMAccountName", "pwdLastSet", "userAccountControl", "objectCategory",
"msDS-AllowedToActOnBehalfOfOtherIdentity", "msDS-AllowedToDelegateTo"]

resp = self.search(search_filter, attributes, 0)
answers = []
self.logger.debug(f"Total of records returned {len(resp):d}")
resp_parse = parse_result_attributes(resp)

for item in resp_parse:
mustCommit = False
sAMAccountName = ""
userAccountControl = 0
delegation = ""
objectType = ""
rightsTo = []
protocolTransition = 0

try:
sAMAccountName = item.get("sAMAccountName")
mustCommit = sAMAccountName is not None
termanix marked this conversation as resolved.
Show resolved Hide resolved

userAccountControl = int(item.get("userAccountControl", 0))
objectType = item.get("objectCategory")

if userAccountControl & UF_TRUSTED_FOR_DELEGATION:
delegation = "Unconstrained"
rightsTo.append("N/A")
elif userAccountControl & UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION:
delegation = "Constrained w/ Protocol Transition"
protocolTransition = 1

if item.get("msDS-AllowedToDelegateTo") is not None:
if protocolTransition == 0:
delegation = "Constrained"
rightsTo = item.get("msDS-AllowedToDelegateTo")

# Not an elif as an object could both have RBCD and another type of delegation
if item.get("msDS-AllowedToActOnBehalfOfOtherIdentity") is not None:
databyte = item.get("msDS-AllowedToActOnBehalfOfOtherIdentity")
rbcdRights = []
rbcdObjType = []
sd = ldaptypes.SR_SECURITY_DESCRIPTOR(data=bytes(databyte))
if len(sd["Dacl"].aces) > 0:
search_filter = "(&(|"
for ace in sd["Dacl"].aces:
search_filter += "(objectSid=" + ace["Ace"]["Sid"].formatCanonical() + ")"
search_filter += ")(!(UserAccountControl:1.2.840.113556.1.4.803:=2)))"
delegUserResp = self.search(search_filter, attributes=["sAMAccountName", "objectCategory"], sizeLimit=999)
delegUserResp_parse = parse_result_attributes(delegUserResp)

for rbcd in delegUserResp_parse:
rbcdRights.append(str(rbcd.get("sAMAccountName")))
rbcdObjType.append(str(rbcd.get("objectCategory")))

if mustCommit:
if int(userAccountControl) & UF_ACCOUNTDISABLE:
self.logger.debug(f"Bypassing disabled account {sAMAccountName}")
else:
for rights, objType in zip(rbcdRights, rbcdObjType):
answers.append([rights, objType, "Resource-Based Constrained", sAMAccountName])

if delegation in ["Unconstrained", "Constrained", "Constrained w/ Protocol Transition"] and mustCommit:
if int(userAccountControl) & UF_ACCOUNTDISABLE:
self.logger.debug(f"Bypassing disabled account {sAMAccountName}")
else:
answers.append([sAMAccountName, objectType, delegation, rightsTo])

except Exception as e:
self.logger.error(f"Skipping item, cannot process due to error {e}")

if answers:
printTable(answers, header=["AccountName", "AccountType", "DelegationType", "DelegationRightsTo"])
else:
self.logger.fail("No entries found!")

def trusted_for_delegation(self):
# Building the search filter
searchFilter = "(userAccountControl:1.2.840.113556.1.4.803:=524288)"
Expand Down
1 change: 1 addition & 0 deletions nxc/protocols/ldap/proto_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def proto_args(parser, parents):

vgroup = ldap_parser.add_argument_group("Retrieve useful information on the domain", "Options to to play with Kerberos")
vgroup.add_argument("--query", nargs=2, help="Query LDAP with a custom filter and attributes")
vgroup.add_argument("--find-delegation", action="store_true", help="Finds delegation relationships within an Active Directory domain.")
vgroup.add_argument("--trusted-for-delegation", action="store_true", help="Get the list of users and computers with flag TRUSTED_FOR_DELEGATION")
vgroup.add_argument("--password-not-required", action="store_true", help="Get the list of users with flag PASSWD_NOTREQD")
vgroup.add_argument("--admin-count", action="store_true", help="Get objets that had the value adminCount=1")
Expand Down