+++ /dev/null
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
-@author: Frank Brehm
-@contact: frank.brehm@pixelpark.com
-@copyright: © 2018 by Frank Brehm, Publicies Pixelpark GmbH, Berlin
-@summary: A base handler module for a handler object, that can call or spawn
- OS commands and read and write files.
-"""
-from __future__ import absolute_import
-
-# Standard modules
-import os
-import logging
-import subprocess
-import pwd
-import signal
-import errno
-import locale
-import time
-import pipes
-from fcntl import fcntl, F_GETFL, F_SETFL
-
-# Third party modules
-import six
-
-# Own modules
-from fb_tools.common import caller_search_path
-from fb_tools.errors import ReadTimeoutError, WriteTimeoutError
-from fb_tools.errors import CommandNotFoundError
-from fb_tools.obj import FbBaseObjectError, FbBaseObject
-
-__version__ = '1.2.2'
-
-LOG = logging.getLogger(__name__)
-
-
-# Some module varriables
-CHOWN_CMD = os.sep + os.path.join('bin', 'chown')
-ECHO_CMD = os.sep + os.path.join('bin', 'echo')
-SUDO_CMD = os.sep + os.path.join('usr', 'bin', 'sudo')
-
-
-# =============================================================================
-class BaseHandlerError(FbBaseObjectError):
- """Base error class for all exceptions happened during
- execution this application"""
-
- pass
-
-
-# =============================================================================
-class BaseHandler(FbBaseObject):
- """
- Base class for handler objects.
- """
-
- # -------------------------------------------------------------------------
- def __init__(
- self, appname=None, verbose=0, version=__version__, base_dir=None,
- initialized=None, simulate=False, sudo=False, quiet=False):
- """
- Initialisation of the base handler object.
-
- @raise CommandNotFoundError: if the commands 'chmod', 'echo' and
- 'sudo' could not be found.
- @raise BaseHandlerError: on a uncoverable error.
-
- @param appname: name of the current running application
- @type appname: str
- @param verbose: verbose level
- @type verbose: int
- @param version: the version string of the current object or application
- @type version: str
- @param base_dir: the base directory of all operations
- @type base_dir: str
- @param initialized: initialisation is complete after __init__()
- of this object
- @type initialized: bool
- @param simulate: don't execute actions, only display them
- @type simulate: bool
- @param sudo: should the command executed by sudo by default
- @type sudo: bool
- @param quiet: don't display ouput of action after calling
- @type quiet: bool
-
- @return: None
- """
-
- super(BaseHandler, self).__init__(
- appname=appname, verbose=verbose, version=version,
- base_dir=base_dir, initialized=False,
- )
-
- failed_commands = []
-
- self._simulate = bool(simulate)
- """
- @ivar: don't execute actions, only display them
- @type: bool
- """
-
- self._quiet = quiet
- """
- @ivar: don't display ouput of action after calling
- (except output on STDERR)
- @type: bool
- """
-
- self._sudo = sudo
- """
- @ivar: should the command executed by sudo by default
- @type: bool
- """
-
- self._chown_cmd = CHOWN_CMD
- """
- @ivar: the chown command for changing ownership of file objects
- @type: str
- """
- if not os.path.exists(self.chown_cmd) or not os.access(
- self.chown_cmd, os.X_OK):
- self._chown_cmd = self.get_command('chown')
- if not self.chown_cmd:
- failed_commands.append('chown')
-
- self._echo_cmd = ECHO_CMD
- """
- @ivar: the echo command for simulating execution
- @type: str
- """
- if not os.path.exists(self.echo_cmd) or not os.access(
- self.echo_cmd, os.X_OK):
- self._echo_cmd = self.get_command('echo')
- if not self.echo_cmd:
- failed_commands.append('echo')
-
- self._sudo_cmd = SUDO_CMD
- """
- @ivar: the sudo command for execute commands as root
- @type: str
- """
- if not os.path.exists(self.sudo_cmd) or not os.access(
- self._sudo_cmd, os.X_OK):
- self.sudo_cmd = self.get_command('sudo')
- if not self.sudo_cmd:
- failed_commands.append('sudo')
-
- # Some commands are missing
- if failed_commands:
- raise CommandNotFoundError(failed_commands)
-
- if initialized is None:
- self.initialized = True
- else:
- self.initialized = initialized
- if self.verbose > 3:
- LOG.debug("Initialized.")
-
- # -----------------------------------------------------------
- @property
- def simulate(self):
- """Simulation mode."""
- return self._simulate
-
- @simulate.setter
- def simulate(self, value):
- self._simulate = bool(value)
-
- # -----------------------------------------------------------
- @property
- def quiet(self):
- """Don't display ouput of action after calling."""
- return self._quiet
-
- @quiet.setter
- def quiet(self, value):
- self._quiet = bool(value)
-
- # -----------------------------------------------------------
- @property
- def sudo(self):
- """Should the command executed by sudo by default."""
- return self._sudo
-
- @sudo.setter
- def sudo(self, value):
- self._sudo = bool(value)
-
- # -----------------------------------------------------------
- @property
- def chown_cmd(self):
- """The absolute path to the OS command 'chown'."""
- return self._chown_cmd
-
- # -----------------------------------------------------------
- @property
- def echo_cmd(self):
- """The absolute path to the OS command 'echo'."""
- return self._echo_cmd
-
- # -----------------------------------------------------------
- @property
- def sudo_cmd(self):
- """The absolute path to the OS command 'sudo'."""
- return self._sudo_cmd
-
- # -------------------------------------------------------------------------
- def as_dict(self, short=True):
- """
- Transforms the elements of the object into a dict
-
- @param short: don't include local properties in resulting dict.
- @type short: bool
-
- @return: structure as dict
- @rtype: dict
- """
-
- res = super(BaseHandler, self).as_dict(short=short)
- res['simulate'] = self.simulate
- res['quiet'] = self.quiet
- res['sudo'] = self.sudo
- res['chown_cmd'] = self.chown_cmd
- res['echo_cmd'] = self.echo_cmd
- res['sudo_cmd'] = self.sudo_cmd
-
- return res
-
- # -------------------------------------------------------------------------
- def __repr__(self):
- """Typecasting into a string for reproduction."""
-
- out = "<%s(" % (self.__class__.__name__)
-
- fields = []
- fields.append("appname=%r" % (self.appname))
- fields.append("verbose=%r" % (self.verbose))
- fields.append("version=%r" % (self.version))
- fields.append("base_dir=%r" % (self.base_dir))
- fields.append("initialized=%r" % (self.initialized))
- fields.append("simulate=%r" % (self.simulate))
- fields.append("sudo=%r" % (self.sudo))
- fields.append("quiet=%r" % (self.quiet))
-
- out += ", ".join(fields) + ")>"
- return out
-
- # -------------------------------------------------------------------------
- def get_cmd(self, cmd):
-
- return self.get_command(cmd)
-
- # -------------------------------------------------------------------------
- def get_command(self, cmd, quiet=False):
- """
- Searches the OS search path for the given command and gives back the
- normalized position of this command.
- If the command is given as an absolute path, it check the existence
- of this command.
-
- @param cmd: the command to search
- @type cmd: str
- @param quiet: No warning message, if the command could not be found,
- only a debug message
- @type quiet: bool
-
- @return: normalized complete path of this command, or None,
- if not found
- @rtype: str or None
-
- """
-
- if self.verbose > 2:
- LOG.debug("Searching for command {!r} ...".format(cmd))
-
- # Checking an absolute path
- if os.path.isabs(cmd):
- if not os.path.exists(cmd):
- LOG.warning("Command {!r} doesn't exists.".format(cmd))
- return None
- if not os.access(cmd, os.X_OK):
- LOG.warning("Command {!r} is not executable.".format(cmd))
- return None
- return os.path.normpath(cmd)
-
- # Checking a relative path
- for d in map(str, caller_search_path()):
- if self.verbose > 3:
- LOG.debug("Searching command in {!r} ...".format(d))
- p = os.path.join(d, cmd)
- if os.path.exists(p):
- if self.verbose > 2:
- LOG.debug("Found {!r} ...".format(p))
- if os.access(p, os.X_OK):
- return os.path.normpath(p)
- else:
- LOG.debug("Command {!r} is not executable.".format(p))
-
- # command not found, sorry
- if quiet:
- if self.verbose > 2:
- LOG.debug("Command {!r} not found.".format(cmd))
- else:
- LOG.warning("Command {!r} not found.".format(cmd))
-
- return None
-
- # -------------------------------------------------------------------------
- def call(
- self, cmd, sudo=None, simulate=None, quiet=None, shell=False,
- stdout=None, stderr=None, bufsize=0, drop_stderr=False,
- close_fds=False, hb_handler=None, hb_interval=2.0,
- poll_interval=0.2, log_output=True, **kwargs):
- """
- Executing a OS command.
-
- @param cmd: the cmd you wanne call
- @type cmd: list of strings or str
- @param sudo: execute the command with sudo
- @type sudo: bool (or none, if self.sudo will be be asked)
- @param simulate: simulate execution or not,
- if None, self.simulate will asked
- @type simulate: bool or None
- @param quiet: quiet execution independend of self.quiet
- @type quiet: bool
- @param shell: execute the command with a shell
- @type shell: bool
- @param stdout: file descriptor for stdout,
- if not given, self.stdout is used
- @type stdout: int
- @param stderr: file descriptor for stderr,
- if not given, self.stderr is used
- @type stderr: int
- @param bufsize: size of the buffer for stdout
- @type bufsize: int
- @param drop_stderr: drop all output on stderr, independend
- of any value of stderr
- @type drop_stderr: bool
- @param close_fds: closing all open file descriptors
- (except 0, 1 and 2) on calling subprocess.Popen()
- @type close_fds: bool
- @param kwargs: any optional named parameter (must be one
- of the supported suprocess.Popen arguments)
- @type kwargs: dict
-
- @return: tuple of::
- - return value of calling process,
- - output on STDOUT,
- - output on STDERR
-
- """
-
- cmd_list = cmd
- if isinstance(cmd, str):
- cmd_list = [cmd]
-
- pwd_info = pwd.getpwuid(os.geteuid())
-
- if sudo is None:
- sudo = self.sudo
- if sudo:
- cmd_list.insert(0, '-n')
- cmd_list.insert(0, self.sudo_cmd)
-
- if simulate is None:
- simulate = self.simulate
- if simulate:
- cmd_list.insert(0, self.echo_cmd)
- quiet = False
-
- if quiet is None:
- quiet = self.quiet
-
- use_shell = bool(shell)
-
- cmd_list = [str(element) for element in cmd_list]
- cmd_str = ' '.join(map(lambda x: pipes.quote(x), cmd_list))
-
- if not quiet or self.verbose > 1:
- LOG.debug("Executing: {}".format(cmd_list))
-
- if quiet and self.verbose > 1:
- LOG.debug("Quiet execution")
-
- used_stdout = subprocess.PIPE
- if stdout is not None:
- used_stdout = stdout
- use_stdout = True
- if used_stdout is None:
- use_stdout = False
-
- used_stderr = subprocess.PIPE
- if drop_stderr:
- used_stderr = None
- elif stderr is not None:
- used_stderr = stderr
- use_stderr = True
- if used_stderr is None:
- use_stderr = False
-
- cur_locale = locale.getlocale()
- cur_encoding = cur_locale[1]
- if (cur_locale[1] is None or cur_locale[1] == '' or
- cur_locale[1].upper() == 'C' or
- cur_locale[1].upper() == 'POSIX'):
- cur_encoding = 'UTF-8'
-
- cmd_obj = subprocess.Popen(
- cmd_list,
- shell=use_shell,
- close_fds=close_fds,
- stderr=used_stderr,
- stdout=used_stdout,
- bufsize=bufsize,
- env={'USER': pwd_info.pw_name},
- **kwargs
- )
- # cwd=self.base_dir,
-
- # Display Output of executable
- if hb_handler is not None:
-
- (stdoutdata, stderrdata) = self._wait_for_proc_with_heartbeat(
- cmd_obj=cmd_obj, cmd_str=cmd_str, hb_handler=hb_handler, hb_interval=hb_interval,
- use_stdout=use_stdout, use_stderr=use_stderr,
- poll_interval=poll_interval, quiet=quiet)
-
- else:
-
- if not quiet or self.verbose > 1:
- LOG.debug("Starting synchronous communication with '{}'.".format(cmd_str))
- (stdoutdata, stderrdata) = cmd_obj.communicate()
-
- if not quiet or self.verbose > 1:
- LOG.debug("Finished communication with '{}'.".format(cmd_str))
-
- ret = cmd_obj.wait()
-
- return self._eval_call_results(
- ret, stderrdata, stdoutdata, cur_encoding=cur_encoding,
- log_output=log_output, quiet=quiet)
-
- # -------------------------------------------------------------------------
- def _eval_call_results(
- self, ret, stderrdata, stdoutdata,
- cur_encoding='utf-8', log_output=True, quiet=False):
-
- if not quiet:
- LOG.debug("Returncode: {}".format(ret))
-
- if stderrdata:
- if six.PY3:
- if self.verbose > 2:
- LOG.debug("Decoding {what} from {enc!r}.".format(
- what='STDERR', enc=cur_encoding))
- stderrdata = stderrdata.decode(cur_encoding)
- if not quiet:
- msg = "Output on {where}:\n{what}.".format(
- where="STDERR", what=stderrdata.strip())
- if ret:
- LOG.warn(msg)
- elif log_output:
- LOG.info(msg)
- else:
- LOG.debug(msg)
-
- if stdoutdata:
- if six.PY3:
- if self.verbose > 2:
- LOG.debug("Decoding {what} from {enc!r}.".format(
- what='STDOUT', enc=cur_encoding))
- stdoutdata = stdoutdata.decode(cur_encoding)
- if not quiet:
- msg = "Output on {where}:\n{what}.".format(
- where="STDOUT", what=stdoutdata.strip())
- if log_output:
- LOG.info(msg)
- else:
- LOG.debug(msg)
-
- return (ret, stdoutdata, stderrdata)
-
- # -------------------------------------------------------------------------
- def _wait_for_proc_with_heartbeat(
- self, cmd_obj, cmd_str, hb_handler, hb_interval, use_stdout=True, use_stderr=True,
- poll_interval=0.2, quiet=False):
-
- stdoutdata = ''
- stderrdata = ''
- if six.PY3:
- stdoutdata = bytearray()
- stderrdata = bytearray()
-
- if not quiet or self.verbose > 1:
- LOG.debug((
- "Starting asynchronous communication with '{cmd}', "
- "heartbeat interval is {interval:0.1f} seconds.").format(
- cmd=cmd_str, interval=hb_interval))
-
- out_flags = fcntl(cmd_obj.stdout, F_GETFL)
- err_flags = fcntl(cmd_obj.stderr, F_GETFL)
- fcntl(cmd_obj.stdout, F_SETFL, out_flags | os.O_NONBLOCK)
- fcntl(cmd_obj.stderr, F_SETFL, err_flags | os.O_NONBLOCK)
-
- start_time = time.time()
-
- while True:
-
- if self.verbose > 3:
- LOG.debug("Checking for the end of the communication ...")
- if cmd_obj.poll() is not None:
- cmd_obj.wait()
- break
-
- # Heartbeat handling ...
- cur_time = time.time()
- time_diff = cur_time - start_time
- if time_diff >= hb_interval:
- if not quiet or self.verbose > 1:
- LOG.debug("Time to execute the heartbeat handler.")
- hb_handler()
- start_time = cur_time
- if self.verbose > 3:
- LOG.debug("Sleeping {:0.2f} seconds ...".format(poll_interval))
- time.sleep(poll_interval)
-
- # Reading out file descriptors
- if use_stdout:
- try:
- stdoutdata += os.read(cmd_obj.stdout.fileno(), 1024)
- if self.verbose > 3:
- LOG.debug(" stdout is now: {!r}".format(stdoutdata))
- except OSError:
- pass
-
- if use_stderr:
- try:
- stderrdata += os.read(cmd_obj.stderr.fileno(), 1024)
- if self.verbose > 3:
- LOG.debug(" stderr is now: {!r}".format(stderrdata))
- except OSError:
- pass
-
- return (stdoutdata, stderrdata)
-
- # -------------------------------------------------------------------------
- def read_file(self, filename, timeout=2, quiet=False):
- """
- Reads the content of the given filename.
-
- @raise IOError: if file doesn't exists or isn't readable
- @raise ReadTimeoutError: on timeout reading the file
-
- @param filename: name of the file to read
- @type filename: str
- @param timeout: the amount in seconds when this method should timeout
- @type timeout: int
- @param quiet: increases the necessary verbosity level to
- put some debug messages
- @type quiet: bool
-
- @return: file content
- @rtype: str
-
- """
-
- needed_verbose_level = 1
- if quiet:
- needed_verbose_level = 3
-
- def read_alarm_caller(signum, sigframe):
- '''
- This nested function will be called in event of a timeout
-
- @param signum: the signal number (POSIX) which happend
- @type signum: int
- @param sigframe: the frame of the signal
- @type sigframe: object
- '''
-
- raise ReadTimeoutError(timeout, filename)
-
- timeout = abs(int(timeout))
-
- if not os.path.isfile(filename):
- raise IOError(
- errno.ENOENT, "File doesn't exists.", filename)
- if not os.access(filename, os.R_OK):
- raise IOError(
- errno.EACCES, 'Read permission denied.', filename)
-
- if self.verbose > needed_verbose_level:
- LOG.debug("Reading file content of {!r} ...".format(filename))
-
- signal.signal(signal.SIGALRM, read_alarm_caller)
- signal.alarm(timeout)
-
- content = ''
- fh = open(filename, 'r')
- for line in fh.readlines():
- content += line
- fh.close()
-
- signal.alarm(0)
-
- return content
-
- # -------------------------------------------------------------------------
- def write_file(self, filename, content, timeout=2, must_exists=True, quiet=False):
- """
- Writes the given content into the given filename.
- It should only be used for small things, because it writes unbuffered.
-
- @raise IOError: if file doesn't exists or isn't writeable
- @raise WriteTimeoutError: on timeout writing into the file
-
- @param filename: name of the file to write
- @type filename: str
- @param content: the content to write into the file
- @type content: str
- @param timeout: the amount in seconds when this method should timeout
- @type timeout: int
- @param must_exists: the file must exists before writing
- @type must_exists: bool
- @param quiet: increases the necessary verbosity level to
- put some debug messages
- @type quiet: bool
-
- @return: None
-
- """
-
- def write_alarm_caller(signum, sigframe):
- '''
- This nested function will be called in event of a timeout
-
- @param signum: the signal number (POSIX) which happend
- @type signum: int
- @param sigframe: the frame of the signal
- @type sigframe: object
- '''
-
- raise WriteTimeoutError(timeout, filename)
-
- verb_level1 = 0
- verb_level2 = 1
- verb_level3 = 3
- if quiet:
- verb_level1 = 2
- verb_level2 = 3
- verb_level3 = 4
-
- timeout = int(timeout)
-
- if must_exists:
- if not os.path.isfile(filename):
- raise IOError(errno.ENOENT, "File doesn't exists.", filename)
-
- if os.path.exists(filename):
- if not os.access(filename, os.W_OK):
- if self.simulate:
- LOG.error("Write permission to {!r} denied.".format(filename))
- else:
- raise IOError(errno.EACCES, 'Write permission denied.', filename)
- else:
- parent_dir = os.path.dirname(filename)
- if not os.access(parent_dir, os.W_OK):
- if self.simulate:
- LOG.error("Write permission to {!r} denied.".format(parent_dir))
- else:
- raise IOError(errno.EACCES, 'Write permission denied.', parent_dir)
-
- if self.verbose > verb_level1:
- if self.verbose > verb_level2:
- LOG.debug("Write {what!r} into {to!r}.".format(
- what=content, to=filename))
- else:
- LOG.debug("Writing {!r} ...".format(filename))
-
- if self.simulate:
- if self.verbose > verb_level2:
- LOG.debug("Simulating write into {!r}.".format(filename))
- return
-
- signal.signal(signal.SIGALRM, write_alarm_caller)
- signal.alarm(timeout)
-
- # Open filename for writing unbuffered
- if self.verbose > verb_level3:
- LOG.debug("Opening {!r} for write unbuffered ...".format(filename))
- fh = open(filename, 'w', 0)
-
- try:
- fh.write(content)
- finally:
- if self.verbose > verb_level3:
- LOG.debug("Closing {!r} ...".format(filename))
- fh.close()
-
- signal.alarm(0)
-
- return
-
-
-# =============================================================================
-if __name__ == "__main__":
-
- pass
-
-# =============================================================================
-
-# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list