setattr(namespace, self.dest, data_dir)
+# =============================================================================
+class LogDirOptionAction(argparse.Action, UncriticalHookError):
+
+ # -------------------------------------------------------------------------
+ def __call__(self, parser, namespace, log_dir, option_string=None):
+
+ if not log_dir.exists():
+ msg = _("Logging directory {!r} does not exists.").format(str(log_dir))
+ raise argparse.ArgumentError(self, msg)
+
+ if not data_dir.is_dir():
+ msg = _("Path to logging directory {!r} is not a directory.").format(str(log_dir))
+ raise argparse.ArgumentError(self, msg)
+
+ setattr(namespace, self.dest, log_dir)
+
# =============================================================================
class BaseHookApp(BaseApplication):
"""
cgi_bin_dir = pathlib.Path(__file__).parent.parent.resolve()
- puppetlabs_cfg_dir = os.sep + os.path.join('etc', 'puppetlabs')
- puppet_envs_dir = os.path.join(puppetlabs_cfg_dir, 'code', 'environments')
- fileserver_dir = os.path.join(puppetlabs_cfg_dir, 'code', 'fileserver')
+ root_path = pathlib.Path(os.sep)
+ puppetlabs_cfg_dir = root_path / 'etc' / 'puppetlabs'
+ puppet_code_dir = puppetlabs_cfg_dir / 'code'
+ puppet_envs_dir = puppet_code_dir / 'environments'
+ fileserver_dir = puppet_code_dir / 'fileserver'
+
+ default_data_dir = root_path / 'var' / 'lib' / 'webhooks'
+ default_log_dir = root_path / 'var' / 'log' / 'webhooks'
- default_data_dir = os.sep + os.path.join('var', 'lib', 'webhooks')
default_tz_name = 'Europe/Berlin'
- default_cachefile_stem = 'modules-info.yaml'
+ default_cachefile_stem = pathlib.Path('modules-info.yaml')
special_chars_re = re.compile(r'[^a-z0-9_\-]', re.IGNORECASE)
dev_re = re.compile(r'^dev')
if not description:
description = _("Base gitlab webhook application.")
- self.data_dir = self.default_data_dir
+ self._data_dir = self.default_data_dir
self._read_stdin = True
self.tz_name = self.default_tz_name
self._html_title = None
self.smtp_server = 'localhost'
self.smtp_port = 25
- self.default_parent_dir = '/var/log/webhooks'
+ self.default_parent_dir = self.fileserver_dir
self.default_email = DEFAULT_TO_EMAIL
self.mail_to_addresses = []
self.mail_cc_addresses = []
self.mail_cc_addresses = [self.default_email, ]
self.sender_address = DEFAULT_FROM_SENDER
- self._log_directory = pathlib.Path('/var/log/webhooks')
+ self._log_directory = self.default_log_dir
try:
pwd_info = pwd.getpwuid(os.geteuid())
self.perform_arg_parser_logging()
self.init_logging()
self.perform_arg_parser()
+ self.read_config()
if self.is_cgi:
LOG.debug("We are in a CGI: REQUEST_METHOD {!r}".format(os.environ['REQUEST_METHOD']))
else:
LOG.debug("We are NOT in CGI.")
+ self.init_cgi_logging()
+
self.search_curl_bin()
self.handler = BaseHandler(
appname=self.appname, verbose=self.verbose, base_dir=self.base_dir,
- simulate=self.simulate)
+ quiet=self.quiet, simulate=self.simulate)
self.tz = pytz.timezone(self.tz_name)
self._start_verbose = self.verbose
help=_("Data directory, default: {!r}.").format(self.data_dir),
)
+ hook_group.add_argument(
+ "-L", '--log-dir', metavar=_('DIR'), dest='log_dir',
+ action=LogDirOptionAction, type=pathlib.Path,
+ help=_("Logging directory, default: {!r}.").format(self.data_dir),
+ )
+
sort_group = hook_group.add_mutually_exclusive_group()
sort_group.add_argument(
# -------------------------------------------------------------------------
def perform_arg_parser_logging(self):
- if self.args.data_dir:
- self._log_directory = self.data_dir
+ if self.args.log_dir:
+ path = self.args.log_dir
+ if not path.is_absolute():
+ path = path.resolve()
+ self.log_directory = path
+
+ if self.args.cgi:
+ if not os.environ.get('REQUEST_METHOD', None):
+ os.environ['REQUEST_METHOD'] = 'GET'
# -------------------------------------------------------------------------
def perform_arg_parser(self):
path = path.resolve()
self.data_dir = path
- if self.args.cgi:
- if not os.environ.get('REQUEST_METHOD', None):
- os.environ['REQUEST_METHOD'] = 'GET'
-
if self.args.query:
if not os.environ.get('QUERY_STRING', None):
os.environ['QUERY_STRING'] = self.args.query
self._mail_headline = val
+ # -----------------------------------------------------------
+ @property
+ def data_dir(self):
+ """The directory containing some volatile data."""
+ return self._data_dir
+
+ @data_dir.setter
+ def data_dir(self, value):
+ self._data_dir = pathlib.Path(value)
+
# -----------------------------------------------------------
@property
def log_directory(self):
"""The directory containing the logfiles of this application."""
return self._log_directory
+ @log_directory.setter
+ def log_directory(self, value):
+ self._log_directory = pathlib.Path(value)
+
# -----------------------------------------------------------
@property
def logfile(self):
"""The logfile of this application."""
- return self.log_directory.joinpath(self.appname + '.log')
+ return self.log_directory / (self.appname + '.log')
# -----------------------------------------------------------
@property
def error_logfile(self):
"""The logfile for STDERR of this application."""
- return self.log_directory.joinpath(self.appname + '.error.log')
+ return self.log_directory / (self.appname + '.error.log')
# -----------------------------------------------------------
@property
cur_env = getattr(self, 'ref', None)
if cur_env is None:
return None
- return os.path.join(self.puppet_envs_dir, cur_env)
+ return self.puppet_envs_dir / cur_env
# -----------------------------------------------------------
@property
@property
def cachefile(self):
"""The filename of the cacheing file."""
-
- if os.path.isabs(self._cachefile):
- return os.path.normpath(self._cachefile)
- return os.path.normpath(os.path.join(self.data_dir, self._cachefile))
+ if self._cachefile.is_absolute():
+ return self._cachefile.resolve()
+ return self.data_dir.joinpath(self._cachefile).resolve()
# -------------------------------------------------------------------------
def as_dict(self, short=True):
res['is_cgi'] = self.is_cgi
res['cgi_bin_dir'] = self.cgi_bin_dir
res['log_directory'] = self.log_directory
+ res['data_dir'] = self.data_dir
res['error_logfile'] = self.error_logfile
res['logfile'] = self.logfile
res['puppetlabs_cfg_dir'] = self.puppetlabs_cfg_dir
+ res['puppet_code_dir'] = self.puppet_code_dir
res['puppet_envs_dir'] = self.puppet_envs_dir
res['env_dir'] = self.env_dir
res['fileserver_dir'] = self.fileserver_dir
self._perform_args(arg_parser)
-# # -------------------------------------------------------------------------
-# def __init_arg_parser(self):
-#
-# self.init_arg_parser(arg_parser)
-#
-# general_group = arg_parser.add_argument_group('General options')
-#
-# general_group.add_argument(
-# "-N", "--no-error-mail", action='store_true', dest='no_error_mail',
-# help="Don't send error messages in case of some exceptions.",
-# )
-#
-# general_group.add_argument(
-# "-D", '--data', '--data-dir', metavar='DIR', dest='data_dir',
-# help="Data directory, default: {!r}.".format(self.data_dir),
-# )
-#
-# sort_group = general_group.add_mutually_exclusive_group()
-#
-# sort_group.add_argument(
-# '-o', '--sort-by-name', action="store_true", dest='sort_by_name',
-# help="Sorting all lists of modules by name and vendor, in this order."
-# )
-#
-# sort_group.add_argument(
-# '-O', '--sort-by-fullname', action="store_false", dest='sort_by_name',
-# help="Sorting all lists of modules by the full name of the module (default)."
-# )
-#
-# general_group.add_argument(
-# "-v", "--verbose", action="count", dest='verbose',
-# help='Increase the verbosity level',
-# )
-#
-# general_group.add_argument(
-# '-s', '--simulate', '--test', action='store_true', dest='simulate',
-# help="Simulation mode, nothing is really done.",
-# )
-#
-# general_group.add_argument(
-# "-C", '--cgi', action='store_true', dest='cgi',
-# help='Enforces behaviour as called as a CGI script.',
-# )
-#
-# general_group.add_argument(
-# "-h", "--help", action='help', dest='help',
-# help='Show this help message and exit'
-# )
-#
-# general_group.add_argument(
-# "--usage", action='store_true', dest='usage',
-# help="Display brief usage message and exit"
-# )
-#
-# general_group.add_argument(
-# "-V", '--version', action='version',
-# version='Version of %(prog)s: {}'.format(self.version),
-# help="Show program's version number and exit"
-# )
-#
-# arg_parser.add_argument(
-# 'query', nargs='?',
-# help="An optional query string like on HTTP GET requests."
-# )
-#
-# # -------------------------------------------------------------------------
-# def __perform_args(self, arg_parser):
-#
-# self.cmdline_args = arg_parser.parse_args()
-#
-# if self.cmdline_args.usage:
-# arg_parser.print_usage(sys.stdout)
-# sys.exit(0)
-#
-# if self.cmdline_args.no_error_mail:
-# self.no_error_mail = True
-#
-# if self.cmdline_args.data_dir:
-# path = self.cmdline_args.data_dir
-# if not os.path.isabs(path):
-# path = os.path.normpath(os.path.join(os.getcwd(), path))
-# self.data_dir = path
-#
-# if self.cmdline_args.verbose is not None:
-# if self.cmdline_args.verbose > self.verbose:
-# self.verbose = self.cmdline_args.verbose
-# if self.cmdline_args.verbose > self._start_verbose:
-# self._start_verbose = self.cmdline_args.verbose
-#
-# if self.cmdline_args.cgi:
-# if not os.environ.get('REQUEST_METHOD', None):
-# os.environ['REQUEST_METHOD'] = 'GET'
-#
-# if self.cmdline_args.query:
-# if not os.environ.get('QUERY_STRING', None):
-# os.environ['QUERY_STRING'] = self.cmdline_args.query
-#
-# if self.cmdline_args.simulate:
-# sys.stderr.write("\nSimulation mode - nothing is really done.\n\n")
-# self.simulate = True
-#
-# if self.cmdline_args.sort_by_name:
-# self._sort_by_name = True
-#
-# self._get_query()
-# if 'output_type' in self.query:
-# try:
-# self.output_type = self.query['output_type']
-# except ValueError as e:
-# LOG.error(str(e))
-#
-# self.perform_args(arg_parser)
-#
-# # -------------------------------------------------------------------------
-# def perform_args(self, arg_parser):
-#
-# pass
-#
# -------------------------------------------------------------------------
def _get_query(self):
yaml_files = []
# ./hooks.yaml
- yaml_files.append(self.base_dir.joinpath('hooks.yaml'))
- # ./hooks.local.yaml
- yaml_files.append(self.base_dir.joinpath('hooks.local.yaml'))
+ yaml_files.append(self.base_dir / 'hooks.yaml')
# ./<appname>.yaml
- yaml_files.append(self.base_dir.joinpath(self.appname + '.yaml'))
- # ./<appname>.local.yaml
- yaml_files.append(self.base_dir.joinpath(self.appname + '.local.yaml'))
+ yaml_files.append(self.base_dir / (self.appname + '.yaml'))
# /etc/pixelpark/hooks.yaml
- yaml_files.append(pp_conf_path.joinpath('hooks.yaml'))
+ yaml_files.append(pp_conf_path / 'hooks.yaml')
# /etc/pixelpark/<appname>.yaml
- yaml_files.append(pp_conf_path.joinpath(self.appname + '.yaml'))
+ yaml_files.append(pp_conf_path / (self.appname + '.yaml'))
+ # ./hooks.local.yaml
+ yaml_files.append(self.base_dir / 'hooks.local.yaml')
+ # ./<appname>.local.yaml
+ yaml_files.append(self.base_dir / (self.appname + '.local.yaml'))
+
+ if self.verbose > 2:
+ LOG.debug("YAML file list to read config:\n{}".format(pp(yaml_files)))
for yaml_file in yaml_files:
self.read_from_yaml(yaml_file)
f = str(yaml_file)
- if self._start_verbose > 1:
- self.print_err(_("Trying to read config from {!r} ...").format(f))
+ if self.verbose > 1:
+ LOG.debug(_("Trying to read config from {!r} ...").format(f))
if not os.access(f, os.F_OK):
return
- if self._start_verbose > 1:
- self.print_err(_("Reading config from {!r} ...").format(f))
+ if self.verbose > 1:
+ LOG.debug(_("Reading config from {!r} ...").format(f))
config = {}
with open(f, 'rb') as fh:
config = yaml.load(fh.read())
- if self._start_verbose > 2:
- self.print_err(_("Read config:\n{}").format(pp(config)))
+ if self.verbose > 2:
+ LOG.debug(_("Read config:\n{}").format(pp(config)))
if config and isinstance(config, dict):
self.evaluate_config(config, f)
# -------------------------------------------------------------------------
def _evaluate_general_config(self, config, yaml_file):
+ error_title = _("Configuration error")
+
if 'verbose' in config:
try:
v = int(config['verbose'])
if v >= 0:
if v >= self.verbose:
- self._verbose = v
+ self.verbose = v
else:
LOG.warn(_("Wrong verbose level {v!d} in file {f!r}, must be >= 0").format(
v=v, f=yaml_file))
except ValueError as e:
msg = _("Wrong verbose level {v!r} in file {f!r}: {e}").format(
v=config['verbose'], f=yaml_file, e=e)
- LOG.warn(msg)
+ self.handle_error(msg, error_title)
if 'simulate' in config and not self.simulate:
self.simulate = config['simulate']
if 'do_sudo' in config:
self.do_sudo = to_bool(config['do_sudo'])
- if 'log_dir' in config and config['log_dir']:
- self._log_directory = config['log_dir']
+ if 'log_dir' in config and config['log_dir'] and not self.args.log_dir:
+ path = pathlib.Path(config['log_dir'])
+ if path.is_absolute():
+ self.log_directory = path
+ else:
+ msg = _("Logging directory {p!r} in file {f!r} must be an absolute path.").format(
+ p=str(path), f=yaml_file)
+ self.handle_error(msg, error_title)
if 'default_parent_dir' in config and config['default_parent_dir']:
- pdir = config['default_parent_dir']
- if os.path.isabs(pdir):
+ pdir = pathlib.Path(config['default_parent_dir'])
+ if pdir.is_absolute():
self.default_parent_dir = pdir
+ else:
+ msg = _(
+ "Default parent directory {p!r} in file {f!r} "
+ "must be an absolute path.").format(p=str(pdir), f=yaml_file)
+ self.handle_error(msg, error_title)
if 'tz' in config and config['tz']:
tz = str(config['tz']).strip()
if tz:
self.tz_name = tz
- if 'data_dir' in config and config['data_dir'] and not self.cmdline_args.data_dir:
- path = config['data_dir']
- if not os.path.isabs(path):
- path = os.path.join(str(self.base_dir), path)
+ if 'data_dir' in config and config['data_dir'] and not self.args.data_dir:
+ path = pathlib.Path(config['data_dir'])
+ if not path.is_absolute():
+ path = self.base_dir / path
self.data_dir = path
if 'cachefile' in config:
- self._cachefile = config['cachefile']
+ self._cachefile = pathlib.Path(config['cachefile'])
elif 'cache_file' in config:
- self._cachefile = config['cache_file']
+ self._cachefile = pathlib.Path(config['cache_file'])
# -------------------------------------------------------------------------
def __evaluate_mail_config(self, config, yaml_file):
self.mail_cc_addresses = config['mail_cc_addresses']
# -------------------------------------------------------------------------
- def init_logging(self):
- """
- Initialize the logger object.
- It creates a colored loghandler with all output to STDERR.
- Maybe overridden in descendant classes.
-
- @return: None
- """
-
- root_log = logging.getLogger()
- root_log.setLevel(logging.INFO)
- if self.verbose:
- root_log.setLevel(logging.DEBUG)
+ def _get_log_formatter(self):
# create formatter
format_str = ''
- if 'REQUEST_METHOD' in os.environ or self.verbose > 1:
+ if self.is_cgi or self.verbose > 1:
format_str = '[%(asctime)s]: '
format_str += self.appname + ': '
if self.verbose:
format_str += '%(levelname)s - %(message)s'
formatter = logging.Formatter(format_str)
- if 'REQUEST_METHOD' in os.environ:
+ return formatter
- se = None
- try:
- se = open(self.error_logfile, 'ab', 0)
- except Exception as e:
- msg = _("Could not open error logfile {f!r}: {e}").format(
- f=self.error_logfile, e=e) + '\n\n'
- sys.stderr.write(msg)
- sys.exit(7)
-
- sys.stderr.flush()
- os.dup2(se.fileno(), sys.stderr.fileno())
-
- # we are in a CGI environment
- if os.path.isdir(self.log_directory) and os.access(self.log_directory, os.W_OK):
- lh_file = logging.FileHandler(
- self.logfile, mode='a', encoding='utf-8', delay=True)
- if self.verbose:
- lh_file.setLevel(logging.DEBUG)
- else:
- lh_file.setLevel(logging.INFO)
- lh_file.setFormatter(formatter)
- root_log.addHandler(lh_file)
+ # -------------------------------------------------------------------------
+ def init_logging(self):
+ """
+ Initialize the logger object.
+ It creates a colored loghandler with all output to STDERR.
+ Maybe overridden in descendant classes.
- else:
+ @return: None
+ """
+
+ root_log = logging.getLogger()
+ root_log.setLevel(logging.INFO)
+ if self.verbose:
+ root_log.setLevel(logging.DEBUG)
+
+ formatter = self._get_log_formatter()
+
+ if not self.is_cgi:
# create log handler for console output
lh_console = logging.StreamHandler(sys.stderr)
if self.verbose:
return
+ # -------------------------------------------------------------------------
+ def init_cgi_logging(self):
+ """
+ Initializes file logging after reading the configuration.
+ """
+
+ error_title = _("Initialization_error")
+
+ if not self.is_cgi:
+ return
+ if self.verbose:
+ self.handle_info("Initializing file logging for CGI ...")
+
+ if not self.log_directory.exists():
+ self.handle_error(_("Logging directory {!r} does not exists.").format(
+ str(self.log_directory)), error_title)
+ sys.stderr.write('\n')
+ sys.exit(7)
+
+ if not self.log_directory.is_dir():
+ self.handle_error(_("Path to logging directory {!r} is not a directory.").format(
+ str(self.log_directory)), error_title)
+ sys.stderr.write('\n')
+ sys.exit(7)
+
+ if not os.access(str(self.log_directory), os.W_OK):
+ self.handle_error(_("Logging directory {!r} is not writeable.").format(
+ str(self.log_directory)), error_title)
+ sys.stderr.write('\n')
+ sys.exit(7)
+
+ root_log = logging.getLogger()
+ formatter = self._get_log_formatter()
+
+ # Redirecting STDERR into logfile
+ if self.verbose > 1:
+ self.handle_info("Redirecting STDERR => {!r} ...".format(str(self.error_logfile)))
+ se = None
+ try:
+ se = open(str(self.error_logfile), 'ab', 0)
+ except Exception as e:
+ msg = _("Could not open error logfile {f!r}: {e}").format(
+ f=str(self.error_logfile), e=e)
+ self.handle_error(msg, error_title)
+ sys.stderr.write('\n')
+ sys.exit(7)
+
+ sys.stderr.flush()
+ os.dup2(se.fileno(), sys.stderr.fileno())
+
+ lh_file = logging.FileHandler(str(self.logfile), mode='a', encoding='utf-8', delay=True)
+ if self.verbose:
+ lh_file.setLevel(logging.DEBUG)
+ else:
+ lh_file.setLevel(logging.INFO)
+ lh_file.setFormatter(formatter)
+ root_log.addHandler(lh_file)
+
# -------------------------------------------------------------------------
def print_err(self, *objects, sep=' ', end='\n', file=sys.stderr.buffer, flush=True):
self.print_out(*objects, sep=sep, end=end, file=file, flush=flush)
self.run_hook()
except BaseHookError as e:
cn = e.__class__.__name__
- msg = _("Got a {cn} performing {a}: {e}").format(n=n, cn=cn, a=self.appname, e=e)
+ msg = _("Got a {cn} performing {a}: {e}").format(cn=cn, a=self.appname, e=e)
self.error_data.append(msg)
LOG.error(msg)
except Exception as e:
cn = e.__class__.__name__
- msg = _("Got a {cn} performing {a}: {e}").format(n=n, cn=cn, a=self.appname, e=e)
+ msg = _("Got a {cn} performing {a}: {e}").format(cn=cn, a=self.appname, e=e)
msg += "\n\nTraceback:\n{}".format(traceback.format_exc())
self.error_data.append(msg)
LOG.error(msg)
except Exception as e:
cn = e.__class__.__name__
- msg = _("Got a {cn} reading input data as JSON: {e}").format(n=n, cn=cn, e=e)
+ msg = _("Got a {cn} reading input data as JSON: {e}").format(cn=cn, e=e)
msg += "\n" + _("Input data: {!r}").format(self.data)
LOG.error(msg)
self.error_data.append(msg)
body = ngettext(
'Error while processing {!r}:',
'Errors while processing {!r}:',
- s=s, a=self.appname)
+ len(self.error_data)).format(self.appname) + '\n\n'
subject = ngettext(
'Puppetmaster error processing {!r}',
'Puppetmaster errors processing {!r}',
return
server = smtplib.SMTP(self.smtp_server, self.smtp_port)
- if 'REQUEST_METHOD' not in os.environ:
+ if not self.is_cgi:
if self.verbose > 2:
server.set_debuglevel(2)
elif self.verbose > 1:
# -------------------------------------------------------------------------
def read_cache_file(self, only_main_branches=True):
- LOG.debug("Searching for {!r} ...".format(self.cachefile))
- if not os.path.exists(self.cachefile):
+ LOG.debug("Searching for {!r} ...".format(str(self.cachefile)))
+ if not self.cachefile.exists():
raise UncriticalHookError(
- _("Cache file {!r} not found.").format(self.cachefile))
+ _("Cache file {!r} not found.").format(str(self.cachefile)))
- if not os.access(self.cachefile, os.R_OK):
+ if not os.access(str(self.cachefile), os.R_OK):
raise UncriticalHookError(
- _("Cache file {!r} not readable.").format(self.cachefile))
+ _("Cache file {!r} not readable.").format(str(self.cachefile)))
modules = ModuleInfoDict(
appname=self.appname, verbose=self.verbose, base_dir=self.base_dir,
sort_by_name=self.sort_by_name)
- LOG.debug("Reading {!r} ...".format(self.cachefile))
+ LOG.debug("Reading {!r} ...".format(str(self.cachefile)))
try:
- with open(self.cachefile, 'r', **self.open_args) as fh:
+ with open(str(self.cachefile), 'r', **self.open_args) as fh:
for struct in yaml.load(fh):
module_info = ModuleInfo.init_from_data(
struct, appname=self.appname, verbose=self.verbose,
modules.append(module_info)
except yaml.YAMLError as e:
raise UncriticalHookError(
- _("Could not evaluate content of {f!r}: {e}").format(f=self.cachefile, e=e))
+ _("Could not evaluate content of {f!r}: {e}").format(f=str(self.cachefile), e=e))
if self.verbose > 3:
- LOG.debug("Content of {f!r}:\n{c}".format(f=self.cachefile, c=pp(modules.as_list())))
+ LOG.debug("Content of {f!r}:\n{c}".format(
+ f=str(self.cachefile), c=pp(modules.as_list())))
if not len(modules):
LOG.debug("Did not found any matching modules.")
from .base_app import BaseHookError, UncriticalHookError, BaseHookApp
+from .xlate import XLATOR
+
LOG = logging.getLogger(__name__)
+_ = XLATOR.gettext
+ngettext = XLATOR.ngettext
+
# =============================================================================
class GetModuleChangesError(BaseHookError):
def __str__(self):
"""Typecasting into a string for error output."""
- return "Puppet environment {!r} does not exists.".format(self.env_name)
+ return _("Puppet environment {!r} does not exists.").format(self.env_name)
# =============================================================================
re_env_name = re.compile(r'^[a-z0-9][a-z0-9_-]*[a-z0-9]$', re.IGNORECASE)
# -------------------------------------------------------------------------
- def __init__(self, appname=None, verbose=0, version=__version__):
+ def __init__(self, appname=None, base_dir=None, verbose=0, version=__version__):
"""Constructor."""
- description = textwrap.dedent('''\
- Generates a list of all Puppets modules, which are newer
- in Puppet forge than in a defined environment
- ''').strip()
+ description = _(
+ "Generates a list of all Puppets modules, which are newer "
+ "in Puppet forge than in a defined environment")
self._environment = self.default_env
super(GetModuleChangesApp, self).__init__(
- appname=appname, verbose=verbose, version=version, no_cc=True,
- description=description)
+ appname=appname, base_dir=base_dir, verbose=verbose, version=version,
+ no_cc=True, description=description)
# -----------------------------------------------------------
@property
@environment.setter
def environment(self, value):
if value is None:
- raise TypeError("An environment may not be None.")
+ raise TypeError(_("An environment may not be None."))
val = str(value).strip()
if val == '':
- raise ValueError("Invalid environment name: {!r}.".format(value))
+ raise ValueError(_("Invalid environment name: {!r}.").format(value))
if not self.re_env_name.match(val):
- raise ValueError("Invalid environment name: {!r}.".format(value))
+ raise ValueError(_("Invalid environment name: {!r}.").format(value))
self._environment = val
super(GetModuleChangesApp, self).init_arg_parser()
- arg_parser.add_argument(
- '-E', '--env', '--environment', metavar='ENVIRONMENT', dest='env',
- help=(
+ self.arg_parser.add_argument(
+ '-E', '--env', '--environment', metavar=_('ENVIRONMENT'), dest='env',
+ help=_(
"The Puppet environmment, which to compare with Puppet forge, "
"default: {!r}.").format(self.default_env),
)
super(GetModuleChangesApp, self).perform_arg_parser()
- if self.cmdline_args.env:
+ if self.args.env:
self.environment = self.cmdline_args.env
# -------------------------------------------------------------------------
def run_hook(self):
"""Main routine."""
- LOG.info("Here I go. ...")
+ LOG.info(_("Here I go. ..."))
module_infos = []
try:
module_infos = self.read_cache_file(only_main_branches=False)
env_found = False
- LOG.info("Checking verions of modules ...")
+ LOG.info(_("Checking verions of modules ..."))
version_infos = []
m=module_info.full_name, v=str(forge_version)))
if local_version < forge_version:
- LOG.info((
+ LOG.info(_(
"Version of module {m!r} on Puppet forge {fv!r} is newer than "
"the local version {lv!r}.").format(
m=module_info.full_name, lv=str(local_version), fv=str(forge_version)))
else:
- LOG.debug((
+ LOG.debug(_(
"Version of module {m!r} on Puppet forge {fv!r} is equal or older than "
"the local version {lv!r}.").format(
m=module_info.full_name, lv=str(local_version), fv=str(forge_version)))
# -------------------------------------------------------------------------
def generate_version_msgs(self, version_infos):
- self.mail_subject = (
+ self.mail_subject = _(
"Check for newer versions of Puppet modules in environment {!r}").format(
self.environment)
- self.mail_headline = (
+ self.mail_headline = _(
"Results of checking for newer versions of Puppet modules "
"in environment {!r}:").format(self.environment)
dt_str = dt.strftime('%Y-%m-%d %H:%M:%S %Z')
if not version_infos:
- msg = (
+ msg = _(
"Didn't found any modules in environment {!r} with a\n"
"newer version on Puppet Forge.\n\n:-D"
).format(self.environment)
if len(version_infos) != 1:
s = 's'
- self.error_data.append((
+ self.error_data.append(_(
"Found {n} module{s} in environment {e!r} with a "
"newer version on Puppet Forge.\n\n:-(\n").format(
n=len(version_infos), s=s, e=self.environment))
label = {
- 'name': 'Module',
- 'full_name': 'Full Module name',
- 'local_version': 'Used Version',
- 'forge_version': 'Version on Puppet Forge',
+ 'name': _('Module'),
+ 'full_name': _('Full Module name'),
+ 'local_version': _('Used Version'),
+ 'forge_version': _('Version on Puppet Forge'),
}
width = {}
for key in label.keys():
forge_version=str(version_info['forge_version'])
))
- self.error_data.append("\nChecked at: {}".format(dt_str))
+ self.error_data.append("\n" + _("Checked at: {}").format(dt_str))
# =============================================================================