From f5c601078f6ed05936a1d539f502c80266833a17 Mon Sep 17 00:00:00 2001 From: Frank Brehm Date: Wed, 30 Mar 2022 15:30:06 +0200 Subject: [PATCH] Adding lib/pp_admintools/dns_deploy_zones_config.py --- lib/pp_admintools/dns_deploy_zones_config.py | 541 +++++++++++++++++++ 1 file changed, 541 insertions(+) create mode 100644 lib/pp_admintools/dns_deploy_zones_config.py diff --git a/lib/pp_admintools/dns_deploy_zones_config.py b/lib/pp_admintools/dns_deploy_zones_config.py new file mode 100644 index 0000000..3003f2a --- /dev/null +++ b/lib/pp_admintools/dns_deploy_zones_config.py @@ -0,0 +1,541 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +@author: Frank Brehm +@contact: frank.brehm@pixelpark.com +@copyright: © 2022 by Frank Brehm, Berlin +@summary: A module for providing a configuration the dns-deploy-zones applications. + It's based on class PdnsConfiguration. +""" +from __future__ import absolute_import + +# Standard module +import logging +import re +import copy +import socket + +from pathlib import Path + +# Third party modules + +# Own modules + +from fb_tools.common import is_sequence, pp, to_bool + +# from .config import ConfigError, BaseConfiguration +from fb_tools.multi_config import DEFAULT_ENCODING + +from . import __version__ as GLOBAL_VERSION +from . import MAX_TIMEOUT, MAX_PORT_NUMBER + +from .pdns_config import PdnsConfigError, PdnsConfiguration +from .mail_config import DEFAULT_CONFIG_DIR + +from .xlate import XLATOR + +__version__ = '0.1.0' +LOG = logging.getLogger(__name__) + +_ = XLATOR.gettext + + +# ============================================================================= +class DnsDeployZonesConfigError(PdnsConfigError): + """Base error class for all exceptions happened during + execution this configured application""" + + pass + + +# ============================================================================= +class DnsDeployZonesConfig(PdnsConfiguration): + """ + A class for providing a configuration for an arbitrary PowerDNS Application + and methods to read it from configuration files. + """ + + default_pidfile = Path('/run/dns-deploy-zones.pid') + + default_named_conf_dir = Path('/etc') + default_named_zones_cfg_file = Path('named.zones.conf') + default_named_basedir = Path('/var/named') + default_named_slavedir = Path('slaves') + + default_zone_masters_local = set('dnsmaster-local.pixelpark.com') + default_zone_masters_public = set('dnsmaster-public.pixelpark.com') + + default_rndc = Path('/usr/sbin/rndc') + default_systemctl = Path('/usr/bin/systemctl') + default_named_checkconf = Path('/usr/sbin/named-checkconf') + + default_named_listen_on_v6 = False + default_named_internal = False + + re_split_addresses = re.compile(r'[,;\s]+') + + # ------------------------------------------------------------------------- + def __init__( + self, appname=None, verbose=0, version=__version__, base_dir=None, + append_appname_to_stems=True, additional_stems=None, config_dir=DEFAULT_CONFIG_DIR, + additional_config_file=None, additional_cfgdirs=None, encoding=DEFAULT_ENCODING, + ensure_privacy=True, use_chardet=True, initialized=False): + + self.pidfile = self.default_pidfile + self.named_conf_dir = self.default_named_conf_dir + self.named_zones_cfg_file = self.default_named_zones_cfg_file + self.named_basedir = self.default_named_basedir + self.named_slavedir = self.default_named_slavedir + + self.zone_masters_local = [] + for master in self.default_zone_masters_local: + self.zone_masters_local.append(master) + + self.zone_masters_public = [] + for master in self.default_zone_masters_public: + self.zone_masters_public.append(master) + + self.rndc = self.default_rndc + self.systemctl = self.default_systemctl + self.named_checkconf = self.default_named_checkconf + + self._named_listen_on_v6 = self.default_named_listen_on_v6 + self._named_internal = self.default_named_internal + + self.masters = set() + + add_stems = [] + if additional_stems: + if is_sequence(additional_stems): + for stem in additional_stems: + add_stems.append(stem) + else: + add_stems.append(additional_stems) + + if 'named' not in add_stems: + add_stems.append('named') + + super(DnsDeployZonesConfig, self).__init__( + appname=appname, verbose=verbose, version=version, base_dir=base_dir, + append_appname_to_stems=append_appname_to_stems, config_dir=config_dir, + additional_stems=add_stems, additional_config_file=additional_config_file, + additional_cfgdirs=additional_cfgdirs, encoding=encoding, use_chardet=use_chardet, + ensure_privacy=ensure_privacy, initialized=False, + ) + + if initialized: + self.initialized = True + + # ------------------------------------------------------------------------- + @property + def named_internal(self): + """Is the BIND nameserver on the current host a local resolver (True) + or an authoritative nameserver for outside.""" + return self._named_internal + + @named_internal.setter + def named_internal(self, value): + self._named_internal = to_bool(value) + + # ------------------------------------------------------------------------- + @property + def named_listen_on_v6(self): + """Is the BIND nameserver on the current listening on some IPv6 addresses?""" + return self._named_listen_on_v6 + + @named_listen_on_v6.setter + def named_listen_on_v6(self, value): + self._named_listen_on_v6 = to_bool(value) + + # ------------------------------------------------------------------------- + def as_dict(self, short=True): + """ + Transforms the elements of the object into a dict + + @param short: don't include local properties in resulting dict. + @type short: bool + + @return: structure as dict + @rtype: dict + """ + + res = super(DnsDeployZonesConfig, self).as_dict(short=short) + + res['default_pidfile'] = self.default_pidfile + res['default_named_conf_dir'] = self.default_named_conf_dir + res['default_named_zones_cfg_file'] = self.default_named_zones_cfg_file + res['default_named_basedir'] = self.default_named_basedir + res['default_named_slavedir'] = self.default_named_slavedir + res['default_zone_masters_local'] = copy.copy(self.default_zone_masters_local) + res['default_zone_masters_public'] = copy.copy(self.default_zone_masters_public) + res['default_rndc'] = self.default_rndc + res['default_systemctl'] = self.default_systemctl + res['default_named_checkconf'] = self.default_named_checkconf + res['default_named_listen_on_v6'] = self.default_named_listen_on_v6 + res['default_named_internal'] = self.default_named_internal + res['named_listen_on_v6'] = self.named_listen_on_v6 + res['named_internal'] = self.named_internal + + res['masters'] = copy.copy(self.masters) + + return res + + # ------------------------------------------------------------------------- + def eval_section(self, section_name): + + super(DnsDeployZonesConfig, self).eval_section(section_name) + sn = section_name.lower() + + if sn == 'named': + section = self.cfg[section_name] + return self._eval_named(section_name, section) + + if sn == self.appname.lower() or sn == 'app': + section = self.cfg[section_name] + return self._eval_app(section_name, section) + + # ------------------------------------------------------------------------- + def _eval_named(self, section_name, section): + + if self.verbose > 2: + msg = _("Evaluating config section {!r}:").format(section_name) + LOG.debug(msg + '\n' + pp(section)) + + re_config_dir = re.compile(r'^\s*(?:named[_-]?)?conf(?:ig)?[_-]?dir\s*$', re.IGNORECASE) + re_config_file = re.compile( + r'^\s*(?:named[_-]?)?zones[_-]?(?:conf(?:ig)?|cfg)[_-]*file\s*$', re.IGNORECASE) + re_base_dir = re.compile(r'^\s*(?:named[_-]?)?base[_-]?dir\s*$', re.IGNORECASE) + re_slave_dir = re.compile(r'^\s*(?:named[_-]?)?slave[_-]?dir\s*$', re.IGNORECASE) + re_named_checkconf = re.compile(r'^named[_-]?checkconf$', re.IGNORECASE) + re_internal = re.compile( + r'^\s*(?:named[_-]?)?(?:is[_-]?)?intern(?:al)?\s*$', re.IGNORECASE) + re_listen_v6 = re.compile(r'^\s*listen[_-](?:on[_-])?(?:ip)v6\s*$', re.IGNORECASE) + + for key in section.keys(): + + if key.lower() == 'masters': + self._eval_named_masters(section_name, key, section) + continue + + if key.lower() == 'rndc': + self._eval_named_rndc(section_name, key, section) + continue + + if key.lower() == 'systemctl': + self._eval_named_systemctl(section_name, key, section) + continue + + if re_config_dir.search(key): + self._eval_named_configdir(section_name, key, section) + continue + + if re_config_file.search(key): + self._eval_named_configfile(section_name, key, section) + continue + + if re_base_dir.search(key): + self._eval_named_basedir(section_name, key, section) + continue + + if re_slave_dir.search(key): + self._eval_named_slavedir(section_name, key, section) + continue + + if re_named_checkconf.search(key): + self._eval_named_checkconf(section_name, key, section) + continue + + if re_internal.search(key): + self._eval_named_internal(section_name, key, section) + continue + + if re_listen_v6.search(key): + self._eval_named_listen_v6(section_name, key, section) + continue + + # ------------------------------------------------------------------------- + def _eval_named_masters(self, section_name, key, section): + + val = section[key] + + if not val: + return + + master_list = set() + + if is_sequence(val): + for value in val: + masters = self._eval_named_master_list(value) + if masters: + master_list |= masters + else: + masters = self._eval_named_master_list(val) + if masters: + master_list |= masters + + self.masters = master_list + + # ------------------------------------------------------------------------- + def _eval_named_master_list(self, value): + + masters = set() + + for m in self.re_split_addresses.split(value): + if not m: + continue + + m = m.strip().lower() + if self.verbose > 1: + LOG.debug(_("Checking given master address {!r} ...").format(m)) + addr_list = self.get_addresses(m) + masters |= addr_list + + return masters + + # ------------------------------------------------------------------------- + def get_addresses(self, host): + + addr_list = set() + + try: + addr_infos = socket.getaddrinfo(host, 53, proto=socket.IPPROTO_TCP) + for addr_info in addr_infos: + addr = addr_info[4][0] + addr_list.add(addr) + except socket.gaierror as e: + msg = _("Invalid hostname or address {a!r} found in masters: {e}") + msg = msg.format(a=host, e=e) + if self.raise_on_error: + raise DnsDeployZonesConfigError(msg) + else: + LOG.error(msg) + return set() + return addr_list + + + # ------------------------------------------------------------------------- + def _eval_named_rndc(self, iname, section): + + val = section[key].strip() + if not val: + return + + path = Path(val) + if not path.is_absolute() + msg = _("The path to {what} must be an absolute path, found {path!r}.") + msg = msg.format(what='rndc', path=val) + if self.raise_on_error: + raise DnsDeployZonesConfigError(msg) + else: + LOG.error(msg) + return + + if self.verbose > 2: + msg = _("Found path to {what}: {path!r}.").format(what='rndc', path=val) + LOG.debug(msg) + + self.rndc = path + + # ------------------------------------------------------------------------- + def _eval_named_systemctl(self, iname, section): + + val = section[key].strip() + if not val: + return + + path = Path(val) + if not path.is_absolute() + msg = _("The path to {what} must be an absolute path, found {path!r}.") + msg = msg.format(what='systemctl', path=val) + if self.raise_on_error: + raise DnsDeployZonesConfigError(msg) + else: + LOG.error(msg) + return + + if self.verbose > 2: + msg = _("Found path to {what}: {path!r}.").format(what='systemctl', path=val) + LOG.debug(msg) + + self.systemctl = path + + # ------------------------------------------------------------------------- + def _eval_named_configdir(self, iname, section): + + val = section[key].strip() + if not val: + return + + what = _("the named config directory") + path = Path(val) + + if not path.is_absolute() + msg = _("The path to {what} must be an absolute path, found {path!r}.") + msg = msg.format(what=what, path=val) + if self.raise_on_error: + raise DnsDeployZonesConfigError(msg) + else: + LOG.error(msg) + return + + if self.verbose > 2: + msg = _("Found path to {what}: {path!r}.").format(what=what, path=val) + LOG.debug(msg) + + self.named_config_dir = path + + # ------------------------------------------------------------------------- + def _eval_named_configfile(self, iname, section): + + val = section[key].strip() + if not val: + return + + what = _("the named config file for zones") + path = Path(val) + + if path.is_absolute() + msg = _("The path to {what} must not be an absolute path, found {path!r}.") + msg = msg.format(what=what, path=val) + if self.raise_on_error: + raise DnsDeployZonesConfigError(msg) + else: + LOG.error(msg) + return + + if self.verbose > 2: + msg = _("Found path to {what}: {path!r}.").format(what=what, path=val) + LOG.debug(msg) + + self.named_zones_cfg_file = path + + # ------------------------------------------------------------------------- + def _eval_named_basedir(self, iname, section): + + val = section[key].strip() + if not val: + return + + what = _("the named base directory") + path = Path(val) + if not path.is_absolute() + msg = _("The path to {what} must be an absolute path, found {path!r}.") + msg = msg.format(what=what, path=val) + if self.raise_on_error: + raise DnsDeployZonesConfigError(msg) + else: + LOG.error(msg) + return + + if self.verbose > 2: + msg = _("Found path to {what}: {path!r}.").format(what=what, path=val) + LOG.debug(msg) + + self.named_basedir = path + + # ------------------------------------------------------------------------- + def _eval_named_slavedir(self, iname, section): + + val = section[key].strip() + if not val: + return + + what = _("the directory for slave zones of named") + path = Path(val) + + if path.is_absolute() + msg = _("The path to {what} must not be an absolute path, found {path!r}.") + msg = msg.format(what=what, path=val) + if self.raise_on_error: + raise DnsDeployZonesConfigError(msg) + else: + LOG.error(msg) + return + + if self.verbose > 2: + msg = _("Found path to {what}: {path!r}.").format(what=what, path=val) + LOG.debug(msg) + + self.named_slavedir = path + + # ------------------------------------------------------------------------- + def _eval_named_checkconf(self, iname, section): + + val = section[key].strip() + if not val: + return + + what = "named-checkconf" + path = Path(val) + if not path.is_absolute() + msg = _("The path to {what} must be an absolute path, found {path!r}.") + msg = msg.format(what=what, path=val) + if self.raise_on_error: + raise DnsDeployZonesConfigError(msg) + else: + LOG.error(msg) + return + + if self.verbose > 2: + msg = _("Found path to {what}: {path!r}.").format(what=what, path=val) + LOG.debug(msg) + + self.named_checkconf = path + + # ------------------------------------------------------------------------- + def _eval_named_internal(self, iname, section): + + val = section[key] + if val is None: + return + + self.named_internal = val + + # ------------------------------------------------------------------------- + def _eval_named_listen_v6(self, iname, section): + + val = section[key] + if val is None: + return + + self.named_listen_on_v6 = val + + # ------------------------------------------------------------------------- + def eval(self): + """Evaluating read configuration and storing them in object properties.""" + + super(DnsDeployZonesConfig, self).eval() + + addr_list = set() + if self.named_internal: + for host in self.default_zone_masters_local: + addr_list |= self.get_addresses(host) + else: + for host in self.default_zone_masters_public: + addr_list |= self.get_addresses(host) + + self.masters |= addr_list + + if not self.named_listen_on_v6: + + addresses = set() + for addr in self.masters: + if ':' not in addr: + addresses.add(addr) + self.masters = addresses + + if self.masters: + if self.verbose > 2: + LOG.debug(_("Using configured masters:") + ' ' + pp(self.masters)) + else: + LOG.warn(_("No valid masters found in configuration.")) + + +# ============================================================================= +if __name__ == "__main__": + + pass + +# ============================================================================= + +# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list -- 2.39.5