]> Frank Brehm's Git Trees - pixelpark/pp-admin-tools.git/commitdiff
Substituting some classes and mothods by new ones from pp-tools
authorFrank Brehm <frank@brehm-online.com>
Mon, 17 Oct 2022 15:42:44 +0000 (17:42 +0200)
committerFrank Brehm <frank@brehm-online.com>
Mon, 17 Oct 2022 15:42:44 +0000 (17:42 +0200)
lib/pp_admintools/app/__init__.py
lib/pp_admintools/app/dns_deploy_zones.py
lib/pp_admintools/app/pdns.py
lib/pp_admintools/app/remove_ldap_user.py
lib/pp_admintools/argparse_actions.py

index 1a167f8f88f333247891023ef8a565f5cd1a0a1f..0baed8a38058e36395e745c152197e8ee8e5cef2 100644 (file)
@@ -16,8 +16,9 @@ import getpass
 
 # Third party modules
 from fb_tools.common import to_bool
+from fb_tools.argparse_actions import TimeoutOptionAction
 from fb_tools.cfg_app import FbConfigApplication
-from fb_tools.errors import FbAppError, IoTimeoutError
+from fb_tools.errors import FbAppError, IoTimeoutError, TimeoutOnPromptError
 from fb_tools.multi_config import BaseMultiConfig
 
 # Own modules
@@ -33,7 +34,7 @@ LOG = logging.getLogger(__name__)
 _ = XLATOR.gettext
 ngettext = XLATOR.ngettext
 
-__version__ = '0.3.4'
+__version__ = '0.4.1'
 
 
 # =============================================================================
@@ -48,57 +49,16 @@ class AbortAppError(DPXAppError):
     pass
 
 
-# =============================================================================
-class TimeoutOnPromptError(AbortAppError, IoTimeoutError):
-    """Special exception class on timout on a prompt."""
-
-    # -------------------------------------------------------------------------
-    def __init__(self, timeout):
-
-        strerror = _("Timeout on answering on the console.")
-        super(TimeoutOnPromptError, self).__init__(strerror, timeout)
-
-
-# =============================================================================
-class TimeoutOptionAction(argparse.Action):
-
-    # -------------------------------------------------------------------------
-    def __init__(self, option_strings, *args, **kwargs):
-
-        super(TimeoutOptionAction, self).__init__(
-            option_strings=option_strings, *args, **kwargs)
-
-    # -------------------------------------------------------------------------
-    def __call__(self, parser, namespace, given_timeout, option_string=None):
-
-        try:
-            timeout = int(given_timeout)
-            if timeout <= 0 or timeout > MAX_TIMEOUT:
-                msg = _(
-                    "a timeout must be greater than zero and less "
-                    "or equal to {}.").format(MAX_TIMEOUT)
-                raise ValueError(msg)
-        except (ValueError, TypeError) as e:
-            msg = _("Wrong timeout {!r}:").format(given_timeout)
-            msg += ' ' + str(e)
-            raise argparse.ArgumentError(self, msg)
-
-        setattr(namespace, self.dest, timeout)
-
 # =============================================================================
 class BaseDPXApplication(FbConfigApplication):
     """
     Base class for all DPX application classes.
     """
 
-    default_prompt_timeout = 30
-    max_prompt_timeout = 600
-
-    yes_list = ['y', 'yes']
-    no_list = ['n', 'no']
-
-    pattern_yes_no = r'^\s*(' + '|'.join(yes_list) + '|' + '|'.join(no_list) + r')?\s*$'
-    re_yes_no = re.compile(pattern_yes_no, re.IGNORECASE)
+    show_assume_options = True
+    show_console_timeout_option = True
+    # show_force_option = False
+    show_simulate_option = True
 
     # -------------------------------------------------------------------------
     def __init__(
@@ -119,88 +79,6 @@ class BaseDPXApplication(FbConfigApplication):
             env_prefix=env_prefix, config_dir=config_dir
         )
 
-    # -------------------------------------------------------------------------
-    @classmethod
-    def init_yes_no_lists(cls):
-
-        yes = _('yes')
-        if yes not in cls.yes_list:
-            cls.yes_list.append(yes)
-        yes_fc = yes[0]
-        if yes_fc not in cls.yes_list:
-            cls.yes_list.append(yes_fc)
-
-        no = _('no')
-        if no not in cls.no_list and no not in cls.yes_list:
-            cls.no_list.append(no)
-        no_fc = no[0]
-        if no_fc not in cls.no_list and no_fc not in cls.yes_list:
-            cls.no_list.append(no_fc)
-
-        cls.pattern_yes_no = (
-            r'^\s*(' + '|'.join(cls.yes_list) + '|' + '|'.join(cls.no_list) + r')?\s*$')
-        cls.re_yes_no = re.compile(cls.pattern_yes_no, re.IGNORECASE)
-
-    # -----------------------------------------------------------
-    @property
-    def yes(self):
-        """Assume 'yes' as an answer to all questions."""
-        return self._yes
-
-    @yes.setter
-    def yes(self, value):
-        self._yes = to_bool(value)
-
-    # -----------------------------------------------------------
-    @property
-    def prompt_timeout(self):
-        """The timeout in seconds for waiting for an answer on a prompt."""
-        return getattr(self, '_prompt_timeout', self.default_prompt_timeout)
-
-    @prompt_timeout.setter
-    def prompt_timeout(self, value):
-        v = int(value)
-        if v < 0 or v > self.max_prompt_timeout:
-            msg = _(
-                "Wrong prompt timeout {v!r}, must be greater or equal to Null "
-                "and less or equal to {max}.").format(v=value, max=self.max_prompt_timeout)
-            LOG.warning(msg)
-        else:
-            self._prompt_timeout = v
-
-    # -------------------------------------------------------------------------
-    def as_dict(self, short=True):
-        """
-        Transforms the elements of the object into a dict
-
-        @param short: don't include local properties in resulting dict.
-        @type short: bool
-
-        @return: structure as dict
-        @rtype:  dict
-        """
-
-        res = super(BaseDPXApplication, self).as_dict(short=short)
-
-        res['prompt_timeout'] = self.prompt_timeout
-        res['yes'] = self.yes
-        res['yes_list'] = self.yes_list
-        res['no_list'] = self.no_list
-        res['pattern_yes_no'] = self.pattern_yes_no
-
-        return res
-
-    # -------------------------------------------------------------------------
-    def init_arg_parser(self):
-        """
-        Public available method to initiate the argument parser.
-        """
-
-        super(BaseDPXApplication, self).init_arg_parser()
-
-        self.arg_parser.add_argument(
-            '-Y', '--yes', '--assume-yes', action="store_true", dest="yes", help=argparse.SUPPRESS)
-
     # -------------------------------------------------------------------------
     def post_init(self):
         """
@@ -218,176 +96,5 @@ class BaseDPXApplication(FbConfigApplication):
 
         super(BaseDPXApplication, self).post_init()
 
-        self.yes = self.args.yes
-
-    # -------------------------------------------------------------------------
-    def get_password(self, first_prompt=None, second_prompt=None, may_empty=True, repeat=True):
-        """
-        Ask the user for a password on the console.
-
-        @raise AbortAppError: if the user presses Ctrl-D (EOF)
-        @raise TimeoutOnPromptError: if the user does not finishing after a time
-
-        @param first_prompt: the prompt for the first password question
-        @type first_prompt: str
-        @param second_prompt: the prompt for the second password question
-        @type second_prompt: str
-        @param may_empty: The behaviour, if the user inputs an empty password:
-                          if True, an empty password will be returned
-                          if False, the question will be repeated.
-        @type may_empty: bool
-        @param repeat: Asking for the password a second time, which must be equal
-                       to the first given password.
-        @type repeat: bool
-
-        @return: The entered password
-        @rtype: str
-
-        """
-
-        if not first_prompt:
-            first_prompt = _('Password:') + ' '
-
-        if not second_prompt:
-            second_prompt = _('Repeat password:') + ' '
-
-        ret_passwd = None
-        second_passwd = None
-
-        while True:
-
-            if not self.quiet:
-                print()
-            ret_passwd = self._get_password(first_prompt, may_empty)
-            if ret_passwd:
-                if repeat:
-                    second_passwd = self._get_password(second_prompt, may_empty=False)
-                    if ret_passwd != second_passwd:
-                        msg = _("The entered passwords does not match.")
-                        LOG.error(msg)
-                        continue
-            break
-
-        return ret_passwd
-
-    # -------------------------------------------------------------------------
-    def _get_password(self, prompt, may_empty=True):
-
-        def passwd_alarm_caller(signum, sigframe):
-            raise TimeoutOnPromptError(self.prompt_timeout)
-
-        msg_intr = _("Interrupted on demand.")
-        ret_passwd = ''
-
-        try:
-            signal.signal(signal.SIGALRM, passwd_alarm_caller)
-            signal.alarm(self.prompt_timeout)
-
-            while True:
-
-                try:
-                    ret_passwd = getpass.getpass(prompt)
-                except EOFError:
-                    raise AbortAppError(msg_intr)
-
-                signal.alarm(self.prompt_timeout)
-
-                if ret_passwd == '':
-                    if may_empty:
-                        return ''
-                    else:
-                        continue
-                else:
-                    break
-
-        except (TimeoutOnPromptError, AbortAppError) as e:
-            msg = _("Got a {}:").format(e.__class__.__name__) + ' ' + str(e)
-            LOG.error(msg)
-            self.exit(10)
-
-        except KeyboardInterrupt:
-            msg = _("Got a {}:").format('KeyboardInterrupt') + ' ' + msg_intr
-            LOG.error(msg)
-            self.exit(10)
-
-        finally:
-            signal.alarm(0)
-
-        return ret_passwd
-
-    # -------------------------------------------------------------------------
-    def ask_for_yes_or_no(self, prompt, default_on_empty=None):
-        """
-        Ask the user for yes or no.
-
-        @raise AbortAppError: if the user presses Ctrl-D (EOF)
-        @raise TimeoutOnPromptError: if the user does not correct answer after a time
-
-        @param prompt: the prompt for the question
-        @type prompt: str
-        @param default_on_empty: behaviour on an empty reply:
-                                 * if None, repeat the question
-                                 * if True, return True
-                                 * else return False
-        @type default_on_empty: bool or None
-
-        @return: True, if the user answered Yes, else False
-        @rtype: bool
-
-        """
-
-        if not prompt:
-            prompt = _('Yes/No') + ' '
-
-        def prompt_alarm_caller(signum, sigframe):
-            raise TimeoutOnPromptError(self.prompt_timeout)
-
-        msg_intr = _("Interrupted on demand.")
-        try:
-            signal.signal(signal.SIGALRM, prompt_alarm_caller)
-            signal.alarm(self.prompt_timeout)
-
-            reply = ''
-            while True:
-                try:
-                    reply = input(prompt)
-                except EOFError:
-                    raise AbortAppError(msg_intr)
-                signal.alarm(self.prompt_timeout)
-                match = self.re_yes_no.match(reply)
-                if match:
-                    if match.group(1) is None:
-                        if default_on_empty is None:
-                            continue
-                        return bool(default_on_empty)
-                    # There is an answer
-                    r = match.group(1).lower()
-                    if r in self.no_list:
-                        # Continue == no
-                        return False
-                    elif r in self.yes_list:
-                        # Continue == yes
-                        return True
-                    else:
-                        continue
-                else:
-                    continue
-                # Repeat the question
-
-        except (TimeoutOnPromptError, AbortAppError) as e:
-            print()
-            msg = _("Got a {}:").format(e.__class__.__name__) + ' ' + str(e)
-            LOG.error(msg)
-            self.exit(10)
-
-        except KeyboardInterrupt:
-            msg = _("Got a {}:").format('KeyboardInterrupt') + ' ' + msg_intr
-            print()
-            LOG.error(msg)
-            self.exit(10)
-
-        finally:
-            signal.alarm(0)
-
 
 # vim: ts=4 et list
index c2cbcdf709825dcc259ba2b1bca8db1dfd4ef8c7..464b3ebf9a7dbca5e02687f1a6517811034b6e2d 100644 (file)
@@ -44,7 +44,7 @@ from ..config.dns_deploy_zones import DnsDeployZonesConfig
 
 from ..xlate import XLATOR
 
-__version__ = '0.8.3'
+__version__ = '0.8.4'
 LOG = logging.getLogger(__name__)
 
 _ = XLATOR.gettext
@@ -256,20 +256,24 @@ class PpDeployZonesApp(PpPDNSApplication):
     # -------------------------------------------------------------------------
     def init_arg_parser(self):
 
-        super(PpDeployZonesApp, self).init_arg_parser()
+        deploy_group = self.arg_parser.add_argument_group(_("Options for {}").format(
+            self.appname))
+
 
-        self.arg_parser.add_argument(
+        deploy_group.add_argument(
             '-B', '--backup', dest="keep_backup", action='store_true',
             help=_("Keep a backup file for each changed configuration file."),
         )
 
-        self.arg_parser.add_argument(
+        deploy_group.add_argument(
             '-K', '--keep-tempdir', dest='keep_tempdir', action='store_true',
             help=_(
                 "Keeping the temporary directory instead of removing it at the end "
                 "(e.g. for debugging purposes)"),
         )
 
+        super(PpDeployZonesApp, self).init_arg_parser()
+
     # -------------------------------------------------------------------------
     def perform_arg_parser(self):
         """
index 2c5a68a25e1e7aeca364c4859788741759e4d76a..df73747f8235bd5617e46e3d1ae25daba21e743a 100644 (file)
@@ -22,16 +22,17 @@ import psutil
 
 # Own modules
 from fb_tools.common import pp
+from fb_tools.argparse_actions import TimeoutOptionAction
+from fb_tools.xlate import format_list
 
 from fb_pdnstools.zone import PowerDNSZone
 from fb_pdnstools.server import PowerDNSServer
 from fb_pdnstools.errors import PDNSApiNotFoundError
 from fb_pdnstools.errors import PDNSApiValidationError
-from fb_tools.xlate import format_list
 
 from .. import __version__ as GLOBAL_VERSION
 
-from ..argparse_actions import PortOptionAction, TimeoutOptionAction
+from ..argparse_actions import PortOptionAction
 
 from .mail import MailAppError, BaseMailApplication
 
@@ -40,7 +41,7 @@ from ..config.pdns import PdnsConfiguration
 
 from ..xlate import XLATOR
 
-__version__ = '0.9.3'
+__version__ = '0.9.4'
 LOG = logging.getLogger(__name__)
 
 _ = XLATOR.gettext
@@ -235,8 +236,6 @@ class PpPDNSApplication(BaseMailApplication):
         methods in descendant classes.
         """
 
-        super(PpPDNSApplication, self).init_arg_parser()
-
         pdns_group = self.arg_parser.add_argument_group(_('PowerDNS API options'))
         inst_group = pdns_group.add_mutually_exclusive_group()
 
@@ -279,13 +278,7 @@ class PpPDNSApplication(BaseMailApplication):
                 PdnsConfiguration.default_pdns_api_port),
         )
 
-        pdns_group.add_argument(
-            '-t', '--timeout',
-            metavar=_("SECS"), type=int, dest='timeout', default=default_timeout,
-            what=_("PowerDNS API access"), action=TimeoutOptionAction,
-            help=_("The timeout in seconds to request the PowerDNS API, default: {}.").format(
-                default_timeout),
-        )
+        super(PpPDNSApplication, self).init_arg_parser()
 
     # -------------------------------------------------------------------------
     def perform_arg_parser(self):
index 9c75561b53b99a20cf99fc6fb905fce0a78ac3ef..393dbb4cdccf2ffbb11e2f7f46c9ced1f6b8b3d9 100644 (file)
@@ -25,7 +25,7 @@ from . import AbortAppError, TimeoutOnPromptError
 from .ldap import LdapAppError, FatalLDAPError
 from .ldap import BaseLdapApplication
 
-__version__ = '0.5.3'
+__version__ = '0.5.4'
 LOG = logging.getLogger(__name__)
 
 _ = XLATOR.gettext
@@ -110,8 +110,6 @@ class RemoveLdapUserApplication(BaseLdapApplication):
     # -------------------------------------------------------------------------
     def init_arg_parser(self):
 
-        super(RemoveLdapUserApplication, self).init_arg_parser()
-
         remove_group = self.arg_parser.add_argument_group(_('Removing options'))
 
         remove_mode_group = remove_group.add_mutually_exclusive_group()
@@ -141,6 +139,8 @@ class RemoveLdapUserApplication(BaseLdapApplication):
                 "different in the particular LDAP instances).")
         )
 
+        super(RemoveLdapUserApplication, self).init_arg_parser()
+
     # -------------------------------------------------------------------------
     def _verify_instances(self):
 
index 00ef3f16e34a059bd2782bd6af325e46dba13b19..314a999e38a7e4b2a132a8775e21812ffb187e5e 100644 (file)
@@ -16,7 +16,7 @@ from . import MAX_PORT_NUMBER, MAX_TIMEOUT
 
 from .xlate import XLATOR
 
-__version__ = '0.1.0'
+__version__ = '0.2.1'
 LOG = logging.getLogger(__name__)
 
 _ = XLATOR.gettext
@@ -54,40 +54,6 @@ class PortOptionAction(argparse.Action):
         setattr(namespace, self.dest, port)
 
 
-# =============================================================================
-class TimeoutOptionAction(argparse.Action):
-
-    # -------------------------------------------------------------------------
-    def __init__(self, option_strings, what, *args, **kwargs):
-
-        self.what = what
-
-        super(TimeoutOptionAction, self).__init__(option_strings=option_strings, *args, **kwargs)
-
-    # -------------------------------------------------------------------------
-    def __call__(self, parser, namespace, values, option_string=None):
-
-        if values is None:
-            setattr(namespace, self.dest, None)
-            return
-
-        try:
-            timeout = int(values)
-        except (ValueError, TypeError) as e:
-            msg = _("Value {v!r} for a timeout of {what} is invalid:").format(
-                v=values, what=self.what)
-            msg += ' ' + str(e)
-            raise argparse.ArgumentError(self, msg)
-
-        if timeout <= 0 or timeout > MAX_TIMEOUT:
-            msg = _(
-                "Value {v!r} for a timeout of {what} must be greater than 0 and less than {max}.")
-            msg = msg.format(v=values, what=self.what, max=(MAX_TIMEOUT + 1))
-            raise argparse.ArgumentError(self, msg)
-
-        setattr(namespace, self.dest, timeout)
-
-
 # =============================================================================
 if __name__ == "__main__":