From: Frank Brehm Date: Wed, 17 May 2023 14:14:19 +0000 (+0200) Subject: Make the linter so what happy X-Git-Tag: 0.9.0~1^2~49 X-Git-Url: https://git.uhu-banane.de/?a=commitdiff_plain;h=089532991e76887ee655ee8bafe2261a33cba010;p=pixelpark%2Fpp-admin-tools.git Make the linter so what happy --- diff --git a/lib/pp_admintools/app/ldap.py b/lib/pp_admintools/app/ldap.py index 1898190..3201748 100644 --- a/lib/pp_admintools/app/ldap.py +++ b/lib/pp_admintools/app/ldap.py @@ -1,60 +1,51 @@ # -*- coding: utf-8 -*- """ +@summary: A base module for application classes with LDAP support. + @author: Frank Brehm @contact: frank.brehm@pixelpark.com @copyright: © 2023 by Frank Brehm, Berlin -@summary: A base module for application classes with LDAP support """ from __future__ import absolute_import # Standard modules +import argparse import logging import os -import argparse import re - try: from pathlib import Path except ImportError: from pathlib2 import Path - from functools import cmp_to_key # Third party modules -from ldap3 import Server, Connection, DSA, IP_V4_PREFERRED, SAFE_SYNC +from fb_tools.argparse_actions import TimeoutOptionAction +from fb_tools.collections import CIDict, CIStringSet, FrozenCIStringSet +from fb_tools.common import is_sequence, to_bool +from fb_tools.mailaddress import MailAddress + # from ldap3 import ALL -from ldap3 import BASE, SUBTREE +# from ldap3 import ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES # from ldap3 import BASE, LEVEL, SUBTREE, DEREF_NEVER, DEREF_SEARCH, DEREF_BASE, DEREF_ALWAYS +# from ldap3.core.exceptions import LDAPInvalidDnError, LDAPInvalidValueError +# from ldap3.core.exceptions import LDAPException, LDAPBindError from ldap3 import ALL_ATTRIBUTES -# from ldap3 import ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES +from ldap3 import BASE, SUBTREE +from ldap3 import Connection, DSA, IP_V4_PREFERRED, SAFE_SYNC, Server from ldap3 import MODIFY_ADD, MODIFY_DELETE, MODIFY_REPLACE -# from ldap3.core.exceptions import LDAPInvalidDnError, LDAPInvalidValueError from ldap3.core.exceptions import LDAPException -# from ldap3.core.exceptions import LDAPException, LDAPBindError - -from fb_tools.common import is_sequence, to_bool -from fb_tools.argparse_actions import TimeoutOptionAction -from fb_tools.mailaddress import MailAddress -from fb_tools.collections import FrozenCIStringSet, CIStringSet, CIDict # Own modules +from . import BaseDPXApplication, DPXAppError +from .. import DEFAULT_CONFIG_DIR, MAX_PORT_NUMBER from .. import __version__ as GLOBAL_VERSION from .. import pp - -from ..xlate import XLATOR, format_list - -from .. import MAX_PORT_NUMBER, DEFAULT_CONFIG_DIR - -# from ..argparse_actions import PortOptionAction - -from . import DPXAppError, BaseDPXApplication - -# from ..config.ldap import LdapConfigError -from ..config.ldap import LdapConnectionInfo, LdapConfiguration -# rom ..config.ldap import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS from ..config.ldap import DEFAULT_TIMEOUT +from ..config.ldap import LdapConfiguration, LdapConnectionInfo +from ..xlate import XLATOR, format_list -__version__ = '0.11.4' +__version__ = '0.11.5' LOG = logging.getLogger(__name__) _ = XLATOR.gettext @@ -63,45 +54,52 @@ ngettext = XLATOR.ngettext # ============================================================================= class LdapAppError(DPXAppError): - """ Base exception class for all exceptions in all LDAP using application classes.""" + """Base exception class for all exceptions in all LDAP using application classes.""" + pass # ============================================================================= class FatalLDAPError(LdapAppError): """Fatal errors leading to interrupt the current application.""" + pass # ============================================================================= class LDAPExecutionError(FatalLDAPError): """Error class in case, a LDAP operation was not successful.""" + pass # ============================================================================= class WriteLDAPItemError(FatalLDAPError): """Error class in case, a LDAP item could not be written.""" + pass # ============================================================================= class DeleteLDAPItemError(FatalLDAPError): """Error class in case, a LDAP item could not be deleted.""" + pass # ============================================================================= class LDAPParseError(FatalLDAPError): """Error on parsing LDAP stuff.""" + pass # ============================================================================= class PasswordFileOptionAction(argparse.Action): + """Argparse action for a password file.""" # ------------------------------------------------------------------------- def __init__(self, option_strings, must_exists=True, must_absolute=True, *args, **kwargs): - + """Construct the action object.""" self.must_exists = bool(must_exists) self.must_absolute = bool(must_absolute) @@ -110,25 +108,25 @@ class PasswordFileOptionAction(argparse.Action): # ------------------------------------------------------------------------- def __call__(self, parser, namespace, given_path, option_string=None): - + """Call the option action.""" path = Path(given_path) if self.must_absolute: if not path.is_absolute(): - msg = _("The path {!r} must be an absolute path.").format(given_path) + msg = _('The path {!r} must be an absolute path.').format(given_path) raise argparse.ArgumentError(self, msg) if self.must_exists: if not path.exists(): - msg = _("The file {!r} does not exists.").format(str(path)) + msg = _('The file {!r} does not exists.').format(str(path)) raise argparse.ArgumentError(self, msg) if not path.is_file(): - msg = _("The given path {!r} exists, but is not a regular file.").format(str(path)) + msg = _('The given path {!r} exists, but is not a regular file.').format(str(path)) raise argparse.ArgumentError(self, msg) if not os.access(str(path), os.R_OK): - msg = _("The given file {!r} is not readable.").format(str(path)) + msg = _('The given file {!r} is not readable.').format(str(path)) raise argparse.ArgumentError(self, msg) setattr(namespace, self.dest, path) @@ -136,25 +134,26 @@ class PasswordFileOptionAction(argparse.Action): # ============================================================================= class LdapPortOptionAction(argparse.Action): + """Argparse action for the LDAP TCP (UDP?) port.""" # ------------------------------------------------------------------------- def __init__(self, option_strings, *args, **kwargs): - + """Construct the action object.""" super(LdapPortOptionAction, self).__init__( option_strings=option_strings, *args, **kwargs) # ------------------------------------------------------------------------- def __call__(self, parser, namespace, given_port, option_string=None): - + """Call the option action.""" try: port = int(given_port) if port <= 0 or port > MAX_PORT_NUMBER: msg = _( - "a port number must be greater than zero and less " - "or equal to {}.").format(MAX_PORT_NUMBER) + 'a port number must be greater than zero and less ' + 'or equal to {}.').format(MAX_PORT_NUMBER) raise ValueError(msg) except (ValueError, TypeError) as e: - msg = _("Wrong port number {!r}:").format(given_port) + msg = _('Wrong port number {!r}:').format(given_port) msg += ' ' + str(e) raise argparse.ArgumentError(self, msg) @@ -163,9 +162,7 @@ class LdapPortOptionAction(argparse.Action): # ============================================================================= class BaseLdapApplication(BaseDPXApplication): - """ - Base class for all application classes using LDAP. - """ + """Base class for all application classes using LDAP.""" use_default_ldap_connection = True use_multiple_ldap_connections = False @@ -198,7 +195,12 @@ class BaseLdapApplication(BaseDPXApplication): # ------------------------------------------------------------------------- @classmethod def compare_ldap_dns(cls, x, y): + """ + Compare two LDAP DN strings. + It will be done by splitting it into the particular tokens + and comparing the from right to left. + """ # Check for empty DNs if x is None and y is None: return 0 @@ -244,8 +246,8 @@ class BaseLdapApplication(BaseDPXApplication): return -1 # Should never come to here ... - LOG.warn("Reached a point, where I never should come.") - LOG.debug("Compared x: {x!r} <=> y: {y!r}".format(x=x, y=y)) + LOG.warn('Reached a point, where I never should come.') + LOG.debug('Compared x: {x!r} <=> y: {y!r}'.format(x=x, y=y)) return 0 # ------------------------------------------------------------------------- @@ -254,7 +256,7 @@ class BaseLdapApplication(BaseDPXApplication): cfg_class=LdapConfiguration, initialized=False, usage=None, description=None, argparse_epilog=None, argparse_prefix_chars='-', env_prefix=None, config_dir=DEFAULT_CONFIG_DIR): - + """Contrict the application object.""" self._password_file = None self.ldap_instances = [] self.ldap_server = {} @@ -270,7 +272,7 @@ class BaseLdapApplication(BaseDPXApplication): # ----------------------------------------------------------- @property def password_file(self): - """The file containing the password of the Bind DN of the default LDAP connection.""" + """Return file containing the password of the Bind DN of the default LDAP connection.""" return self._password_file @password_file.setter @@ -278,19 +280,19 @@ class BaseLdapApplication(BaseDPXApplication): path = Path(value) if not path.is_absolute(): - msg = _("The path {!r} must be an absolute path.").format(value) + msg = _('The path {!r} must be an absolute path.').format(value) raise LdapAppError(msg) if not path.exists(): - msg = _("The file {!r} does not exists.").format(str(path)) + msg = _('The file {!r} does not exists.').format(str(path)) raise LdapAppError(msg) if not path.is_file(): - msg = _("The given path {!r} exists, but is not a regular file.").format(str(path)) + msg = _('The given path {!r} exists, but is not a regular file.').format(str(path)) raise LdapAppError(msg) if not os.access(str(path), os.R_OK): - msg = _("The given file {!r} is not readable.").format(str(path)) + msg = _('The given file {!r} is not readable.').format(str(path)) raise LdapAppError(msg) self._password_file = path @@ -298,7 +300,7 @@ class BaseLdapApplication(BaseDPXApplication): # ------------------------------------------------------------------------- def as_dict(self, short=True): """ - Transforms the elements of the object into a dict + Transform the elements of the object into a dict. @param short: don't include local properties in resulting dict. @type short: bool @@ -306,7 +308,6 @@ class BaseLdapApplication(BaseDPXApplication): @return: structure as dict @rtype: dict """ - res = super(BaseLdapApplication, self).as_dict(short=short) res['pattern_re_ldap_dn'] = self.pattern_re_ldap_dn @@ -319,10 +320,7 @@ class BaseLdapApplication(BaseDPXApplication): # ------------------------------------------------------------------------- def init_arg_parser(self): - """ - Public available method to initiate the argument parser. - """ - + """Public available method to initiate the argument parser.""" group_title = _('LDAP options') if self.use_default_ldap_connection: group_title = _('Options for the default LDAP connection') @@ -342,80 +340,81 @@ class BaseLdapApplication(BaseDPXApplication): ldap_bind_dn = LdapConfiguration.default_bind_dn ldap_group.add_argument( - '-H', '--ldap-host', metavar=_("HOST"), dest="ldap_host", + '-H', '--ldap-host', metavar=_('HOST'), dest='ldap_host', help=_( - "Hostname or address of the LDAP server to use. Default: {!r}").format( + 'Hostname or address of the LDAP server to use. Default: {!r}').format( ldap_host), ) ldap_group.add_argument( - '--ssl', '--ldaps', '--ldap-ssl', dest="ldap_ssl", action="store_true", - help=_("Use ldaps to connect to the LDAP server. Default: {}").format( + '--ssl', '--ldaps', '--ldap-ssl', dest='ldap_ssl', action='store_true', + help=_('Use ldaps to connect to the LDAP server. Default: {}').format( ldap_ssl_str), ) ldap_group.add_argument( - '-p', '--ldap-port', metavar=_("PORT"), type=int, dest="ldap_port", + '-p', '--ldap-port', metavar=_('PORT'), type=int, dest='ldap_port', action=LdapPortOptionAction, - help=_("The port number to connect to the LDAP server. Default: {}").format( + help=_('The port number to connect to the LDAP server. Default: {}').format( ldap_port), ) ldap_group.add_argument( - '-b', '--base-dn', metavar="DN", dest="ldap_base_dn", + '-b', '--base-dn', metavar='DN', dest='ldap_base_dn', help=_( - "The base DN used as the root for the LDAP searches. " - "Default: {!r}").format(ldap_base_dn), + 'The base DN used as the root for the LDAP searches. ' + 'Default: {!r}').format(ldap_base_dn), ) ldap_group.add_argument( - '-D', '--bind-dn', metavar="DN", dest="ldap_bind_dn", + '-D', '--bind-dn', metavar='DN', dest='ldap_bind_dn', help=_( - "The Bind DN to use to connect to the LDAP server. Default: {!r}").format( + 'The Bind DN to use to connect to the LDAP server. Default: {!r}').format( ldap_bind_dn), ) pw_group = ldap_group.add_mutually_exclusive_group() pw_group.add_argument( - '-w', '--bind-pw', '--password', metavar=_("PASSWORD"), dest="ldap_bind_pw", - help=_("Use PASSWORD as the password for simple LDAP authentication."), + '-w', '--bind-pw', '--password', metavar=_('PASSWORD'), dest='ldap_bind_pw', + help=_('Use PASSWORD as the password for simple LDAP authentication.'), ) pw_group.add_argument( - '-W', '--password-prompt', action="store_true", dest="ldap_pw_prompt", + '-W', '--password-prompt', action='store_true', dest='ldap_pw_prompt', help=_( - "Prompt for simple LDAP authentication. This is used instead of " - "specifying the password on the command line."), + 'Prompt for simple LDAP authentication. This is used instead of ' + 'specifying the password on the command line.'), ) pw_group.add_argument( - '-y', '--password-file', metavar=_('PASSWORD_FILE'), dest="ldap_pw_file", + '-y', '--password-file', metavar=_('PASSWORD_FILE'), dest='ldap_pw_file', action=PasswordFileOptionAction, - help=_("Use contents of PASSWORD_FILE as the password for simple authentication."), + help=_('Use contents of PASSWORD_FILE as the password for simple authentication.'), ) else: arg_params = { - 'dest': "instance", + 'dest': 'instance', 'type': str, 'metavar': _('INSTANCE'), } help_default_inst = _( - "If not given, then the instance {!r} will be used.").format(self.default_default_ldap_instance) + 'If not given, then the instance {!r} will be used.').format( + self.default_default_ldap_instance) help_single = _( - "The LDAP instance (LDAP cluster) from configuration, " - "where to execute this script.") + 'The LDAP instance (LDAP cluster) from configuration, ' + 'where to execute this script.') help_multi = _( - "The LDAP instance (LDAP cluster) from configuration, where to execute " - "this script. It is possible to give here the value {val_all!r}, " - "then all found LDAP instances except {default!r} are used. " - "It is alo possible to give the value {val_list!r}, then all configured " - "LDAP instances are shown, and the application is exiting.").format( + 'The LDAP instance (LDAP cluster) from configuration, where to execute ' + 'this script. It is possible to give here the value {val_all!r}, ' + 'then all found LDAP instances except {default!r} are used. ' + 'It is alo possible to give the value {val_list!r}, then all configured ' + 'LDAP instances are shown, and the application is exiting.').format( val_all='all', val_list='list', default='default') if self.use_multiple_ldap_connections: @@ -436,11 +435,11 @@ class BaseLdapApplication(BaseDPXApplication): if self.show_cmdline_ldap_timeout: ldap_group.add_argument( - '-T', '--timeout', metavar=_('SECONDS'), dest="ldap_timeout", + '-T', '--timeout', metavar=_('SECONDS'), dest='ldap_timeout', action=TimeoutOptionAction, help=_( - "Using the given timeout in seconds for all LDAP operations. " - "Default: {}").format(DEFAULT_TIMEOUT), + 'Using the given timeout in seconds for all LDAP operations. ' + 'Default: {}').format(DEFAULT_TIMEOUT), ) super(BaseLdapApplication, self).init_arg_parser() @@ -448,16 +447,14 @@ class BaseLdapApplication(BaseDPXApplication): # ------------------------------------------------------------------------- def post_init(self): """ - Method to execute before calling run(). Here could be done some - finishing actions after reading in commandline parameters, - configuration a.s.o. + Call this method before calling run(). - This method could be overwritten by descendant classes, these - methhods should allways include a call to post_init() of the - parent class. + Here could be done some finishing actions after reading in commandline parameters, + configuration a.s.o. + This method could be overwritten by descendant classes, these methods should allways + include a call to post_init() of the parent class. """ - self.initialized = False super(BaseLdapApplication, self).post_init() @@ -472,11 +469,11 @@ class BaseLdapApplication(BaseDPXApplication): self.ldap_instances = [self.default_default_ldap_instance] return - LOG.debug(_("Checking given instances.")) + LOG.debug(_('Checking given instances.')) insts = getattr(self.args, 'instance', None) if self.verbose > 1: - LOG.debug(_("Given insts:") + ' ' + pp(insts)) + LOG.debug(_('Given insts:') + ' ' + pp(insts)) if insts is None: insts = [] @@ -527,11 +524,11 @@ class BaseLdapApplication(BaseDPXApplication): max_key_len += 1 - title = _("Configured LDAP instances:") + title = _('Configured LDAP instances:') print(title) print('-' * len(title)) print() - tpl = "{inst:<{width}} {url:<{url_l}} {bind_dn:<{bind_dn_l}} {tier}" + tpl = '{inst:<{width}} {url:<{url_l}} {bind_dn:<{bind_dn_l}} {tier}' for inst in instances: cfg = self.cfg.ldap_connection[inst] url = '{url}/{base}'.format(url=cfg.url, base=cfg.base_dn) @@ -544,7 +541,7 @@ class BaseLdapApplication(BaseDPXApplication): def _verify_instances(self, is_admin=None, readonly=None, tier=None, has_sync_source=False): if self.verbose > 1: - LOG.debug(_("Verifying given instances ...")) + LOG.debug(_('Verifying given instances ...')) if self.verbose > 2: show_filter = [] @@ -557,7 +554,7 @@ class BaseLdapApplication(BaseDPXApplication): show_filter.append('tier = {!r}'.format(tier)) if has_sync_source: show_filter.append('sync_source = *') - msg = _("Used filter:") + ' ' + format_list(show_filter) + msg = _('Used filter:') + ' ' + format_list(show_filter) LOG.debug(msg) filtered_instances = [] @@ -580,12 +577,12 @@ class BaseLdapApplication(BaseDPXApplication): filtered_instances.append(inst.lower()) if self.verbose > 2: - LOG.debug(_("Filtered instances:") + ' ' + pp(filtered_instances)) + LOG.debug(_('Filtered instances:') + ' ' + pp(filtered_instances)) self._validate_given_instances(filtered_instances) if self.verbose > 1: - LOG.debug(_("LDAP instances to use:") + ' ' + pp(self.ldap_instances)) + LOG.debug(_('LDAP instances to use:') + ' ' + pp(self.ldap_instances)) # ------------------------------------------------------------------------- def _validate_given_instances(self, filtered_instances): @@ -601,7 +598,7 @@ class BaseLdapApplication(BaseDPXApplication): all_ok = True for given_inst in self.ldap_instances: if given_inst not in filtered_instances: - msg = _("LDAP instance {!r} not found in configuration or is not usable.").format( + msg = _('LDAP instance {!r} not found in configuration or is not usable.').format( given_inst) LOG.error(msg) all_ok = False @@ -650,30 +647,30 @@ class BaseLdapApplication(BaseDPXApplication): # ------------------------------------------------------------------------- def __del__(self): - + """Destruct the object.""" self.disconnect_all() # ------------------------------------------------------------------------- def pre_run(self): - - LOG.debug(_("Preparations ...")) + """Execute it immediately before running the underlaying payload.""" + LOG.debug(_('Preparations ...')) super(BaseLdapApplication, self).pre_run() - LOG.debug(_("Open all necessary LDAP connections ...")) + LOG.debug(_('Open all necessary LDAP connections ...')) for inst in self.ldap_instances: self.connect_instance(inst) # ------------------------------------------------------------------------- def connect_instance(self, inst): - + """Connect to the given LDAP instance.""" connect_info = self.cfg.ldap_connection[inst] ldap_server = self.get_ldap_server_obj(inst) self.ldap_server[inst] = ldap_server if not connect_info.bind_pw: - first_prompt = _("Password of user {usr} on LDAP instance {inst}:").format( + first_prompt = _('Password of user {usr} on LDAP instance {inst}:').format( usr=self.colored(connect_info.bind_dn, 'CYAN'), inst=self.colored(connect_info.url, 'CYAN')) + ' ' connect_info.bind_pw = self.get_password(first_prompt, may_empty=False, repeat=False) @@ -682,17 +679,17 @@ class BaseLdapApplication(BaseDPXApplication): self.ldap_connection[inst] = ldap_connection if self.verbose > 2: - msg = _("Info about LDAP server {}:").format(connect_info.url) + msg = _('Info about LDAP server {}:').format(connect_info.url) msg += ' ' + repr(ldap_connection) LOG.debug(msg) # ------------------------------------------------------------------------- def get_ldap_server_obj(self, inst): - + """Return the ldap3-Server object for the given instance.""" connect_info = self.cfg.ldap_connection[inst] if self.verbose > 2: - msg = _("Trying to get LDAP server object for {} ...").format(connect_info.url) + msg = _('Trying to get LDAP server object for {} ...').format(connect_info.url) LOG.debug(msg) server_opts = {} @@ -708,20 +705,20 @@ class BaseLdapApplication(BaseDPXApplication): server_opts['mode'] = IP_V4_PREFERRED server_opts['connect_timeout'] = self.cfg.ldap_timeout if self.verbose > 2: - msg = _("Connect options to server {!r}:").format(connect_info.url) + msg = _('Connect options to server {!r}:').format(connect_info.url) msg += ' ' + pp(server_opts) LOG.debug(msg) ldap_server = Server(connect_info.host, **server_opts) if self.verbose > 2: - LOG.debug(_("LDAP server {s}: {re}").format(s=ldap_server, re=repr(ldap_server))) + LOG.debug(_('LDAP server {s}: {re}').format(s=ldap_server, re=repr(ldap_server))) return ldap_server # ------------------------------------------------------------------------- def connect_to_ldap_server(self, ldap_server, inst, bind_dn=None, bind_pw=None): - + """Connect to the given LDAP server.""" connect_info = self.cfg.ldap_connection[inst] if not bind_dn: bind_dn = connect_info.bind_dn @@ -729,7 +726,7 @@ class BaseLdapApplication(BaseDPXApplication): bind_pw = connect_info.bind_pw if self.verbose > 1: - msg = _("Connecting to LDAP server {url} as {dn!r} ...").format( + msg = _('Connecting to LDAP server {url} as {dn!r} ...').format( url=connect_info.url, dn=bind_dn) LOG.debug(msg) @@ -740,43 +737,45 @@ class BaseLdapApplication(BaseDPXApplication): # ------------------------------------------------------------------------- def post_run(self): - - LOG.debug(_("Finishing ...")) + """Execute after the the payload action of the application.""" + LOG.debug(_('Finishing ...')) super(BaseLdapApplication, self).post_run() self.disconnect_all() # ------------------------------------------------------------------------- def disconnect_all(self): - + """Disconnect from all connected LDAP instances.""" if hasattr(self, 'ldap_connection'): if len(self.ldap_connection) or len(self.ldap_server): - LOG.debug(_("Disconnecting from all remaining LDAP instances ...")) + LOG.debug(_('Disconnecting from all remaining LDAP instances ...')) for inst in self.ldap_instances: self.disconnect_instance(inst) # ------------------------------------------------------------------------- def disconnect_instance(self, inst): - + """Disconnect from the given instance.""" connect_info = self.cfg.ldap_connection[inst] if inst in self.ldap_connection: ldap_connection = self.ldap_connection[inst] if self.verbose > 1: - LOG.debug(_("Unbinding from LDAP server {!r} ...").format(connect_info.url)) + LOG.debug(_('Unbinding from LDAP server {!r} ...').format(connect_info.url)) ldap_connection.unbind() ldap_connection = None del self.ldap_connection[inst] if inst in self.ldap_server: if self.verbose > 1: - LOG.debug(_("Disconnecting from LDAP server {!r} ...").format(connect_info.url)) + LOG.debug(_('Disconnecting from LDAP server {!r} ...').format(connect_info.url)) del self.ldap_server[inst] # ------------------------------------------------------------------------- def get_all_entries(self, inst, base_dn=None, ldap_filter=None, attributes=None): - """Get all LDAP entries bellow the given BaseDN and the given LDAP filter. + """ + Get all LDAP entries bellow the given BaseDN and the given LDAP filter. + If no attributes are given, all attributes are given back. The result is a hash with the DNs if the resulting entries as keys, and a hash with the resulting attributes as values. @@ -795,8 +794,8 @@ class BaseLdapApplication(BaseDPXApplication): if self.verbose > 2: msg = _( - "Searching in {uri}/{bdn} for all entries with filter {fltr!r}, " - "giving attributes:").format(uri=connect_info.url, bdn=base_dn, fltr=ldap_filter) + 'Searching in {uri}/{bdn} for all entries with filter {fltr!r}, ' + 'giving attributes:').format(uri=connect_info.url, bdn=base_dn, fltr=ldap_filter) msg += ' ' + format_list(attributes, do_repr=True) LOG.debug(msg) @@ -806,27 +805,27 @@ class BaseLdapApplication(BaseDPXApplication): if req_status: if self.verbose > 4: - LOG.debug(_("Result of searching:") + '\n' + pp(req_result)) + LOG.debug(_('Result of searching:') + '\n' + pp(req_result)) for entry in req_response: dn = entry['dn'] if self.verbose > 3: - LOG.debug(_("Found entry {!r}.").format(dn)) + LOG.debug(_('Found entry {!r}.').format(dn)) result[dn] = self.normalized_attributes(entry) if self.verbose > 2: msg = ngettext( - "Found one entry with filter {fltr!r} in {uri}/{bdn}.", - "Found {nr} enries with filter {fltr!r} in {uri}/{bdn}.", + 'Found one entry with filter {fltr!r} in {uri}/{bdn}.', + 'Found {nr} enries with filter {fltr!r} in {uri}/{bdn}.', len(result)).format( nr=len(result), uri=connect_info.url, bdn=base_dn, fltr=ldap_filter) LOG.debug(msg) if self.verbose > 4: - LOG.debug(_("Got response entries:") + '\n' + pp(result)) + LOG.debug(_('Got response entries:') + '\n' + pp(result)) else: if self.verbose > 3: - msg = _("No entry found with filter {fltr!r} in {uri}/{bdn}.").format( + msg = _('No entry found with filter {fltr!r} in {uri}/{bdn}.').format( uri=connect_info.url, bdn=base_dn, fltr=ldap_filter) LOG.debug(msg) @@ -835,7 +834,6 @@ class BaseLdapApplication(BaseDPXApplication): # ------------------------------------------------------------------------- def get_all_entry_dns(self, inst, ldap_filter=None): """Get DNs of all entries in the given LDAP instance and sort them.""" - connect_info = self.cfg.ldap_connection[inst] base_dn = connect_info.base_dn ldap = self.ldap_connection[inst] @@ -847,7 +845,7 @@ class BaseLdapApplication(BaseDPXApplication): ldap_filter = '(objectClass=*)' if self.verbose > 1: - LOG.debug(_("Using LDAP filter: {!r}").format(ldap_filter)) + LOG.debug(_('Using LDAP filter: {!r}').format(ldap_filter)) req_status, req_result, req_response, req_whatever = ldap.search( search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter, @@ -856,27 +854,26 @@ class BaseLdapApplication(BaseDPXApplication): if req_status: if self.verbose > 5: - msg = _("Result of searching for DNs of all entries:") + msg = _('Result of searching for DNs of all entries:') LOG.debug(msg + '\n' + pp(req_result)) for entry in req_response: if self.verbose > 4: - LOG.debug(_("Got a response entry:") + ' ' + pp(entry)) + LOG.debug(_('Got a response entry:') + ' ' + pp(entry)) result.append(entry['dn']) else: - LOG.warn("Got no entry DNs.") + LOG.warn('Got no entry DNs.') if result: result = sorted(result, key=cmp_to_key(self.compare_ldap_dns)) if self.verbose > 3 and result: - LOG.debug(_("Result:") + '\n' + pp(result)) + LOG.debug(_('Result:') + '\n' + pp(result)) return result # ------------------------------------------------------------------------- def get_all_entry_dns_hash(self, inst, ldap_filter=None): """Get Object classes and DNs of all entries in the given LDAP instance.""" - connect_info = self.cfg.ldap_connection[inst] base_dn = connect_info.base_dn ldap = self.ldap_connection[inst] @@ -886,11 +883,11 @@ class BaseLdapApplication(BaseDPXApplication): if not ldap_filter: ldap_filter = '(objectClass=*)' - LOG.debug(_("Getting all Entry DNs of LDAP instance {i!r} below {b!r}.").format( + LOG.debug(_('Getting all Entry DNs of LDAP instance {i!r} below {b!r}.').format( i=inst, b=base_dn)) if self.verbose > 1: - LOG.debug(_("Using LDAP filter: {!r}").format(ldap_filter)) + LOG.debug(_('Using LDAP filter: {!r}').format(ldap_filter)) req_status, req_result, req_response, req_whatever = ldap.search( search_base=base_dn, search_scope=SUBTREE, search_filter=ldap_filter, @@ -899,11 +896,11 @@ class BaseLdapApplication(BaseDPXApplication): if req_status: if self.verbose > 5: - msg = _("Result of searching for DNs of all entries:") + msg = _('Result of searching for DNs of all entries:') LOG.debug(msg + '\n' + pp(req_result)) for entry in req_response: if self.verbose > 4: - LOG.debug(_("Got a response entry:") + ' ' + pp(entry)) + LOG.debug(_('Got a response entry:') + ' ' + pp(entry)) dn = entry['dn'] object_classes = FrozenCIStringSet(entry['attributes']['objectClass']) @@ -915,36 +912,36 @@ class BaseLdapApplication(BaseDPXApplication): } else: - LOG.warn("Got no entry DNs.") + LOG.warn('Got no entry DNs.') return result # ------------------------------------------------------------------------- def get_user_dn(self, user, inst): - + """Get the DN of the given user in the given LDAP instance somehow.""" connect_info = self.cfg.ldap_connection[inst] if self.verbose > 1: - msg = _("Trying to evaluate DN of user {user!r} in LDAP instance {inst} ...").format( + msg = _('Trying to evaluate DN of user {user!r} in LDAP instance {inst} ...').format( user=user, inst=connect_info.url) LOG.debug(msg) if MailAddress.valid_address(user, verbose=self.verbose): - msg = _("Trying to evaluate DN of user {u!r} as a mail address ...").format(u=user) + msg = _('Trying to evaluate DN of user {u!r} as a mail address ...').format(u=user) LOG.debug(msg) dns = self.get_user_dn_by_mail(user, inst) if dns: return dns if self.re_ldap_dn.match(user): - msg = _("Trying to evaluate DN of user {u!r} as a LDAP DN ...").format(u=user) + msg = _('Trying to evaluate DN of user {u!r} as a LDAP DN ...').format(u=user) LOG.debug(msg) dns = self.get_user_dn_by_dn(user, inst) if dns: return dns if self.re_uid.match(user): - msg = _("Trying to evaluate DN of user {u!r} as a UID (Posix user name) ...").format( + msg = _('Trying to evaluate DN of user {u!r} as a UID (Posix user name) ...').format( u=user) LOG.debug(msg) dns = self.get_user_dn_by_uid(user, inst) @@ -953,17 +950,17 @@ class BaseLdapApplication(BaseDPXApplication): usr = user.strip() if usr == '': - msg = _("Empty user given.") + msg = _('Empty user given.') LOG.warn(msg) return None - msg = _("Trying to evaluate DN of user {u!r} as a CN ({c}) ...").format( + msg = _('Trying to evaluate DN of user {u!r} as a CN ({c}) ...').format( u=usr, c='common name') LOG.debug(msg) return self.get_user_dn_by_cn(user, inst) # ------------------------------------------------------------------------- def get_user_dn_by_mail(self, mail, inst): - + """Get the DN of the user with the given mail address in the given LDAP instance.""" connect_info = self.cfg.ldap_connection[inst] base_dn = connect_info.base_dn ldap = self.ldap_connection[inst] @@ -986,7 +983,7 @@ class BaseLdapApplication(BaseDPXApplication): ldap_filter += ')' if self.verbose > 1: - msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format( + msg = _('Searching in {uri}/{bdn} with filter: {fltr}').format( uri=connect_info.url, bdn=base_dn, fltr=ldap_filter) LOG.debug(msg) @@ -997,18 +994,18 @@ class BaseLdapApplication(BaseDPXApplication): if req_status: if self.verbose > 4: - msg = _("Result of searching for mail address {m!r}:").format(m=mail) + msg = _('Result of searching for mail address {m!r}:').format(m=mail) LOG.debug(msg + ' ' + pp(req_result)) for entry in req_response: if self.verbose > 4: - LOG.debug(_("Got a response entry:") + ' ' + pp(entry)) + LOG.debug(_('Got a response entry:') + ' ' + pp(entry)) result.append(entry['dn']) if self.verbose > 3: - LOG.debug(_("Result:") + ' ' + pp(result)) + LOG.debug(_('Result:') + ' ' + pp(result)) else: if self.verbose > 3: - msg = _("User with mail address {m!r} not found in {uri}/{bdn}.").format( + msg = _('User with mail address {m!r} not found in {uri}/{bdn}.').format( m=mail, uri=connect_info.url, bdn=base_dn) LOG.debug(msg) @@ -1016,7 +1013,7 @@ class BaseLdapApplication(BaseDPXApplication): # ------------------------------------------------------------------------- def get_user_dn_by_uid(self, uid, inst): - + """Get the DN of the user with the given uid (POSIX name) in the given LDAP instance.""" connect_info = self.cfg.ldap_connection[inst] base_dn = connect_info.base_dn ldap = self.ldap_connection[inst] @@ -1039,7 +1036,7 @@ class BaseLdapApplication(BaseDPXApplication): ldap_filter += ')' if self.verbose > 1: - msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format( + msg = _('Searching in {uri}/{bdn} with filter: {fltr}').format( uri=connect_info.url, bdn=base_dn, fltr=ldap_filter) LOG.debug(msg) @@ -1050,18 +1047,18 @@ class BaseLdapApplication(BaseDPXApplication): if req_status: if self.verbose > 4: - msg = _("Result of searching for uid {u!r}:").format(u=uid) + msg = _('Result of searching for uid {u!r}:').format(u=uid) LOG.debug(msg + ' ' + pp(req_result)) for entry in req_response: if self.verbose > 4: - LOG.debug(_("Got a response entry:") + ' ' + pp(entry)) + LOG.debug(_('Got a response entry:') + ' ' + pp(entry)) result.append(entry['dn']) if self.verbose > 3: - LOG.debug(_("Result:") + ' ' + pp(result)) + LOG.debug(_('Result:') + ' ' + pp(result)) else: if self.verbose > 3: - msg = _("User with uid {u!r} not found in {uri}/{bdn}.").format( + msg = _('User with uid {u!r} not found in {uri}/{bdn}.').format( u=uid, uri=connect_info.url, bdn=base_dn) LOG.debug(msg) @@ -1069,7 +1066,7 @@ class BaseLdapApplication(BaseDPXApplication): # ------------------------------------------------------------------------- def get_user_dn_by_cn(self, cn, inst): - + """Get the DN of the user with the given cn (common name) in the given LDAP instance.""" connect_info = self.cfg.ldap_connection[inst] base_dn = connect_info.base_dn ldap = self.ldap_connection[inst] @@ -1092,7 +1089,7 @@ class BaseLdapApplication(BaseDPXApplication): ldap_filter += ')' if self.verbose > 1: - msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format( + msg = _('Searching in {uri}/{bdn} with filter: {fltr}').format( uri=connect_info.url, bdn=base_dn, fltr=ldap_filter) LOG.debug(msg) @@ -1103,18 +1100,18 @@ class BaseLdapApplication(BaseDPXApplication): if req_status: if self.verbose > 4: - msg = _("Result of searching for CN {cn!r}:").format(cn=cn) + msg = _('Result of searching for CN {cn!r}:').format(cn=cn) LOG.debug(msg + ' ' + pp(req_result)) for entry in req_response: if self.verbose > 4: - LOG.debug(_("Got a response entry:") + ' ' + pp(entry)) + LOG.debug(_('Got a response entry:') + ' ' + pp(entry)) result.append(entry['dn']) if self.verbose > 3: - LOG.debug(_("Result:") + ' ' + pp(result)) + LOG.debug(_('Result:') + ' ' + pp(result)) else: if self.verbose > 3: - msg = _("User with cn {cn!r} not found in {uri}/{bdn}.").format( + msg = _('User with cn {cn!r} not found in {uri}/{bdn}.').format( cn=cn, uri=connect_info.url, bdn=base_dn) LOG.debug(msg) @@ -1122,7 +1119,7 @@ class BaseLdapApplication(BaseDPXApplication): # ------------------------------------------------------------------------- def get_user_dn_by_dn(self, dn, inst, strict=True): - + """Search for an entry with the given DN in the given LDAP instance.""" connect_info = self.cfg.ldap_connection[inst] attributes = ['objectClass'] @@ -1131,7 +1128,7 @@ class BaseLdapApplication(BaseDPXApplication): if not entry: if self.verbose > 3: - msg = _("User with DN {dn!r} not found in {uri}.").format( + msg = _('User with DN {dn!r} not found in {uri}.').format( dn=dn, uri=connect_info.url) LOG.debug(msg) return None @@ -1149,7 +1146,7 @@ class BaseLdapApplication(BaseDPXApplication): object_classes.add(values) if self.verbose > 3: - msg = _("ObjectClasses of {dn!r}:").format(dn=found_dn) + msg = _('ObjectClasses of {dn!r}:').format(dn=found_dn) LOG.debug(msg + '\n' + pp(object_classes.as_list())) is_person = False @@ -1159,7 +1156,7 @@ class BaseLdapApplication(BaseDPXApplication): break if not is_person: - msg = _("Entry {dn!r} in {uri} seems not to be an account.").format( + msg = _('Entry {dn!r} in {uri} seems not to be an account.').format( dn=found_dn, uri=connect_info.url) LOG.warn(msg) if strict: @@ -1169,7 +1166,7 @@ class BaseLdapApplication(BaseDPXApplication): # ------------------------------------------------------------------------- def get_entry(self, dn, inst, attributes=None, operational_attributes=False): - + """Get an complete LDAP entry by the given DN in the given instance.""" connect_info = self.cfg.ldap_connection[inst] ldap = self.ldap_connection[inst] @@ -1181,7 +1178,7 @@ class BaseLdapApplication(BaseDPXApplication): result = None if self.verbose > 1: - msg = _("Searching DN {dn!r} in {uri}.").format(dn=dn, uri=connect_info.url) + msg = _('Searching DN {dn!r} in {uri}.').format(dn=dn, uri=connect_info.url) LOG.debug(msg) req_status, req_result, req_response, req_whatever = ldap.search( @@ -1191,15 +1188,15 @@ class BaseLdapApplication(BaseDPXApplication): if req_status: if self.verbose > 4: - msg = _("Result of searching for DN {dn!r}:").format(dn=dn) + msg = _('Result of searching for DN {dn!r}:').format(dn=dn) LOG.debug(msg + ' ' + pp(req_result)) result = req_response[0] if self.verbose > 3: - LOG.debug(_("Got a response entry:") + '\n' + pp(result)) + LOG.debug(_('Got a response entry:') + '\n' + pp(result)) else: if self.verbose > 3: - msg = _("Entry with DN {dn!r} not found in {uri}.").format( + msg = _('Entry with DN {dn!r} not found in {uri}.').format( dn=dn, uri=connect_info.url) LOG.debug(msg) @@ -1207,7 +1204,7 @@ class BaseLdapApplication(BaseDPXApplication): # ------------------------------------------------------------------------- def normalized_attributes(self, entry, omit_members=False, omit_memberof=False): - + """Transform an LDAP entry into a simple dict.""" attribs = CIDict() for attrib in entry['attributes']: @@ -1238,29 +1235,29 @@ class BaseLdapApplication(BaseDPXApplication): # ------------------------------------------------------------------------- def add_entry(self, inst, dn, object_classes, target_entry, ldap=None): - """Creating a LDAP entry.""" + """Create a LDAP entry.""" connect_info = self.cfg.ldap_connection[inst] if not ldap: ldap = self.ldap_connection[inst] if self.verbose > 2: - msg = _("Creating entry {dn!r} on {uri}:").format( + msg = _('Creating entry {dn!r} on {uri}:').format( uri=connect_info.url, dn=dn) msg += '\nobjectClasses:\n' + pp(object_classes) - msg += "\nAttributes:\n" + pp(target_entry) + msg += '\nAttributes:\n' + pp(target_entry) if self.simulate: - LOG.info(_("Simulation mode - entry will not be created.")) + LOG.info(_('Simulation mode - entry will not be created.')) return True try: req_status, req_result, req_response, req_whatever = ldap.add( dn, object_class=object_classes, attributes=target_entry) except LDAPException as e: - msg = _("Creation of entry {dn!r} was NOT successfull - {c}: {e}").format( + msg = _('Creation of entry {dn!r} was NOT successfull - {c}: {e}').format( dn=dn, c=e.__class__.__name__, e=e) msg += '\nobjectClasses:\n' + pp(object_classes) - msg += "\nAttributes:\n" + pp(target_entry) + msg += '\nAttributes:\n' + pp(target_entry) raise WriteLDAPItemError(msg) # Example result on a not successful modification: @@ -1272,15 +1269,15 @@ class BaseLdapApplication(BaseDPXApplication): # 'type': 'modifyResponse'} if self.verbose > 1: - LOG.debug(_("Creation status: {!r}.").format(req_status)) + LOG.debug(_('Creation status: {!r}.').format(req_status)) if self.verbose > 2: - LOG.debug(_("Result of creating:") + '\n' + pp(req_result)) + LOG.debug(_('Result of creating:') + '\n' + pp(req_result)) if not req_status: - msg = _("Creation of entry {dn!r} was NOT successful: {desc} - {msg}").format( + msg = _('Creation of entry {dn!r} was NOT successful: {desc} - {msg}').format( dn=dn, desc=req_result['description'], msg=req_result['message'].strip()) msg += '\nobjectClasses:\n' + pp(object_classes) - msg += "\nAttributes:\n" + pp(target_entry) + msg += '\nAttributes:\n' + pp(target_entry) raise WriteLDAPItemError(msg) LOG.debug(_('Creation successful.')) @@ -1288,26 +1285,26 @@ class BaseLdapApplication(BaseDPXApplication): # ------------------------------------------------------------------------- def modify_entry(self, inst, dn, changes, ldap=None): - """Mofifying an existing LDAP entry.""" + """Mofify an existing LDAP entry.""" connect_info = self.cfg.ldap_connection[inst] if not ldap: ldap = self.ldap_connection[inst] if self.verbose > 1: - msg = _("Applying changes on {uri} to DN {dn!r}:").format( + msg = _('Applying changes on {uri} to DN {dn!r}:').format( uri=connect_info.url, dn=dn) LOG.debug(msg + '\n' + pp(changes)) if self.simulate: - LOG.info(_("Simulation mode - changes are not applied.")) + LOG.info(_('Simulation mode - changes are not applied.')) return True try: req_status, req_result, req_response, req_whatever = ldap.modify(dn, changes) except LDAPException as e: - msg = _("Modification of {dn!r} was NOT successfull - {c}: {e}").format( + msg = _('Modification of {dn!r} was NOT successfull - {c}: {e}').format( dn=dn, c=e.__class__.__name__, e=e) - msg += '\n' + _("Changes:") + '\n' + pp(changes) + msg += '\n' + _('Changes:') + '\n' + pp(changes) raise WriteLDAPItemError(msg) # Example result on a not successful modification: @@ -1319,14 +1316,14 @@ class BaseLdapApplication(BaseDPXApplication): # 'type': 'modifyResponse'} if self.verbose > 1: - LOG.debug(_("Modification status: {!r}.").format(req_status)) + LOG.debug(_('Modification status: {!r}.').format(req_status)) if self.verbose > 2: - LOG.debug(_("Result of modifying:") + '\n' + pp(req_result)) + LOG.debug(_('Result of modifying:') + '\n' + pp(req_result)) if not req_status: - msg = _("Modification of {dn!r} was NOT successful: {desc} - {msg}").format( + msg = _('Modification of {dn!r} was NOT successful: {desc} - {msg}').format( dn=dn, desc=req_result['description'], msg=req_result['message'].strip()) - msg += '\n' + _("Changes:") + '\n' + pp(changes) + msg += '\n' + _('Changes:') + '\n' + pp(changes) raise WriteLDAPItemError(msg) LOG.debug(_('Modification successful.')) @@ -1334,33 +1331,33 @@ class BaseLdapApplication(BaseDPXApplication): # ------------------------------------------------------------------------- def delete_entry(self, inst, dn, ldap=None): - + """Delete a LDAP entry.""" connect_info = self.cfg.ldap_connection[inst] if not ldap: ldap = self.ldap_connection[inst] - msg = _("Deleting LDAP entry {dn!r} on {uri} ...").format( + msg = _('Deleting LDAP entry {dn!r} on {uri} ...').format( uri=connect_info.url, dn=dn) LOG.info(msg) if self.simulate: - LOG.info(_("Simulation mode - deletion will not be executed.")) + LOG.info(_('Simulation mode - deletion will not be executed.')) return True try: req_status, req_result, req_response, req_whatever = ldap.delete(dn) except LDAPException as e: - msg = _("Deletion of {dn!r} was NOT successfull - {c}: {e}").format( + msg = _('Deletion of {dn!r} was NOT successfull - {c}: {e}').format( c=e.__class__.__name__, e=e) raise DeleteLDAPItemError(msg) if self.verbose > 1: - LOG.debug(_("Deletion status: {!r}.").format(req_status)) + LOG.debug(_('Deletion status: {!r}.').format(req_status)) if self.verbose > 2: - LOG.debug(_("Result of deletion:") + '\n' + pp(req_result)) + LOG.debug(_('Result of deletion:') + '\n' + pp(req_result)) if not req_status: - msg = _("Deletion of {dn!r} was NOT successful: {desc} - {msg}").format( + msg = _('Deletion of {dn!r} was NOT successful: {desc} - {msg}').format( dn=dn, desc=req_result['description'], msg=req_result['message'].strip()) raise DeleteLDAPItemError(msg) @@ -1370,7 +1367,7 @@ class BaseLdapApplication(BaseDPXApplication): # ------------------------------------------------------------------------- def get_group_memberships(self, inst, dn, base_dn=None): - + """Return a list with DNs of all groups containing the given DN as a member.""" connect_info = self.cfg.ldap_connection[inst] ldap = self.ldap_connection[inst] @@ -1383,7 +1380,7 @@ class BaseLdapApplication(BaseDPXApplication): ldap_filter = '(member={})'.format(dn) if self.verbose > 1: - msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format( + msg = _('Searching in {uri}/{bdn} with filter: {fltr}').format( uri=connect_info.url, bdn=base_dn, fltr=ldap_filter) LOG.debug(msg) @@ -1395,16 +1392,16 @@ class BaseLdapApplication(BaseDPXApplication): if req_status: for entry in req_response: if self.verbose > 4: - LOG.debug(_("Got a response entry:") + ' ' + pp(entry)) + LOG.debug(_('Got a response entry:') + ' ' + pp(entry)) result.append(entry['dn']) if self.verbose > 3: - LOG.debug(_("Result:") + ' ' + pp(result)) + LOG.debug(_('Result:') + ' ' + pp(result)) return result # ------------------------------------------------------------------------- def get_unique_group_memberships(self, inst, dn, base_dn=None): - + """Return a list with DNs of all groups containing the given DN as a unique member.""" connect_info = self.cfg.ldap_connection[inst] ldap = self.ldap_connection[inst] @@ -1417,7 +1414,7 @@ class BaseLdapApplication(BaseDPXApplication): ldap_filter = '(uniqueMember={})'.format(dn) if self.verbose > 1: - msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format( + msg = _('Searching in {uri}/{bdn} with filter: {fltr}').format( uri=connect_info.url, bdn=base_dn, fltr=ldap_filter) LOG.debug(msg) @@ -1429,16 +1426,16 @@ class BaseLdapApplication(BaseDPXApplication): if req_status: for entry in req_response: if self.verbose > 4: - LOG.debug(_("Got a response entry:") + ' ' + pp(entry)) + LOG.debug(_('Got a response entry:') + ' ' + pp(entry)) result.append(entry['dn']) if self.verbose > 3: - LOG.debug(_("Result:") + ' ' + pp(result)) + LOG.debug(_('Result:') + ' ' + pp(result)) return result # ------------------------------------------------------------------------- def get_posix_group_memberships(self, inst, uid, base_dn=None): - + """Return a list with DNs of all POSIX groups containing the given UID as a member.""" connect_info = self.cfg.ldap_connection[inst] ldap = self.ldap_connection[inst] @@ -1451,7 +1448,7 @@ class BaseLdapApplication(BaseDPXApplication): ldap_filter = '(memberUid={})'.format(uid) if self.verbose > 1: - msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format( + msg = _('Searching in {uri}/{bdn} with filter: {fltr}').format( uri=connect_info.url, bdn=base_dn, fltr=ldap_filter) LOG.debug(msg) @@ -1463,16 +1460,16 @@ class BaseLdapApplication(BaseDPXApplication): if req_status: for entry in req_response: if self.verbose > 4: - LOG.debug(_("Got a response entry:") + ' ' + pp(entry)) + LOG.debug(_('Got a response entry:') + ' ' + pp(entry)) result.append(entry['dn']) if self.verbose > 3: - LOG.debug(_("Result:") + ' ' + pp(result)) + LOG.debug(_('Result:') + ' ' + pp(result)) return result # ------------------------------------------------------------------------- def get_sudo_group_memberships(self, inst, uid, base_dn=None): - + """Return a list with DNs of all sudo roles containing the given UID as a member.""" connect_info = self.cfg.ldap_connection[inst] ldap = self.ldap_connection[inst] @@ -1485,7 +1482,7 @@ class BaseLdapApplication(BaseDPXApplication): ldap_filter = '(sudoUser={})'.format(uid) if self.verbose > 1: - msg = _("Searching in {uri}/{bdn} with filter: {fltr}").format( + msg = _('Searching in {uri}/{bdn} with filter: {fltr}').format( uri=connect_info.url, bdn=base_dn, fltr=ldap_filter) LOG.debug(msg) @@ -1497,34 +1494,36 @@ class BaseLdapApplication(BaseDPXApplication): if req_status: for entry in req_response: if self.verbose > 4: - LOG.debug(_("Got a response entry:") + ' ' + pp(entry)) + LOG.debug(_('Got a response entry:') + ' ' + pp(entry)) result.append(entry['dn']) if self.verbose > 3: - LOG.debug(_("Result:") + ' ' + pp(result)) + LOG.debug(_('Result:') + ' ' + pp(result)) return result # ------------------------------------------------------------------------- def read_password_file(self, pw_file): - """Reading a password from the given password file. It returns the first - stripped non empty line.""" + """ + Read a password from the given password file. + It returns the first stripped non empty line. + """ if self.verbose > 1: abs_path = pw_file.resolve() - LOG.debug(_("Reading password file {!r} ...").format(str(abs_path))) + LOG.debug(_('Reading password file {!r} ...').format(str(abs_path))) if not pw_file.exists(): - msg = _("The file {!r} does not exists.").format(str(pw_file)) + msg = _('The file {!r} does not exists.').format(str(pw_file)) LOG.warn(msg) return None if not pw_file.is_file(): - msg = _("The given path {!r} exists, but is not a regular file.").format(str(pw_file)) + msg = _('The given path {!r} exists, but is not a regular file.').format(str(pw_file)) LOG.warn(msg) return None if not os.access(str(pw_file), os.R_OK): - msg = _("The given file {!r} is not readable.").format(str(pw_file)) + msg = _('The given file {!r} is not readable.').format(str(pw_file)) LOG.warn(msg) return None @@ -1536,29 +1535,29 @@ class BaseLdapApplication(BaseDPXApplication): # ------------------------------------------------------------------------- def generate_modify_data(self, dn, src_attribs, tgt_attribs): - + """Generate a dict for using my LDAP-modify by the given source and target attributes.""" changes = {} first_dn_token = self.re_dn_separator.split(dn)[0] match = self.re_dntoken.match(first_dn_token) if not match: - msg = _("Could not detect RDN from DN {!r}.").format(dn) + msg = _('Could not detect RDN from DN {!r}.').format(dn) raise LDAPParseError(msg) rdn = match.group(1) if self.verbose > 2: - msg = _("Found RDN attribute {!r}.").format(rdn) + msg = _('Found RDN attribute {!r}.').format(rdn) LOG.debug(msg) for attrib_name in src_attribs: if attrib_name.lower() == rdn.lower(): if self.verbose > 2: - msg = _("RDN attribute {!r} will not be touched.").format(rdn) + msg = _('RDN attribute {!r} will not be touched.').format(rdn) LOG.debug(msg) continue if attrib_name.lower() == 'memberof': if self.verbose > 2: - msg = _("Attribute {!r} will not be touched.").format(attrib_name) + msg = _('Attribute {!r} will not be touched.').format(attrib_name) LOG.debug(msg) continue @@ -1571,13 +1570,13 @@ class BaseLdapApplication(BaseDPXApplication): continue if attrib_name.lower() == rdn.lower(): - msg = "RDN attribute {!r} will not be touched.".format(rdn) + msg = 'RDN attribute {!r} will not be touched.'.format(rdn) LOG.warn(msg) continue if attrib_name.lower() == 'memberof': if self.verbose > 2: - msg = _("Attribute {!r} will not be touched.").format(attrib_name) + msg = _('Attribute {!r} will not be touched.').format(attrib_name) LOG.debug(msg) continue @@ -1611,11 +1610,11 @@ class BaseLdapApplication(BaseDPXApplication): values_del.append(tgt_val) if self.verbose > 2 and values_add: - msg = _("Values to add to attribute {!r}:").format(attrib_name) + msg = _('Values to add to attribute {!r}:').format(attrib_name) LOG.debug(msg + '\n' + pp(values_add)) if self.verbose > 2 and values_del: - msg = _("Values to removed from attribute {!r}:").format(attrib_name) + msg = _('Values to removed from attribute {!r}:').format(attrib_name) LOG.debug(msg + '\n' + pp(values_del)) if len(values_add) == len(src_attrib_values) and len(values_del) == 0: @@ -1632,17 +1631,22 @@ class BaseLdapApplication(BaseDPXApplication): if self.verbose > 3: if attr_changes: - msg = _("Changes for attribute {!r}:").format(attrib_name) + msg = _('Changes for attribute {!r}:').format(attrib_name) msg += '\n' + pp(attr_changes) else: - msg = _("No changes to attribute {!r}.").format(attrib_name) + msg = _('No changes to attribute {!r}.').format(attrib_name) LOG.debug(msg) return attr_changes # ------------------------------------------------------------------------- def generate_create_entry(self, src_attribs): + """ + Create data for cretating a new LDAP entry. + It returns a tuple, consisting of a list of the objec classes, + and a dict with all other attributes. + """ object_classes = [] target_entry = {} @@ -1650,7 +1654,7 @@ class BaseLdapApplication(BaseDPXApplication): if attrib_name.lower() == 'memberof': if self.verbose > 2: - msg = _("Attribute {!r} will not be touched.").format(attrib_name) + msg = _('Attribute {!r} will not be touched.').format(attrib_name) LOG.debug(msg) continue @@ -1670,7 +1674,7 @@ class BaseLdapApplication(BaseDPXApplication): # ============================================================================= -if __name__ == "__main__": +if __name__ == '__main__': pass