From f0a37b00761634b856722cf0e4184103d0c7c2b2 Mon Sep 17 00:00:00 2001 From: Frank Brehm Date: Tue, 17 May 2022 18:35:21 +0200 Subject: [PATCH] Starting with module pp_admintools.ldap_app --- lib/pp_admintools/ldap_app.py | 374 ++++++++++++++++++++++++++++++++++ 1 file changed, 374 insertions(+) create mode 100644 lib/pp_admintools/ldap_app.py diff --git a/lib/pp_admintools/ldap_app.py b/lib/pp_admintools/ldap_app.py new file mode 100644 index 0000000..a84bf6f --- /dev/null +++ b/lib/pp_admintools/ldap_app.py @@ -0,0 +1,374 @@ +# -*- coding: utf-8 -*- +""" +@author: Frank Brehm +@contact: frank.brehm@pixelpark.com +@copyright: © 2022 by Frank Brehm, Berlin +@summary: A base module for application classes with LDAP support +""" +from __future__ import absolute_import + +# Standard modules +import logging +import copy +import pipes +import os +import argparse + +try: + from pathlib import Path +except ImportError: + from pathlib2 import Path + +## Third party modules +from fb_tools.common import pp + +from fb_tools.cfg_app import FbConfigApplication + +from fb_tools.errors import FbAppError + +# Own modules +from . import __version__ as GLOBAL_VERSION + +from .xlate import XLATOR + +from . import MAX_PORT_NUMBER, DEFAULT_CONFIG_DIR + +from .xlate import XLATOR + +from .argparse_actions import PortOptionAction + +from .ldap_config import LdapConfigError, LdapConnectionInfo, LdapConfiguration +from .ldap_config import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS, DEFAULT_TIMEOUT, MAX_TIMEOUT + +__version__ = '0.1.0' +LOG = logging.getLogger(__name__) + +_ = XLATOR.gettext +ngettext = XLATOR.ngettext + + +# ============================================================================= +class LdapAppError(FbAppError): + """ Base exception class for all exceptions in all LDAP using application classes.""" + pass + + +# ============================================================================= +class PasswordFileOptionAction(argparse.Action): + + # ------------------------------------------------------------------------- + def __init__(self, option_strings, must_exists=True, *args, **kwargs): + + self.must_exists = bool(must_exists) + + super(PasswordFileOptionAction, self).__init__( + option_strings=option_strings, *args, **kwargs) + + # ------------------------------------------------------------------------- + def __call__(self, parser, namespace, given_path, option_string=None): + + path = Path(given_path) + if not path.is_absolute(): + msg = _("The path {!r} must be an absolute path.").format(given_path) + raise argparse.ArgumentError(self, msg) + + if self.must_exists: + + if not path.exists(): + msg = _("The file {!r} does not exists.").format(str(path)) + raise argparse.ArgumentError(self, msg) + + if not path.is_file(): + msg = _("The given path {!r} exists, but is not a regular file.").format(str(path)) + raise argparse.ArgumentError(self, msg) + + if not os.access(str(path), os.R_OK): + msg = _("The given file {!r} is not readable.").format(str(path)) + raise argparse.ArgumentError(self, msg) + + setattr(namespace, self.dest, path) + + +# ============================================================================= +class LdapPortOptionAction(argparse.Action): + + # ------------------------------------------------------------------------- + def __init__(self, option_strings, *args, **kwargs): + + super(LdapPortOptionAction, self).__init__( + option_strings=option_strings, *args, **kwargs) + + # ------------------------------------------------------------------------- + def __call__(self, parser, namespace, given_port, option_string=None): + + try: + port = int(given_port): + if port <= 0 or port > MAX_PORT_NUMBER: + msg = _( + "a port number must be greater than zero and less " + "or equal to {}.").format(MAX_PORT_NUMBER) + raise ValueError(msg) + except (ValueError, TypeError) as e: + msg = _("Wrong port number {!r}:").format(given_port) + msg += ' ' + str(e) + raise argparse.ArgumentError(self, msg) + + setattr(namespace, self.dest, port) + + +# ============================================================================= +class TimeoutOptionAction(argparse.Action): + + # ------------------------------------------------------------------------- + def __init__(self, option_strings, *args, **kwargs): + + super(TimeoutOptionAction, self).__init__( + option_strings=option_strings, *args, **kwargs) + + # ------------------------------------------------------------------------- + def __call__(self, parser, namespace, given_timeout, option_string=None): + + try: + timeout = int(given_timeout): + if timeout <= 0 or timeout > MAX_TIMEOUT: + msg = _( + "a timeout must be greater than zero and less " + "or equal to {}.").format(MAX_TIMEOUT) + raise ValueError(msg) + except (ValueError, TypeError) as e: + msg = _("Wrong timeout {!r}:").format(given_timeout) + msg += ' ' + str(e) + raise argparse.ArgumentError(self, msg) + + setattr(namespace, self.dest, timeout) + + +# ============================================================================= +class BaseLdapApplication(FbConfigApplication): + """ + Base class for all application classes using LDAP. + """ + + # ------------------------------------------------------------------------- + def __init__( + self, appname=None, verbose=0, version=GLOBAL_VERSION, base_dir=None, + cfg_class=LdapConfiguration, initialized=False, usage=None, description=None, + argparse_epilog=None, argparse_prefix_chars='-', env_prefix=None, + use_default_ldap_connection=True, config_dir=DEFAULT_CONFIG_DIR): + + self._password_file = None + self._use_default_ldap_connection = bool(use_default_ldap_connection) + + super(BaseLdapApplication, self).__init__( + appname=appname, verbose=verbose, version=version, base_dir=base_dir, + description=description, cfg_class=cfg_class, initialized=False, + argparse_epilog=argparse_epilog, argparse_prefix_chars=argparse_prefix_chars, + env_prefix=env_prefix, config_dir=config_dir + ) + + # ----------------------------------------------------------- + @property + def use_default_ldap_connection(self): + """Should there be command line parameters for the default LDAP connection.""" + return self._use_default_ldap_connection + + # ----------------------------------------------------------- + @property + def password_file(self): + """The file containing the password of the Bind DN of the default LDAP connection.""" + return self._password_file + + @password_file.setter + def password_file(self, value): + + path = Path(value) + if not path.is_absolute(): + msg = _("The path {!r} must be an absolute path.").format(value) + raise LdapAppError(msg) + + if not path.exists(): + msg = _("The file {!r} does not exists.").format(str(path)) + raise LdapAppError(msg) + + if not path.is_file(): + msg = _("The given path {!r} exists, but is not a regular file.").format(str(path)) + raise LdapAppError(msg) + + if not os.access(str(path), os.R_OK): + msg = _("The given file {!r} is not readable.").format(str(path)) + raise LdapAppError(msg) + + self._password_file = path + + # ------------------------------------------------------------------------- + def as_dict(self, short=True): + """ + Transforms the elements of the object into a dict + + @param short: don't include local properties in resulting dict. + @type short: bool + + @return: structure as dict + @rtype: dict + """ + + res = super(BaseLdapApplication, self).as_dict(short=short) + + res['password_file'] = self.password_file + res['use_default_ldap_connection'] = self.use_default_ldap_connection + + return res + + + # ------------------------------------------------------------------------- + def init_arg_parser(self): + """ + Public available method to initiate the argument parser. + """ + + super(BaseLdapApplication, self).init_arg_parser() + + ldap_group = self.arg_parser.add_argument_group(_( + 'Options for the default LDAP connection')) + + if self.use_default_ldap_connection: + + ldap_host = LdapConfiguration.default_ldap_server + ldap_ssl = LdapConfiguration.use_ssl_on_default + ldap_ssl_str = _('No') + if ldap_ssl: + ldap_ssl_str = _('Yes') + ldap_port = LdapConfiguration.default_ldap_port + ldap_base_dn = LdapConfiguration.default_base_dn + ldap_bind_dn = LdapConfiguration.default_bind_dn + + ldap_group.add_argument( + '-h', '--ldap-host', metavar=_("HOST"), dest="ldap_host", + help=_( + "Hostname or address of the LDAP server to use. Default: {!r}").format( + ldap_host), + ) + + ldap_group.add_argument( + '--ssl', '--ldaps', '--ldap-ssl', dest="ldap_ssl", action="store_true", + help=_("Use ldaps to connect to the LDAP server. Default: {}").format(ldap_ssl_str), + ) + + ldap_group.add_argument( + '-p', '--ldap-port', metavar=_("PORT"), type=int, dest="ldap_port", + action=LdapPortOptionAction, + help=_("The port number to connect to the LDAP server. Default: {}").format( + ldap_port), + ) + + ldap_group.add_argument( + '-b', '--base-dn', metavar="DN", dest="ldap_base_dn", + help=_( + "The base DN used as the root for the LDAP searches. " + "Default: {!r}").format(ldap_base_dn), + ) + + ldap_group.add_argument( + '-D', '--bind-dn', metavar="DN", dest="ldap_bind_dn", + help=_( + "The Bind DN to use to connect to the LDAP server. Default: {}").format( + ldap_bind_dn), + ) + + pw_group = ldap_group.add_mutually_exclusive_group() + + pw_group.add_argument( + '-w', '--bind-pw', '--password', metavar=_("PASSWORD"), dest="ldap_bind_pw", + help=_("Use PASSWORD as the password for simple LDAP authentication."), + ) + + pw_group.add_argument( + '-W', '--password-prompt', action="store_true", dest="ldap_pw_prompt", + help=_( + "Prompt for simple LDAP authentication. This is used instead of " + "specifying the password on the command line."), + ) + + pw_group.add_argument( + '-y', '--password-file', metavar=_('PASSWORD_FILE'), dest="ldap_pw_file", + action=PasswordFileOptionAction, + help=_("Use contents of PASSWORD_FILE as the password for simple authentication."), + ) + + self.arg_parser.add_argument( + '-T', '--timeout', metavar=_('SECONDS'), dest="ldap_timeout", + action=TimeoutOptionAction, + help=_( + "Using the given timeout in seconds for all LDAP operations. " + "Default: {}").format(DEFAULT_TIMEOUT), + ) + + # ------------------------------------------------------------------------- + def post_init(self): + """ + Method to execute before calling run(). Here could be done some + finishing actions after reading in commandline parameters, + configuration a.s.o. + + This method could be overwritten by descendant classes, these + methhods should allways include a call to post_init() of the + parent class. + + """ + + self.initialized = False + + super(BaseLdapApplication, self).post_init() + + if not self.use_default_ldap_connection: + return + + if 'default' in self.cfg.connection: + default_connection = self.cfg.connection['default'] + else: + default_connection = LdapConnectionInfo( + appname=self.appname, verbose=self.verbose, base_dir=self.base_dir, + host=LdapConfiguration.default_ldap_server, + use_ldaps=LdapConfiguration.use_ssl_on_default, + port=LdapConfiguration.default_ldap_port, + base_dn=LdapConfiguration.default_base_dn, + bind_dn=LdapConfiguration.default_bind_dn, + initialized=False) + self.cfg.connection['default'] = default_connection + + v = getattr(self.args, 'ldap_host', None) + if v: + default_connection.host = v + + if getattr(self.args, 'ldap_ssl', False): + default_connection.use_ldaps = True + + v = getattr(self.args, 'ldap_port', None) + if v is not None: + default_connection.port = v + + v = getattr(self.args, 'ldap_base_dn', None) + if v: + default_connection.base_dn = v + + v = getattr(self.args, 'ldap_bind_dn', None) + if v: + default_connection.bind_dn = v + + v = getattr(self.args, 'ldap_bind_pw', None) + if v: + default_connection.bind_pw = v + + v = getattr(self.args, 'ldap_timeout', None) + if v: + self.cfg.timeout = v + + +# ============================================================================= +if __name__ == "__main__": + + pass + +# ============================================================================= + +# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list -- 2.39.5