From: Frank Brehm Date: Wed, 7 Sep 2022 11:24:12 +0000 (+0200) Subject: Extending LDAP search methods X-Git-Tag: 0.5.0^2~2^2~12 X-Git-Url: https://git.uhu-banane.de/?a=commitdiff_plain;h=38656e205d6c5dc4eea6901042042463bd330cee;p=pixelpark%2Fpp-admin-tools.git Extending LDAP search methods --- diff --git a/lib/pp_admintools/app/ldap.py b/lib/pp_admintools/app/ldap.py index 289ee5d..6be0bbb 100644 --- a/lib/pp_admintools/app/ldap.py +++ b/lib/pp_admintools/app/ldap.py @@ -21,6 +21,7 @@ except ImportError: # Third party modules from ldap3 import Server, Connection, DSA, IP_V4_PREFERRED, SAFE_SYNC # from ldap3 import ALL +from ldap3 import SUBTREE # from ldap3 import BASE, LEVEL, SUBTREE, DEREF_NEVER, DEREF_SEARCH, DEREF_BASE, DEREF_ALWAYS # from ldap3 import ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES # from ldap3 import MODIFY_ADD, MODIFY_DELETE, MODIFY_REPLACE @@ -46,7 +47,7 @@ from ..config.ldap import LdapConnectionInfo, LdapConfiguration # rom ..config.ldap import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS from ..config.ldap import DEFAULT_TIMEOUT, MAX_TIMEOUT -__version__ = '0.3.3' +__version__ = '0.4.0' LOG = logging.getLogger(__name__) _ = XLATOR.gettext @@ -628,18 +629,18 @@ class BaseLdapApplication(FbConfigApplication): if MailAddress.valid_address(user, verbose=self.verbose): msg = _("Trying to evaluate DN of user {u!r} as a mail address ...").format(u=user) LOG.debug(msg) - return 'mail={u},{b}'.format(u=user, b=base_dn) + return self.get_user_dn_by_mail(user, inst) if self.re_ldap_dn.match(user): msg = _("Trying to evaluate DN of user {u!r} as a LDAP DN ...").format(u=user) LOG.debug(msg) - return user + return [user] if self.re_uid.match(user): msg = _("Trying to evaluate DN of user {u!r} as a UID (Posix user name) ...").format( u=user) LOG.debug(msg) - return 'uid={u},{b}'.format(u=user, b=base_dn) + return self.get_user_dn_by_uid(user, inst) usr = user.strip() if usr == '': @@ -651,6 +652,112 @@ class BaseLdapApplication(FbConfigApplication): LOG.debug(msg) return 'cn={u},{b}'.format(u=usr, b=base_dn) + # ------------------------------------------------------------------------- + def get_user_dn_by_mail(self, mail, inst): + + connect_info = self.cfg.ldap_connection[inst] + base_dn = connect_info.base_dn + ldap = self.ldap_connection[inst] + + result = [] + + attributes = ['dn'] + + ldap_filter = '(&' + ldap_filter += '(|' + ldap_filter += '(objectClass=inetLocalMailRecipient)' + ldap_filter += '(objectClass=inetOrgPerson)' + ldap_filter += '(objectClass=inetMailUser)' + ldap_filter += ')' + ldap_filter += '(|' + ldap_filter += '(mail={})'.format(mail) + ldap_filter += '(mailAlternateAddress={})'.format(mail) + ldap_filter += '(mailEquivalentAddress={})'.format(mail) + ldap_filter += ')' + ldap_filter += ')' + + if self.verbose > 1: + msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format( + uri=connect_info.url, bdn=base_dn, fltr=ldap_filter) + LOG.debug(msg) + + req_status, req_result, req_response, req_whatever = ldap.search( + search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter, + get_operational_attributes=False, attributes=attributes, + time_limit=self.cfg.ldap_timeout) + + if req_status: + if self.verbose > 4: + msg = _("Result of searching for mail address {m!r}:").format(m=mail) + LOG.debug(msg + ' ' + pp(req_result)) + for entry in req_response: + if self.verbose > 4: + LOG.debug(_("Got a response entry:") + ' ' + pp(entry)) + result.append(entry['dn']) + if self.verbose > 3: + LOG.debug(_("Result:") + ' ' + pp(result)) + + else: + if self.verbose > 3: + msg = _("User with mail address {m!r} not found in {uri}/{bdn}.").format( + m=mail, uri=connect_info.url, bdn=base_dn) + LOG.debug(msg) + + return result + + # ------------------------------------------------------------------------- + def get_user_dn_by_uid(self, uid, inst): + + connect_info = self.cfg.ldap_connection[inst] + base_dn = connect_info.base_dn + ldap = self.ldap_connection[inst] + + result = [] + + attributes = ['dn'] + + ldap_filter = '(&' + ldap_filter += '(|' + ldap_filter += '(objectClass=account)' + ldap_filter += '(objectClass=inetOrgPerson)' + ldap_filter += '(objectClass=inetUser)' + ldap_filter += '(objectClass=mailRecipient)' + ldap_filter += '(objectClass=posixAccount)' + ldap_filter += '(objectClass=sambaSamAccount)' + ldap_filter += '(objectClass=uidObject)' + ldap_filter += ')' + ldap_filter += '(uid={})'.format(uid) + ldap_filter += ')' + + if self.verbose > 1: + msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format( + uri=connect_info.url, bdn=base_dn, fltr=ldap_filter) + LOG.debug(msg) + + req_status, req_result, req_response, req_whatever = ldap.search( + search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter, + get_operational_attributes=False, attributes=attributes, + time_limit=self.cfg.ldap_timeout) + + if req_status: + if self.verbose > 4: + msg = _("Result of searching for uid {u!r}:").format(u=uid) + LOG.debug(msg + ' ' + pp(req_result)) + for entry in req_response: + if self.verbose > 4: + LOG.debug(_("Got a response entry:") + ' ' + pp(entry)) + result.append(entry['dn']) + if self.verbose > 3: + LOG.debug(_("Result:") + ' ' + pp(result)) + + else: + if self.verbose > 3: + msg = _("User with uid {u!r} not found in {uri}/{bdn}.").format( + u=uid, uri=connect_info.url, bdn=base_dn) + LOG.debug(msg) + + return result + # ============================================================================= if __name__ == "__main__": diff --git a/lib/pp_admintools/app/remove_ldap_user.py b/lib/pp_admintools/app/remove_ldap_user.py index 1a1a568..f593e50 100644 --- a/lib/pp_admintools/app/remove_ldap_user.py +++ b/lib/pp_admintools/app/remove_ldap_user.py @@ -20,7 +20,7 @@ from ..xlate import XLATOR from ..app.ldap import LdapAppError from ..app.ldap import BaseLdapApplication -__version__ = '0.3.0' +__version__ = '0.3.1' LOG = logging.getLogger(__name__) _ = XLATOR.gettext @@ -171,24 +171,27 @@ class RemoveLdapUserApplication(BaseLdapApplication): for inst in self.ldap_instances: connect_info = self.cfg.ldap_connection[inst] - dn = self.get_user_dn(usr, inst) - if dn: + dns = self.get_user_dn(usr, inst) + if dns: + if self.verbose > 2: + msg = _("Got DN {dn!r} for user {user!r} in LDAP instance {inst}.").format( + dn=dns, user=usr, inst=connect_info.url) + LOG.debug(msg) if inst not in self.dns: self.dns[inst] = [] - if is_sequence(dn): - self.dns[inst] += dn - if len(dn) > 1: + if is_sequence(dns): + if len(dns) > 1: msg = _("Found {nr} entries for user {u!r} in LDAP instance {i}.").format( - nr=len(dn), u=usr, i=connect_info.url) + nr=len(dns), u=usr, i=connect_info.url) LOG.warn(msg) self.wrong_users = True return + for dn in dns: + if dn not in self.dns[inst]: + self.dns[inst].append(dn) else: - self.dns[inst].append(dn) - if self.verbose > 2: - msg = _("Got DN {dn!r} for user {user!r} in LDAP instance {inst}.").format( - dn=dn, user=usr, inst=connect_info.url) - LOG.debug(msg) + if dns not in self.dns[inst]: + self.dns[inst].append(dns) else: msg = _("Did not found user {user!r} in LDAP instance {inst}.").format( user=usr, inst=connect_info.url)