]> Frank Brehm's Git Trees - pixelpark/pp-admin-tools.git/commitdiff
Adding lib/pp_admintools/mail_config.py
authorFrank Brehm <frank@brehm-online.com>
Thu, 24 Mar 2022 16:40:59 +0000 (17:40 +0100)
committerFrank Brehm <frank@brehm-online.com>
Thu, 24 Mar 2022 16:40:59 +0000 (17:40 +0100)
lib/pp_admintools/mail_config.py [new file with mode: 0644]

diff --git a/lib/pp_admintools/mail_config.py b/lib/pp_admintools/mail_config.py
new file mode 100644 (file)
index 0000000..df4709e
--- /dev/null
@@ -0,0 +1,327 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2022 by Frank Brehm, Berlin
+@summary: A module for providing a configuration for applications,
+          which are sending mails
+"""
+from __future__ import absolute_import
+
+# Standard module
+import logging
+import pwd
+import re
+import copy
+
+from numbers import Number
+
+# Third party modules
+
+# Own modules
+
+from fb_tools.common import is_sequence
+
+# from .config import ConfigError, BaseConfiguration
+from fb_tools.multi_config import MultiConfigError, BaseMultiConfig
+from fb_tools.multi_config import DEFAULT_ENCODING
+
+from .mailaddress import MailAddress
+
+from .xlate import XLATOR
+
+__version__ = '0.1.0'
+LOG = logging.getLogger(__name__)
+
+_ = XLATOR.gettext
+
+DEFAULT_CONFIG_DIR = 'pixelpark'
+VALID_MAIL_METHODS = ('smtp', 'sendmail')
+MAX_PORT_NUMBER = (2 ** 16) -1
+
+
+# =============================================================================
+class MailConfigError(MultiConfigError):
+    """Base error class for all exceptions happened during
+    execution this configured application"""
+
+    pass
+
+
+# =============================================================================
+class MailConfiguration(BaseMultiConfig):
+    """
+    A class for providing a configuration for an arbitrary PowerDNS Application
+    and methods to read it from configuration files.
+    """
+
+    default_mail_recipients = [
+        'frank.brehm@pixelpark.com'
+    ]
+    default_mail_cc = [
+        'thomas.dalichow@pixelpark.com',
+    ]
+
+    default_reply_to = 'solution@pixelpark.com'
+
+    default_mail_server = 'prd-mail.pixelpark.com'
+
+    current_user_name = pwd.getpwuid(os.getuid()).pw_name
+    current_user_gecos = pwd.getpwuid(os.getuid()).pw_gecos
+    default_mail_from = MailAddress(current_user_name, socket.getfqdn())
+
+    valid_mail_methods = VALID_MAIL_METHODS
+
+    whitespace_re = re.compile(r'(?:[,;]+|\s*[,;]*\s+)+')
+
+    # -------------------------------------------------------------------------
+    def __init__(
+        self, appname=None, verbose=0, version=__version__, base_dir=None,
+            append_appname_to_stems=True, additional_stems=None, config_dir=DEFAULT_CONFIG_DIR,
+            additional_config_file=None, additional_cfgdirs=None, encoding=DEFAULT_ENCODING,
+            use_chardet=True, initialized=False):
+
+        add_stems = []
+        if additional_stems:
+            if is_sequence(additional_stems):
+                for stem in additional_stems:
+                    add_stems.append(stem)
+            else:
+                add_stems.append(additional_stems)
+
+        if 'mail' not in add_stems:
+            add_stems.append('mail')
+
+        self.mail_recipients = copy.copy(self.default_mail_recipients)
+        self.mail_from = '{n} <{m}>'.format(
+            n=self.current_user_gecos, m=self.default_mail_from)
+        self.mail_cc = copy.copy(self.default_mail_cc)
+        self.reply_to = self.default_reply_to
+        self.mail_method = 'smtp'
+        self.mail_server = self.default_mail_server
+        self.smtp_port = 25
+        self._mail_cc_configured = False
+
+        super(MailConfiguration, self).__init__(
+            appname=appname, verbose=verbose, version=version, base_dir=base_dir,
+            append_appname_to_stems=append_appname_to_stems, config_dir=config_dir,
+            additional_stems=add_stems, additional_config_file=additional_config_file,
+            additional_cfgdirs=additional_cfgdirs, encoding=encoding, use_chardet=use_chardet,
+            ensure_privacy=True, initialized=False,
+        )
+
+        if initialized:
+            self.initialized = True
+
+    # -------------------------------------------------------------------------
+    def as_dict(self, short=True):
+        """
+        Transforms the elements of the object into a dict
+
+        @param short: don't include local properties in resulting dict.
+        @type short: bool
+
+        @return: structure as dict
+        @rtype:  dict
+        """
+
+        res = super(MailConfiguration, self).as_dict(short=short)
+
+        res['default_mail_recipients'] = self.default_mail_recipients
+        res['default_mail_cc'] = self.default_mail_cc
+        res['default_reply_to'] = self.default_reply_to
+        res['default_mail_server'] = self.default_mail_server
+        res['current_user_name'] = self.current_user_name
+        res['current_user_gecos'] = self.current_user_gecos
+        res['default_mail_from'] = self.default_mail_from
+
+        return res
+
+    # -------------------------------------------------------------------------
+    def eval(self):
+
+        self.mail_recipients = []
+        self.mail_cc = []
+
+        super(MailConfiguration, self).eval()
+
+        if not self.mail_recipients:
+            self.mail_recipients = copy.copy(self.default_mail_recipients)
+
+        if not self.mail_cc and not self._mail_cc_configured:
+            self.mail_cc = copy.copy(self.default_mail_cc)
+
+    # -------------------------------------------------------------------------
+    def eval_section(self, section_name):
+
+        super(MailConfiguration, self).eval_section(section_name)
+        sn = section_name.lower()
+
+        if sn == 'mail':
+            section = self.cfg[section_name]
+            return self._eval_mail(section_name, section)
+
+    # -------------------------------------------------------------------------
+    def _eval_mail(self, section_name, section):
+
+        if self.verbose > 2:
+            msg = _("Evaluating config section {!r}:").format(section_name)
+            LOG.debug(msg + '\n' + pp(section))
+
+        self._eval_mail_rcpt(section_name, section)
+        self._eval_mail_cc(section_name, section)
+        self._eval_mail_reply_to(section_name, section)
+        self._eval_mail_method(section_name, section)
+        self._eval_mail_server(section_name, section)
+        self._eval_smtp_port(section_name, section)
+
+    # -------------------------------------------------------------------------
+    def _split_mailaddress_tokens(self, value, what=None):
+
+        result = []
+
+        tokens = self.whitespace_re.split(value)
+        for token in tokens:
+            if MailAddress.valid_address(token):
+                result.append(token)
+            else:
+                msg = _("Found invalid {what} {addr!r} in configuration.")
+                LOG.error(msg.format(what=what, addr=token))
+
+        return result
+
+    # -------------------------------------------------------------------------
+    def _eval_mail_rcpt(self, section_name, section):
+
+        re_rcpt = re.compile(r'^\s*(mail[_-]?)?(recipients?|rcpt)\s*$', re.IGNORECASE)
+
+        for key in section.keys():
+            if not re_rcpt.search(key):
+                continue
+
+            val = section[key]
+            if not val:
+                continue
+            if is_sequence(val):
+                for v in val:
+                    result = self._split_mailaddress_tokens(v, _("recipient mail address"))
+                    if result:
+                        self.mail_recipients.expand(result)
+            else:
+                result = self._split_mailaddress_tokens(val, _("recipient mail address"))
+                if result:
+                    self.mail_recipients.expand(result)
+
+    # -------------------------------------------------------------------------
+    def _eval_mail_cc(self, section_name, section):
+
+        re_cc = re.compile(r'^\s*(mail[_-]?)?cc\s*$', re.IGNORECASE)
+
+        for key in section.keys():
+
+            self._mail_cc_configured = True
+            if not re_cc.search(key):
+                continue
+
+            val = section[key]
+            if not val:
+                continue
+            if is_sequence(val):
+                for v in val:
+                    result = self._split_mailaddress_tokens(v, _("cc mail address"))
+                    if result:
+                        self.mail_cc.expand(result)
+            else:
+                result = self._split_mailaddress_tokens(val, _("cc mail address"))
+                if result:
+                    self.mail_cc.expand(result)
+
+    # -------------------------------------------------------------------------
+    def _eval_mail_reply_to(self, section_name, section):
+
+        re_reply = re.compile(r'^\s*(mail[_-]?)?reply([-_]?to)?\s*$', re.IGNORECASE)
+
+        for key in section.keys():
+            if not re_reply.search(key):
+                continue
+
+            val = section[key]
+
+            if is_sequence(val):
+                if not len(val):
+                    continue
+                val = val[0]
+
+            if MailAddress.valid_address(val):
+                self.reply_to = val
+            else:
+                msg = _("Found invalid {what} {addr!r} in configuration.")
+                LOG.error(msg.format(what=_("reply to address"), addr=val))
+
+    # -------------------------------------------------------------------------
+    def _eval_mail_method(self, section_name, section):
+
+        re_method = re.compile(r'^\s*(mail[_-]?)?method\s*$', re.IGNORECASE)
+
+        for key in section.keys():
+            if not re_reply.search(key):
+                continue
+
+            val = section[key].strip().lower()
+            if not val:
+                continue
+
+            if val not in self.valid_mail_methods:
+                msg = _("Found invalid mail method {!r} in configuration.")
+                LOG.error(msg.format(section[key]))
+                continue
+
+            self.mail_method = val
+
+    # -------------------------------------------------------------------------
+    def _eval_mail_server(self, section_name, section):
+
+        re_server = re.compile(r'^\s*(mail[_-]?)?server\s*$', re.IGNORECASE)
+
+        for key in section.keys():
+            if not re_server.search(key):
+                continue
+
+            val = section[key].strip().lower()
+            if not val:
+                continue
+
+            self.mail_server = val
+
+    # -------------------------------------------------------------------------
+    def _eval_smtp_port(self, section_name, section):
+
+        re_server = re.compile(r'^\s*(smtp[_-]?)?port\s*$', re.IGNORECASE)
+
+        for key in section.keys():
+            if not re_server.search(key):
+                continue
+
+            val = section[[key]
+            try:
+                port = int(val)
+            except (ValueError, TypeError) as e:
+                msg = _("Value {!r} for SMTP port is invalid:").format(val)
+                LOG.error(msg)
+                continue
+            if port <= 0 or port > MAX_PORT_NUMBER:
+                msg = _("Found invalid SMTP port number {} in configuration.").format(port)
+                LOG.error(msg)
+                continue
+
+            self.smtp_port = port
+
+# =============================================================================
+if __name__ == "__main__":
+
+    pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list