From: Frank Brehm Date: Wed, 7 Sep 2022 12:34:13 +0000 (+0200) Subject: Extending LDAP search methods X-Git-Tag: 0.5.0^2~2^2~11 X-Git-Url: https://git.uhu-banane.de/?a=commitdiff_plain;h=afdc8eb559f530d8f6c72e2eb8e26b2243e11be9;p=pixelpark%2Fpp-admin-tools.git Extending LDAP search methods --- diff --git a/lib/pp_admintools/app/ldap.py b/lib/pp_admintools/app/ldap.py index 6be0bbb..12c3219 100644 --- a/lib/pp_admintools/app/ldap.py +++ b/lib/pp_admintools/app/ldap.py @@ -21,17 +21,19 @@ except ImportError: # Third party modules from ldap3 import Server, Connection, DSA, IP_V4_PREFERRED, SAFE_SYNC # from ldap3 import ALL -from ldap3 import SUBTREE +from ldap3 import BASE, SUBTREE # from ldap3 import BASE, LEVEL, SUBTREE, DEREF_NEVER, DEREF_SEARCH, DEREF_BASE, DEREF_ALWAYS +from ldap3 import ALL_ATTRIBUTES # from ldap3 import ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES # from ldap3 import MODIFY_ADD, MODIFY_DELETE, MODIFY_REPLACE # from ldap3.core.exceptions import LDAPInvalidDnError, LDAPInvalidValueError # from ldap3.core.exceptions import LDAPException, LDAPBindError -from fb_tools.common import pp, to_bool +from fb_tools.common import pp, to_bool, is_sequence from fb_tools.cfg_app import FbConfigApplication from fb_tools.errors import FbAppError from fb_tools.mailaddress import MailAddress +from fb_tools.collections import FrozenCIStringSet, CIStringSet # Own modules from .. import __version__ as GLOBAL_VERSION @@ -47,7 +49,7 @@ from ..config.ldap import LdapConnectionInfo, LdapConfiguration # rom ..config.ldap import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS from ..config.ldap import DEFAULT_TIMEOUT, MAX_TIMEOUT -__version__ = '0.4.0' +__version__ = '0.4.1' LOG = logging.getLogger(__name__) _ = XLATOR.gettext @@ -173,6 +175,9 @@ class BaseLdapApplication(FbConfigApplication): pattern_re_uid = r'^[a-z](?:[a-z0-9\-_.]*[a-z0-9])?$' re_uid = re.compile(pattern_re_uid, re.IGNORECASE) + person_object_classes = FrozenCIStringSet([ + 'account', 'inetOrgPerson', 'inetUser', 'posixAccount', 'sambaSamAccount']) + # ------------------------------------------------------------------------- def __init__( self, appname=None, verbose=0, version=GLOBAL_VERSION, base_dir=None, @@ -619,7 +624,6 @@ class BaseLdapApplication(FbConfigApplication): def get_user_dn(self, user, inst): connect_info = self.cfg.ldap_connection[inst] - base_dn = connect_info.base_dn if self.verbose > 1: msg = _("Trying to evaluate DN of user {user!r} in LDAP instance {inst} ...").format( @@ -629,18 +633,24 @@ class BaseLdapApplication(FbConfigApplication): if MailAddress.valid_address(user, verbose=self.verbose): msg = _("Trying to evaluate DN of user {u!r} as a mail address ...").format(u=user) LOG.debug(msg) - return self.get_user_dn_by_mail(user, inst) + dns = self.get_user_dn_by_mail(user, inst) + if dns: + return dns if self.re_ldap_dn.match(user): msg = _("Trying to evaluate DN of user {u!r} as a LDAP DN ...").format(u=user) LOG.debug(msg) - return [user] + dns = self.get_user_dn_by_dn(user, inst) + if dns: + return dns if self.re_uid.match(user): msg = _("Trying to evaluate DN of user {u!r} as a UID (Posix user name) ...").format( u=user) LOG.debug(msg) - return self.get_user_dn_by_uid(user, inst) + dns = self.get_user_dn_by_uid(user, inst) + if dns: + return dns usr = user.strip() if usr == '': @@ -650,7 +660,7 @@ class BaseLdapApplication(FbConfigApplication): msg = _("Trying to evaluate DN of user {u!r} as a CN ({c}) ...").format( u=usr, c='common name') LOG.debug(msg) - return 'cn={u},{b}'.format(u=usr, b=base_dn) + return self.get_user_dn_by_cn(user, inst) # ------------------------------------------------------------------------- def get_user_dn_by_mail(self, mail, inst): @@ -758,6 +768,144 @@ class BaseLdapApplication(FbConfigApplication): return result + # ------------------------------------------------------------------------- + def get_user_dn_by_cn(self, cn, inst): + + connect_info = self.cfg.ldap_connection[inst] + base_dn = connect_info.base_dn + ldap = self.ldap_connection[inst] + + result = [] + + attributes = ['dn'] + + ldap_filter = '(&' + ldap_filter += '(|' + ldap_filter += '(objectClass=account)' + ldap_filter += '(objectClass=inetOrgPerson)' + ldap_filter += '(objectClass=inetUser)' + ldap_filter += '(objectClass=mailRecipient)' + ldap_filter += '(objectClass=posixAccount)' + ldap_filter += '(objectClass=sambaSamAccount)' + ldap_filter += '(objectClass=uidObject)' + ldap_filter += ')' + ldap_filter += '(cn={})'.format(cn) + ldap_filter += ')' + + if self.verbose > 1: + msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format( + uri=connect_info.url, bdn=base_dn, fltr=ldap_filter) + LOG.debug(msg) + + req_status, req_result, req_response, req_whatever = ldap.search( + search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter, + get_operational_attributes=False, attributes=attributes, + time_limit=self.cfg.ldap_timeout) + + if req_status: + if self.verbose > 4: + msg = _("Result of searching for CN {cn!r}:").format(cn=cn) + LOG.debug(msg + ' ' + pp(req_result)) + for entry in req_response: + if self.verbose > 4: + LOG.debug(_("Got a response entry:") + ' ' + pp(entry)) + result.append(entry['dn']) + if self.verbose > 3: + LOG.debug(_("Result:") + ' ' + pp(result)) + + else: + if self.verbose > 3: + msg = _("User with cn {cn!r} not found in {uri}/{bdn}.").format( + cn=cn, uri=connect_info.url, bdn=base_dn) + LOG.debug(msg) + + return result + + # ------------------------------------------------------------------------- + def get_user_dn_by_dn(self, dn, inst, strict=True): + + connect_info = self.cfg.ldap_connection[inst] + + attributes = ['objectClass'] + + entry = self.get_entry(dn, inst, attributes=attributes) + + if not entry: + if self.verbose > 3: + msg = _("User with DN {dn!r} not found in {uri}.").format( + dn=dn, uri=connect_info.url) + LOG.debug(msg) + return None + + found_dn = entry['dn'] + + object_classes = CIStringSet() + for attr in entry['attributes']: + if attr.lower() == 'objectclass': + values = entry['attributes'][attr] + if is_sequence(values): + for val in values: + object_classes.add(val) + else: + object_classes.add(values) + + if self.verbose > 3: + msg = _("ObjectClasses of {dn!r}:").format(dn=found_dn) + LOG.debug(msg + '\n' + pp(object_classes.as_list())) + + is_person = False + for oclass in object_classes: + if oclass in self.person_object_classes: + is_person = True + break + + if not is_person: + msg = _("Entry {dn!r} in {uri} seems not to be an account.").format( + dn=found_dn, uri=connect_info.url) + LOG.warn(msg) + if strict: + return None + + return [found_dn] + + # ------------------------------------------------------------------------- + def get_entry(self, dn, inst, attributes=None, operational_attributes=False): + + connect_info = self.cfg.ldap_connection[inst] + ldap = self.ldap_connection[inst] + + if attributes is None: + attributes = [ALL_ATTRIBUTES] + + sfilter = '(objectClass=*)' + + result = None + + if self.verbose > 1: + msg = _("Searching DN {dn!r} in {uri}.").format(dn=dn, uri=connect_info.url) + LOG.debug(msg) + + req_status, req_result, req_response, req_whatever = ldap.search( + search_base=dn, search_scope=BASE, attributes=attributes, + get_operational_attributes=operational_attributes, search_filter=sfilter, + time_limit=self.cfg.ldap_timeout) + + if req_status: + if self.verbose > 4: + msg = _("Result of searching for DN {dn!r}:").format(dn=dn) + LOG.debug(msg + ' ' + pp(req_result)) + result = req_response[0] + if self.verbose > 3: + LOG.debug(_("Got a response entry:") + ' ' + pp(result)) + + else: + if self.verbose > 3: + msg = _("Entry with DN {dn!r} not found in {uri}.").format( + dn=dn, uri=connect_info.url) + LOG.debug(msg) + + return result + # ============================================================================= if __name__ == "__main__":