From f2829bbfc618e67e64eb0a8d874631d1320d2e7c Mon Sep 17 00:00:00 2001 From: Frank Brehm Date: Thu, 24 Mar 2022 17:40:59 +0100 Subject: [PATCH] Adding lib/pp_admintools/mail_config.py --- lib/pp_admintools/mail_config.py | 327 +++++++++++++++++++++++++++++++ 1 file changed, 327 insertions(+) create mode 100644 lib/pp_admintools/mail_config.py diff --git a/lib/pp_admintools/mail_config.py b/lib/pp_admintools/mail_config.py new file mode 100644 index 0000000..df4709e --- /dev/null +++ b/lib/pp_admintools/mail_config.py @@ -0,0 +1,327 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +@author: Frank Brehm +@contact: frank.brehm@pixelpark.com +@copyright: © 2022 by Frank Brehm, Berlin +@summary: A module for providing a configuration for applications, + which are sending mails +""" +from __future__ import absolute_import + +# Standard module +import logging +import pwd +import re +import copy + +from numbers import Number + +# Third party modules + +# Own modules + +from fb_tools.common import is_sequence + +# from .config import ConfigError, BaseConfiguration +from fb_tools.multi_config import MultiConfigError, BaseMultiConfig +from fb_tools.multi_config import DEFAULT_ENCODING + +from .mailaddress import MailAddress + +from .xlate import XLATOR + +__version__ = '0.1.0' +LOG = logging.getLogger(__name__) + +_ = XLATOR.gettext + +DEFAULT_CONFIG_DIR = 'pixelpark' +VALID_MAIL_METHODS = ('smtp', 'sendmail') +MAX_PORT_NUMBER = (2 ** 16) -1 + + +# ============================================================================= +class MailConfigError(MultiConfigError): + """Base error class for all exceptions happened during + execution this configured application""" + + pass + + +# ============================================================================= +class MailConfiguration(BaseMultiConfig): + """ + A class for providing a configuration for an arbitrary PowerDNS Application + and methods to read it from configuration files. + """ + + default_mail_recipients = [ + 'frank.brehm@pixelpark.com' + ] + default_mail_cc = [ + 'thomas.dalichow@pixelpark.com', + ] + + default_reply_to = 'solution@pixelpark.com' + + default_mail_server = 'prd-mail.pixelpark.com' + + current_user_name = pwd.getpwuid(os.getuid()).pw_name + current_user_gecos = pwd.getpwuid(os.getuid()).pw_gecos + default_mail_from = MailAddress(current_user_name, socket.getfqdn()) + + valid_mail_methods = VALID_MAIL_METHODS + + whitespace_re = re.compile(r'(?:[,;]+|\s*[,;]*\s+)+') + + # ------------------------------------------------------------------------- + def __init__( + self, appname=None, verbose=0, version=__version__, base_dir=None, + append_appname_to_stems=True, additional_stems=None, config_dir=DEFAULT_CONFIG_DIR, + additional_config_file=None, additional_cfgdirs=None, encoding=DEFAULT_ENCODING, + use_chardet=True, initialized=False): + + add_stems = [] + if additional_stems: + if is_sequence(additional_stems): + for stem in additional_stems: + add_stems.append(stem) + else: + add_stems.append(additional_stems) + + if 'mail' not in add_stems: + add_stems.append('mail') + + self.mail_recipients = copy.copy(self.default_mail_recipients) + self.mail_from = '{n} <{m}>'.format( + n=self.current_user_gecos, m=self.default_mail_from) + self.mail_cc = copy.copy(self.default_mail_cc) + self.reply_to = self.default_reply_to + self.mail_method = 'smtp' + self.mail_server = self.default_mail_server + self.smtp_port = 25 + self._mail_cc_configured = False + + super(MailConfiguration, self).__init__( + appname=appname, verbose=verbose, version=version, base_dir=base_dir, + append_appname_to_stems=append_appname_to_stems, config_dir=config_dir, + additional_stems=add_stems, additional_config_file=additional_config_file, + additional_cfgdirs=additional_cfgdirs, encoding=encoding, use_chardet=use_chardet, + ensure_privacy=True, initialized=False, + ) + + if initialized: + self.initialized = True + + # ------------------------------------------------------------------------- + def as_dict(self, short=True): + """ + Transforms the elements of the object into a dict + + @param short: don't include local properties in resulting dict. + @type short: bool + + @return: structure as dict + @rtype: dict + """ + + res = super(MailConfiguration, self).as_dict(short=short) + + res['default_mail_recipients'] = self.default_mail_recipients + res['default_mail_cc'] = self.default_mail_cc + res['default_reply_to'] = self.default_reply_to + res['default_mail_server'] = self.default_mail_server + res['current_user_name'] = self.current_user_name + res['current_user_gecos'] = self.current_user_gecos + res['default_mail_from'] = self.default_mail_from + + return res + + # ------------------------------------------------------------------------- + def eval(self): + + self.mail_recipients = [] + self.mail_cc = [] + + super(MailConfiguration, self).eval() + + if not self.mail_recipients: + self.mail_recipients = copy.copy(self.default_mail_recipients) + + if not self.mail_cc and not self._mail_cc_configured: + self.mail_cc = copy.copy(self.default_mail_cc) + + # ------------------------------------------------------------------------- + def eval_section(self, section_name): + + super(MailConfiguration, self).eval_section(section_name) + sn = section_name.lower() + + if sn == 'mail': + section = self.cfg[section_name] + return self._eval_mail(section_name, section) + + # ------------------------------------------------------------------------- + def _eval_mail(self, section_name, section): + + if self.verbose > 2: + msg = _("Evaluating config section {!r}:").format(section_name) + LOG.debug(msg + '\n' + pp(section)) + + self._eval_mail_rcpt(section_name, section) + self._eval_mail_cc(section_name, section) + self._eval_mail_reply_to(section_name, section) + self._eval_mail_method(section_name, section) + self._eval_mail_server(section_name, section) + self._eval_smtp_port(section_name, section) + + # ------------------------------------------------------------------------- + def _split_mailaddress_tokens(self, value, what=None): + + result = [] + + tokens = self.whitespace_re.split(value) + for token in tokens: + if MailAddress.valid_address(token): + result.append(token) + else: + msg = _("Found invalid {what} {addr!r} in configuration.") + LOG.error(msg.format(what=what, addr=token)) + + return result + + # ------------------------------------------------------------------------- + def _eval_mail_rcpt(self, section_name, section): + + re_rcpt = re.compile(r'^\s*(mail[_-]?)?(recipients?|rcpt)\s*$', re.IGNORECASE) + + for key in section.keys(): + if not re_rcpt.search(key): + continue + + val = section[key] + if not val: + continue + if is_sequence(val): + for v in val: + result = self._split_mailaddress_tokens(v, _("recipient mail address")) + if result: + self.mail_recipients.expand(result) + else: + result = self._split_mailaddress_tokens(val, _("recipient mail address")) + if result: + self.mail_recipients.expand(result) + + # ------------------------------------------------------------------------- + def _eval_mail_cc(self, section_name, section): + + re_cc = re.compile(r'^\s*(mail[_-]?)?cc\s*$', re.IGNORECASE) + + for key in section.keys(): + + self._mail_cc_configured = True + if not re_cc.search(key): + continue + + val = section[key] + if not val: + continue + if is_sequence(val): + for v in val: + result = self._split_mailaddress_tokens(v, _("cc mail address")) + if result: + self.mail_cc.expand(result) + else: + result = self._split_mailaddress_tokens(val, _("cc mail address")) + if result: + self.mail_cc.expand(result) + + # ------------------------------------------------------------------------- + def _eval_mail_reply_to(self, section_name, section): + + re_reply = re.compile(r'^\s*(mail[_-]?)?reply([-_]?to)?\s*$', re.IGNORECASE) + + for key in section.keys(): + if not re_reply.search(key): + continue + + val = section[key] + + if is_sequence(val): + if not len(val): + continue + val = val[0] + + if MailAddress.valid_address(val): + self.reply_to = val + else: + msg = _("Found invalid {what} {addr!r} in configuration.") + LOG.error(msg.format(what=_("reply to address"), addr=val)) + + # ------------------------------------------------------------------------- + def _eval_mail_method(self, section_name, section): + + re_method = re.compile(r'^\s*(mail[_-]?)?method\s*$', re.IGNORECASE) + + for key in section.keys(): + if not re_reply.search(key): + continue + + val = section[key].strip().lower() + if not val: + continue + + if val not in self.valid_mail_methods: + msg = _("Found invalid mail method {!r} in configuration.") + LOG.error(msg.format(section[key])) + continue + + self.mail_method = val + + # ------------------------------------------------------------------------- + def _eval_mail_server(self, section_name, section): + + re_server = re.compile(r'^\s*(mail[_-]?)?server\s*$', re.IGNORECASE) + + for key in section.keys(): + if not re_server.search(key): + continue + + val = section[key].strip().lower() + if not val: + continue + + self.mail_server = val + + # ------------------------------------------------------------------------- + def _eval_smtp_port(self, section_name, section): + + re_server = re.compile(r'^\s*(smtp[_-]?)?port\s*$', re.IGNORECASE) + + for key in section.keys(): + if not re_server.search(key): + continue + + val = section[[key] + try: + port = int(val) + except (ValueError, TypeError) as e: + msg = _("Value {!r} for SMTP port is invalid:").format(val) + LOG.error(msg) + continue + if port <= 0 or port > MAX_PORT_NUMBER: + msg = _("Found invalid SMTP port number {} in configuration.").format(port) + LOG.error(msg) + continue + + self.smtp_port = port + +# ============================================================================= +if __name__ == "__main__": + + pass + +# ============================================================================= + +# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list -- 2.39.5