From d220a4a003458f30b70c29a8bba305cbe5ba4ca4 Mon Sep 17 00:00:00 2001 From: Frank Brehm Date: Fri, 22 Sep 2023 12:15:40 +0200 Subject: [PATCH] Separating definition of LdapConnectionInfo and LdapConnectionDict into a separate module --- lib/cr_vmware_tpl/__init__.py | 13 +- .../{config.py => config/__init__.py} | 351 +---------------- lib/cr_vmware_tpl/config/ldap.py | 369 ++++++++++++++++++ lib/cr_vmware_tpl/handler.py | 3 +- 4 files changed, 392 insertions(+), 344 deletions(-) rename lib/cr_vmware_tpl/{config.py => config/__init__.py} (81%) create mode 100644 lib/cr_vmware_tpl/config/ldap.py diff --git a/lib/cr_vmware_tpl/__init__.py b/lib/cr_vmware_tpl/__init__.py index 7e25320..735a8dc 100644 --- a/lib/cr_vmware_tpl/__init__.py +++ b/lib/cr_vmware_tpl/__init__.py @@ -3,11 +3,22 @@ import time -__version__ = '2.8.0' +__version__ = '2.8.1' DEFAULT_CONFIG_DIR = 'pixelpark' DEFAULT_DISTRO_ARCH = 'x86_64' MAX_PORT_NUMBER = (2 ** 16) - 1 +DEFAULT_PORT_LDAP = 389 +DEFAULT_PORT_LDAPS = 636 +DEFAULT_TIMEOUT = 20 +MAX_TIMEOUT = 3600 + +DEFAULT_LDAP_ADMIN_FILTER = ( + '(&(inetuserstatus=active)(mailuserstatus=active)(objectclass=pppixelaccount)(mail=*)' + '(sshPublicKey=*)' + '(memberOf=cn=Administratoren Pixelpark Berlin,ou=Groups,o=Pixelpark,o=isp))' +) + # ------------------------------------------------------------------------- def print_section_start(name, header=None, collapsed=False): diff --git a/lib/cr_vmware_tpl/config.py b/lib/cr_vmware_tpl/config/__init__.py similarity index 81% rename from lib/cr_vmware_tpl/config.py rename to lib/cr_vmware_tpl/config/__init__.py index 53d27d3..c195dee 100644 --- a/lib/cr_vmware_tpl/config.py +++ b/lib/cr_vmware_tpl/config/__init__.py @@ -19,9 +19,8 @@ import ipaddress # Third party modules from pathlib import Path -# Own modules from fb_tools.common import is_sequence, pp, to_bool -from fb_tools.obj import FbGenericBaseObject, FbBaseObject +from fb_tools.obj import FbGenericBaseObject from fb_tools.collections import CIStringSet from fb_tools.multi_config import BaseMultiConfig from fb_tools.multi_config import DEFAULT_ENCODING @@ -29,320 +28,22 @@ from fb_tools.xlate import format_list from fb_vmware.config import VSPhereConfigInfo -from . import DEFAULT_CONFIG_DIR, DEFAULT_DISTRO_ARCH, MAX_PORT_NUMBER +# Own modules +from .. import DEFAULT_CONFIG_DIR, DEFAULT_DISTRO_ARCH +from .. import DEFAULT_PORT_LDAPS, DEFAULT_TIMEOUT, MAX_TIMEOUT + +from ..errors import CrTplConfigError -from .errors import CrTplConfigError +from ..xlate import XLATOR -from .xlate import XLATOR +from .ldap import LdapConnectionInfo, LdapConnectionDict -__version__ = '2.3.3' +__version__ = '3.0.1' LOG = logging.getLogger(__name__) _ = XLATOR.gettext ngettext = XLATOR.ngettext -DEFAULT_PORT_LDAP = 389 -DEFAULT_PORT_LDAPS = 636 -DEFAULT_TIMEOUT = 20 -MAX_TIMEOUT = 3600 -DEFAULT_ADMIN_FILTER = ( - '(&(inetuserstatus=active)(mailuserstatus=active)(objectclass=pppixelaccount)(mail=*)' - '(sshPublicKey=*)' - '(memberOf=cn=Administratoren Pixelpark Berlin,ou=Groups,o=Pixelpark,o=isp))' -) - - -# ============================================================================= -class LdapConnectionInfo(FbBaseObject): - """Encapsulating all necessary data to connect to a LDAP server.""" - - re_host_key = re.compile(r'^\s*(?:host|server)\s*$', re.IGNORECASE) - re_ldaps_key = re.compile(r'^\s*(?:use[_-]?)?(?:ldaps|ssl)\s*$', re.IGNORECASE) - re_port_key = re.compile(r'^\s*port\s*$', re.IGNORECASE) - re_base_dn_key = re.compile(r'^\s*base[_-]*dn\s*$', re.IGNORECASE) - re_bind_dn_key = re.compile(r'^\s*bind[_-]*dn\s*$', re.IGNORECASE) - re_bind_pw_key = re.compile(r'^\s*bind[_-]*pw\s*$', re.IGNORECASE) - re_admin_filter_key = re.compile(r'^\s*(?:admin[_-]?)?filter\s*$', re.IGNORECASE) - - # ------------------------------------------------------------------------- - def __init__( - self, appname=None, verbose=0, version=__version__, base_dir=None, - host=None, use_ldaps=False, port=DEFAULT_PORT_LDAP, base_dn=None, - bind_dn=None, bind_pw=None, admin_filter=None, initialized=False): - - self._host = None - self._use_ldaps = False - self._port = DEFAULT_PORT_LDAP - self._base_dn = None - self._bind_dn = None - self._bind_pw = None - self._admin_filter = DEFAULT_ADMIN_FILTER - - super(LdapConnectionInfo, self).__init__( - appname=appname, verbose=verbose, version=version, base_dir=base_dir, - initialized=False) - - self.host = host - self.use_ldaps = use_ldaps - self.port = port - if base_dn: - self.base_dn = base_dn - self.bind_dn = bind_dn - self.bind_pw = bind_pw - if admin_filter: - self.admin_filter = admin_filter - - if initialized: - self.initialized = True - - # ------------------------------------------------------------------------- - def as_dict(self, short=True): - """ - Transforms the elements of the object into a dict - - @param short: don't include local properties in resulting dict. - @type short: bool - - @return: structure as dict - @rtype: dict - """ - - res = super(LdapConnectionInfo, self).as_dict(short=short) - - res['host'] = self.host - res['use_ldaps'] = self.use_ldaps - res['port'] = self.port - res['base_dn'] = self.base_dn - res['bind_dn'] = self.bind_dn - res['bind_pw'] = None - res['schema'] = self.schema - res['url'] = self.url - res['admin_filter'] = self.admin_filter - - if self.bind_pw: - if self.verbose > 4: - res['bind_pw'] = self.bind_pw - else: - res['bind_pw'] = '******' - - return res - - # ----------------------------------------------------------- - @property - def host(self): - """The host name (or IP address) of the LDAP server.""" - return self._host - - @host.setter - def host(self, value): - if value is None or str(value).strip() == '': - self._host = None - return - self._host = str(value).strip().lower() - - # ----------------------------------------------------------- - @property - def use_ldaps(self): - """Should there be used LDAPS for communicating with the LDAP server?""" - return self._use_ldaps - - @use_ldaps.setter - def use_ldaps(self, value): - self._use_ldaps = to_bool(value) - - # ----------------------------------------------------------- - @property - def port(self): - "The TCP port number of the LDAP server." - return self._port - - @port.setter - def port(self, value): - v = int(value) - if v < 1 or v > MAX_PORT_NUMBER: - raise CrTplConfigError(_("Invalid port {!r} for LDAP server given.").format(value)) - self._port = v - - # ----------------------------------------------------------- - @property - def base_dn(self): - """The DN used to connect to the LDAP server, anonymous bind is used, if - this DN is empty or None.""" - return self._base_dn - - @base_dn.setter - def base_dn(self, value): - if value is None or str(value).strip() == '': - msg = _("An empty Base DN for LDAP searches is not allowed.") - raise CrTplConfigError(msg) - self._base_dn = str(value).strip() - - # ----------------------------------------------------------- - @property - def bind_dn(self): - """The DN used to connect to the LDAP server, anonymous bind is used, if - this DN is empty or None.""" - return self._bind_dn - - @bind_dn.setter - def bind_dn(self, value): - if value is None or str(value).strip() == '': - self._bind_dn = None - return - self._bind_dn = str(value).strip() - - # ----------------------------------------------------------- - @property - def bind_pw(self): - """The password of the DN used to connect to the LDAP server.""" - return self._bind_pw - - @bind_pw.setter - def bind_pw(self, value): - if value is None or str(value).strip() == '': - self._bind_pw = None - return - self._bind_pw = str(value).strip() - - # ----------------------------------------------------------- - @property - def admin_filter(self): - """The LDAP filter to get the list of administrators from LDAP.""" - return self._admin_filter - - @admin_filter.setter - def admin_filter(self, value): - if value is None or str(value).strip() == '': - self._admin_filter = None - return - self._admin_filter = str(value).strip() - - # ----------------------------------------------------------- - @property - def schema(self): - """The schema as part of the URL to connect to the LDAP server.""" - if self.use_ldaps: - return 'ldaps' - return 'ldap' - - # ----------------------------------------------------------- - @property - def url(self): - """The URL, which ca be used to connect to the LDAP server.""" - if not self.host: - return None - - port = '' - if self.use_ldaps: - if self.port != DEFAULT_PORT_LDAPS: - port = ':{}'.format(self.port) - else: - if self.port != DEFAULT_PORT_LDAP: - port = ':{}'.format(self.port) - - return '{s}://{h}{p}'.format(s=self.schema, h=self.host, p=port) - - # ------------------------------------------------------------------------- - def __repr__(self): - """Typecasting into a string for reproduction.""" - - out = "<%s(" % (self.__class__.__name__) - - fields = [] - fields.append("appname={!r}".format(self.appname)) - fields.append("host={!r}".format(self.host)) - fields.append("use_ldaps={!r}".format(self.use_ldaps)) - fields.append("port={!r}".format(self.port)) - fields.append("base_dn={!r}".format(self.base_dn)) - fields.append("bind_dn={!r}".format(self.bind_dn)) - fields.append("bind_pw={!r}".format(self.bind_pw)) - fields.append("admin_filter={!r}".format(self.admin_filter)) - fields.append("initialized={!r}".format(self.initialized)) - - out += ", ".join(fields) + ")>" - return out - - # ------------------------------------------------------------------------- - def __copy__(self): - - new = self.__class__( - appname=self.appname, verbose=self.verbose, base_dir=self.base_dir, host=self.host, - use_ldaps=self.use_ldaps, port=self.port, base_dn=self.base_dn, bind_dn=self.bind_dn, - bind_pw=self.bind_pw, admin_filter=self.admin_filter, initialized=self.initialized) - - return new - - # ------------------------------------------------------------------------- - @classmethod - def init_from_config(cls, name, data, appname=None, verbose=0, base_dir=None): - - new = cls(appname=appname, verbose=verbose, base_dir=base_dir) - - s_name = "ldap:" + name - msg_invalid = _("Invalid value {val!r} in section {sec!r} for a LDAP {what}.") - - for key in data.keys(): - value = data[key] - - if cls.re_host_key.match(key): - if value.strip(): - new.host = value - else: - msg = msg_invalid.format(val=value, sec=s_name, what='host') - LOG.error(msg) - continue - - if cls.re_ldaps_key.match(key): - new.use_ldaps = value - continue - - if cls.re_port_key.match(key): - port = DEFAULT_PORT_LDAP - try: - port = int(value) - except (ValueError, TypeError) as e: - msg = msg_invalid.format(val=value, sec=s_name, what='port') - msg += ' ' + str(e) - LOG.error(msg) - continue - if port <= 0 or port > MAX_PORT_NUMBER: - msg = msg_invalid.format(val=value, sec=s_name, what='port') - LOG.error(msg) - continue - new.port = port - continue - - if cls.re_base_dn_key.match(key): - if value.strip(): - new.base_dn = value - else: - msg = msg_invalid.format(val=value, sec=s_name, what='base_dn') - LOG.error(msg) - continue - - if cls.re_bind_dn_key.match(key): - new.bind_dn = value - continue - - if cls.re_bind_pw_key.match(key): - new.bind_pw = value - continue - - if cls.re_admin_filter_key.match(key): - new.admin_filter = value - continue - - if key.lower() in ['is_admin', 'readonly', 'tier', 'sync-source']: - continue - - msg = _("Unknown LDAP configuration key {key} found in section {sec!r}.").format( - key=key, sec=s_name) - LOG.error(msg) - - new.initialized = True - - return new - # ============================================================================= class CobblerDistroInfo(FbGenericBaseObject): @@ -719,40 +420,6 @@ class CobblerDistroInfo(FbGenericBaseObject): new.snippets.add(value.strip()) -# ============================================================================= -class LdapConnectionDict(dict, FbGenericBaseObject): - """A dictionary containing LdapConnectionInfo as values and their names as keys.""" - - # ------------------------------------------------------------------------- - def as_dict(self, short=True): - """ - Transforms the elements of the object into a dict - - @param short: don't include local properties in resulting dict. - @type short: bool - - @return: structure as dict - @rtype: dict - """ - - res = super(LdapConnectionDict, self).as_dict(short=short) - - for key in self.keys(): - res[key] = self[key].as_dict(short=short) - - return res - - # ------------------------------------------------------------------------- - def __copy__(self): - - new = self.__class__() - - for key in self.keys(): - new[key] = copy.copy(self[key]) - - return new - - # ============================================================================= class CrTplConfiguration(BaseMultiConfig): """ diff --git a/lib/cr_vmware_tpl/config/ldap.py b/lib/cr_vmware_tpl/config/ldap.py new file mode 100644 index 0000000..4b734b7 --- /dev/null +++ b/lib/cr_vmware_tpl/config/ldap.py @@ -0,0 +1,369 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +@author: Frank Brehm +@contact: frank.brehm@pixelpark.com +@copyright: © 2018 by Frank Brehm, Berlin +@summary: A module for providing LDAP connection infos. +""" +from __future__ import absolute_import + +# Standard module +import logging +import re +import copy + +# Third party modules +from fb_tools.common import to_bool +from fb_tools.obj import FbGenericBaseObject, FbBaseObject + +# Own modules +from .. import MAX_PORT_NUMBER +from .. import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS +from .. import DEFAULT_LDAP_ADMIN_FILTER + +from ..errors import CrTplConfigError + +from ..xlate import XLATOR + +__version__ = '3.0.0' +LOG = logging.getLogger(__name__) + +_ = XLATOR.gettext +ngettext = XLATOR.ngettext + + +# ============================================================================= +class LdapConnectionInfo(FbBaseObject): + """Encapsulating all necessary data to connect to a LDAP server.""" + + re_host_key = re.compile(r'^\s*(?:host|server)\s*$', re.IGNORECASE) + re_ldaps_key = re.compile(r'^\s*(?:use[_-]?)?(?:ldaps|ssl)\s*$', re.IGNORECASE) + re_port_key = re.compile(r'^\s*port\s*$', re.IGNORECASE) + re_base_dn_key = re.compile(r'^\s*base[_-]*dn\s*$', re.IGNORECASE) + re_bind_dn_key = re.compile(r'^\s*bind[_-]*dn\s*$', re.IGNORECASE) + re_bind_pw_key = re.compile(r'^\s*bind[_-]*pw\s*$', re.IGNORECASE) + re_admin_filter_key = re.compile(r'^\s*(?:admin[_-]?)?filter\s*$', re.IGNORECASE) + + # ------------------------------------------------------------------------- + def __init__( + self, appname=None, verbose=0, version=__version__, base_dir=None, + host=None, use_ldaps=False, port=DEFAULT_PORT_LDAP, base_dn=None, + bind_dn=None, bind_pw=None, admin_filter=None, initialized=False): + + self._host = None + self._use_ldaps = False + self._port = DEFAULT_PORT_LDAP + self._base_dn = None + self._bind_dn = None + self._bind_pw = None + self._admin_filter = DEFAULT_LDAP_ADMIN_FILTER + + super(LdapConnectionInfo, self).__init__( + appname=appname, verbose=verbose, version=version, base_dir=base_dir, + initialized=False) + + self.host = host + self.use_ldaps = use_ldaps + self.port = port + if base_dn: + self.base_dn = base_dn + self.bind_dn = bind_dn + self.bind_pw = bind_pw + if admin_filter: + self.admin_filter = admin_filter + + if initialized: + self.initialized = True + + # ------------------------------------------------------------------------- + def as_dict(self, short=True): + """ + Transforms the elements of the object into a dict + + @param short: don't include local properties in resulting dict. + @type short: bool + + @return: structure as dict + @rtype: dict + """ + + res = super(LdapConnectionInfo, self).as_dict(short=short) + + res['host'] = self.host + res['use_ldaps'] = self.use_ldaps + res['port'] = self.port + res['base_dn'] = self.base_dn + res['bind_dn'] = self.bind_dn + res['bind_pw'] = None + res['schema'] = self.schema + res['url'] = self.url + res['admin_filter'] = self.admin_filter + + if self.bind_pw: + if self.verbose > 4: + res['bind_pw'] = self.bind_pw + else: + res['bind_pw'] = '******' + + return res + + # ----------------------------------------------------------- + @property + def host(self): + """The host name (or IP address) of the LDAP server.""" + return self._host + + @host.setter + def host(self, value): + if value is None or str(value).strip() == '': + self._host = None + return + self._host = str(value).strip().lower() + + # ----------------------------------------------------------- + @property + def use_ldaps(self): + """Should there be used LDAPS for communicating with the LDAP server?""" + return self._use_ldaps + + @use_ldaps.setter + def use_ldaps(self, value): + self._use_ldaps = to_bool(value) + + # ----------------------------------------------------------- + @property + def port(self): + "The TCP port number of the LDAP server." + return self._port + + @port.setter + def port(self, value): + v = int(value) + if v < 1 or v > MAX_PORT_NUMBER: + raise CrTplConfigError(_("Invalid port {!r} for LDAP server given.").format(value)) + self._port = v + + # ----------------------------------------------------------- + @property + def base_dn(self): + """The DN used to connect to the LDAP server, anonymous bind is used, if + this DN is empty or None.""" + return self._base_dn + + @base_dn.setter + def base_dn(self, value): + if value is None or str(value).strip() == '': + msg = _("An empty Base DN for LDAP searches is not allowed.") + raise CrTplConfigError(msg) + self._base_dn = str(value).strip() + + # ----------------------------------------------------------- + @property + def bind_dn(self): + """The DN used to connect to the LDAP server, anonymous bind is used, if + this DN is empty or None.""" + return self._bind_dn + + @bind_dn.setter + def bind_dn(self, value): + if value is None or str(value).strip() == '': + self._bind_dn = None + return + self._bind_dn = str(value).strip() + + # ----------------------------------------------------------- + @property + def bind_pw(self): + """The password of the DN used to connect to the LDAP server.""" + return self._bind_pw + + @bind_pw.setter + def bind_pw(self, value): + if value is None or str(value).strip() == '': + self._bind_pw = None + return + self._bind_pw = str(value).strip() + + # ----------------------------------------------------------- + @property + def admin_filter(self): + """The LDAP filter to get the list of administrators from LDAP.""" + return self._admin_filter + + @admin_filter.setter + def admin_filter(self, value): + if value is None or str(value).strip() == '': + self._admin_filter = None + return + self._admin_filter = str(value).strip() + + # ----------------------------------------------------------- + @property + def schema(self): + """The schema as part of the URL to connect to the LDAP server.""" + if self.use_ldaps: + return 'ldaps' + return 'ldap' + + # ----------------------------------------------------------- + @property + def url(self): + """The URL, which ca be used to connect to the LDAP server.""" + if not self.host: + return None + + port = '' + if self.use_ldaps: + if self.port != DEFAULT_PORT_LDAPS: + port = ':{}'.format(self.port) + else: + if self.port != DEFAULT_PORT_LDAP: + port = ':{}'.format(self.port) + + return '{s}://{h}{p}'.format(s=self.schema, h=self.host, p=port) + + # ------------------------------------------------------------------------- + def __repr__(self): + """Typecasting into a string for reproduction.""" + + out = "<%s(" % (self.__class__.__name__) + + fields = [] + fields.append("appname={!r}".format(self.appname)) + fields.append("host={!r}".format(self.host)) + fields.append("use_ldaps={!r}".format(self.use_ldaps)) + fields.append("port={!r}".format(self.port)) + fields.append("base_dn={!r}".format(self.base_dn)) + fields.append("bind_dn={!r}".format(self.bind_dn)) + fields.append("bind_pw={!r}".format(self.bind_pw)) + fields.append("admin_filter={!r}".format(self.admin_filter)) + fields.append("initialized={!r}".format(self.initialized)) + + out += ", ".join(fields) + ")>" + return out + + # ------------------------------------------------------------------------- + def __copy__(self): + + new = self.__class__( + appname=self.appname, verbose=self.verbose, base_dir=self.base_dir, host=self.host, + use_ldaps=self.use_ldaps, port=self.port, base_dn=self.base_dn, bind_dn=self.bind_dn, + bind_pw=self.bind_pw, admin_filter=self.admin_filter, initialized=self.initialized) + + return new + + # ------------------------------------------------------------------------- + @classmethod + def init_from_config(cls, name, data, appname=None, verbose=0, base_dir=None): + + new = cls(appname=appname, verbose=verbose, base_dir=base_dir) + + s_name = "ldap:" + name + msg_invalid = _("Invalid value {val!r} in section {sec!r} for a LDAP {what}.") + + for key in data.keys(): + value = data[key] + + if cls.re_host_key.match(key): + if value.strip(): + new.host = value + else: + msg = msg_invalid.format(val=value, sec=s_name, what='host') + LOG.error(msg) + continue + + if cls.re_ldaps_key.match(key): + new.use_ldaps = value + continue + + if cls.re_port_key.match(key): + port = DEFAULT_PORT_LDAP + try: + port = int(value) + except (ValueError, TypeError) as e: + msg = msg_invalid.format(val=value, sec=s_name, what='port') + msg += ' ' + str(e) + LOG.error(msg) + continue + if port <= 0 or port > MAX_PORT_NUMBER: + msg = msg_invalid.format(val=value, sec=s_name, what='port') + LOG.error(msg) + continue + new.port = port + continue + + if cls.re_base_dn_key.match(key): + if value.strip(): + new.base_dn = value + else: + msg = msg_invalid.format(val=value, sec=s_name, what='base_dn') + LOG.error(msg) + continue + + if cls.re_bind_dn_key.match(key): + new.bind_dn = value + continue + + if cls.re_bind_pw_key.match(key): + new.bind_pw = value + continue + + if cls.re_admin_filter_key.match(key): + new.admin_filter = value + continue + + if key.lower() in ['is_admin', 'readonly', 'tier', 'sync-source']: + continue + + msg = _("Unknown LDAP configuration key {key} found in section {sec!r}.").format( + key=key, sec=s_name) + LOG.error(msg) + + new.initialized = True + + return new + + +# ============================================================================= +class LdapConnectionDict(dict, FbGenericBaseObject): + """A dictionary containing LdapConnectionInfo as values and their names as keys.""" + + # ------------------------------------------------------------------------- + def as_dict(self, short=True): + """ + Transforms the elements of the object into a dict + + @param short: don't include local properties in resulting dict. + @type short: bool + + @return: structure as dict + @rtype: dict + """ + + res = super(LdapConnectionDict, self).as_dict(short=short) + + for key in self.keys(): + res[key] = self[key].as_dict(short=short) + + return res + + # ------------------------------------------------------------------------- + def __copy__(self): + + new = self.__class__() + + for key in self.keys(): + new[key] = copy.copy(self[key]) + + return new + + +# ============================================================================= +if __name__ == "__main__": + + pass + +# ============================================================================= + +# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list diff --git a/lib/cr_vmware_tpl/handler.py b/lib/cr_vmware_tpl/handler.py index daf0157..748374d 100644 --- a/lib/cr_vmware_tpl/handler.py +++ b/lib/cr_vmware_tpl/handler.py @@ -47,8 +47,9 @@ from fb_vmware.iface import VsphereVmInterface from fb_vmware.datastore import VsphereDatastore from . import print_section_start, print_section_end +from . import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS -from .config import CrTplConfiguration, DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS +from .config import CrTplConfiguration from .cobbler import Cobbler -- 2.39.5