]> Frank Brehm's Git Trees - pixelpark/pp-admin-tools.git/commitdiff
Extending LDAP search methods
authorFrank Brehm <frank.brehm@pixelpark.com>
Wed, 7 Sep 2022 11:24:12 +0000 (13:24 +0200)
committerFrank Brehm <frank.brehm@pixelpark.com>
Wed, 7 Sep 2022 11:24:12 +0000 (13:24 +0200)
lib/pp_admintools/app/ldap.py
lib/pp_admintools/app/remove_ldap_user.py

index 289ee5d468f5cc3ef618607592133a525009301a..6be0bbb2c74b2bcd9355694e4818eb4e74fa75f6 100644 (file)
@@ -21,6 +21,7 @@ except ImportError:
 # Third party modules
 from ldap3 import Server, Connection, DSA, IP_V4_PREFERRED, SAFE_SYNC
 # from ldap3 import ALL
+from ldap3 import SUBTREE
 # from ldap3 import BASE, LEVEL, SUBTREE, DEREF_NEVER, DEREF_SEARCH, DEREF_BASE, DEREF_ALWAYS
 # from ldap3 import ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES
 # from ldap3 import MODIFY_ADD, MODIFY_DELETE, MODIFY_REPLACE
@@ -46,7 +47,7 @@ from ..config.ldap import LdapConnectionInfo, LdapConfiguration
 # rom ..config.ldap import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS
 from ..config.ldap import DEFAULT_TIMEOUT, MAX_TIMEOUT
 
-__version__ = '0.3.3'
+__version__ = '0.4.0'
 LOG = logging.getLogger(__name__)
 
 _ = XLATOR.gettext
@@ -628,18 +629,18 @@ class BaseLdapApplication(FbConfigApplication):
         if MailAddress.valid_address(user, verbose=self.verbose):
             msg = _("Trying to evaluate DN of user {u!r} as a mail address ...").format(u=user)
             LOG.debug(msg)
-            return 'mail={u},{b}'.format(u=user, b=base_dn)
+            return self.get_user_dn_by_mail(user, inst)
 
         if self.re_ldap_dn.match(user):
             msg = _("Trying to evaluate DN of user {u!r} as a LDAP DN ...").format(u=user)
             LOG.debug(msg)
-            return user
+            return [user]
 
         if self.re_uid.match(user):
             msg = _("Trying to evaluate DN of user {u!r} as a UID (Posix user name) ...").format(
                 u=user)
             LOG.debug(msg)
-            return 'uid={u},{b}'.format(u=user, b=base_dn)
+            return self.get_user_dn_by_uid(user, inst)
 
         usr = user.strip()
         if usr == '':
@@ -651,6 +652,112 @@ class BaseLdapApplication(FbConfigApplication):
         LOG.debug(msg)
         return 'cn={u},{b}'.format(u=usr, b=base_dn)
 
+    # -------------------------------------------------------------------------
+    def get_user_dn_by_mail(self, mail, inst):
+
+        connect_info = self.cfg.ldap_connection[inst]
+        base_dn = connect_info.base_dn
+        ldap = self.ldap_connection[inst]
+
+        result = []
+
+        attributes = ['dn']
+
+        ldap_filter = '(&'
+        ldap_filter += '(|'
+        ldap_filter += '(objectClass=inetLocalMailRecipient)'
+        ldap_filter += '(objectClass=inetOrgPerson)'
+        ldap_filter += '(objectClass=inetMailUser)'
+        ldap_filter += ')'
+        ldap_filter += '(|'
+        ldap_filter += '(mail={})'.format(mail)
+        ldap_filter += '(mailAlternateAddress={})'.format(mail)
+        ldap_filter += '(mailEquivalentAddress={})'.format(mail)
+        ldap_filter += ')'
+        ldap_filter += ')'
+
+        if self.verbose > 1:
+            msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format(
+                uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
+            LOG.debug(msg)
+
+        req_status, req_result, req_response, req_whatever = ldap.search(
+            search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter,
+            get_operational_attributes=False, attributes=attributes,
+            time_limit=self.cfg.ldap_timeout)
+
+        if req_status:
+            if self.verbose > 4:
+                msg = _("Result of searching for mail address {m!r}:").format(m=mail)
+                LOG.debug(msg + ' ' + pp(req_result))
+            for entry in req_response:
+                if self.verbose > 4:
+                    LOG.debug(_("Got a response entry:") + ' ' + pp(entry))
+                result.append(entry['dn'])
+            if self.verbose > 3:
+                LOG.debug(_("Result:") + ' ' + pp(result))
+
+        else:
+            if self.verbose > 3:
+                msg = _("User with mail address {m!r} not found in {uri}/{bdn}.").format(
+                    m=mail, uri=connect_info.url, bdn=base_dn)
+                LOG.debug(msg)
+
+        return result
+
+    # -------------------------------------------------------------------------
+    def get_user_dn_by_uid(self, uid, inst):
+
+        connect_info = self.cfg.ldap_connection[inst]
+        base_dn = connect_info.base_dn
+        ldap = self.ldap_connection[inst]
+
+        result = []
+
+        attributes = ['dn']
+
+        ldap_filter = '(&'
+        ldap_filter += '(|'
+        ldap_filter += '(objectClass=account)'
+        ldap_filter += '(objectClass=inetOrgPerson)'
+        ldap_filter += '(objectClass=inetUser)'
+        ldap_filter += '(objectClass=mailRecipient)'
+        ldap_filter += '(objectClass=posixAccount)'
+        ldap_filter += '(objectClass=sambaSamAccount)'
+        ldap_filter += '(objectClass=uidObject)'
+        ldap_filter += ')'
+        ldap_filter += '(uid={})'.format(uid)
+        ldap_filter += ')'
+
+        if self.verbose > 1:
+            msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format(
+                uri=connect_info.url, bdn=base_dn, fltr=ldap_filter)
+            LOG.debug(msg)
+
+        req_status, req_result, req_response, req_whatever = ldap.search(
+            search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter,
+            get_operational_attributes=False, attributes=attributes,
+            time_limit=self.cfg.ldap_timeout)
+
+        if req_status:
+            if self.verbose > 4:
+                msg = _("Result of searching for uid {u!r}:").format(u=uid)
+                LOG.debug(msg + ' ' + pp(req_result))
+            for entry in req_response:
+                if self.verbose > 4:
+                    LOG.debug(_("Got a response entry:") + ' ' + pp(entry))
+                result.append(entry['dn'])
+            if self.verbose > 3:
+                LOG.debug(_("Result:") + ' ' + pp(result))
+
+        else:
+            if self.verbose > 3:
+                msg = _("User with uid {u!r} not found in {uri}/{bdn}.").format(
+                    u=uid, uri=connect_info.url, bdn=base_dn)
+                LOG.debug(msg)
+
+        return result
+
 
 # =============================================================================
 if __name__ == "__main__":
index 1a1a5687f6cdcfc9b1aa663a1e05ad15fac72dbf..f593e5015261258a5da9a2c33df9fb6969929efa 100644 (file)
@@ -20,7 +20,7 @@ from ..xlate import XLATOR
 from ..app.ldap import LdapAppError
 from ..app.ldap import BaseLdapApplication
 
-__version__ = '0.3.0'
+__version__ = '0.3.1'
 LOG = logging.getLogger(__name__)
 
 _ = XLATOR.gettext
@@ -171,24 +171,27 @@ class RemoveLdapUserApplication(BaseLdapApplication):
 
         for inst in self.ldap_instances:
             connect_info = self.cfg.ldap_connection[inst]
-            dn = self.get_user_dn(usr, inst)
-            if dn:
+            dns = self.get_user_dn(usr, inst)
+            if dns:
+                if self.verbose > 2:
+                    msg = _("Got DN {dn!r} for user {user!r} in LDAP instance {inst}.").format(
+                        dn=dns, user=usr, inst=connect_info.url)
+                    LOG.debug(msg)
                 if inst not in self.dns:
                     self.dns[inst] = []
-                if is_sequence(dn):
-                    self.dns[inst] += dn
-                    if len(dn) > 1:
+                if is_sequence(dns):
+                    if len(dns) > 1:
                         msg = _("Found {nr} entries for user {u!r} in LDAP instance {i}.").format(
-                            nr=len(dn), u=usr, i=connect_info.url)
+                            nr=len(dns), u=usr, i=connect_info.url)
                         LOG.warn(msg)
                         self.wrong_users = True
                         return
+                    for dn in dns:
+                        if dn not in self.dns[inst]:
+                            self.dns[inst].append(dn)
                 else:
-                    self.dns[inst].append(dn)
-                if self.verbose > 2:
-                    msg = _("Got DN {dn!r} for user {user!r} in LDAP instance {inst}.").format(
-                        dn=dn, user=usr, inst=connect_info.url)
-                    LOG.debug(msg)
+                    if dns not in self.dns[inst]:
+                        self.dns[inst].append(dns)
             else:
                 msg = _("Did not found user {user!r} in LDAP instance {inst}.").format(
                     user=usr, inst=connect_info.url)