]> Frank Brehm's Git Trees - pixelpark/pp-admin-tools.git/commitdiff
Starting with module pp_admintools.ldap_app
authorFrank Brehm <frank@brehm-online.com>
Tue, 17 May 2022 16:35:21 +0000 (18:35 +0200)
committerFrank Brehm <frank@brehm-online.com>
Tue, 17 May 2022 16:35:21 +0000 (18:35 +0200)
lib/pp_admintools/ldap_app.py [new file with mode: 0644]

diff --git a/lib/pp_admintools/ldap_app.py b/lib/pp_admintools/ldap_app.py
new file mode 100644 (file)
index 0000000..a84bf6f
--- /dev/null
@@ -0,0 +1,374 @@
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2022 by Frank Brehm, Berlin
+@summary: A base module for application classes with LDAP support
+"""
+from __future__ import absolute_import
+
+# Standard modules
+import logging
+import copy
+import pipes
+import os
+import argparse
+
+try:
+    from pathlib import Path
+except ImportError:
+    from pathlib2 import Path
+
+## Third party modules
+from fb_tools.common import pp
+
+from fb_tools.cfg_app import FbConfigApplication
+
+from fb_tools.errors import FbAppError
+
+# Own modules
+from . import __version__ as GLOBAL_VERSION
+
+from .xlate import XLATOR
+
+from . import MAX_PORT_NUMBER, DEFAULT_CONFIG_DIR
+
+from .xlate import XLATOR
+
+from .argparse_actions import PortOptionAction
+
+from .ldap_config import LdapConfigError, LdapConnectionInfo, LdapConfiguration
+from .ldap_config import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS, DEFAULT_TIMEOUT, MAX_TIMEOUT
+
+__version__ = '0.1.0'
+LOG = logging.getLogger(__name__)
+
+_ = XLATOR.gettext
+ngettext = XLATOR.ngettext
+
+
+# =============================================================================
+class LdapAppError(FbAppError):
+    """ Base exception class for all exceptions in all LDAP using application classes."""
+    pass
+
+
+# =============================================================================
+class PasswordFileOptionAction(argparse.Action):
+
+    # -------------------------------------------------------------------------
+    def __init__(self, option_strings, must_exists=True,  *args, **kwargs):
+
+        self.must_exists = bool(must_exists)
+
+        super(PasswordFileOptionAction, self).__init__(
+            option_strings=option_strings, *args, **kwargs)
+
+    # -------------------------------------------------------------------------
+    def __call__(self, parser, namespace, given_path, option_string=None):
+
+        path = Path(given_path)
+        if not path.is_absolute():
+            msg = _("The path {!r} must be an absolute path.").format(given_path)
+            raise argparse.ArgumentError(self, msg)
+
+        if self.must_exists:
+
+            if not path.exists():
+                msg = _("The file {!r} does not exists.").format(str(path))
+                raise argparse.ArgumentError(self, msg)
+
+            if not path.is_file():
+                msg = _("The given path {!r} exists, but is not a regular file.").format(str(path))
+                raise argparse.ArgumentError(self, msg)
+
+            if not os.access(str(path), os.R_OK):
+                msg = _("The given file {!r} is not readable.").format(str(path))
+                raise argparse.ArgumentError(self, msg)
+
+        setattr(namespace, self.dest, path)
+
+
+# =============================================================================
+class LdapPortOptionAction(argparse.Action):
+
+    # -------------------------------------------------------------------------
+    def __init__(self, option_strings, *args, **kwargs):
+
+        super(LdapPortOptionAction, self).__init__(
+            option_strings=option_strings, *args, **kwargs)
+
+    # -------------------------------------------------------------------------
+    def __call__(self, parser, namespace, given_port, option_string=None):
+
+        try:
+            port = int(given_port):
+            if port <= 0 or port > MAX_PORT_NUMBER:
+                msg = _(
+                    "a port number must be greater than zero and less "
+                    "or equal to {}.").format(MAX_PORT_NUMBER)
+                raise ValueError(msg)
+        except (ValueError, TypeError) as e:
+            msg = _("Wrong port number {!r}:").format(given_port)
+            msg += ' ' + str(e)
+            raise argparse.ArgumentError(self, msg)
+
+        setattr(namespace, self.dest, port)
+
+
+# =============================================================================
+class TimeoutOptionAction(argparse.Action):
+
+    # -------------------------------------------------------------------------
+    def __init__(self, option_strings, *args, **kwargs):
+
+        super(TimeoutOptionAction, self).__init__(
+            option_strings=option_strings, *args, **kwargs)
+
+    # -------------------------------------------------------------------------
+    def __call__(self, parser, namespace, given_timeout, option_string=None):
+
+        try:
+            timeout = int(given_timeout):
+            if timeout <= 0 or timeout > MAX_TIMEOUT:
+                msg = _(
+                    "a timeout must be greater than zero and less "
+                    "or equal to {}.").format(MAX_TIMEOUT)
+                raise ValueError(msg)
+        except (ValueError, TypeError) as e:
+            msg = _("Wrong timeout {!r}:").format(given_timeout)
+            msg += ' ' + str(e)
+            raise argparse.ArgumentError(self, msg)
+
+        setattr(namespace, self.dest, timeout)
+
+
+# =============================================================================
+class BaseLdapApplication(FbConfigApplication):
+    """
+    Base class for all application classes using LDAP.
+    """
+
+    # -------------------------------------------------------------------------
+    def __init__(
+        self, appname=None, verbose=0, version=GLOBAL_VERSION, base_dir=None,
+            cfg_class=LdapConfiguration, initialized=False, usage=None, description=None,
+            argparse_epilog=None, argparse_prefix_chars='-', env_prefix=None,
+            use_default_ldap_connection=True, config_dir=DEFAULT_CONFIG_DIR):
+
+        self._password_file = None
+        self._use_default_ldap_connection = bool(use_default_ldap_connection)
+
+        super(BaseLdapApplication, self).__init__(
+            appname=appname, verbose=verbose, version=version, base_dir=base_dir,
+            description=description, cfg_class=cfg_class, initialized=False,
+            argparse_epilog=argparse_epilog, argparse_prefix_chars=argparse_prefix_chars,
+            env_prefix=env_prefix, config_dir=config_dir
+        )
+
+    # -----------------------------------------------------------
+    @property
+    def use_default_ldap_connection(self):
+        """Should there be command line parameters for the default LDAP connection."""
+        return self._use_default_ldap_connection
+
+    # -----------------------------------------------------------
+    @property
+    def password_file(self):
+        """The file containing the password of the Bind DN of the default LDAP connection."""
+        return self._password_file
+
+    @password_file.setter
+    def password_file(self, value):
+
+        path = Path(value)
+        if not path.is_absolute():
+            msg = _("The path {!r} must be an absolute path.").format(value)
+            raise LdapAppError(msg)
+
+        if not path.exists():
+            msg = _("The file {!r} does not exists.").format(str(path))
+            raise LdapAppError(msg)
+
+        if not path.is_file():
+            msg = _("The given path {!r} exists, but is not a regular file.").format(str(path))
+            raise LdapAppError(msg)
+
+        if not os.access(str(path), os.R_OK):
+            msg = _("The given file {!r} is not readable.").format(str(path))
+            raise LdapAppError(msg)
+
+        self._password_file = path
+
+    # -------------------------------------------------------------------------
+    def as_dict(self, short=True):
+        """
+        Transforms the elements of the object into a dict
+
+        @param short: don't include local properties in resulting dict.
+        @type short: bool
+
+        @return: structure as dict
+        @rtype:  dict
+        """
+
+        res = super(BaseLdapApplication, self).as_dict(short=short)
+
+        res['password_file'] = self.password_file
+        res['use_default_ldap_connection'] = self.use_default_ldap_connection
+
+        return res
+
+
+    # -------------------------------------------------------------------------
+    def init_arg_parser(self):
+        """
+        Public available method to initiate the argument parser.
+        """
+
+        super(BaseLdapApplication, self).init_arg_parser()
+
+        ldap_group = self.arg_parser.add_argument_group(_(
+            'Options for the default LDAP connection'))
+
+        if self.use_default_ldap_connection:
+
+            ldap_host = LdapConfiguration.default_ldap_server
+            ldap_ssl = LdapConfiguration.use_ssl_on_default
+            ldap_ssl_str = _('No')
+            if ldap_ssl:
+                ldap_ssl_str = _('Yes')
+            ldap_port = LdapConfiguration.default_ldap_port
+            ldap_base_dn = LdapConfiguration.default_base_dn
+            ldap_bind_dn = LdapConfiguration.default_bind_dn
+
+            ldap_group.add_argument(
+                '-h', '--ldap-host', metavar=_("HOST"), dest="ldap_host",
+                help=_(
+                    "Hostname or address of the LDAP server to use. Default: {!r}").format(
+                    ldap_host),
+            )
+
+            ldap_group.add_argument(
+                '--ssl', '--ldaps', '--ldap-ssl', dest="ldap_ssl", action="store_true",
+                help=_("Use ldaps to connect to the LDAP server. Default: {}").format(ldap_ssl_str),
+            )
+
+            ldap_group.add_argument(
+                '-p', '--ldap-port', metavar=_("PORT"), type=int, dest="ldap_port",
+                action=LdapPortOptionAction,
+                help=_("The port number to connect to the LDAP server. Default: {}").format(
+                    ldap_port),
+            )
+
+            ldap_group.add_argument(
+                '-b', '--base-dn', metavar="DN", dest="ldap_base_dn",
+                help=_(
+                    "The base DN used as the root for the LDAP searches. "
+                    "Default: {!r}").format(ldap_base_dn),
+            )
+
+            ldap_group.add_argument(
+                '-D', '--bind-dn', metavar="DN", dest="ldap_bind_dn",
+                help=_(
+                    "The Bind DN to use to connect to the LDAP server. Default: {}").format(
+                    ldap_bind_dn),
+            )
+
+            pw_group = ldap_group.add_mutually_exclusive_group()
+
+            pw_group.add_argument(
+                '-w', '--bind-pw', '--password', metavar=_("PASSWORD"), dest="ldap_bind_pw",
+                help=_("Use PASSWORD as the password for simple LDAP authentication."),
+            )
+
+            pw_group.add_argument(
+                '-W', '--password-prompt', action="store_true", dest="ldap_pw_prompt",
+                help=_(
+                    "Prompt for simple LDAP authentication. This is used instead of "
+                    "specifying the password on the command line."),
+            )
+
+            pw_group.add_argument(
+                '-y', '--password-file', metavar=_('PASSWORD_FILE'), dest="ldap_pw_file",
+                action=PasswordFileOptionAction,
+                help=_("Use contents of PASSWORD_FILE as the password for simple authentication."),
+            )
+
+        self.arg_parser.add_argument(
+            '-T', '--timeout', metavar=_('SECONDS'), dest="ldap_timeout",
+            action=TimeoutOptionAction,
+            help=_(
+                "Using the given timeout in seconds for all LDAP operations. "
+                "Default: {}").format(DEFAULT_TIMEOUT),
+        )
+
+    # -------------------------------------------------------------------------
+    def post_init(self):
+        """
+        Method to execute before calling run(). Here could be done some
+        finishing actions after reading in commandline parameters,
+        configuration a.s.o.
+
+        This method could be overwritten by descendant classes, these
+        methhods should allways include a call to post_init() of the
+        parent class.
+
+        """
+
+        self.initialized = False
+
+        super(BaseLdapApplication, self).post_init()
+
+        if not self.use_default_ldap_connection:
+            return
+
+        if 'default' in self.cfg.connection:
+            default_connection = self.cfg.connection['default']
+        else:
+            default_connection = LdapConnectionInfo(
+                appname=self.appname, verbose=self.verbose, base_dir=self.base_dir,
+                host=LdapConfiguration.default_ldap_server,
+                use_ldaps=LdapConfiguration.use_ssl_on_default,
+                port=LdapConfiguration.default_ldap_port,
+                base_dn=LdapConfiguration.default_base_dn,
+                bind_dn=LdapConfiguration.default_bind_dn,
+                initialized=False)
+            self.cfg.connection['default'] = default_connection
+
+        v = getattr(self.args, 'ldap_host', None)
+        if v:
+            default_connection.host = v
+
+        if getattr(self.args, 'ldap_ssl', False):
+            default_connection.use_ldaps = True
+
+        v = getattr(self.args, 'ldap_port', None)
+        if v is not None:
+            default_connection.port = v
+
+        v = getattr(self.args, 'ldap_base_dn', None)
+        if v:
+            default_connection.base_dn = v
+
+        v = getattr(self.args, 'ldap_bind_dn', None)
+        if v:
+            default_connection.bind_dn = v
+
+        v = getattr(self.args, 'ldap_bind_pw', None)
+        if v:
+            default_connection.bind_pw = v
+
+        v = getattr(self.args, 'ldap_timeout', None)
+        if v:
+            self.cfg.timeout = v
+
+
+# =============================================================================
+if __name__ == "__main__":
+
+    pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list