From ce0bf0269f5e219e60c706a66a0246f481c4f856 Mon Sep 17 00:00:00 2001 From: Frank Brehm Date: Thu, 8 Sep 2022 11:59:47 +0200 Subject: [PATCH] Built in request for acknoledgement on removing LDAP user --- lib/pp_admintools/app/__init__.py | 28 ++++++-- lib/pp_admintools/app/remove_ldap_user.py | 84 +++++++++++++++++++++-- 2 files changed, 103 insertions(+), 9 deletions(-) diff --git a/lib/pp_admintools/app/__init__.py b/lib/pp_admintools/app/__init__.py index 89e8a35..c3c13bb 100644 --- a/lib/pp_admintools/app/__init__.py +++ b/lib/pp_admintools/app/__init__.py @@ -14,7 +14,7 @@ import signal import re # Third party modules -from fb_tools.common import to_bool +from fb_tools.common import to_bool, pp from fb_tools.cfg_app import FbConfigApplication from fb_tools.errors import FbAppError, IoTimeoutError from fb_tools.multi_config import BaseMultiConfig @@ -32,7 +32,7 @@ LOG = logging.getLogger(__name__) _ = XLATOR.gettext ngettext = XLATOR.ngettext -__version__ = '0.2.0' +__version__ = '0.2.1' # ============================================================================= @@ -211,6 +211,26 @@ class BaseDPXApplication(FbConfigApplication): def prompt_alarm_caller(signum, sigframe): raise TimeoutOnPromptError(self.prompt_timeout) + yes_list = ['y', 'yes'] + yes = _('yes') + if yes not in yes_list: + yes_list.append(yes) + yes_fc = yes[0] + if yes_fc not in yes_list: + yes_list.append(yes_fc) + if self.verbose > 1: + LOG.debug("Allowed values for 'Yes': " + pp(yes_list)) + + no_list = ['n', 'no'] + no = _('no') + if no not in no_list and no not in yes_list: + no_list.append(no) + no_fc = no[0] + if no_fc not in no_list and no_fc not in yes_list: + no_list.append(no_fc) + if self.verbose > 1: + LOG.debug("Allowed values for 'No': " + pp(no_list)) + try: signal.signal(signal.SIGALRM, prompt_alarm_caller) signal.alarm(self.prompt_timeout) @@ -230,10 +250,10 @@ class BaseDPXApplication(FbConfigApplication): return bool(default_on_empty) # There is an answer r = match.group(1).lower() - if r == 'n' or r == 'no': + if r in no_list: # Continue == no return False - elif r == 'y' or r == 'yes': + elif r in yes_list: # Continue == yes return True else: diff --git a/lib/pp_admintools/app/remove_ldap_user.py b/lib/pp_admintools/app/remove_ldap_user.py index 9b5e353..fa458d5 100644 --- a/lib/pp_admintools/app/remove_ldap_user.py +++ b/lib/pp_admintools/app/remove_ldap_user.py @@ -21,10 +21,12 @@ from fb_tools.common import to_bool, is_sequence, pp from ..xlate import XLATOR -from ..app.ldap import LdapAppError -from ..app.ldap import BaseLdapApplication +from . import AbortAppError, TimeoutOnPromptError -__version__ = '0.4.0' +from .ldap import LdapAppError +from .ldap import BaseLdapApplication + +__version__ = '0.4.1' LOG = logging.getLogger(__name__) _ = XLATOR.gettext @@ -169,12 +171,84 @@ class RemoveLdapUserApplication(BaseLdapApplication): LOG.warn(msg) self.exit(1) - msg = _("Evaluated DNs to remove:") - LOG.debug(msg + '\n' + pp(self.dns)) + if self.verbose > 1: + msg = _("Evaluated DNs to remove:") + LOG.debug(msg + '\n' + pp(self.dns)) + + if not self.request_for_remove(): + self.exit(1) + + if not self.quiet: + print() + if self.deactivate: + msg = _("Start disabling user entries in:") + else: + msg = _("Start removing user entries in:") + self.countdown(number=3, delay=1, prompt=msg) + else: + if self.deactivate: + msg = _("Start disabling user entries ...") + else: + msg = _("Start removing user entries ...") + LOG.warn(msg) for inst in self.dns: self.remove_users_from_inst(inst) + # ------------------------------------------------------------------------- + def request_for_remove(self): + + if self.yes or self.quiet: + return True + + print() + if self.deactivate: + msg = _("Do you really want to deactivate the following users?") + else: + msg = _("Do you really want to remove the following users?") + print(self.colored(msg, 'CYAN')) + print(self.colored(('-' * len(msg)), 'CYAN')) + print() + + max_len = 1 + for inst in self.dns: + connect_info = self.cfg.ldap_connection[inst] + uri = connect_info.url + if len(uri) > max_len: + max_len = len(uri) + + for inst in self.dns: + first = True + uri = connect_info.url + for dn in self.dns[inst]: + if not first: + uri = ' ' + msg = " {a} {uri:<{max_len}} - {dn}".format( + a=self.colored('*', 'CYAN'), uri=uri, max_len=max_len, + dn=self.colored(dn, 'CYAN')) + print(msg) + first = False + print() + + if self.deactivate: + msg = _("Deactivate [{yes}/{no}]?") + else: + msg = _("Remove [{yes}/{no}]?") + msg = msg.format( + yes=self.colored(_('yes'), 'RED'), no=self.colored(_('No'), 'GREEN')) + ' ' + try: + return self.ask_for_yes_or_no(msg) + except (TimeoutOnPromptError, AbortAppError) as e: + if self.deactivate: + msg = _("Abort deactivating by {cls}: {e}") + else: + msg = _("Abort removing by {cls}: {e}") + LOG.error(msg.format(cls=e.__class__.__name__, e=e)) + return False + + return True + + # ------------------------------------------------------------------------- def eval_user_dns(self, user): -- 2.39.5