Skip to content

Commit

Permalink
Update scripts to include posixGroups
Browse files Browse the repository at this point in the history
  • Loading branch information
waTeim committed Oct 28, 2024
1 parent 07b5e38 commit 3cc076c
Show file tree
Hide file tree
Showing 3 changed files with 436 additions and 211 deletions.
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"makefile.configureOnOpen": false
}
242 changes: 168 additions & 74 deletions scripts/get_ldap_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,97 +23,191 @@ def load_ldap_config(config_file="helx_ldap_config.yaml"):
return yaml.safe_load(file)
return None

def fetch_user_details(ldap_server_url, bind_dn, bind_password, search_base, group_base):
def connect_to_ldap(ldap_server_url, bind_dn, bind_password):
"""
Establishes a connection to the LDAP server.
Args:
ldap_server_url (str): LDAP server URL.
bind_dn (str): Bind DN for LDAP authentication.
bind_password (str): Password for Bind DN.
Returns:
ldap3.Connection: An active LDAP connection.
"""
parsed_url = urlparse(ldap_server_url)
host = parsed_url.hostname
port = parsed_url.port if parsed_url.port else (636 if parsed_url.scheme == 'ldaps' else 389)
use_ssl = parsed_url.scheme == 'ldaps'

# Initialize and bind to the LDAP server
server = Server(host, port=port, use_ssl=use_ssl, get_info=ALL)
conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True)
return conn

def fetch_all_users(conn, search_base):
"""
Fetches all user entries from the LDAP directory.
Args:
conn (ldap3.Connection): An active LDAP connection.
search_base (str): The base DN for searching user entries.
Returns:
list: A list of ldap3.Entry objects representing user entries.
"""
search_filter = '(|(objectClass=inetOrgPerson)(objectClass=posixAccount))'
retrieve_attributes = [
'uid', 'cn', 'sn', 'mail', 'telephoneNumber',
'givenName', 'displayName', 'o', 'ou',
'runAsUser', 'runAsGroup', 'fsGroup', 'supplementalGroups',
'uidNumber', 'gidNumber', 'homeDirectory', 'loginShell'
]

conn.search(search_base, search_filter, search_scope=SUBTREE, attributes=retrieve_attributes)
return conn.entries

def fetch_posix_groups(conn, group_base):
"""
Fetches all posixGroup entries and builds a mapping of uid to posixGroups.
Args:
conn (ldap3.Connection): An active LDAP connection.
group_base (str): The base DN for searching group entries.
Returns:
dict: A mapping of user uid to a list of posixGroup names.
"""
Fetches user details from the LDAP server and processes their attributes.
conn.search(
search_base=group_base,
search_filter='(objectClass=posixGroup)',
search_scope=SUBTREE,
attributes=['cn', 'memberUid']
)
posix_group_entries = conn.entries

The function binds to an LDAP server and retrieves user entries based on
the specified search base and filter. It also checks group memberships for
each user and processes attributes like `cn`, `mail`, `telephoneNumber`, etc.,
including posixAccount attributes.
uid_to_posix_groups = {}
for group_entry in posix_group_entries:
group_name = group_entry.cn.value
member_uids = group_entry.memberUid.values if 'memberUid' in group_entry else []
for uid in member_uids:
uid_to_posix_groups.setdefault(uid, []).append(group_name)
return uid_to_posix_groups

def fetch_group_of_names(conn, group_base):
"""
Fetches all groupOfNames entries and builds a mapping of user_dn to groups.
Args:
ldap_server_url (str): The URL of the LDAP server (e.g., ldap://localhost).
bind_dn (str): The distinguished name (DN) used for binding to the LDAP server.
conn (ldap3.Connection): An active LDAP connection.
group_base (str): The base DN for searching group entries.
Returns:
dict: A mapping of user DN to a list of group names.
"""
conn.search(
search_base=group_base,
search_filter='(objectClass=groupOfNames)',
search_scope=SUBTREE,
attributes=['cn', 'member']
)
group_of_names_entries = conn.entries

dn_to_groups = {}
for group_entry in group_of_names_entries:
group_name = group_entry.cn.value
members = group_entry.member.values if 'member' in group_entry else []
for dn in members:
dn_to_groups.setdefault(dn, []).append(group_name)
return dn_to_groups

def process_user_entries(user_entries, uid_to_posix_groups, dn_to_groups):
"""
Processes user entries and assigns group memberships.
Args:
user_entries (list): A list of ldap3.Entry objects representing user entries.
uid_to_posix_groups (dict): Mapping of uid to posixGroup names.
dn_to_groups (dict): Mapping of user DN to group names.
Returns:
list: A list of dictionaries containing processed user details.
"""
retrieve_attributes = [
'uid', 'cn', 'sn', 'mail', 'telephoneNumber',
'givenName', 'displayName', 'o', 'ou',
'runAsUser', 'runAsGroup', 'fsGroup', 'supplementalGroups',
'uidNumber', 'gidNumber', 'homeDirectory', 'loginShell'
]

result_set = []
for entry in user_entries:
entry_dict = entry.entry_attributes_as_dict
processed_entry = {}

# Process each attribute and handle missing attributes
for attr in retrieve_attributes:
if attr in entry_dict and entry_dict[attr]:
if attr in ['runAsUser', 'runAsGroup', 'fsGroup', 'uidNumber', 'gidNumber']:
processed_entry[attr] = int(entry_dict[attr][0])
elif attr == 'supplementalGroups':
processed_entry[attr] = [int(x) for x in entry_dict[attr]]
else:
processed_entry[attr] = entry_dict[attr][0]
else:
# Assign default values for certain attributes
if attr in ['uidNumber', 'gidNumber']:
processed_entry[attr] = None
elif attr == 'supplementalGroups':
processed_entry[attr] = []
else:
processed_entry[attr] = ""

# Assign posixGroup memberships
uid = processed_entry['uid']
posix_groups = uid_to_posix_groups.get(uid, [])
processed_entry['posixGroups'] = posix_groups

# Assign groupOfNames memberships
user_dn = entry.entry_dn
groups = dn_to_groups.get(user_dn, [])
processed_entry['groups'] = groups

result_set.append(processed_entry)
return result_set

def fetch_user_details(ldap_server_url, bind_dn, bind_password, search_base, group_base):
"""
Orchestrates the fetching and processing of user details.
Args:
ldap_server_url (str): The URL of the LDAP server.
bind_dn (str): The distinguished name used for binding to the LDAP server.
bind_password (str): The password used for the bind DN.
search_base (str): The base DN for searching user entries.
group_base (str): The base DN for searching group memberships.
group_base (str): The base DN for searching group entries.
Returns:
list: A list of dictionaries, each containing user details and their group memberships.
"""
conn = None
try:
parsed_url = urlparse(ldap_server_url)
host = parsed_url.hostname
port = parsed_url.port if parsed_url.port else (636 if parsed_url.scheme == 'ldaps' else 389)
use_ssl = parsed_url.scheme == 'ldaps'

# Initialize and bind to the LDAP server
server = Server(host, port=port, use_ssl=use_ssl, get_info=ALL)
conn = Connection(server, user=bind_dn, password=bind_password, auto_bind=True)

# Check if the search base exists
base_check = conn.search(search_base, '(objectClass=*)', search_scope=SUBTREE, attributes=[])
if not base_check:
print(f"Search base '{search_base}' does not exist.")
return []

# Search for user entries
search_filter = '(|(objectClass=inetOrgPerson)(objectClass=posixAccount))'
retrieve_attributes = [
'uid', 'cn', 'sn', 'mail', 'telephoneNumber',
'givenName', 'displayName', 'o', 'ou',
'runAsUser', 'runAsGroup', 'fsGroup', 'supplementalGroups',
'uidNumber', 'gidNumber', 'homeDirectory', 'loginShell'
]
conn = connect_to_ldap(ldap_server_url, bind_dn, bind_password)

conn.search(search_base, search_filter, search_scope=SUBTREE, attributes=retrieve_attributes)

# Return an empty list if no users are found
if len(conn.entries) == 0:
# Fetch user entries
user_entries = fetch_all_users(conn, search_base)
if not user_entries:
print(f"No users found in search base: {search_base}")
return []

result_set = []
for entry in conn.entries:
entry_dict = entry.entry_attributes_as_dict
processed_entry = {}

# Process each attribute and handle missing attributes
for attr in retrieve_attributes:
if attr in entry_dict and entry_dict[attr]:
if attr in ['runAsUser', 'runAsGroup', 'fsGroup', 'uidNumber', 'gidNumber']:
processed_entry[attr] = int(entry_dict[attr][0])
elif attr == 'supplementalGroups':
processed_entry[attr] = [int(x) for x in entry_dict[attr]]
else:
processed_entry[attr] = entry_dict[attr][0]
else:
# Assign default values for certain attributes
if attr in ['uidNumber', 'gidNumber']:
processed_entry[attr] = None
elif attr == 'supplementalGroups':
processed_entry[attr] = []
else:
processed_entry[attr] = ""

# Fetch group memberships for the user
user_dn = entry.entry_dn
conn.search(
search_base=group_base,
search_filter=f'(&(objectClass=groupOfNames)(member={user_dn}))',
search_scope=SUBTREE,
attributes=['cn']
)

# Process group memberships
group_names = [g_entry.cn.value for g_entry in conn.entries] if conn.entries else []
processed_entry['groups'] = group_names

result_set.append(processed_entry)
# Fetch group memberships
uid_to_posix_groups = fetch_posix_groups(conn, group_base)
dn_to_groups = fetch_group_of_names(conn, group_base)

# Process user entries
result_set = process_user_entries(user_entries, uid_to_posix_groups, dn_to_groups)
return result_set

except Exception as e:
print(f"LDAP error: {e}")
traceback.print_exc()
Expand Down Expand Up @@ -178,7 +272,7 @@ def main():
for user in users:
print("User Details:")
for key, value in user.items():
if key == 'groups':
if key in ['groups', 'posixGroups']:
print(f"{key}: {', '.join(value)}")
else:
print(f"{key}: {value}")
Expand Down
Loading

0 comments on commit 3cc076c

Please sign in to comment.