import re
import copy
import crypt
-import os
-import ipaddress
# Third party modules
from pathlib import Path
# Own modules
from .. import DEFAULT_CONFIG_DIR
-from .. import DEFAULT_PORT_LDAPS, DEFAULT_TIMEOUT, MAX_TIMEOUT
+from .. import DEFAULT_PORT_LDAPS, DEFAULT_TIMEOUT
from ..errors import CrTplConfigError
from ..xlate import XLATOR
from .cobbler import ConfigCobbler
+from .eval import ConfigEval
from .ldap import LdapConnectionInfo, LdapConnectionDict
from .timeouts import ConfigTimeouts
-__version__ = '3.1.1'
+__version__ = '3.1.2'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
# =============================================================================
-class CrTplConfiguration(BaseMultiConfig, ConfigCobbler, ConfigTimeouts):
+class CrTplConfiguration(BaseMultiConfig, ConfigCobbler, ConfigTimeouts, ConfigEval):
"""
A class for providing a configuration for the CrTplApplication class
and methods to read it from configuration files.
return res
+ # -------------------------------------------------------------------------
+ def super_eval(self):
+
+ super(CrTplConfiguration, self).eval()
+
+ # -------------------------------------------------------------------------
+ def super_eval_section(self, section_name):
+
+ super(CrTplConfiguration, self).eval_section(section_name)
+
# -------------------------------------------------------------------------
def eval(self):
"""Evaluating read configuration and storing them in object properties."""
- super(CrTplConfiguration, self).eval()
+ if self.verbose > 1:
+ LOG.debug(_("Evaluating configuration ..."))
+
+ self.super_eval()
if self.verbose > 1:
LOG.debug(_("Checking for unconfigured options ..."))
if not self.template_name_given:
self.template_name = self.current_distro.shortname + '-template'
- # -------------------------------------------------------------------------
- @classmethod
- def eval_resolv_conf(cls):
-
- if cls.evaluated_resolv_conf:
- return True
-
- if not cls.resolv_conf.exists():
- LOG.error(_("File {!r} not found on current host.").format(str(cls.resolv_conf)))
- cls.evaluated_resolv_conf = True
- return False
-
- if not cls.resolv_conf.is_file():
- LOG.error(_("Path {!r} is not a regular file.").format(str(cls.resolv_conf)))
- cls.evaluated_resolv_conf = True
- return False
-
- if not os.access(cls.resolv_conf, os.R_OK):
- LOG.error(_("File {!r} is not readable.").format(str(cls.resolv_conf)))
- cls.evaluated_resolv_conf = True
- return False
-
- LOG.info(_("Evaluating {!r} for nameservers.").format(str(cls.resolv_conf)))
-
- nameservers = []
- file_content = cls.resolv_conf.read_text()
- lines = file_content.splitlines()
-
- for line in lines:
- match = cls.re_resolv_ns_entry.match(line)
- if match:
- try:
- ns_address = ipaddress.ip_address(match.group(1))
- nameservers.append(str(ns_address))
- except ValueError as e:
- msg = _("Found invalid IP address {addr!r} as a nameserver in {file!r}:")
- msg = msg.format(addr=match.group(1), file=str(cls.resolv_conf))
- msg += ' ' + str(e)
- LOG.warn(msg)
-
- cls.evaluated_resolv_conf = True
- msg = _("Found nameservers in {!r}:").format(str(cls.resolv_conf))
- msg += ' {}'.format(pp(nameservers))
- LOG.debug(msg)
-
- if len(nameservers):
- cls.default_cobbler_nameservers = nameservers
- return True
-
- return False
-
# -------------------------------------------------------------------------
def eval_section(self, section_name):
if self.verbose > 2:
LOG.debug(_("Content of section:") + '\n' + pp(section))
- super(CrTplConfiguration, self).eval_section(section_name)
+ self.super_eval_section(section_name)
if sn == 'vsphere':
self._eval_config_vsphere(section_name, section)
if self.verbose > 1:
LOG.debug(_("Unhandled configuration section {!r}.").format(section_name))
- # -------------------------------------------------------------------------
- def _eval_ldap_timeout(self, value):
-
- timeout = DEFAULT_TIMEOUT
- msg_invalid = _("Value {!r} for a timeout is invalid.")
-
- try:
- timeout = int(value)
- except (ValueError, TypeError) as e:
- msg = msg_invalid.format(value)
- msg += ': ' + str(e)
- LOG.error(msg)
- return
- if timeout <= 0 or timeout > MAX_TIMEOUT:
- msg = msg_invalid.format(value)
- LOG.error(msg)
- return
-
- self.ldap_timeout = timeout
-
- # -------------------------------------------------------------------------
- def _eval_ldap_connection(self, connection_name, section):
-
- connection = LdapConnectionInfo.init_from_config(
- connection_name, section,
- appname=self.appname, verbose=self.verbose, base_dir=self.base_dir,
- )
-
- self.ldap_connection[connection_name] = connection
-
- # -------------------------------------------------------------------------
- def _eval_config_vsphere(self, section_name, section):
-
- if self.verbose > 1:
- LOG.debug(_("Checking config section {!r} ...").format(section_name))
-
- re_excl_ds = re.compile(r'^\s*excluded?[-_]datastores?\s*$', re.IGNORECASE)
- re_split_ds = re.compile(r'[,;\s]+')
- re_storage_cluster = re.compile(r'^\s*storage[-_]?cluster\s*$', re.IGNORECASE)
-
- for key in section.keys():
- value = section[key]
-
- if key.lower() == 'host':
- self.vsphere_info.host = value
- continue
- elif key.lower() == 'port':
- self.vsphere_info.port = value
- continue
- elif key.lower() == 'user':
- self.vsphere_info.user = value
- continue
- elif key.lower() == 'password':
- self.vsphere_info.password = value
- continue
- elif key.lower() == 'cluster':
- self.vsphere_cluster = value
- continue
- elif key.lower() == 'folder':
- self.folder = value
- elif key.lower() == 'dc':
- self.vsphere_info.dc = value
-
- elif key.lower() == 'max_nr_templates_stay':
- v = int(value)
- if v < 1:
- LOG.error(_(
- "Value {val} for {p} is less than {minval}, using {default}.").format(
- val=v, minval=1, p='max_nr_templates_stay',
- default=self.default_max_nr_templates_stay))
- elif v >= 100:
- LOG.error(_(
- "Value {val} for {p} is greater than {maxval}, using {default}.").format(
- val=v, maxval=100, p='max_nr_templates_stay',
- default=self.default_max_nr_templates_stay))
- else:
- self.max_nr_templates_stay = v
-
- elif re_excl_ds.search(key):
- datastores = re_split_ds.split(value.strip())
- self.excluded_datastores = datastores
-
- elif re_storage_cluster.search(key):
- cl_name = value.strip()
- if cl_name:
- self.storage_cluster = cl_name
- else:
- self.storage_cluster = None
-
- return
-
- # -------------------------------------------------------------------------
- def _eval_config_template(self, section_name, section):
-
- if self.verbose > 1:
- LOG.debug(_("Checking config section {!r} ...").format(section_name))
-
- re_os_id = re.compile(r'^\s*os[-_]?id\s*$', re.IGNORECASE)
- re_os_id_subst = re.compile(r'[^a-z0-9_-]+', re.IGNORECASE)
- re_vm_domain = re.compile(r'^\s*(?:vm[-_]?)?domain\s*$', re.IGNORECASE)
- re_root_pwd = re.compile(r'^\s*root[-_]?password\s*$', re.IGNORECASE)
- re_swap_space = re.compile(r'^\s*swap[-_]?space(?:[-_]?mb)?\s*$', re.IGNORECASE)
-
- for key in section.keys():
- value = section[key]
-
- if re_os_id.match(key):
- v = value.strip().lower()
- v = re_os_id_subst.sub('', v)
- if v:
- self.os_id = v
- continue
- if key.lower() == 'name':
- self.template_name = value
- self.template_name_given = True
- continue
- if re_vm_domain.match(key) and value.strip():
- self.tpl_vm_domain = value.strip().lower()
- continue
- if key.lower() == 'data_size_gb':
- self.data_size_gb = float(value)
- continue
- if key.lower() == 'data_size_mb':
- self.data_size_gb = float(value) / 1024.0
- continue
- if key.lower() == 'data_size_kb':
- self.data_size_gb = float(value) / 1024.0 / 1024.0
- continue
- if key.lower() == 'data_size':
- self.data_size_gb = float(value) / 1024.0 / 1024.0 / 1024.0
- continue
- if key.lower() == 'num_cpus':
- self.num_cpus = int(value)
- continue
- if key.lower() == 'ram_gb':
- self.ram_mb = int(float(value) * 1024.0)
- continue
- if key.lower() == 'ram_mb':
- self.ram_mb = int(value)
- continue
- if key.lower() == 'network':
- self.network = value.strip()
- continue
- if key.lower() == 'vmware_cfg_version':
- self.vmware_cfg_version = value.strip()
- continue
- if key.lower() == 'os_version':
- self.os_version = value.strip()
- continue
- if re_root_pwd.match(key) and value.strip():
- self._root_password = value.strip()
- continue
- if re_swap_space.match(key) and value.strip():
- self.swap_size_mb = int(value)
- continue
-
- return
-
# -------------------------------------------------------------------------
def get_root_pwd_hash(self, method='sha256'):
"""Generates a password hash based on the root password."""
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2023 by Frank Brehm, Berlin
+@summary: A mixin module for the CrTplConfiguration class for evaluation methods.
+"""
+from __future__ import absolute_import, print_function
+
+# Standard modules
+import logging
+import re
+import os
+import ipaddress
+
+# Third party modules
+from fb_tools.common import pp
+
+# Own modules
+from .. import DEFAULT_TIMEOUT, MAX_TIMEOUT
+
+from ..xlate import XLATOR
+
+from .ldap import LdapConnectionInfo
+
+__version__ = '0.1.0'
+
+LOG = logging.getLogger(__name__)
+
+_ = XLATOR.gettext
+ngettext = XLATOR.ngettext
+
+# =============================================================================
+class ConfigEval():
+ """
+ A mixin class for extending the CrTplConfiguration class for evaluation methods.
+ """
+
+ # -------------------------------------------------------------------------
+ @classmethod
+ def eval_resolv_conf(cls):
+
+ if cls.evaluated_resolv_conf:
+ return True
+
+ if not cls.resolv_conf.exists():
+ LOG.error(_("File {!r} not found on current host.").format(str(cls.resolv_conf)))
+ cls.evaluated_resolv_conf = True
+ return False
+
+ if not cls.resolv_conf.is_file():
+ LOG.error(_("Path {!r} is not a regular file.").format(str(cls.resolv_conf)))
+ cls.evaluated_resolv_conf = True
+ return False
+
+ if not os.access(cls.resolv_conf, os.R_OK):
+ LOG.error(_("File {!r} is not readable.").format(str(cls.resolv_conf)))
+ cls.evaluated_resolv_conf = True
+ return False
+
+ LOG.info(_("Evaluating {!r} for nameservers.").format(str(cls.resolv_conf)))
+
+ nameservers = []
+ file_content = cls.resolv_conf.read_text()
+ lines = file_content.splitlines()
+
+ for line in lines:
+ match = cls.re_resolv_ns_entry.match(line)
+ if match:
+ try:
+ ns_address = ipaddress.ip_address(match.group(1))
+ nameservers.append(str(ns_address))
+ except ValueError as e:
+ msg = _("Found invalid IP address {addr!r} as a nameserver in {file!r}:")
+ msg = msg.format(addr=match.group(1), file=str(cls.resolv_conf))
+ msg += ' ' + str(e)
+ LOG.warn(msg)
+
+ cls.evaluated_resolv_conf = True
+ msg = _("Found nameservers in {!r}:").format(str(cls.resolv_conf))
+ msg += ' {}'.format(pp(nameservers))
+ LOG.debug(msg)
+
+ if len(nameservers):
+ cls.default_cobbler_nameservers = nameservers
+ return True
+
+ return False
+
+ # -------------------------------------------------------------------------
+ def _eval_ldap_timeout(self, value):
+
+ timeout = DEFAULT_TIMEOUT
+ msg_invalid = _("Value {!r} for a timeout is invalid.")
+
+ try:
+ timeout = int(value)
+ except (ValueError, TypeError) as e:
+ msg = msg_invalid.format(value)
+ msg += ': ' + str(e)
+ LOG.error(msg)
+ return
+ if timeout <= 0 or timeout > MAX_TIMEOUT:
+ msg = msg_invalid.format(value)
+ LOG.error(msg)
+ return
+
+ self.ldap_timeout = timeout
+
+ # -------------------------------------------------------------------------
+ def _eval_ldap_connection(self, connection_name, section):
+
+ connection = LdapConnectionInfo.init_from_config(
+ connection_name, section,
+ appname=self.appname, verbose=self.verbose, base_dir=self.base_dir,
+ )
+
+ self.ldap_connection[connection_name] = connection
+
+ # -------------------------------------------------------------------------
+ def _eval_config_vsphere(self, section_name, section):
+
+ if self.verbose > 1:
+ LOG.debug(_("Checking config section {!r} ...").format(section_name))
+
+ re_excl_ds = re.compile(r'^\s*excluded?[-_]datastores?\s*$', re.IGNORECASE)
+ re_split_ds = re.compile(r'[,;\s]+')
+ re_storage_cluster = re.compile(r'^\s*storage[-_]?cluster\s*$', re.IGNORECASE)
+
+ for key in section.keys():
+ value = section[key]
+
+ if key.lower() == 'host':
+ self.vsphere_info.host = value
+ continue
+ elif key.lower() == 'port':
+ self.vsphere_info.port = value
+ continue
+ elif key.lower() == 'user':
+ self.vsphere_info.user = value
+ continue
+ elif key.lower() == 'password':
+ self.vsphere_info.password = value
+ continue
+ elif key.lower() == 'cluster':
+ self.vsphere_cluster = value
+ continue
+ elif key.lower() == 'folder':
+ self.folder = value
+ elif key.lower() == 'dc':
+ self.vsphere_info.dc = value
+
+ elif key.lower() == 'max_nr_templates_stay':
+ v = int(value)
+ if v < 1:
+ LOG.error(_(
+ "Value {val} for {p} is less than {minval}, using {default}.").format(
+ val=v, minval=1, p='max_nr_templates_stay',
+ default=self.default_max_nr_templates_stay))
+ elif v >= 100:
+ LOG.error(_(
+ "Value {val} for {p} is greater than {maxval}, using {default}.").format(
+ val=v, maxval=100, p='max_nr_templates_stay',
+ default=self.default_max_nr_templates_stay))
+ else:
+ self.max_nr_templates_stay = v
+
+ elif re_excl_ds.search(key):
+ datastores = re_split_ds.split(value.strip())
+ self.excluded_datastores = datastores
+
+ elif re_storage_cluster.search(key):
+ cl_name = value.strip()
+ if cl_name:
+ self.storage_cluster = cl_name
+ else:
+ self.storage_cluster = None
+
+ return
+
+ # -------------------------------------------------------------------------
+ def _eval_config_template(self, section_name, section):
+
+ if self.verbose > 1:
+ LOG.debug(_("Checking config section {!r} ...").format(section_name))
+
+ re_os_id = re.compile(r'^\s*os[-_]?id\s*$', re.IGNORECASE)
+ re_os_id_subst = re.compile(r'[^a-z0-9_-]+', re.IGNORECASE)
+ re_vm_domain = re.compile(r'^\s*(?:vm[-_]?)?domain\s*$', re.IGNORECASE)
+ re_root_pwd = re.compile(r'^\s*root[-_]?password\s*$', re.IGNORECASE)
+ re_swap_space = re.compile(r'^\s*swap[-_]?space(?:[-_]?mb)?\s*$', re.IGNORECASE)
+
+ for key in section.keys():
+ value = section[key]
+
+ if re_os_id.match(key):
+ v = value.strip().lower()
+ v = re_os_id_subst.sub('', v)
+ if v:
+ self.os_id = v
+ continue
+ if key.lower() == 'name':
+ self.template_name = value
+ self.template_name_given = True
+ continue
+ if re_vm_domain.match(key) and value.strip():
+ self.tpl_vm_domain = value.strip().lower()
+ continue
+ if key.lower() == 'data_size_gb':
+ self.data_size_gb = float(value)
+ continue
+ if key.lower() == 'data_size_mb':
+ self.data_size_gb = float(value) / 1024.0
+ continue
+ if key.lower() == 'data_size_kb':
+ self.data_size_gb = float(value) / 1024.0 / 1024.0
+ continue
+ if key.lower() == 'data_size':
+ self.data_size_gb = float(value) / 1024.0 / 1024.0 / 1024.0
+ continue
+ if key.lower() == 'num_cpus':
+ self.num_cpus = int(value)
+ continue
+ if key.lower() == 'ram_gb':
+ self.ram_mb = int(float(value) * 1024.0)
+ continue
+ if key.lower() == 'ram_mb':
+ self.ram_mb = int(value)
+ continue
+ if key.lower() == 'network':
+ self.network = value.strip()
+ continue
+ if key.lower() == 'vmware_cfg_version':
+ self.vmware_cfg_version = value.strip()
+ continue
+ if key.lower() == 'os_version':
+ self.os_version = value.strip()
+ continue
+ if re_root_pwd.match(key) and value.strip():
+ self._root_password = value.strip()
+ continue
+ if re_swap_space.match(key) and value.strip():
+ self.swap_size_mb = int(value)
+ continue
+
+ return
+
+
+# =============================================================================
+if __name__ == "__main__":
+
+ pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list