--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2018 by Frank Brehm, Berlin
+@summary: The module for the application object.
+"""
+from __future__ import absolute_import
+
+# Standard modules
+import sys
+import os
+import logging
+import re
+import traceback
+
+# Third party modules
+import argparse
+
+# Own modules
+from .errors import FunctionNotImplementedError, PpAppError
+
+from .common import terminal_can_colors
+from .common import caller_search_path
+
+from .colored import ColoredFormatter, colorstr
+
+from .obj import PpBaseObject
+
+__version__ = '0.3.6'
+LOG = logging.getLogger(__name__)
+
+
+# =============================================================================
+class PpApplication(PpBaseObject):
+ """
+ Class for the application objects.
+ """
+
+ re_prefix = re.compile(r'^[a-z0-9][a-z0-9_]*$', re.IGNORECASE)
+ re_anum = re.compile(r'[^A-Z0-9_]+', re.IGNORECASE)
+
+ # -------------------------------------------------------------------------
+ def __init__(
+ self, appname=None, verbose=0, version=__version__, base_dir=None,
+ initialized=False, usage=None, description=None,
+ argparse_epilog=None, argparse_prefix_chars='-', env_prefix=None):
+
+ self.arg_parser = None
+ """
+ @ivar: argparser object to parse commandline parameters
+ @type: argparse.ArgumentParser
+ """
+
+ self.args = None
+ """
+ @ivar: an object containing all commandline parameters
+ after parsing them
+ @type: Namespace
+ """
+
+ self._exit_value = 0
+ """
+ @ivar: return value of the application for exiting with sys.exit().
+ @type: int
+ """
+
+ self._usage = usage
+ """
+ @ivar: usage text used on argparse
+ @type: str
+ """
+
+ self._description = description
+ """
+ @ivar: a short text describing the application
+ @type: str
+ """
+
+ self._argparse_epilog = argparse_epilog
+ """
+ @ivar: an epilog displayed at the end of the argparse help screen
+ @type: str
+ """
+
+ self._argparse_prefix_chars = argparse_prefix_chars
+ """
+ @ivar: The set of characters that prefix optional arguments.
+ @type: str
+ """
+
+ self._terminal_has_colors = False
+ """
+ @ivar: flag, that the current terminal understands color ANSI codes
+ @type: bool
+ """
+
+ self._quiet = False
+ self._force = False
+ self._simulate = False
+
+ self.env = {}
+ """
+ @ivar: a dictionary with all application specifiv environment variables,
+ they will detected by the env_prefix property of this object,
+ and their names will transformed before saving their values in
+ self.env by removing the env_prefix from the variable name.
+ @type: dict
+ """
+
+ self._env_prefix = None
+ """
+ @ivar: a prefix for environment variables to detect them and to assign
+ their transformed names and their values in self.env
+ @type: str
+ """
+
+ super(PpApplication, self).__init__(
+ appname=appname,
+ verbose=verbose,
+ version=version,
+ base_dir=base_dir,
+ initialized=False,
+ )
+
+ if env_prefix:
+ ep = str(env_prefix).strip()
+ if not ep:
+ msg = "Invalid env_prefix {!r} given - it may not be empty.".format(env_prefix)
+ raise PpAppError(msg)
+ match = self.re_prefix.search(ep)
+ if not match:
+ msg = (
+ "Invalid characters found in env_prefix {!r}, only "
+ "alphanumeric characters and digits and underscore "
+ "(this not as the first character) are allowed.").format(env_prefix)
+ raise PpAppError(msg)
+ self._env_prefix = ep
+ else:
+ ep = self.appname.upper() + '_'
+ self._env_prefix = self.re_anum.sub('_', ep)
+
+ self._init_arg_parser()
+ self._perform_arg_parser()
+
+ self._init_env()
+ self._perform_env()
+
+ # -----------------------------------------------------------
+ @property
+ def exit_value(self):
+ """The return value of the application for exiting with sys.exit()."""
+ return self._exit_value
+
+ @exit_value.setter
+ def exit_value(self, value):
+ v = int(value)
+ if v >= 0:
+ self._exit_value = v
+ else:
+ LOG.warn("Wrong exit_value {!r}, must be >= 0".format(value))
+
+ # -----------------------------------------------------------
+ @property
+ def exitvalue(self):
+ """The return value of the application for exiting with sys.exit()."""
+ return self._exit_value
+
+ @exitvalue.setter
+ def exitvalue(self, value):
+ self.exit_value = value
+
+ # -----------------------------------------------------------
+ @property
+ def usage(self):
+ """The usage text used on argparse."""
+ return self._usage
+
+ # -----------------------------------------------------------
+ @property
+ def description(self):
+ """A short text describing the application."""
+ return self._description
+
+ # -----------------------------------------------------------
+ @property
+ def argparse_epilog(self):
+ """An epilog displayed at the end of the argparse help screen."""
+ return self._argparse_epilog
+
+ # -----------------------------------------------------------
+ @property
+ def argparse_prefix_chars(self):
+ """The set of characters that prefix optional arguments."""
+ return self._argparse_prefix_chars
+
+ # -----------------------------------------------------------
+ @property
+ def terminal_has_colors(self):
+ """A flag, that the current terminal understands color ANSI codes."""
+ return self._terminal_has_colors
+
+ # -----------------------------------------------------------
+ @property
+ def env_prefix(self):
+ """A prefix for environment variables to detect them."""
+ return self._env_prefix
+
+ # -----------------------------------------------------------
+ @property
+ def usage_term(self):
+ """The localized version of 'usage: '"""
+ return 'Usage: '
+
+ # -----------------------------------------------------------
+ @property
+ def usage_term_len(self):
+ """The length of the localized version of 'usage: '"""
+ return len(self.usage_term)
+
+ # -----------------------------------------------------------
+ @property
+ def quiet(self):
+ """Quiet execution of the application,
+ only warnings and errors are emitted."""
+ return self._quiet
+
+ @quiet.setter
+ def quiet(self, value):
+ self._quiet = bool(value)
+
+ # -----------------------------------------------------------
+ @property
+ def force(self):
+ """Forced execution of the application."""
+ return self._force
+
+ @force.setter
+ def force(self, value):
+ self._force = bool(value)
+
+ # -----------------------------------------------------------
+ @property
+ def simulate(self):
+ """Simulation mode, nothing is really done."""
+ return self._simulate
+
+ @simulate.setter
+ def simulate(self, value):
+ self._simulate = bool(value)
+
+ # -----------------------------------------------------------
+ @property
+ def show_force_opt(self):
+ """Flag, whether the command line option '--force' should be shown."""
+ return getattr(self, '_show_force_opt', False)
+
+ @show_force_opt.setter
+ def show_force_opt(self, value):
+ self._show_force_opt = bool(value)
+
+ # -----------------------------------------------------------
+ @property
+ def show_simulate_opt(self):
+ """Flag, whether the command line option '--simulate' should be shown."""
+ return getattr(self, '_show_simulate_opt', False)
+
+ @show_simulate_opt.setter
+ def show_simulate_opt(self, value):
+ self._show_simulate_opt = bool(value)
+
+ # -------------------------------------------------------------------------
+ def exit(self, retval=-1, msg=None, trace=False):
+ """
+ Universal method to call sys.exit(). If fake_exit is set, a
+ FakeExitError exception is raised instead (useful for unittests.)
+
+ @param retval: the return value to give back to theoperating system
+ @type retval: int
+ @param msg: a last message, which should be emitted before exit.
+ @type msg: str
+ @param trace: flag to output a stack trace before exiting
+ @type trace: bool
+
+ @return: None
+
+ """
+
+ retval = int(retval)
+ trace = bool(trace)
+
+ root_logger = logging.getLogger()
+ has_handlers = False
+ if root_logger.handlers:
+ has_handlers = True
+
+ if msg:
+ if has_handlers:
+ if retval:
+ LOG.error(msg)
+ else:
+ LOG.info(msg)
+ if not has_handlers:
+ if hasattr(sys.stderr, 'buffer'):
+ sys.stderr.buffer.write(str(msg) + "\n")
+ else:
+ sys.stderr.write(str(msg) + "\n")
+
+ if trace:
+ if has_handlers:
+ if retval:
+ LOG.error(traceback.format_exc())
+ else:
+ LOG.info(traceback.format_exc())
+ else:
+ traceback.print_exc()
+
+ sys.exit(retval)
+
+ # -------------------------------------------------------------------------
+ def as_dict(self, short=True):
+ """
+ Transforms the elements of the object into a dict
+
+ @param short: don't include local properties in resulting dict.
+ @type short: bool
+
+ @return: structure as dict
+ @rtype: dict
+ """
+
+ res = super(PpApplication, self).as_dict(short=short)
+ res['exit_value'] = self.exit_value
+ res['usage'] = self.usage
+ res['quiet'] = self.quiet
+ res['force'] = self.force
+ res['simulate'] = self.simulate
+ res['description'] = self.description
+ res['argparse_epilog'] = self.argparse_epilog
+ res['argparse_prefix_chars'] = self.argparse_prefix_chars
+ res['terminal_has_colors'] = self.terminal_has_colors
+ res['env_prefix'] = self.env_prefix
+
+ return res
+
+ # -------------------------------------------------------------------------
+ def init_logging(self):
+ """
+ Initialize the logger object.
+ It creates a colored loghandler with all output to STDERR.
+ Maybe overridden in descendant classes.
+
+ @return: None
+ """
+
+ log_level = logging.INFO
+ if self.verbose:
+ log_level = logging.DEBUG
+ elif self.quiet:
+ log_level = logging.WARNING
+
+ root_logger = logging.getLogger()
+ root_logger.setLevel(log_level)
+
+ # create formatter
+ format_str = ''
+ if self.verbose > 1:
+ format_str = '[%(asctime)s]: '
+ format_str += self.appname + ': '
+ if self.verbose:
+ if self.verbose > 1:
+ format_str += '%(name)s(%(lineno)d) %(funcName)s() '
+ else:
+ format_str += '%(name)s '
+ format_str += '%(levelname)s - %(message)s'
+ formatter = None
+ if self.terminal_has_colors:
+ formatter = ColoredFormatter(format_str)
+ else:
+ formatter = logging.Formatter(format_str)
+
+ # create log handler for console output
+ lh_console = logging.StreamHandler(sys.stderr)
+ lh_console.setLevel(log_level)
+ lh_console.setFormatter(formatter)
+
+ root_logger.addHandler(lh_console)
+
+ return
+
+ # -------------------------------------------------------------------------
+ def terminal_can_color(self):
+ """
+ Method to detect, whether the current terminal (stdout and stderr)
+ is able to perform ANSI color sequences.
+
+ @return: both stdout and stderr can perform ANSI color sequences
+ @rtype: bool
+
+ """
+
+ term_debug = False
+ if self.verbose > 3:
+ term_debug = True
+ return terminal_can_colors(debug=term_debug)
+
+ # -------------------------------------------------------------------------
+ def post_init(self):
+ """
+ Method to execute before calling run(). Here could be done some
+ finishing actions after reading in commandline parameters,
+ configuration a.s.o.
+
+ This method could be overwritten by descendant classes, these
+ methhods should allways include a call to post_init() of the
+ parent class.
+
+ """
+
+ self.perform_arg_parser()
+ self.init_logging()
+
+ self.initialized = True
+
+ # -------------------------------------------------------------------------
+ def pre_run(self):
+ """
+ Dummy function to run before the main routine.
+ Could be overwritten by descendant classes.
+
+ """
+
+ if self.simulate:
+ LOG.warn("Simulation mode - nothing is really done.")
+
+ # -------------------------------------------------------------------------
+ def _run(self):
+ """
+ Dummy function as main routine.
+
+ MUST be overwritten by descendant classes.
+
+ """
+
+ raise FunctionNotImplementedError('_run', self.__class__.__name__)
+
+ # -------------------------------------------------------------------------
+ def __call__(self):
+ """
+ Helper method to make the resulting object callable, e.g.::
+
+ app = PBApplication(...)
+ app()
+
+ @return: None
+
+ """
+
+ self.run()
+
+ # -------------------------------------------------------------------------
+ def run(self):
+ """
+ The visible start point of this object.
+
+ @return: None
+
+ """
+
+ LOG.debug("Executing {} ...".format(self.__class__.__name__))
+
+ if not self.initialized:
+ self.handle_error(
+ "The application is not completely initialized.", '', True)
+ self.exit(9)
+
+ try:
+ self.pre_run()
+ except Exception as e:
+ self.handle_error(str(e), e.__class__.__name__, True)
+ self.exit(98)
+
+ if not self.initialized:
+ raise PpAppError(
+ "Object {!r} seems not to be completely initialized.".format(
+ self.__class__.__name__))
+
+ try:
+ self._run()
+ except Exception as e:
+ self.handle_error(str(e), e.__class__.__name__, True)
+ self.exit_value = 99
+
+ if self.verbose > 1:
+ LOG.info("Ending.")
+
+ try:
+ self.post_run()
+ except Exception as e:
+ self.handle_error(str(e), e.__class__.__name__, True)
+ self.exit_value = 97
+
+ self.exit(self.exit_value)
+
+ # -------------------------------------------------------------------------
+ def post_run(self):
+ """
+ Dummy function to run after the main routine.
+ Could be overwritten by descendant classes.
+
+ """
+
+ if self.verbose > 1:
+ LOG.info("executing post_run() ...")
+
+ # -------------------------------------------------------------------------
+ def _init_arg_parser(self):
+ """
+ Local called method to initiate the argument parser.
+
+ @raise PBApplicationError: on some errors
+
+ """
+
+ self.arg_parser = argparse.ArgumentParser(
+ prog=self.appname,
+ description=self.description,
+ usage=self.usage,
+ epilog=self.argparse_epilog,
+ prefix_chars=self.argparse_prefix_chars,
+ add_help=False,
+ )
+
+ self.init_arg_parser()
+
+ general_group = self.arg_parser.add_argument_group('General options')
+ general_group.add_argument(
+ '--color',
+ action="store",
+ dest='color',
+ const='yes',
+ default='auto',
+ nargs='?',
+ choices=['yes', 'no', 'auto'],
+ help="Use colored output for messages.",
+ )
+
+ verbose_group = general_group.add_mutually_exclusive_group()
+
+ verbose_group.add_argument(
+ "-v", "--verbose",
+ action="count",
+ dest='verbose',
+ help='Increase the verbosity level',
+ )
+
+ verbose_group.add_argument(
+ "-q", "--quiet",
+ action="store_true",
+ dest='quiet',
+ help='Silent execution, only warnings and errors are emitted.',
+ )
+
+ if self.show_force_opt:
+ general_group.add_argument(
+ "-f", "--force",
+ action="store_true", dest="force",
+ help="Forced execution of this application",
+ )
+
+ if self.show_simulate_opt:
+ help_msg = getattr(self, '_simulate_opt_help', None)
+ if not help_msg or str(help_msg) == '':
+ help_msg = "Simulation af all actions, nothing is really done."
+ general_group.add_argument(
+ "-s", "--simulate",
+ action="store_true", dest="simulate", help=help_msg,
+ )
+
+ general_group.add_argument(
+ "-h", "--help",
+ action='help',
+ dest='help',
+ help='Show this help message and exit'
+ )
+ general_group.add_argument(
+ "--usage",
+ action='store_true',
+ dest='usage',
+ help="Display brief usage message and exit"
+ )
+ general_group.add_argument(
+ "-V", '--version',
+ action='version',
+ version='Version of %(prog)s: {}'.format(self.version),
+ help="Show program's version number and exit"
+ )
+
+ # -------------------------------------------------------------------------
+ def init_arg_parser(self):
+ """
+ Public available method to initiate the argument parser.
+
+ Note::
+ avoid adding the general options '--verbose', '--help', '--usage'
+ and '--version'. These options are allways added after executing
+ this method.
+
+ Descendant classes may override this method.
+
+ """
+
+ pass
+
+ # -------------------------------------------------------------------------
+ def _perform_arg_parser(self):
+ """
+ Underlaying method for parsing arguments.
+ """
+
+ self.args = self.arg_parser.parse_args()
+
+ if self.args.usage:
+ self.arg_parser.print_usage(sys.stdout)
+ self.exit(0)
+
+ if self.args.verbose is not None and self.args.verbose > self.verbose:
+ self.verbose = self.args.verbose
+
+ if self.args.quiet:
+ self.quiet = self.args.quiet
+
+ if self.args.color == 'yes':
+ self._terminal_has_colors = True
+ elif self.args.color == 'no':
+ self._terminal_has_colors = False
+ else:
+ self._terminal_has_colors = self.terminal_can_color()
+
+ if getattr(self.args, 'force', False):
+ self.force = True
+
+ if getattr(self.args, 'simulate', False):
+ self.simulate = True
+
+ # -------------------------------------------------------------------------
+ def perform_arg_parser(self):
+ """
+ Public available method to execute some actions after parsing
+ the command line parameters.
+
+ Descendant classes may override this method.
+ """
+
+ pass
+
+ # -------------------------------------------------------------------------
+ def _init_env(self):
+ """
+ Initialization of self.env by application specific environment
+ variables.
+
+ It calls self.init_env(), after it has done his job.
+
+ """
+
+ for (key, value) in list(os.environ.items()):
+
+ if not key.startswith(self.env_prefix):
+ continue
+
+ newkey = key.replace(self.env_prefix, '', 1)
+ self.env[newkey] = value
+
+ self.init_env()
+
+ # -------------------------------------------------------------------------
+ def init_env(self):
+ """
+ Public available method to initiate self.env additional to the implicit
+ initialization done by this module.
+ Maybe it can be used to import environment variables, their
+ names not starting with self.env_prefix.
+
+ Currently a dummy method, which ca be overriden by descendant classes.
+
+ """
+
+ pass
+
+ # -------------------------------------------------------------------------
+ def _perform_env(self):
+ """
+ Method to do some useful things with the found environment.
+
+ It calls self.perform_env(), after it has done his job.
+
+ """
+
+ # try to detect verbosity level from environment
+ if 'VERBOSE' in self.env and self.env['VERBOSE']:
+ v = 0
+ try:
+ v = int(self.env['VERBOSE'])
+ except ValueError:
+ v = 1
+ if v > self.verbose:
+ self.verbose = v
+
+ self.perform_env()
+
+ # -------------------------------------------------------------------------
+ def perform_env(self):
+ """
+ Public available method to perform found environment variables after
+ initialization of self.env.
+
+ Currently a dummy method, which ca be overriden by descendant classes.
+
+ """
+
+ pass
+
+ # -------------------------------------------------------------------------
+ def colored(self, msg, color):
+ """
+ Wrapper function to colorize the message. Depending, whether the current
+ terminal can display ANSI colors, the message is colorized or not.
+
+ @param msg: The message to colorize
+ @type msg: str
+ @param color: The color to use, must be one of the keys of COLOR_CODE
+ @type color: str
+
+ @return: the colorized message
+ @rtype: str
+
+ """
+
+ if not self.terminal_has_colors:
+ return msg
+ return colorstr(msg, color)
+
+ # -------------------------------------------------------------------------
+ def get_command(self, cmd, quiet=False):
+ """
+ Searches the OS search path for the given command and gives back the
+ normalized position of this command.
+ If the command is given as an absolute path, it check the existence
+ of this command.
+
+ @param cmd: the command to search
+ @type cmd: str
+ @param quiet: No warning message, if the command could not be found,
+ only a debug message
+ @type quiet: bool
+
+ @return: normalized complete path of this command, or None,
+ if not found
+ @rtype: str or None
+
+ """
+
+ if self.verbose > 2:
+ LOG.debug("Searching for command {!r} ...".format(cmd))
+
+ # Checking an absolute path
+ if os.path.isabs(cmd):
+ if not os.path.exists(cmd):
+ LOG.warning("Command {!r} doesn't exists.".format(cmd))
+ return None
+ if not os.access(cmd, os.X_OK):
+ msg = "Command {!r} is not executable.".format(cmd)
+ LOG.warning(msg)
+ return None
+ return os.path.normpath(cmd)
+
+ # Checking a relative path
+ for d in caller_search_path():
+ if self.verbose > 3:
+ LOG.debug("Searching command in {!r} ...".format(d))
+ p = os.path.join(d, cmd)
+ if os.path.exists(p):
+ if self.verbose > 2:
+ LOG.debug("Found {!r} ...".format(p))
+ if os.access(p, os.X_OK):
+ return os.path.normpath(p)
+ else:
+ LOG.debug("Command {!r} is not executable.".format(p))
+
+ # command not found, sorry
+ if quiet:
+ if self.verbose > 2:
+ LOG.debug("Command {!r} not found.".format(cmd))
+ else:
+ LOG.warning("Command {!r} not found.".format(cmd))
+
+ return None
+
+# =============================================================================
+
+if __name__ == "__main__":
+
+ pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@summary: additional logging formatter for colored output via console
+"""
+
+# Standard modules
+import logging
+# import os.path
+# import sys
+import copy
+
+# Third party modules
+
+# Own modules
+
+# import pb_provisioning.common
+
+# from pb_provisioning.common import to_unicode_or_bust, to_utf8_or_bust
+
+__version__ = '0.1.4'
+
+# =============================================================================
+# Color coding module variables and helper functions
+
+COLOR_CODE = {
+ 'ENDC': 0, # RESET COLOR
+ 'BOLD': 1,
+ 'UNDERLINE': 4,
+ 'BLINK': 5,
+ 'INVERT': 7,
+ 'CONCEALD': 8,
+ 'STRIKE': 9,
+ 'GREY30': 90,
+ 'GREY40': 2,
+ 'GREY65': 37,
+ 'GREY70': 97,
+ 'GREY20_BG': 40,
+ 'GREY33_BG': 100,
+ 'GREY80_BG': 47,
+ 'GREY93_BG': 107,
+ 'DARK_RED': 31,
+ 'RED': 91,
+ 'RED_BG': 41,
+ 'LIGHT_RED_BG': 101,
+ 'DARK_YELLOW': 33,
+ 'YELLOW': 93,
+ 'YELLOW_BG': 43,
+ 'LIGHT_YELLOW_BG': 103,
+ 'DARK_BLUE': 34,
+ 'BLUE': 94,
+ 'BLUE_BG': 44,
+ 'LIGHT_BLUE_BG': 104,
+ 'DARK_MAGENTA': 35,
+ 'PURPLE': 95,
+ 'MAGENTA_BG': 45,
+ 'LIGHT_PURPLE_BG': 105,
+ 'DARK_CYAN': 36,
+ 'AUQA': 96,
+ 'AQUA': 96,
+ 'CYAN_BG': 46,
+ 'LIGHT_AUQA_BG': 106,
+ 'LIGHT_AQUA_BG': 106,
+ 'DARK_GREEN': 32,
+ 'GREEN': 92,
+ 'GREEN_BG': 42,
+ 'LIGHT_GREEN_BG': 102,
+ 'BLACK': 30,
+}
+
+
+# -----------------------------------------------------------------------------
+def termcode(num):
+ """
+ Output of an ANSII terminal code.
+ """
+
+ return('\033[%sm' % (num))
+
+
+# -----------------------------------------------------------------------------
+def colorstr(message, color):
+ """
+ Wrapper function to colorize the message.
+
+ @param message: The message to colorize
+ @type message: str
+ @param color: The color to use, must be one of the keys of COLOR_CODE
+ @type color: str
+
+ @return: the colorized message
+ @rtype: str
+
+ """
+
+ tcode = ''
+ if isinstance(color, (list, tuple)):
+ for clr in color:
+ tcode += termcode(COLOR_CODE[clr])
+ else:
+ tcode = termcode(COLOR_CODE[color])
+
+ return tcode + message + termcode(COLOR_CODE['ENDC'])
+
+LOG = logging.getLogger(__name__)
+
+
+# =============================================================================
+class ColoredFormatter(logging.Formatter):
+ """
+ A variant of code found at:
+ http://stackoverflow.com/questions/384076/how-can-i-make-the-python-logging-output-to-be-colored
+ """
+
+ LEVEL_COLOR = {
+ 'DEBUG': None,
+ 'INFO': 'GREEN',
+ 'WARNING': 'YELLOW',
+ 'ERROR': ('BOLD', 'RED'),
+ 'CRITICAL': 'RED_BG',
+ }
+
+ # -------------------------------------------------------------------------
+ def __init__(self, fmt=None, datefmt=None):
+ """
+ Initialize the formatter with specified format strings.
+
+ Initialize the formatter either with the specified format string, or a
+ default. Allow for specialized date formatting with the optional
+ datefmt argument (if omitted, you get the ISO8601 format).
+ """
+
+ logging.Formatter.__init__(self, fmt, datefmt)
+
+ # -----------------------------------------------------------
+ @property
+ def color_debug(self):
+ """The color used to output debug messages."""
+ return self.LEVEL_COLOR['DEBUG']
+
+ @color_debug.setter
+ def color_debug(self, value):
+ self.LEVEL_COLOR['DEBUG'] = value
+
+ # -----------------------------------------------------------
+ @property
+ def color_info(self):
+ """The color used to output info messages."""
+ return self.LEVEL_COLOR['INFO']
+
+ @color_info.setter
+ def color_info(self, value):
+ self.LEVEL_COLOR['INFO'] = value
+
+ # -----------------------------------------------------------
+ @property
+ def color_warning(self):
+ """The color used to output warning messages."""
+ return self.LEVEL_COLOR['WARNING']
+
+ @color_warning.setter
+ def color_warning(self, value):
+ self.LEVEL_COLOR['WARNING'] = value
+
+ # -----------------------------------------------------------
+ @property
+ def color_error(self):
+ """The color used to output error messages."""
+ return self.LEVEL_COLOR['ERROR']
+
+ @color_error.setter
+ def color_error(self, value):
+ self.LEVEL_COLOR['ERROR'] = value
+
+ # -----------------------------------------------------------
+ @property
+ def color_critical(self):
+ """The color used to output critical messages."""
+ return self.LEVEL_COLOR['CRITICAL']
+
+ @color_critical.setter
+ def color_critical(self, value):
+ self.LEVEL_COLOR['CRITICAL'] = value
+
+ # -------------------------------------------------------------------------
+ def format(self, record):
+ """
+ Format the specified record as text.
+ """
+
+ record = copy.copy(record)
+ levelname = record.levelname
+
+ if levelname in self.LEVEL_COLOR:
+
+ record.name = colorstr(record.name, 'BOLD')
+ record.filename = colorstr(record.filename, 'BOLD')
+ record.module = colorstr(record.module, 'BOLD')
+ record.funcName = colorstr(record.funcName, 'BOLD')
+ record.pathname = colorstr(record.pathname, 'BOLD')
+ record.processName = colorstr(record.processName, 'BOLD')
+ record.threadName = colorstr(record.threadName, 'BOLD')
+
+ if self.LEVEL_COLOR[levelname] is not None:
+ record.levelname = colorstr(
+ levelname, self.LEVEL_COLOR[levelname])
+ record.msg = colorstr(record.msg, self.LEVEL_COLOR[levelname])
+
+ return logging.Formatter.format(self, record)
+
+
+# =============================================================================
+
+if __name__ == "__main__":
+ pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2018 by Frank Brehm, Berlin
+@summary: The module for common used functions.
+"""
+
+# Standard modules
+import sys
+import os
+import logging
+import re
+import pprint
+import platform
+import locale
+
+# Third party modules
+import six
+
+# Own modules
+
+__version__ = '0.5.3'
+
+LOG = logging.getLogger(__name__)
+
+RE_YES = re.compile(r'^\s*(?:y(?:es)?|true)\s*$', re.IGNORECASE)
+RE_NO = re.compile(r'^\s*(?:no?|false|off)\s*$', re.IGNORECASE)
+PAT_TO_BOOL_TRUE = locale.nl_langinfo(locale.YESEXPR)
+RE_TO_BOOL_TRUE = re.compile(PAT_TO_BOOL_TRUE)
+PAT_TO_BOOL_FALSE = locale.nl_langinfo(locale.NOEXPR)
+RE_TO_BOOL_FALSE = re.compile(PAT_TO_BOOL_FALSE)
+
+RE_DOT = re.compile(r'\.')
+RE_DOT_AT_END = re.compile(r'(\.)*$')
+RE_DECIMAL = re.compile(r'^\d+$')
+RE_IPV4_PTR = re.compile(r'\.in-addr\.arpa\.$', re.IGNORECASE)
+RE_IPV6_PTR = re.compile(r'\.ip6\.arpa\.$', re.IGNORECASE)
+
+
+# =============================================================================
+def pp(value, indent=4, width=99, depth=None):
+ """
+ Returns a pretty print string of the given value.
+
+ @return: pretty print string
+ @rtype: str
+ """
+
+ pretty_printer = pprint.PrettyPrinter(
+ indent=indent, width=width, depth=depth)
+ return pretty_printer.pformat(value)
+
+
+# =============================================================================
+def terminal_can_colors(debug=False):
+ """
+ Method to detect, whether the current terminal (stdout and stderr)
+ is able to perform ANSI color sequences.
+
+ @return: both stdout and stderr can perform ANSI color sequences
+ @rtype: bool
+
+ """
+
+ cur_term = ''
+ if 'TERM' in os.environ:
+ cur_term = os.environ['TERM'].lower().strip()
+
+ colored_term_list = (
+ r'ansi',
+ r'linux.*',
+ r'screen.*',
+ r'[xeak]term.*',
+ r'gnome.*',
+ r'rxvt.*',
+ r'interix',
+ )
+ term_pattern = r'^(?:' + r'|'.join(colored_term_list) + r')$'
+ re_term = re.compile(term_pattern)
+
+ ansi_term = False
+ env_term_has_colors = False
+
+ if cur_term:
+ if cur_term == 'ansi':
+ env_term_has_colors = True
+ ansi_term = True
+ elif re_term.search(cur_term):
+ env_term_has_colors = True
+ if debug:
+ sys.stderr.write(
+ "ansi_term: %r, env_term_has_colors: %r\n" % (
+ ansi_term, env_term_has_colors))
+
+ has_colors = False
+ if env_term_has_colors:
+ has_colors = True
+ for handle in [sys.stdout, sys.stderr]:
+ if (hasattr(handle, "isatty") and handle.isatty()):
+ if debug:
+ sys.stderr.write("%s is a tty.\n" % (handle.name))
+ if (platform.system() == 'Windows' and not ansi_term):
+ if debug:
+ sys.stderr.write("platform is Windows and not ansi_term.\n")
+ has_colors = False
+ else:
+ if debug:
+ sys.stderr.write("%s is not a tty.\n" % (handle.name))
+ if ansi_term:
+ pass
+ else:
+ has_colors = False
+
+ return has_colors
+
+
+# =============================================================================
+def to_bool(value):
+ """
+ Converter from string to boolean values (e.g. from configurations)
+ """
+
+ if not value:
+ return False
+
+ try:
+ v_int = int(value)
+ except ValueError:
+ pass
+ except TypeError:
+ pass
+ else:
+ if v_int == 0:
+ return False
+ else:
+ return True
+
+ global PAT_TO_BOOL_TRUE
+ global RE_TO_BOOL_TRUE
+ global PAT_TO_BOOL_FALSE
+ global RE_TO_BOOL_FALSE
+
+ c_yes_expr = locale.nl_langinfo(locale.YESEXPR)
+ if c_yes_expr != PAT_TO_BOOL_TRUE:
+ PAT_TO_BOOL_TRUE = c_yes_expr
+ RE_TO_BOOL_TRUE = re.compile(PAT_TO_BOOL_TRUE)
+ # LOG.debug("Current pattern for 'yes': %r.", c_yes_expr)
+
+ c_no_expr = locale.nl_langinfo(locale.NOEXPR)
+ if c_no_expr != PAT_TO_BOOL_FALSE:
+ PAT_TO_BOOL_FALSE = c_no_expr
+ RE_TO_BOOL_FALSE = re.compile(PAT_TO_BOOL_FALSE)
+ # LOG.debug("Current pattern for 'no': %r.", c_no_expr)
+
+ v_str = ''
+ if isinstance(value, str):
+ v_str = value
+ if six.PY2:
+ if isinstance(value, unicode): # noqa
+ v_str = value.encode('utf-8')
+ elif six.PY3 and isinstance(value, bytes):
+ v_str = value.decode('utf-8')
+ else:
+ v_str = str(value)
+
+ match = RE_YES.search(v_str)
+ if match:
+ return True
+ match = RE_TO_BOOL_TRUE.search(v_str)
+ if match:
+ return True
+
+ match = RE_NO.search(v_str)
+ if match:
+ return False
+ match = RE_TO_BOOL_FALSE.search(v_str)
+ if match:
+ return False
+
+ return bool(value)
+
+
+# =============================================================================
+def to_unicode(obj, encoding='utf-8'):
+
+ do_decode = False
+ if six.PY2:
+ if isinstance(obj, str):
+ do_decode = True
+ else:
+ if isinstance(obj, bytes):
+ do_decode = True
+
+ if do_decode:
+ obj = obj.decode(encoding)
+
+ return obj
+
+
+# =============================================================================
+def to_utf8(obj):
+
+ return encode_or_bust(obj, 'utf-8')
+
+
+# =============================================================================
+def encode_or_bust(obj, encoding='utf-8'):
+
+ do_encode = False
+ if six.PY2:
+ if isinstance(obj, unicode): # noqa
+ do_encode = True
+ else:
+ if isinstance(obj, str):
+ do_encode = True
+
+ if do_encode:
+ obj = obj.encode(encoding)
+
+ return obj
+
+
+# =============================================================================
+def to_bytes(obj, encoding='utf-8'):
+ "Wrapper for encode_or_bust()"
+
+ return encode_or_bust(obj, encoding)
+
+
+# =============================================================================
+def to_str(obj, encoding='utf-8'):
+ """
+ Transformes the given string-like object into the str-type according
+ to the current Python version.
+ """
+
+ if six.PY2:
+ return encode_or_bust(obj, encoding)
+ else:
+ return to_unicode(obj, encoding)
+
+
+# =============================================================================
+def caller_search_path():
+ """
+ Builds a search path for executables from environment $PATH
+ including some standard paths.
+
+ @return: all existing search paths
+ @rtype: list
+ """
+
+ path_list = []
+ search_path = os.environ['PATH']
+ if not search_path:
+ search_path = os.defpath
+
+ search_path_list = [
+ '/opt/PPlocal/bin',
+ ]
+
+ for d in search_path.split(os.pathsep):
+ search_path_list.append(d)
+
+ default_path = [
+ '/bin',
+ '/usr/bin',
+ '/usr/local/bin',
+ '/sbin',
+ '/usr/sbin',
+ '/usr/local/sbin',
+ '/usr/ucb',
+ '/usr/sfw/bin',
+ '/opt/csw/bin',
+ '/usr/openwin/bin',
+ '/usr/ccs/bin',
+ ]
+
+ for d in default_path:
+ search_path_list.append(d)
+
+ for d in search_path_list:
+ if not os.path.exists(d):
+ continue
+ if not os.path.isdir(d):
+ continue
+ d_abs = os.path.realpath(d)
+ if d_abs not in path_list:
+ path_list.append(d_abs)
+
+ return path_list
+
+# =============================================================================
+def compare_fqdn(x, y):
+
+ # LOG.debug("Comparing {!r} <=> {!r}.".format(x, y))
+
+ # First check for None values
+ if x is None and y is None:
+ return 0
+ if x is None:
+ return -1
+ if y is None:
+ return 1
+
+ # Check for empty FQDNs
+ xs = str(x).strip().lower()
+ ys = str(y).strip().lower()
+
+ if xs == '' and ys == '':
+ return 0
+ if xs == '':
+ return -1
+ if ys == '':
+ return 1
+
+ # Ensure a dot at end
+ xs = RE_DOT_AT_END.sub('.', xs)
+ ys = RE_DOT_AT_END.sub('.', ys)
+
+ if xs == ys:
+ return 0
+
+ # Reverse IPv4 zones first, then reverse IPv6 zones
+ if RE_IPV4_PTR.search(xs):
+ if not RE_IPV4_PTR.search(ys):
+ return -1
+ elif RE_IPV4_PTR.search(ys):
+ if not RE_IPV4_PTR.search(xs):
+ return 1
+ elif RE_IPV6_PTR.search(xs):
+ if not RE_IPV6_PTR.search(ys):
+ return -1
+ elif RE_IPV6_PTR.search(ys):
+ if not RE_IPV6_PTR.search(xs):
+ return 1
+
+ return compare_fqdn_tokens(xs, ys)
+
+# =============================================================================
+def compare_fqdn_tokens(xs, ys):
+
+ xa = RE_DOT.split(xs)
+ xa.reverse()
+ xa.pop(0)
+
+ ya = RE_DOT.split(ys)
+ ya.reverse()
+ ya.pop(0)
+
+ # Compare token from the last to the first
+ nr_tokens = min(len(xa), len(ya))
+ while nr_tokens > 0:
+ token_x = xa.pop(0)
+ token_y = ya.pop(0)
+ if RE_DECIMAL.match(token_x) and RE_DECIMAL.match(token_y):
+ num_x = int(token_x)
+ num_y = int(token_y)
+ if num_x < num_y:
+ return -1
+ elif num_x > num_y:
+ return 1
+ else:
+ if token_x < token_y:
+ return -1
+ elif token_x > token_y:
+ return 1
+ nr_tokens -= 1
+
+ if len(xa):
+ return 1
+ if len(ya):
+ return -1
+
+ return 0
+
+# =============================================================================
+
+if __name__ == "__main__":
+
+ pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+@author: Frank Brehm
+@contact: frank.brehm@pixelpark.com
+@copyright: © 2018 by Frank Brehm, Publicies Pixelpark GmbH, Berlin
+"""
+from __future__ import absolute_import
+
+# Standard modules
+import sys
+import os
+import logging
+import datetime
+import traceback
+
+# Third party modules
+
+# Own modules
+from .common import pp, to_bytes
+
+from .errors import PpError
+
+__version__ = '0.2.4'
+
+LOG = logging.getLogger(__name__)
+
+
+# =============================================================================
+class PpBaseObjectError(PpError):
+ """
+ Base error class useable by all descendand objects.
+ """
+
+ pass
+
+
+# =============================================================================
+class PpBaseObject(object):
+ """
+ Base class for all objects.
+ """
+
+ # -------------------------------------------------------------------------
+ def __init__(
+ self, appname=None, verbose=0, version=__version__, base_dir=None,
+ initialized=False):
+ """
+ Initialisation of the base object.
+
+ Raises an exception on a uncoverable error.
+ """
+
+ self._appname = None
+ """
+ @ivar: name of the current running application
+ @type: str
+ """
+ if appname:
+ v = str(appname).strip()
+ if v:
+ self._appname = v
+ if not self._appname:
+ self._appname = os.path.basename(sys.argv[0])
+
+ self._version = version
+ """
+ @ivar: version string of the current object or application
+ @type: str
+ """
+
+ self._verbose = int(verbose)
+ """
+ @ivar: verbosity level (0 - 9)
+ @type: int
+ """
+ if self._verbose < 0:
+ msg = "Wrong verbose level {!r}, must be >= 0".format(verbose)
+ raise ValueError(msg)
+
+ self._initialized = False
+ """
+ @ivar: initialisation of this object is complete
+ after __init__() of this object
+ @type: bool
+ """
+
+ self._base_dir = base_dir
+ """
+ @ivar: base directory used for different purposes, must be an existent
+ directory. Defaults to directory of current script daemon.py.
+ @type: str
+ """
+ if base_dir:
+ if not os.path.exists(base_dir):
+ msg = "Base directory {!r} does not exists.".format(base_dir)
+ self.handle_error(msg)
+ self._base_dir = None
+ elif not os.path.isdir(base_dir):
+ msg = "Base directory {!r} is not a directory.".format(base_dir)
+ self.handle_error(msg)
+ self._base_dir = None
+ if not self._base_dir:
+ self._base_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
+
+ self._initialized = bool(initialized)
+
+ # -----------------------------------------------------------
+ @property
+ def appname(self):
+ """The name of the current running application."""
+ if hasattr(self, '_appname'):
+ return self._appname
+ return os.path.basename(sys.argv[0])
+
+ @appname.setter
+ def appname(self, value):
+ if value:
+ v = str(value).strip()
+ if v:
+ self._appname = v
+
+ # -----------------------------------------------------------
+ @property
+ def version(self):
+ """The version string of the current object or application."""
+ return getattr(self, '_version', __version__)
+
+ # -----------------------------------------------------------
+ @property
+ def verbose(self):
+ """The verbosity level."""
+ return getattr(self, '_verbose', 0)
+
+ @verbose.setter
+ def verbose(self, value):
+ v = int(value)
+ if v >= 0:
+ self._verbose = v
+ else:
+ LOG.warn("Wrong verbose level {!r}, must be >= 0".format(value))
+
+ # -----------------------------------------------------------
+ @property
+ def initialized(self):
+ """The initialisation of this object is complete."""
+ return getattr(self, '_initialized', False)
+
+ @initialized.setter
+ def initialized(self, value):
+ self._initialized = bool(value)
+
+ # -----------------------------------------------------------
+ @property
+ def base_dir(self):
+ """The base directory used for different purposes."""
+ return self._base_dir
+
+ @base_dir.setter
+ def base_dir(self, value):
+ if value.startswith('~'):
+ value = os.path.expanduser(value)
+ if not os.path.exists(value):
+ msg = "Base directory {!r} does not exists.".format(value)
+ LOG.error(msg)
+ elif not os.path.isdir(value):
+ msg = "Base directory {!r} is not a directory.".format(value)
+ LOG.error(msg)
+ else:
+ self._base_dir = value
+
+ # -------------------------------------------------------------------------
+ def __str__(self):
+ """
+ Typecasting function for translating object structure
+ into a string
+
+ @return: structure as string
+ @rtype: str
+ """
+
+ return pp(self.as_dict(short=True))
+
+ # -------------------------------------------------------------------------
+ def __repr__(self):
+ """Typecasting into a string for reproduction."""
+
+ out = "<%s(" % (self.__class__.__name__)
+
+ fields = []
+ fields.append("appname={!r}".format(self.appname))
+ fields.append("verbose={!r}".format(self.verbose))
+ fields.append("version={!r}".format(self.version))
+ fields.append("base_dir={!r}".format(self.base_dir))
+ fields.append("initialized={!r}".format(self.initialized))
+
+ out += ", ".join(fields) + ")>"
+ return out
+
+ # -------------------------------------------------------------------------
+ def as_dict(self, short=True):
+ """
+ Transforms the elements of the object into a dict
+
+ @param short: don't include local properties in resulting dict.
+ @type short: bool
+
+ @return: structure as dict
+ @rtype: dict
+ """
+
+ res = self.__dict__
+ res = {}
+ for key in self.__dict__:
+ if short and key.startswith('_') and not key.startswith('__'):
+ continue
+ val = self.__dict__[key]
+ if isinstance(val, PpBaseObject):
+ res[key] = val.as_dict(short=short)
+ else:
+ res[key] = val
+ res['__class_name__'] = self.__class__.__name__
+ res['appname'] = self.appname
+ res['version'] = self.version
+ res['verbose'] = self.verbose
+ res['initialized'] = self.initialized
+ res['base_dir'] = self.base_dir
+
+ return res
+
+ # -------------------------------------------------------------------------
+ def handle_error(
+ self, error_message=None, exception_name=None, do_traceback=False):
+ """
+ Handle an error gracefully.
+
+ Print a traceback and continue.
+
+ @param error_message: the error message to display
+ @type error_message: str
+ @param exception_name: name of the exception class
+ @type exception_name: str
+ @param do_traceback: allways show a traceback
+ @type do_traceback: bool
+
+ """
+
+ msg = 'Exception happened: '
+ if exception_name is not None:
+ exception_name = exception_name.strip()
+ if exception_name:
+ msg = exception_name + ': '
+ else:
+ msg = ''
+ if error_message:
+ msg += str(error_message)
+ else:
+ msg += 'undefined error.'
+
+ root_log = logging.getLogger()
+ has_handlers = False
+ if root_log.handlers:
+ has_handlers = True
+
+ if has_handlers:
+ LOG.error(msg)
+ if do_traceback:
+ LOG.error(traceback.format_exc())
+ else:
+ curdate = datetime.datetime.now()
+ curdate_str = "[" + curdate.isoformat(' ') + "]: "
+ msg = curdate_str + msg + "\n"
+ if hasattr(sys.stderr, 'buffer'):
+ sys.stderr.buffer.write(to_bytes(msg))
+ else:
+ sys.stderr.write(msg)
+ if do_traceback:
+ traceback.print_exc()
+
+ return
+
+ # -------------------------------------------------------------------------
+ def handle_info(self, message, info_name=None):
+ """
+ Shows an information. This happens both to STDERR and to all
+ initialized log handlers.
+
+ @param message: the info message to display
+ @type message: str
+ @param info_name: Title of information
+ @type info_name: str
+
+ """
+
+ msg = ''
+ if info_name is not None:
+ info_name = info_name.strip()
+ if info_name:
+ msg = info_name + ': '
+ msg += str(message).strip()
+
+ root_log = logging.getLogger()
+ has_handlers = False
+ if root_log.handlers:
+ has_handlers = True
+
+ if has_handlers:
+ LOG.info(msg)
+ else:
+ curdate = datetime.datetime.now()
+ curdate_str = "[" + curdate.isoformat(' ') + "]: "
+ msg = curdate_str + msg + "\n"
+ if hasattr(sys.stderr, 'buffer'):
+ sys.stderr.buffer.write(to_bytes(msg))
+ else:
+ sys.stderr.write(msg)
+
+ return
+
+# =============================================================================
+
+if __name__ == "__main__":
+
+ pass
+
+# =============================================================================
+
+# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
+++ /dev/null
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
-@author: Frank Brehm
-@contact: frank.brehm@pixelpark.com
-@copyright: © 2018 by Frank Brehm, Berlin
-@summary: The module for the application object.
-"""
-from __future__ import absolute_import
-
-# Standard modules
-import sys
-import os
-import logging
-import re
-import traceback
-
-# Third party modules
-import argparse
-
-# Own modules
-from .errors import FunctionNotImplementedError, PpAppError
-
-from .common import terminal_can_colors
-from .common import caller_search_path
-
-from .colored import ColoredFormatter, colorstr
-
-from .obj import PpBaseObject
-
-__version__ = '0.3.6'
-LOG = logging.getLogger(__name__)
-
-
-# =============================================================================
-class PpApplication(PpBaseObject):
- """
- Class for the application objects.
- """
-
- re_prefix = re.compile(r'^[a-z0-9][a-z0-9_]*$', re.IGNORECASE)
- re_anum = re.compile(r'[^A-Z0-9_]+', re.IGNORECASE)
-
- # -------------------------------------------------------------------------
- def __init__(
- self, appname=None, verbose=0, version=__version__, base_dir=None,
- initialized=False, usage=None, description=None,
- argparse_epilog=None, argparse_prefix_chars='-', env_prefix=None):
-
- self.arg_parser = None
- """
- @ivar: argparser object to parse commandline parameters
- @type: argparse.ArgumentParser
- """
-
- self.args = None
- """
- @ivar: an object containing all commandline parameters
- after parsing them
- @type: Namespace
- """
-
- self._exit_value = 0
- """
- @ivar: return value of the application for exiting with sys.exit().
- @type: int
- """
-
- self._usage = usage
- """
- @ivar: usage text used on argparse
- @type: str
- """
-
- self._description = description
- """
- @ivar: a short text describing the application
- @type: str
- """
-
- self._argparse_epilog = argparse_epilog
- """
- @ivar: an epilog displayed at the end of the argparse help screen
- @type: str
- """
-
- self._argparse_prefix_chars = argparse_prefix_chars
- """
- @ivar: The set of characters that prefix optional arguments.
- @type: str
- """
-
- self._terminal_has_colors = False
- """
- @ivar: flag, that the current terminal understands color ANSI codes
- @type: bool
- """
-
- self._quiet = False
- self._force = False
- self._simulate = False
-
- self.env = {}
- """
- @ivar: a dictionary with all application specifiv environment variables,
- they will detected by the env_prefix property of this object,
- and their names will transformed before saving their values in
- self.env by removing the env_prefix from the variable name.
- @type: dict
- """
-
- self._env_prefix = None
- """
- @ivar: a prefix for environment variables to detect them and to assign
- their transformed names and their values in self.env
- @type: str
- """
-
- super(PpApplication, self).__init__(
- appname=appname,
- verbose=verbose,
- version=version,
- base_dir=base_dir,
- initialized=False,
- )
-
- if env_prefix:
- ep = str(env_prefix).strip()
- if not ep:
- msg = "Invalid env_prefix {!r} given - it may not be empty.".format(env_prefix)
- raise PpAppError(msg)
- match = self.re_prefix.search(ep)
- if not match:
- msg = (
- "Invalid characters found in env_prefix {!r}, only "
- "alphanumeric characters and digits and underscore "
- "(this not as the first character) are allowed.").format(env_prefix)
- raise PpAppError(msg)
- self._env_prefix = ep
- else:
- ep = self.appname.upper() + '_'
- self._env_prefix = self.re_anum.sub('_', ep)
-
- self._init_arg_parser()
- self._perform_arg_parser()
-
- self._init_env()
- self._perform_env()
-
- # -----------------------------------------------------------
- @property
- def exit_value(self):
- """The return value of the application for exiting with sys.exit()."""
- return self._exit_value
-
- @exit_value.setter
- def exit_value(self, value):
- v = int(value)
- if v >= 0:
- self._exit_value = v
- else:
- LOG.warn("Wrong exit_value {!r}, must be >= 0".format(value))
-
- # -----------------------------------------------------------
- @property
- def exitvalue(self):
- """The return value of the application for exiting with sys.exit()."""
- return self._exit_value
-
- @exitvalue.setter
- def exitvalue(self, value):
- self.exit_value = value
-
- # -----------------------------------------------------------
- @property
- def usage(self):
- """The usage text used on argparse."""
- return self._usage
-
- # -----------------------------------------------------------
- @property
- def description(self):
- """A short text describing the application."""
- return self._description
-
- # -----------------------------------------------------------
- @property
- def argparse_epilog(self):
- """An epilog displayed at the end of the argparse help screen."""
- return self._argparse_epilog
-
- # -----------------------------------------------------------
- @property
- def argparse_prefix_chars(self):
- """The set of characters that prefix optional arguments."""
- return self._argparse_prefix_chars
-
- # -----------------------------------------------------------
- @property
- def terminal_has_colors(self):
- """A flag, that the current terminal understands color ANSI codes."""
- return self._terminal_has_colors
-
- # -----------------------------------------------------------
- @property
- def env_prefix(self):
- """A prefix for environment variables to detect them."""
- return self._env_prefix
-
- # -----------------------------------------------------------
- @property
- def usage_term(self):
- """The localized version of 'usage: '"""
- return 'Usage: '
-
- # -----------------------------------------------------------
- @property
- def usage_term_len(self):
- """The length of the localized version of 'usage: '"""
- return len(self.usage_term)
-
- # -----------------------------------------------------------
- @property
- def quiet(self):
- """Quiet execution of the application,
- only warnings and errors are emitted."""
- return self._quiet
-
- @quiet.setter
- def quiet(self, value):
- self._quiet = bool(value)
-
- # -----------------------------------------------------------
- @property
- def force(self):
- """Forced execution of the application."""
- return self._force
-
- @force.setter
- def force(self, value):
- self._force = bool(value)
-
- # -----------------------------------------------------------
- @property
- def simulate(self):
- """Simulation mode, nothing is really done."""
- return self._simulate
-
- @simulate.setter
- def simulate(self, value):
- self._simulate = bool(value)
-
- # -----------------------------------------------------------
- @property
- def show_force_opt(self):
- """Flag, whether the command line option '--force' should be shown."""
- return getattr(self, '_show_force_opt', False)
-
- @show_force_opt.setter
- def show_force_opt(self, value):
- self._show_force_opt = bool(value)
-
- # -----------------------------------------------------------
- @property
- def show_simulate_opt(self):
- """Flag, whether the command line option '--simulate' should be shown."""
- return getattr(self, '_show_simulate_opt', False)
-
- @show_simulate_opt.setter
- def show_simulate_opt(self, value):
- self._show_simulate_opt = bool(value)
-
- # -------------------------------------------------------------------------
- def exit(self, retval=-1, msg=None, trace=False):
- """
- Universal method to call sys.exit(). If fake_exit is set, a
- FakeExitError exception is raised instead (useful for unittests.)
-
- @param retval: the return value to give back to theoperating system
- @type retval: int
- @param msg: a last message, which should be emitted before exit.
- @type msg: str
- @param trace: flag to output a stack trace before exiting
- @type trace: bool
-
- @return: None
-
- """
-
- retval = int(retval)
- trace = bool(trace)
-
- root_logger = logging.getLogger()
- has_handlers = False
- if root_logger.handlers:
- has_handlers = True
-
- if msg:
- if has_handlers:
- if retval:
- LOG.error(msg)
- else:
- LOG.info(msg)
- if not has_handlers:
- if hasattr(sys.stderr, 'buffer'):
- sys.stderr.buffer.write(str(msg) + "\n")
- else:
- sys.stderr.write(str(msg) + "\n")
-
- if trace:
- if has_handlers:
- if retval:
- LOG.error(traceback.format_exc())
- else:
- LOG.info(traceback.format_exc())
- else:
- traceback.print_exc()
-
- sys.exit(retval)
-
- # -------------------------------------------------------------------------
- def as_dict(self, short=True):
- """
- Transforms the elements of the object into a dict
-
- @param short: don't include local properties in resulting dict.
- @type short: bool
-
- @return: structure as dict
- @rtype: dict
- """
-
- res = super(PpApplication, self).as_dict(short=short)
- res['exit_value'] = self.exit_value
- res['usage'] = self.usage
- res['quiet'] = self.quiet
- res['force'] = self.force
- res['simulate'] = self.simulate
- res['description'] = self.description
- res['argparse_epilog'] = self.argparse_epilog
- res['argparse_prefix_chars'] = self.argparse_prefix_chars
- res['terminal_has_colors'] = self.terminal_has_colors
- res['env_prefix'] = self.env_prefix
-
- return res
-
- # -------------------------------------------------------------------------
- def init_logging(self):
- """
- Initialize the logger object.
- It creates a colored loghandler with all output to STDERR.
- Maybe overridden in descendant classes.
-
- @return: None
- """
-
- log_level = logging.INFO
- if self.verbose:
- log_level = logging.DEBUG
- elif self.quiet:
- log_level = logging.WARNING
-
- root_logger = logging.getLogger()
- root_logger.setLevel(log_level)
-
- # create formatter
- format_str = ''
- if self.verbose > 1:
- format_str = '[%(asctime)s]: '
- format_str += self.appname + ': '
- if self.verbose:
- if self.verbose > 1:
- format_str += '%(name)s(%(lineno)d) %(funcName)s() '
- else:
- format_str += '%(name)s '
- format_str += '%(levelname)s - %(message)s'
- formatter = None
- if self.terminal_has_colors:
- formatter = ColoredFormatter(format_str)
- else:
- formatter = logging.Formatter(format_str)
-
- # create log handler for console output
- lh_console = logging.StreamHandler(sys.stderr)
- lh_console.setLevel(log_level)
- lh_console.setFormatter(formatter)
-
- root_logger.addHandler(lh_console)
-
- return
-
- # -------------------------------------------------------------------------
- def terminal_can_color(self):
- """
- Method to detect, whether the current terminal (stdout and stderr)
- is able to perform ANSI color sequences.
-
- @return: both stdout and stderr can perform ANSI color sequences
- @rtype: bool
-
- """
-
- term_debug = False
- if self.verbose > 3:
- term_debug = True
- return terminal_can_colors(debug=term_debug)
-
- # -------------------------------------------------------------------------
- def post_init(self):
- """
- Method to execute before calling run(). Here could be done some
- finishing actions after reading in commandline parameters,
- configuration a.s.o.
-
- This method could be overwritten by descendant classes, these
- methhods should allways include a call to post_init() of the
- parent class.
-
- """
-
- self.perform_arg_parser()
- self.init_logging()
-
- self.initialized = True
-
- # -------------------------------------------------------------------------
- def pre_run(self):
- """
- Dummy function to run before the main routine.
- Could be overwritten by descendant classes.
-
- """
-
- if self.simulate:
- LOG.warn("Simulation mode - nothing is really done.")
-
- # -------------------------------------------------------------------------
- def _run(self):
- """
- Dummy function as main routine.
-
- MUST be overwritten by descendant classes.
-
- """
-
- raise FunctionNotImplementedError('_run', self.__class__.__name__)
-
- # -------------------------------------------------------------------------
- def __call__(self):
- """
- Helper method to make the resulting object callable, e.g.::
-
- app = PBApplication(...)
- app()
-
- @return: None
-
- """
-
- self.run()
-
- # -------------------------------------------------------------------------
- def run(self):
- """
- The visible start point of this object.
-
- @return: None
-
- """
-
- LOG.debug("Executing {} ...".format(self.__class__.__name__))
-
- if not self.initialized:
- self.handle_error(
- "The application is not completely initialized.", '', True)
- self.exit(9)
-
- try:
- self.pre_run()
- except Exception as e:
- self.handle_error(str(e), e.__class__.__name__, True)
- self.exit(98)
-
- if not self.initialized:
- raise PpAppError(
- "Object {!r} seems not to be completely initialized.".format(
- self.__class__.__name__))
-
- try:
- self._run()
- except Exception as e:
- self.handle_error(str(e), e.__class__.__name__, True)
- self.exit_value = 99
-
- if self.verbose > 1:
- LOG.info("Ending.")
-
- try:
- self.post_run()
- except Exception as e:
- self.handle_error(str(e), e.__class__.__name__, True)
- self.exit_value = 97
-
- self.exit(self.exit_value)
-
- # -------------------------------------------------------------------------
- def post_run(self):
- """
- Dummy function to run after the main routine.
- Could be overwritten by descendant classes.
-
- """
-
- if self.verbose > 1:
- LOG.info("executing post_run() ...")
-
- # -------------------------------------------------------------------------
- def _init_arg_parser(self):
- """
- Local called method to initiate the argument parser.
-
- @raise PBApplicationError: on some errors
-
- """
-
- self.arg_parser = argparse.ArgumentParser(
- prog=self.appname,
- description=self.description,
- usage=self.usage,
- epilog=self.argparse_epilog,
- prefix_chars=self.argparse_prefix_chars,
- add_help=False,
- )
-
- self.init_arg_parser()
-
- general_group = self.arg_parser.add_argument_group('General options')
- general_group.add_argument(
- '--color',
- action="store",
- dest='color',
- const='yes',
- default='auto',
- nargs='?',
- choices=['yes', 'no', 'auto'],
- help="Use colored output for messages.",
- )
-
- verbose_group = general_group.add_mutually_exclusive_group()
-
- verbose_group.add_argument(
- "-v", "--verbose",
- action="count",
- dest='verbose',
- help='Increase the verbosity level',
- )
-
- verbose_group.add_argument(
- "-q", "--quiet",
- action="store_true",
- dest='quiet',
- help='Silent execution, only warnings and errors are emitted.',
- )
-
- if self.show_force_opt:
- general_group.add_argument(
- "-f", "--force",
- action="store_true", dest="force",
- help="Forced execution of this application",
- )
-
- if self.show_simulate_opt:
- help_msg = getattr(self, '_simulate_opt_help', None)
- if not help_msg or str(help_msg) == '':
- help_msg = "Simulation af all actions, nothing is really done."
- general_group.add_argument(
- "-s", "--simulate",
- action="store_true", dest="simulate", help=help_msg,
- )
-
- general_group.add_argument(
- "-h", "--help",
- action='help',
- dest='help',
- help='Show this help message and exit'
- )
- general_group.add_argument(
- "--usage",
- action='store_true',
- dest='usage',
- help="Display brief usage message and exit"
- )
- general_group.add_argument(
- "-V", '--version',
- action='version',
- version='Version of %(prog)s: {}'.format(self.version),
- help="Show program's version number and exit"
- )
-
- # -------------------------------------------------------------------------
- def init_arg_parser(self):
- """
- Public available method to initiate the argument parser.
-
- Note::
- avoid adding the general options '--verbose', '--help', '--usage'
- and '--version'. These options are allways added after executing
- this method.
-
- Descendant classes may override this method.
-
- """
-
- pass
-
- # -------------------------------------------------------------------------
- def _perform_arg_parser(self):
- """
- Underlaying method for parsing arguments.
- """
-
- self.args = self.arg_parser.parse_args()
-
- if self.args.usage:
- self.arg_parser.print_usage(sys.stdout)
- self.exit(0)
-
- if self.args.verbose is not None and self.args.verbose > self.verbose:
- self.verbose = self.args.verbose
-
- if self.args.quiet:
- self.quiet = self.args.quiet
-
- if self.args.color == 'yes':
- self._terminal_has_colors = True
- elif self.args.color == 'no':
- self._terminal_has_colors = False
- else:
- self._terminal_has_colors = self.terminal_can_color()
-
- if getattr(self.args, 'force', False):
- self.force = True
-
- if getattr(self.args, 'simulate', False):
- self.simulate = True
-
- # -------------------------------------------------------------------------
- def perform_arg_parser(self):
- """
- Public available method to execute some actions after parsing
- the command line parameters.
-
- Descendant classes may override this method.
- """
-
- pass
-
- # -------------------------------------------------------------------------
- def _init_env(self):
- """
- Initialization of self.env by application specific environment
- variables.
-
- It calls self.init_env(), after it has done his job.
-
- """
-
- for (key, value) in list(os.environ.items()):
-
- if not key.startswith(self.env_prefix):
- continue
-
- newkey = key.replace(self.env_prefix, '', 1)
- self.env[newkey] = value
-
- self.init_env()
-
- # -------------------------------------------------------------------------
- def init_env(self):
- """
- Public available method to initiate self.env additional to the implicit
- initialization done by this module.
- Maybe it can be used to import environment variables, their
- names not starting with self.env_prefix.
-
- Currently a dummy method, which ca be overriden by descendant classes.
-
- """
-
- pass
-
- # -------------------------------------------------------------------------
- def _perform_env(self):
- """
- Method to do some useful things with the found environment.
-
- It calls self.perform_env(), after it has done his job.
-
- """
-
- # try to detect verbosity level from environment
- if 'VERBOSE' in self.env and self.env['VERBOSE']:
- v = 0
- try:
- v = int(self.env['VERBOSE'])
- except ValueError:
- v = 1
- if v > self.verbose:
- self.verbose = v
-
- self.perform_env()
-
- # -------------------------------------------------------------------------
- def perform_env(self):
- """
- Public available method to perform found environment variables after
- initialization of self.env.
-
- Currently a dummy method, which ca be overriden by descendant classes.
-
- """
-
- pass
-
- # -------------------------------------------------------------------------
- def colored(self, msg, color):
- """
- Wrapper function to colorize the message. Depending, whether the current
- terminal can display ANSI colors, the message is colorized or not.
-
- @param msg: The message to colorize
- @type msg: str
- @param color: The color to use, must be one of the keys of COLOR_CODE
- @type color: str
-
- @return: the colorized message
- @rtype: str
-
- """
-
- if not self.terminal_has_colors:
- return msg
- return colorstr(msg, color)
-
- # -------------------------------------------------------------------------
- def get_command(self, cmd, quiet=False):
- """
- Searches the OS search path for the given command and gives back the
- normalized position of this command.
- If the command is given as an absolute path, it check the existence
- of this command.
-
- @param cmd: the command to search
- @type cmd: str
- @param quiet: No warning message, if the command could not be found,
- only a debug message
- @type quiet: bool
-
- @return: normalized complete path of this command, or None,
- if not found
- @rtype: str or None
-
- """
-
- if self.verbose > 2:
- LOG.debug("Searching for command {!r} ...".format(cmd))
-
- # Checking an absolute path
- if os.path.isabs(cmd):
- if not os.path.exists(cmd):
- LOG.warning("Command {!r} doesn't exists.".format(cmd))
- return None
- if not os.access(cmd, os.X_OK):
- msg = "Command {!r} is not executable.".format(cmd)
- LOG.warning(msg)
- return None
- return os.path.normpath(cmd)
-
- # Checking a relative path
- for d in caller_search_path():
- if self.verbose > 3:
- LOG.debug("Searching command in {!r} ...".format(d))
- p = os.path.join(d, cmd)
- if os.path.exists(p):
- if self.verbose > 2:
- LOG.debug("Found {!r} ...".format(p))
- if os.access(p, os.X_OK):
- return os.path.normpath(p)
- else:
- LOG.debug("Command {!r} is not executable.".format(p))
-
- # command not found, sorry
- if quiet:
- if self.verbose > 2:
- LOG.debug("Command {!r} not found.".format(cmd))
- else:
- LOG.warning("Command {!r} not found.".format(cmd))
-
- return None
-
-# =============================================================================
-
-if __name__ == "__main__":
-
- pass
-
-# =============================================================================
-
-# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list
+++ /dev/null
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
-@summary: additional logging formatter for colored output via console
-"""
-
-# Standard modules
-import logging
-# import os.path
-# import sys
-import copy
-
-# Third party modules
-
-# Own modules
-
-# import pb_provisioning.common
-
-# from pb_provisioning.common import to_unicode_or_bust, to_utf8_or_bust
-
-__version__ = '0.1.4'
-
-# =============================================================================
-# Color coding module variables and helper functions
-
-COLOR_CODE = {
- 'ENDC': 0, # RESET COLOR
- 'BOLD': 1,
- 'UNDERLINE': 4,
- 'BLINK': 5,
- 'INVERT': 7,
- 'CONCEALD': 8,
- 'STRIKE': 9,
- 'GREY30': 90,
- 'GREY40': 2,
- 'GREY65': 37,
- 'GREY70': 97,
- 'GREY20_BG': 40,
- 'GREY33_BG': 100,
- 'GREY80_BG': 47,
- 'GREY93_BG': 107,
- 'DARK_RED': 31,
- 'RED': 91,
- 'RED_BG': 41,
- 'LIGHT_RED_BG': 101,
- 'DARK_YELLOW': 33,
- 'YELLOW': 93,
- 'YELLOW_BG': 43,
- 'LIGHT_YELLOW_BG': 103,
- 'DARK_BLUE': 34,
- 'BLUE': 94,
- 'BLUE_BG': 44,
- 'LIGHT_BLUE_BG': 104,
- 'DARK_MAGENTA': 35,
- 'PURPLE': 95,
- 'MAGENTA_BG': 45,
- 'LIGHT_PURPLE_BG': 105,
- 'DARK_CYAN': 36,
- 'AUQA': 96,
- 'AQUA': 96,
- 'CYAN_BG': 46,
- 'LIGHT_AUQA_BG': 106,
- 'LIGHT_AQUA_BG': 106,
- 'DARK_GREEN': 32,
- 'GREEN': 92,
- 'GREEN_BG': 42,
- 'LIGHT_GREEN_BG': 102,
- 'BLACK': 30,
-}
-
-
-# -----------------------------------------------------------------------------
-def termcode(num):
- """
- Output of an ANSII terminal code.
- """
-
- return('\033[%sm' % (num))
-
-
-# -----------------------------------------------------------------------------
-def colorstr(message, color):
- """
- Wrapper function to colorize the message.
-
- @param message: The message to colorize
- @type message: str
- @param color: The color to use, must be one of the keys of COLOR_CODE
- @type color: str
-
- @return: the colorized message
- @rtype: str
-
- """
-
- tcode = ''
- if isinstance(color, (list, tuple)):
- for clr in color:
- tcode += termcode(COLOR_CODE[clr])
- else:
- tcode = termcode(COLOR_CODE[color])
-
- return tcode + message + termcode(COLOR_CODE['ENDC'])
-
-LOG = logging.getLogger(__name__)
-
-
-# =============================================================================
-class ColoredFormatter(logging.Formatter):
- """
- A variant of code found at:
- http://stackoverflow.com/questions/384076/how-can-i-make-the-python-logging-output-to-be-colored
- """
-
- LEVEL_COLOR = {
- 'DEBUG': None,
- 'INFO': 'GREEN',
- 'WARNING': 'YELLOW',
- 'ERROR': ('BOLD', 'RED'),
- 'CRITICAL': 'RED_BG',
- }
-
- # -------------------------------------------------------------------------
- def __init__(self, fmt=None, datefmt=None):
- """
- Initialize the formatter with specified format strings.
-
- Initialize the formatter either with the specified format string, or a
- default. Allow for specialized date formatting with the optional
- datefmt argument (if omitted, you get the ISO8601 format).
- """
-
- logging.Formatter.__init__(self, fmt, datefmt)
-
- # -----------------------------------------------------------
- @property
- def color_debug(self):
- """The color used to output debug messages."""
- return self.LEVEL_COLOR['DEBUG']
-
- @color_debug.setter
- def color_debug(self, value):
- self.LEVEL_COLOR['DEBUG'] = value
-
- # -----------------------------------------------------------
- @property
- def color_info(self):
- """The color used to output info messages."""
- return self.LEVEL_COLOR['INFO']
-
- @color_info.setter
- def color_info(self, value):
- self.LEVEL_COLOR['INFO'] = value
-
- # -----------------------------------------------------------
- @property
- def color_warning(self):
- """The color used to output warning messages."""
- return self.LEVEL_COLOR['WARNING']
-
- @color_warning.setter
- def color_warning(self, value):
- self.LEVEL_COLOR['WARNING'] = value
-
- # -----------------------------------------------------------
- @property
- def color_error(self):
- """The color used to output error messages."""
- return self.LEVEL_COLOR['ERROR']
-
- @color_error.setter
- def color_error(self, value):
- self.LEVEL_COLOR['ERROR'] = value
-
- # -----------------------------------------------------------
- @property
- def color_critical(self):
- """The color used to output critical messages."""
- return self.LEVEL_COLOR['CRITICAL']
-
- @color_critical.setter
- def color_critical(self, value):
- self.LEVEL_COLOR['CRITICAL'] = value
-
- # -------------------------------------------------------------------------
- def format(self, record):
- """
- Format the specified record as text.
- """
-
- record = copy.copy(record)
- levelname = record.levelname
-
- if levelname in self.LEVEL_COLOR:
-
- record.name = colorstr(record.name, 'BOLD')
- record.filename = colorstr(record.filename, 'BOLD')
- record.module = colorstr(record.module, 'BOLD')
- record.funcName = colorstr(record.funcName, 'BOLD')
- record.pathname = colorstr(record.pathname, 'BOLD')
- record.processName = colorstr(record.processName, 'BOLD')
- record.threadName = colorstr(record.threadName, 'BOLD')
-
- if self.LEVEL_COLOR[levelname] is not None:
- record.levelname = colorstr(
- levelname, self.LEVEL_COLOR[levelname])
- record.msg = colorstr(record.msg, self.LEVEL_COLOR[levelname])
-
- return logging.Formatter.format(self, record)
-
-
-# =============================================================================
-
-if __name__ == "__main__":
- pass
-
-# =============================================================================
-
-# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
+++ /dev/null
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
-@author: Frank Brehm
-@contact: frank.brehm@pixelpark.com
-@copyright: © 2018 by Frank Brehm, Berlin
-@summary: The module for common used functions.
-"""
-
-# Standard modules
-import sys
-import os
-import logging
-import re
-import pprint
-import platform
-import locale
-
-# Third party modules
-import six
-
-# Own modules
-
-__version__ = '0.5.3'
-
-LOG = logging.getLogger(__name__)
-
-RE_YES = re.compile(r'^\s*(?:y(?:es)?|true)\s*$', re.IGNORECASE)
-RE_NO = re.compile(r'^\s*(?:no?|false|off)\s*$', re.IGNORECASE)
-PAT_TO_BOOL_TRUE = locale.nl_langinfo(locale.YESEXPR)
-RE_TO_BOOL_TRUE = re.compile(PAT_TO_BOOL_TRUE)
-PAT_TO_BOOL_FALSE = locale.nl_langinfo(locale.NOEXPR)
-RE_TO_BOOL_FALSE = re.compile(PAT_TO_BOOL_FALSE)
-
-RE_DOT = re.compile(r'\.')
-RE_DOT_AT_END = re.compile(r'(\.)*$')
-RE_DECIMAL = re.compile(r'^\d+$')
-RE_IPV4_PTR = re.compile(r'\.in-addr\.arpa\.$', re.IGNORECASE)
-RE_IPV6_PTR = re.compile(r'\.ip6\.arpa\.$', re.IGNORECASE)
-
-
-# =============================================================================
-def pp(value, indent=4, width=99, depth=None):
- """
- Returns a pretty print string of the given value.
-
- @return: pretty print string
- @rtype: str
- """
-
- pretty_printer = pprint.PrettyPrinter(
- indent=indent, width=width, depth=depth)
- return pretty_printer.pformat(value)
-
-
-# =============================================================================
-def terminal_can_colors(debug=False):
- """
- Method to detect, whether the current terminal (stdout and stderr)
- is able to perform ANSI color sequences.
-
- @return: both stdout and stderr can perform ANSI color sequences
- @rtype: bool
-
- """
-
- cur_term = ''
- if 'TERM' in os.environ:
- cur_term = os.environ['TERM'].lower().strip()
-
- colored_term_list = (
- r'ansi',
- r'linux.*',
- r'screen.*',
- r'[xeak]term.*',
- r'gnome.*',
- r'rxvt.*',
- r'interix',
- )
- term_pattern = r'^(?:' + r'|'.join(colored_term_list) + r')$'
- re_term = re.compile(term_pattern)
-
- ansi_term = False
- env_term_has_colors = False
-
- if cur_term:
- if cur_term == 'ansi':
- env_term_has_colors = True
- ansi_term = True
- elif re_term.search(cur_term):
- env_term_has_colors = True
- if debug:
- sys.stderr.write(
- "ansi_term: %r, env_term_has_colors: %r\n" % (
- ansi_term, env_term_has_colors))
-
- has_colors = False
- if env_term_has_colors:
- has_colors = True
- for handle in [sys.stdout, sys.stderr]:
- if (hasattr(handle, "isatty") and handle.isatty()):
- if debug:
- sys.stderr.write("%s is a tty.\n" % (handle.name))
- if (platform.system() == 'Windows' and not ansi_term):
- if debug:
- sys.stderr.write("platform is Windows and not ansi_term.\n")
- has_colors = False
- else:
- if debug:
- sys.stderr.write("%s is not a tty.\n" % (handle.name))
- if ansi_term:
- pass
- else:
- has_colors = False
-
- return has_colors
-
-
-# =============================================================================
-def to_bool(value):
- """
- Converter from string to boolean values (e.g. from configurations)
- """
-
- if not value:
- return False
-
- try:
- v_int = int(value)
- except ValueError:
- pass
- except TypeError:
- pass
- else:
- if v_int == 0:
- return False
- else:
- return True
-
- global PAT_TO_BOOL_TRUE
- global RE_TO_BOOL_TRUE
- global PAT_TO_BOOL_FALSE
- global RE_TO_BOOL_FALSE
-
- c_yes_expr = locale.nl_langinfo(locale.YESEXPR)
- if c_yes_expr != PAT_TO_BOOL_TRUE:
- PAT_TO_BOOL_TRUE = c_yes_expr
- RE_TO_BOOL_TRUE = re.compile(PAT_TO_BOOL_TRUE)
- # LOG.debug("Current pattern for 'yes': %r.", c_yes_expr)
-
- c_no_expr = locale.nl_langinfo(locale.NOEXPR)
- if c_no_expr != PAT_TO_BOOL_FALSE:
- PAT_TO_BOOL_FALSE = c_no_expr
- RE_TO_BOOL_FALSE = re.compile(PAT_TO_BOOL_FALSE)
- # LOG.debug("Current pattern for 'no': %r.", c_no_expr)
-
- v_str = ''
- if isinstance(value, str):
- v_str = value
- if six.PY2:
- if isinstance(value, unicode): # noqa
- v_str = value.encode('utf-8')
- elif six.PY3 and isinstance(value, bytes):
- v_str = value.decode('utf-8')
- else:
- v_str = str(value)
-
- match = RE_YES.search(v_str)
- if match:
- return True
- match = RE_TO_BOOL_TRUE.search(v_str)
- if match:
- return True
-
- match = RE_NO.search(v_str)
- if match:
- return False
- match = RE_TO_BOOL_FALSE.search(v_str)
- if match:
- return False
-
- return bool(value)
-
-
-# =============================================================================
-def to_unicode(obj, encoding='utf-8'):
-
- do_decode = False
- if six.PY2:
- if isinstance(obj, str):
- do_decode = True
- else:
- if isinstance(obj, bytes):
- do_decode = True
-
- if do_decode:
- obj = obj.decode(encoding)
-
- return obj
-
-
-# =============================================================================
-def to_utf8(obj):
-
- return encode_or_bust(obj, 'utf-8')
-
-
-# =============================================================================
-def encode_or_bust(obj, encoding='utf-8'):
-
- do_encode = False
- if six.PY2:
- if isinstance(obj, unicode): # noqa
- do_encode = True
- else:
- if isinstance(obj, str):
- do_encode = True
-
- if do_encode:
- obj = obj.encode(encoding)
-
- return obj
-
-
-# =============================================================================
-def to_bytes(obj, encoding='utf-8'):
- "Wrapper for encode_or_bust()"
-
- return encode_or_bust(obj, encoding)
-
-
-# =============================================================================
-def to_str(obj, encoding='utf-8'):
- """
- Transformes the given string-like object into the str-type according
- to the current Python version.
- """
-
- if six.PY2:
- return encode_or_bust(obj, encoding)
- else:
- return to_unicode(obj, encoding)
-
-
-# =============================================================================
-def caller_search_path():
- """
- Builds a search path for executables from environment $PATH
- including some standard paths.
-
- @return: all existing search paths
- @rtype: list
- """
-
- path_list = []
- search_path = os.environ['PATH']
- if not search_path:
- search_path = os.defpath
-
- search_path_list = [
- '/opt/PPlocal/bin',
- ]
-
- for d in search_path.split(os.pathsep):
- search_path_list.append(d)
-
- default_path = [
- '/bin',
- '/usr/bin',
- '/usr/local/bin',
- '/sbin',
- '/usr/sbin',
- '/usr/local/sbin',
- '/usr/ucb',
- '/usr/sfw/bin',
- '/opt/csw/bin',
- '/usr/openwin/bin',
- '/usr/ccs/bin',
- ]
-
- for d in default_path:
- search_path_list.append(d)
-
- for d in search_path_list:
- if not os.path.exists(d):
- continue
- if not os.path.isdir(d):
- continue
- d_abs = os.path.realpath(d)
- if d_abs not in path_list:
- path_list.append(d_abs)
-
- return path_list
-
-# =============================================================================
-def compare_fqdn(x, y):
-
- # LOG.debug("Comparing {!r} <=> {!r}.".format(x, y))
-
- # First check for None values
- if x is None and y is None:
- return 0
- if x is None:
- return -1
- if y is None:
- return 1
-
- # Check for empty FQDNs
- xs = str(x).strip().lower()
- ys = str(y).strip().lower()
-
- if xs == '' and ys == '':
- return 0
- if xs == '':
- return -1
- if ys == '':
- return 1
-
- # Ensure a dot at end
- xs = RE_DOT_AT_END.sub('.', xs)
- ys = RE_DOT_AT_END.sub('.', ys)
-
- if xs == ys:
- return 0
-
- # Reverse IPv4 zones first, then reverse IPv6 zones
- if RE_IPV4_PTR.search(xs):
- if not RE_IPV4_PTR.search(ys):
- return -1
- elif RE_IPV4_PTR.search(ys):
- if not RE_IPV4_PTR.search(xs):
- return 1
- elif RE_IPV6_PTR.search(xs):
- if not RE_IPV6_PTR.search(ys):
- return -1
- elif RE_IPV6_PTR.search(ys):
- if not RE_IPV6_PTR.search(xs):
- return 1
-
- return compare_fqdn_tokens(xs, ys)
-
-# =============================================================================
-def compare_fqdn_tokens(xs, ys):
-
- xa = RE_DOT.split(xs)
- xa.reverse()
- xa.pop(0)
-
- ya = RE_DOT.split(ys)
- ya.reverse()
- ya.pop(0)
-
- # Compare token from the last to the first
- nr_tokens = min(len(xa), len(ya))
- while nr_tokens > 0:
- token_x = xa.pop(0)
- token_y = ya.pop(0)
- if RE_DECIMAL.match(token_x) and RE_DECIMAL.match(token_y):
- num_x = int(token_x)
- num_y = int(token_y)
- if num_x < num_y:
- return -1
- elif num_x > num_y:
- return 1
- else:
- if token_x < token_y:
- return -1
- elif token_x > token_y:
- return 1
- nr_tokens -= 1
-
- if len(xa):
- return 1
- if len(ya):
- return -1
-
- return 0
-
-# =============================================================================
-
-if __name__ == "__main__":
-
- pass
-
-# =============================================================================
-
-# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
+++ /dev/null
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
-@author: Frank Brehm
-@contact: frank.brehm@pixelpark.com
-@copyright: © 2018 by Frank Brehm, Publicies Pixelpark GmbH, Berlin
-"""
-from __future__ import absolute_import
-
-# Standard modules
-import sys
-import os
-import logging
-import datetime
-import traceback
-
-# Third party modules
-
-# Own modules
-from .common import pp, to_bytes
-
-from .errors import PpError
-
-__version__ = '0.2.4'
-
-LOG = logging.getLogger(__name__)
-
-
-# =============================================================================
-class PpBaseObjectError(PpError):
- """
- Base error class useable by all descendand objects.
- """
-
- pass
-
-
-# =============================================================================
-class PpBaseObject(object):
- """
- Base class for all objects.
- """
-
- # -------------------------------------------------------------------------
- def __init__(
- self, appname=None, verbose=0, version=__version__, base_dir=None,
- initialized=False):
- """
- Initialisation of the base object.
-
- Raises an exception on a uncoverable error.
- """
-
- self._appname = None
- """
- @ivar: name of the current running application
- @type: str
- """
- if appname:
- v = str(appname).strip()
- if v:
- self._appname = v
- if not self._appname:
- self._appname = os.path.basename(sys.argv[0])
-
- self._version = version
- """
- @ivar: version string of the current object or application
- @type: str
- """
-
- self._verbose = int(verbose)
- """
- @ivar: verbosity level (0 - 9)
- @type: int
- """
- if self._verbose < 0:
- msg = "Wrong verbose level {!r}, must be >= 0".format(verbose)
- raise ValueError(msg)
-
- self._initialized = False
- """
- @ivar: initialisation of this object is complete
- after __init__() of this object
- @type: bool
- """
-
- self._base_dir = base_dir
- """
- @ivar: base directory used for different purposes, must be an existent
- directory. Defaults to directory of current script daemon.py.
- @type: str
- """
- if base_dir:
- if not os.path.exists(base_dir):
- msg = "Base directory {!r} does not exists.".format(base_dir)
- self.handle_error(msg)
- self._base_dir = None
- elif not os.path.isdir(base_dir):
- msg = "Base directory {!r} is not a directory.".format(base_dir)
- self.handle_error(msg)
- self._base_dir = None
- if not self._base_dir:
- self._base_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
-
- self._initialized = bool(initialized)
-
- # -----------------------------------------------------------
- @property
- def appname(self):
- """The name of the current running application."""
- if hasattr(self, '_appname'):
- return self._appname
- return os.path.basename(sys.argv[0])
-
- @appname.setter
- def appname(self, value):
- if value:
- v = str(value).strip()
- if v:
- self._appname = v
-
- # -----------------------------------------------------------
- @property
- def version(self):
- """The version string of the current object or application."""
- return getattr(self, '_version', __version__)
-
- # -----------------------------------------------------------
- @property
- def verbose(self):
- """The verbosity level."""
- return getattr(self, '_verbose', 0)
-
- @verbose.setter
- def verbose(self, value):
- v = int(value)
- if v >= 0:
- self._verbose = v
- else:
- LOG.warn("Wrong verbose level {!r}, must be >= 0".format(value))
-
- # -----------------------------------------------------------
- @property
- def initialized(self):
- """The initialisation of this object is complete."""
- return getattr(self, '_initialized', False)
-
- @initialized.setter
- def initialized(self, value):
- self._initialized = bool(value)
-
- # -----------------------------------------------------------
- @property
- def base_dir(self):
- """The base directory used for different purposes."""
- return self._base_dir
-
- @base_dir.setter
- def base_dir(self, value):
- if value.startswith('~'):
- value = os.path.expanduser(value)
- if not os.path.exists(value):
- msg = "Base directory {!r} does not exists.".format(value)
- LOG.error(msg)
- elif not os.path.isdir(value):
- msg = "Base directory {!r} is not a directory.".format(value)
- LOG.error(msg)
- else:
- self._base_dir = value
-
- # -------------------------------------------------------------------------
- def __str__(self):
- """
- Typecasting function for translating object structure
- into a string
-
- @return: structure as string
- @rtype: str
- """
-
- return pp(self.as_dict(short=True))
-
- # -------------------------------------------------------------------------
- def __repr__(self):
- """Typecasting into a string for reproduction."""
-
- out = "<%s(" % (self.__class__.__name__)
-
- fields = []
- fields.append("appname={!r}".format(self.appname))
- fields.append("verbose={!r}".format(self.verbose))
- fields.append("version={!r}".format(self.version))
- fields.append("base_dir={!r}".format(self.base_dir))
- fields.append("initialized={!r}".format(self.initialized))
-
- out += ", ".join(fields) + ")>"
- return out
-
- # -------------------------------------------------------------------------
- def as_dict(self, short=True):
- """
- Transforms the elements of the object into a dict
-
- @param short: don't include local properties in resulting dict.
- @type short: bool
-
- @return: structure as dict
- @rtype: dict
- """
-
- res = self.__dict__
- res = {}
- for key in self.__dict__:
- if short and key.startswith('_') and not key.startswith('__'):
- continue
- val = self.__dict__[key]
- if isinstance(val, PpBaseObject):
- res[key] = val.as_dict(short=short)
- else:
- res[key] = val
- res['__class_name__'] = self.__class__.__name__
- res['appname'] = self.appname
- res['version'] = self.version
- res['verbose'] = self.verbose
- res['initialized'] = self.initialized
- res['base_dir'] = self.base_dir
-
- return res
-
- # -------------------------------------------------------------------------
- def handle_error(
- self, error_message=None, exception_name=None, do_traceback=False):
- """
- Handle an error gracefully.
-
- Print a traceback and continue.
-
- @param error_message: the error message to display
- @type error_message: str
- @param exception_name: name of the exception class
- @type exception_name: str
- @param do_traceback: allways show a traceback
- @type do_traceback: bool
-
- """
-
- msg = 'Exception happened: '
- if exception_name is not None:
- exception_name = exception_name.strip()
- if exception_name:
- msg = exception_name + ': '
- else:
- msg = ''
- if error_message:
- msg += str(error_message)
- else:
- msg += 'undefined error.'
-
- root_log = logging.getLogger()
- has_handlers = False
- if root_log.handlers:
- has_handlers = True
-
- if has_handlers:
- LOG.error(msg)
- if do_traceback:
- LOG.error(traceback.format_exc())
- else:
- curdate = datetime.datetime.now()
- curdate_str = "[" + curdate.isoformat(' ') + "]: "
- msg = curdate_str + msg + "\n"
- if hasattr(sys.stderr, 'buffer'):
- sys.stderr.buffer.write(to_bytes(msg))
- else:
- sys.stderr.write(msg)
- if do_traceback:
- traceback.print_exc()
-
- return
-
- # -------------------------------------------------------------------------
- def handle_info(self, message, info_name=None):
- """
- Shows an information. This happens both to STDERR and to all
- initialized log handlers.
-
- @param message: the info message to display
- @type message: str
- @param info_name: Title of information
- @type info_name: str
-
- """
-
- msg = ''
- if info_name is not None:
- info_name = info_name.strip()
- if info_name:
- msg = info_name + ': '
- msg += str(message).strip()
-
- root_log = logging.getLogger()
- has_handlers = False
- if root_log.handlers:
- has_handlers = True
-
- if has_handlers:
- LOG.info(msg)
- else:
- curdate = datetime.datetime.now()
- curdate_str = "[" + curdate.isoformat(' ') + "]: "
- msg = curdate_str + msg + "\n"
- if hasattr(sys.stderr, 'buffer'):
- sys.stderr.buffer.write(to_bytes(msg))
- else:
- sys.stderr.write(msg)
-
- return
-
-# =============================================================================
-
-if __name__ == "__main__":
-
- pass
-
-# =============================================================================
-
-# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4