From f3427c523b84eaf21c711be13764ccf5b8458fc6 Mon Sep 17 00:00:00 2001 From: Frank Brehm Date: Fri, 10 Feb 2023 16:30:10 +0100 Subject: [PATCH] Adding module dpx_puppettools.forge.mod_info for class ForgeModuleInfo. --- lib/dpx_puppettools/forge/mod_info.py | 823 ++++++++++++++++++++++++++ 1 file changed, 823 insertions(+) create mode 100644 lib/dpx_puppettools/forge/mod_info.py diff --git a/lib/dpx_puppettools/forge/mod_info.py b/lib/dpx_puppettools/forge/mod_info.py new file mode 100644 index 0000000..dcf34b9 --- /dev/null +++ b/lib/dpx_puppettools/forge/mod_info.py @@ -0,0 +1,823 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +@author: Frank Brehm +@contact: frank.brehm@pixelpark.com +@copyright: © 2023 by Frank Brehm, Digitas Pixelpark GmbH, Berlin +@summary: A module for encapsulating all information about + a Puppet module, which is provided by Puppet forge. +""" +from __future__ import absolute_import + +# Standard modules +import logging +import copy +import warnings +import datetime +import collections +import time + +from pathlib import Path + +# Third party modules +import six +import requests +import pytz +import yaml + +from requests.exceptions import ConnectionError, ReadTimeout, ConnectTimeout + +from fb_tools.common import to_bool + +# Own modules +from .. import pp, DEFAULT_FORGE_API_URL, DEFAULT_HTTP_TIMEOUT, MAX_HTTP_TIMEOUT +from .. import DEFAULT_VAR_DIR + +from ..errors import BaseHookError, BaseModuleInfoError + +from ..xlate import XLATOR + +from ..base_module_info import BaseModuleInfo + +from . import parse_forge_date, ForgeModuleInfoError + +from .mod_release_info import ModuleReleaseInfo +from .mod_release_list import ModuleReleaseList +from .cur_mod_release_info import CurrentModuleReleaseInfo +from .owner_info import ForgeOwnerInfo + + +__version__ = '0.4.0' + +LOG = logging.getLogger(__name__) + +_ = XLATOR.gettext +ngettext = XLATOR.ngettext + + +# ============================================================================= +class ReadForgeModuleInfoError(ForgeModuleInfoError): + """Exception, if the persistent forge module info file could not be read.""" + pass + + +# ============================================================================= +class WriteForgeModuleInfoError(ForgeModuleInfoError): + """Exception, if the persistent forge module info file could not be written.""" + pass + + +# ============================================================================= +class RetrieveForgeDataError(BaseHookError): + """Exception, there was an known exception in retrieving the module data + from Puppet forge.""" + + +# ============================================================================= +class ForgeModuleInfo(BaseModuleInfo): + """Class for encapsulating all information about a Puppet module from Puppet Forge.""" + + default_forge_uri = DEFAULT_FORGE_API_URL + default_http_timeout = DEFAULT_HTTP_TIMEOUT + max_http_timeout = MAX_HTTP_TIMEOUT + + open_args = {} + if six.PY3: + open_args = { + 'encoding': 'utf-8', + 'errors': 'surrogateescape', + } + + # ------------------------------------------------------------------------- + def __init__( + self, appname=None, verbose=0, version=__version__, base_dir=None, + initialized=None, name=None, vendor=None, full_name=None, + forge_uri=DEFAULT_FORGE_API_URL, http_timeout=DEFAULT_HTTP_TIMEOUT, + var_dir=DEFAULT_VAR_DIR): + + self.current_release = None + self.releases = None + self.owner = None + + self._created_at = None + self._deprecated_at = None + self._deprecated_for = None + self._downloads = None + self._endorsement = None + self._feedback_score = None + self._homepage_url = None + self._issues_url = None + self._module_group = None + self._slug = None + self._superseded_by = None + self._supported = None + self._updated_at = None + self._uri = None + + self._forge_uri = self.default_forge_uri + self._http_timeout = self.default_http_timeout + self._response_code = None + self._response_msg = None + self._ts_checked = None + self._var_dir = DEFAULT_VAR_DIR + + super(ForgeModuleInfo, self).__init__( + appname=appname, verbose=verbose, version=version, base_dir=base_dir, + initialized=False, name=name, vendor=vendor, full_name=full_name + ) + + self.forge_uri = forge_uri + self.http_timeout = http_timeout + self.var_dir = var_dir + + if initialized is not None: + self.initialized = initialized + + # ------------------------------------------------------------------------- + def as_dict(self, short=True): + + res = super(ForgeModuleInfo, self).as_dict(short=short) + + res['created_at'] = self.created_at + res['deprecated_for'] = self.deprecated_for + res['downloads'] = self.downloads + res['endorsement'] = self.endorsement + res['feedback_score'] = self.feedback_score + res['forge_uri'] = self.forge_uri + res['homepage_url'] = self.homepage_url + res['http_timeout'] = self.http_timeout + res['issues_url'] = self.issues_url + res['last_download_date'] = self.last_download_date + res['last_download_ts'] = self.last_download_ts + res['module_group'] = self.module_group + res['response_code'] = self.response_code + res['response_msg'] = self.response_msg + res['slug'] = self.slug + res['superseded_by'] = self.superseded_by + res['supported'] = self.supported + res['updated_at'] = self.updated_at + res['ts_checked'] = self.ts_checked + res['uri'] = self.uri + res['var_dir'] = self.var_dir + + return res + + # ------------------------------------------------------------------------- + @property + def created_at(self): + """Creation date of this forge module.""" + return self._created_at + + @created_at.setter + def created_at(self, value): + if value is None: + self._created_at = None + return + if isinstance(value, datetime.datetime): + self._created_at = value + return + v = str(value).strip() + if v == '': + self._created_at = None + return + self._created_at = parse_forge_date(v) + + # ------------------------------------------------------------------------- + @property + def deprecated_at(self): + """Date of deprecation of this forge module.""" + return self._deprecated_at + + @deprecated_at.setter + def deprecated_at(self, value): + if value is None: + self._deprecated_at = None + return + if isinstance(value, datetime.datetime): + self._deprecated_at = value + return + v = str(value).strip() + if v == '': + self._deprecated_at = None + return + self._deprecated_at = parse_forge_date(v) + + # ------------------------------------------------------------------------- + @property + def deprecated_for(self): + """The reason of deprecation of this forge module.""" + return self._deprecated_for + + @deprecated_for.setter + def deprecated_for(self, value): + if value is None: + self._deprecated_for = None + return + v = str(value).strip() + if v == '': + self._deprecated_for = None + return + self._deprecated_for = v + + # ------------------------------------------------------------------------- + @property + def downloads(self): + """The number of downloads of this module.""" + return self._downloads + + @downloads.setter + def downloads(self, value): + if value is None: + self._downloads = None + return + v = int(value) + if v < 0: + msg = _( + "The number of {w!r} must be greater or equal to zero " + "(Given: {v}).").format(w='downloads', v=value) + raise ValueError(msg) + self._downloads = v + + # ------------------------------------------------------------------------- + @property + def endorsement(self): + """The endorsement of this forge module..""" + return self._endorsement + + @endorsement.setter + def endorsement(self, value): + if value is None: + self._endorsement = None + return + v = str(value).strip() + if v == '': + self._endorsement = None + return + self._endorsement = v + + # ------------------------------------------------------------------------- + @property + def feedback_score(self): + """The number of feedback_scores of this module.""" + return self._feedback_score + + @feedback_score.setter + def feedback_score(self, value): + if value is None: + self._feedback_score = None + return + v = int(value) + if v < 0: + msg = _( + "The number of {w!r} must be greater or equal to zero " + "(Given: {v}).").format(w='feedback_score', v=value) + raise ValueError(msg) + self._feedback_score = v + + # ------------------------------------------------------------------------- + @property + def homepage_url(self): + """The homepage URL of this forge module.""" + return self._homepage_url + + @homepage_url.setter + def homepage_url(self, value): + if value is None: + self._homepage_url = None + return + v = str(value).strip() + if v == '': + self._homepage_url = None + return + self._homepage_url = v + + # ------------------------------------------------------------------------- + @property + def issues_url(self): + """The issues URL of this forge module.""" + return self._issues_url + + @issues_url.setter + def issues_url(self, value): + if value is None: + self._issues_url = None + return + v = str(value).strip() + if v == '': + self._issues_url = None + return + self._issues_url = v + + # ------------------------------------------------------------------------- + @property + def module_group(self): + """The module group of this forge module.""" + return self._module_group + + @module_group.setter + def module_group(self, value): + if value is None: + self._module_group = None + return + v = str(value).strip() + if v == '': + self._module_group = None + return + self._module_group = v + + # ------------------------------------------------------------------------- + @property + def slug(self): + """The slug of this forge module.""" + return self._slug + + @slug.setter + def slug(self, value): + if value is None: + self._slug = None + return + v = str(value).strip() + if v == '': + self._slug = None + return + self._slug = v + + # ------------------------------------------------------------------------- + @property + def superseded_by(self): + """The name of the superseding module this forge module.""" + return self._superseded_by + + @superseded_by.setter + def superseded_by(self, value): + if value is None: + self._superseded_by = None + return + if isinstance(value, collections.Mapping): + self._superseded_by = copy.copy(value) + return + + v = str(value).strip() + if v == '': + self._superseded_by = None + return + self._superseded_by = v + + # ------------------------------------------------------------------------- + @property + def supported(self): + """Is this forge module supported by Puppetlabs?.""" + return self._supported + + @supported.setter + def supported(self, value): + if value is None: + self._supported = None + return + self._supported = to_bool(value) + + # ------------------------------------------------------------------------- + @property + def updated_at(self): + """Last update date of this forge module.""" + return self._updated_at + + @updated_at.setter + def updated_at(self, value): + if value is None: + self._updated_at = None + return + if isinstance(value, datetime.datetime): + self._updated_at = value + return + v = str(value).strip() + if v == '': + self._updated_at = None + return + self._updated_at = parse_forge_date(v) + + # ------------------------------------------------------------------------- + @property + def uri(self): + """The URI of this forge module.""" + return self._uri + + @uri.setter + def uri(self, value): + if value is None: + self._uri = None + return + v = str(value).strip() + if v == '': + self._uri = None + return + self._uri = v + + # ------------------------------------------------------------------------- + @property + def ts_checked(self): + """The timestamp, when this module was checked on Puppet forge.""" + return self._ts_checked + + @ts_checked.setter + def ts_checked(self, value): + if value is None: + self._ts_checked = None + return + self._ts_checked = float(value) + + # ------------------------------------------------------------------------- + @property + def date_checked(self): + """A string representation of the check timestamp, if such one is given.""" + if self.ts_checked is None: + return None + dt = datetime.datetime.fromtimestamp(self.ts_checked, tz=pytz.utc) + return dt.strftime('%Y-%m-%d %H:%M:%S %Z') + + # ------------------------------------------------------------------------- + def set_ts_checked(self): + self._ts_checked = time.time() + + # ------------------------------------------------------------------------- + @property + def forge_uri(self): + """The URI of the Puppet forge API.""" + return self._forge_uri + + @forge_uri.setter + def forge_uri(self, value): + if value is None: + msg = _("The URI of the Puppet forge API must not be None.") + raise BaseModuleInfoError(msg) + v = str(value).strip() + if v == '': + msg = _("The URI of the Puppet forge API must not be empty.") + raise BaseModuleInfoError(msg) + self._forge_uri = v + + # ------------------------------------------------------------------------- + @property + def http_timeout(self): + """The number of downloads of this module.""" + return self._http_timeout + + @http_timeout.setter + def http_timeout(self, value): + if value is None: + msg = _("The HTTP timeout value must not be None.") + raise BaseModuleInfoError(msg) + try: + v = int(value) + except ValueError as e: + msg = _("Invalid value {!r} for a HTTP timeout:").format(value) + msg += ' ' + str(e) + raise BaseModuleInfoError(msg) + if v <= 0: + msg = _( + "The HTTP timeout must be greater than zero " + "(Given: {}).").format(value) + raise ValueError(msg) + if v > self.max_http_timeout: + msg = _( + "The HTTP timeout must not be greater than {max} seconds. " + "(Given: {v}.").formmat(max=self.max_http_timeout, v=value) + raise ValueError(msg) + self._http_timeout = v + + # ------------------------------------------------------------------------- + @property + def response_code(self): + """The numeric HTTP response code from the Puppet forge API.""" + return self._response_code + + # ------------------------------------------------------------------------- + @property + def response_msg(self): + """The textual HTTP response message from the Puppet forge API.""" + return self._response_msg + + # ------------------------------------------------------------------------- + @property + def var_dir(self): + """The directory containing variable data.""" + return self._var_dir + + @var_dir.setter + def var_dir(self, value) + if value is None: + msg = _("The var directory must not be None.") + raise BaseModuleInfoError(msg) + v = Path(value) + if not v.is_absolute(): + msg = _("The var directory must be an absolute path.").format(str(value)) + raise BaseModuleInfoError(msg) + self._var_dir = v + + # ------------------------------------------------------------------------- + def to_data(self): + """Returning a dict, which can be used to re-instantiate this module info.""" + + res = {} + + res['forge_data'] = {} + + res['forge_data']['deprecated_for'] = self.deprecated_for + res['forge_data']['downloads'] = self.downloads + res['forge_data']['endorsement'] = self.endorsement + res['forge_data']['feedback_score'] = self.feedback_score + res['forge_data']['homepage_url'] = self.homepage_url + res['forge_data']['issues_url'] = self.issues_url + res['forge_data']['module_group'] = self.module_group + res['forge_data']['name'] = self.name + res['forge_data']['slug'] = self.slug + res['forge_data']['superseded_by'] = self.superseded_by + res['forge_data']['supported'] = self.supported + res['forge_data']['uri'] = self.uri + + res['forge_data']['created_at'] = None + if self.created_at: + res['forge_data']['created_at'] = self.created_at.strftime('%Y-%m-%d %H:%M:%S %z') + + res['forge_data']['deprecated_at'] = None + if self.deprecated_at: + res['forge_data']['deprecated_at'] = self.deprecated_at.strftime( + '%Y-%m-%d %H:%M:%S %z') + + res['forge_data']['updated_at'] = None + if self.updated_at: + res['forge_data']['updated_at'] = self.updated_at.strftime('%Y-%m-%d %H:%M:%S %z') + + res['forge_data']['releases'] = [] + for release in self.releases: + res['forge_data']['releases'].append(release.to_data()) + + res['forge_data']['current_release'] = None + if self.current_release: + res['forge_data']['current_release'] = self.current_release.to_data() + + res['forge_data']['owner'] = None + if self.owner: + res['forge_data']['owner'] = self.owner.to_data() + + res['ts_checked'] = self.ts_checked + res['response_code'] = self.response_code + res['response_msg'] = self.response_msg + + return res + + # ------------------------------------------------------------------------- + def reset_all_data(self): + """Resetting all date, which are depending from forge, to empty values.""" + + if self.verbose > 2: + LOG.debug(_("Resetting all forge data ...")) + + self.current_release = None + self.releases = None + self.owner = None + + self._created_at = None + self._deprecated_at = None + self._deprecated_for = None + self._downloads = None + self._endorsement = None + self._feedback_score = None + self._homepage_url = None + self._issues_url = None + self._module_group = None + self._slug = None + self._superseded_by = None + self._supported = None + self._updated_at = None + self._uri = None + + self._response_code = None + self._response_msg = None + self._ts_checked = None + + # ------------------------------------------------------------------------- + def read(self, data_file=None): + """Reading the forge data from given .yaml-file.""" + + if not data_file: + for ext in ('.yaml', 'yml'): + fn = self.var_dir / 'forge' / (self.full_name + ext) + if fn.exists(): + data_file = fn + break + if not data_file: + fn = str(self.var_dir / 'forge' / (self.full_name + '.y(a?)ml')) + msg = _("Did not found the forge module info file {!r}.").format(fn) + raise ReadForgeModuleInfoError(msg) + + if not data_file.is_file(): + msg = _("Frge module info file {!r} is not a regular file.").format(str(data_file)) + raise ReadForgeModuleInfoError(msg) + if not os.access(data_file, os.R_OK): + msg = _("Frge module info file {!r} is not readable.").format(str(data_file)) + raise ReadForgeModuleInfoError(msg) + + data = None + try: + with data_file.open('r', **self.open_args) as fh: + data = yaml.safe_load(fh) + except yaml.YAMLError as e: + msg = _("Invalid YAML data found in file {!r):").format(str(data_file)) + msg += ' ' + str(e) + raise ReadForgeModuleInfoError(msg) + + self.apply_data(data) + + # ------------------------------------------------------------------------- + def apply_data(self, data): + + if 'forge_data' not in data: + msg = _("The given data seems not to be valid cached puppet forge data:") + msg += '\n' + pp(data) + raise ReadForgeModuleInfoError(msg) + + fd = data['forge_data'] + + self.releases = ModuleReleaseList( + appname=self.appname, verbose=self.verbose, base_dir=self.base_dir) + + for prop_name in ( + 'created_at', 'deprecated_at', 'deprecated_for', 'downloads', 'endorsement', + 'feedback_score', 'homepage_url', 'issues_url', 'module_group', 'slug', + 'superseded_by', 'updated_at', 'uri'): + if prop_name in fd and fd[prop_name]: + setattr(self, prop_name, fd[prop_name]) + + if 'supported' in fd: + self.supported = fd['supported'] + + if 'current_release' in fd and fd['current_release']: + self.current_release = CurrentModuleReleaseInfo.from_data( + fd['current_release'], appname=self.appname, + verbose=self.verbose, base_dir=self.base_dir) + + if 'releases' in fd: + for rel in fd['releases']: + release = ModuleReleaseInfo.from_data( + rel, appname=self.appname, verbose=self.verbose, base_dir=self.base_dir) + if release: + self.releases.append(release) + + self.releases.initialized = True + + if 'owner' in fd and fd['owner']: + self.owner = ForgeOwnerInfo.from_data( + fd['owner'], appname=self.appname, verbose=self.verbose, base_dir=self.base_dir) + + for prop_name in ('response_code', 'response_msg', 'ts_checked'): + if prop_name in data and data[prop_name]: + setattr(self, prop_name, data[prop_name]) + + # ------------------------------------------------------------------------- + def write(self, data_file=None): + """Writing the forge data into the given .yaml-file.""" + + if not data_file: + data_file = self.var_dir / 'forge' / (self.full_name + '.yaml') + + forge_dir = data_file.parent + if not forge_dir.is_dir(): + LOG.info(_("Creating directory {!r} ...").format(str(forge_dir))) + os.mkdir(str(forge_dir), mode=0o755) + + else: + forge_dir = data_file.parent + + data = self.to_data() + + try: + with data_file.open('w', **self.open_args) as fh: + yaml.safe_dump(data, fh) + except Exception as e: + msg = _("Got {ec} on writing puppet forge cache file {fn!r}: {e}").format( + ec=e.__class__.__name__, fn=str(data_file), e=e) + raise WriteForgeModuleInfoError(msg) + + # ------------------------------------------------------------------------- + @classmethod + def from_data(cls, data, appname=None, verbose=0, base_dir=None): + + if verbose > 3: + LOG.debug(_( + "Trying to instantiate a {}-object from:").format( + cls.__name__) + '\n' + pp(data)) + + if 'slug' not in data: + msg = _("Did not found {!r}-definition in data for forge module:").format( + 'slug') + '\n' + pp(data) + LOG.error(msg) + return None + + full_name = data['slug'].strip() + if full_name == '': + msg = _("Found empty {!r}-definition in data for forge module:").format( + 'slug') + '\n' + pp(data) + LOG.error(msg) + return None + + module_info = cls(appname=appname, verbose=verbose, base_dir=base_dir, full_name=full_name) + module_info.apply_data(data) + + return module_info + + # ------------------------------------------------------------------------- + def retrieve_forge_data(self): + + url = "{url}/{name}".format(url=self.forge_uri, name=self.full_name) + module_info = None + + self.reset_all_data() + + LOG.info(_("Trying to get module {m!r} from Puppet forge {u!r} ...").format( + m=self.full_name, u=url)) + + session = requests.Session() + response = None + + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + try: + response = session.request('GET', url, timeout=self.http_timeout) + except (ConnectionError, ReadTimeout, ConnectTimeout) as e: + msg = "{c} on getting module info for {m}: {e}".format( + c=e.__class__.__name__, m=full_name, e=e) + raise RetrieveForgeDataErrormsg) + if w: + warn_class = w[-1].category.__name__ + warn_msg = '{}: {}'.format( + warn_class, w[-1].message) + self.response_msg = warn_msg + if warn_class == 'SubjectAltNameWarning': + LOG.debug(warn_msg) + else: + LOG.warn(warn_msg) + + LOG.debug("Got status code: {}.".format(response.status_code)) + self.response_code = response.status_code + if not response.ok: + err = response.json() + err_msg = err['error'] + if self.response_msg: + self.response_msg += '\n' + err_msg + else: + self.response_msg = err_msg + LOG.debug("Did not found module {} on Puppet forge.".format(self.full_name)) + return None + + if not response.text: + LOG.warn(_("No output for URL {!r}.").format(url)) + return None + if self.verbose > 3: + msg = "Output:\n{}".format(response.text) + LOG.debug(msg) + + self.set_ts_checked() + data = response.json() + if self.verbose > 3: + LOG.debug("Performing forge data:\n" + pp(data)) + self.apply_data(data) + + if self.verbose > 2: + msg = _("Got {}:").format(self.__class__.__name__) + msg += '\n' + pp(self.as_dict()) + LOG.debug(msg) + + if self.superseded_by: + subst = self.superseded_by + if self.verbose > 2: + LOG.debug("Superseded info:\n" + pp(subst)) + if 'slug' in subst: + subst = subst['slug'] + LOG.info(_( + "Module {c!r} is deprecated at Puppet forge and should be substituted " + "by module {n!r}.").format(c=self.slug, n=subst)) + + # ------------------------------------------------------------------------- + @classmethod + def get_from_forge( + cls, full_name, forge_uri=DEFAULT_FORGE_API_URL, http_timeout=DEFAULT_HTTP_TIMEOUT, + appname=None, verbose=0, base_dir=None): + + module_info = cls( + appname=appname, verbose=verbose, base_dir=base_dir, full_name=full_name, + ) + + module_info.retrieve_forge_data() + + return module_info + + +# ============================================================================= +if __name__ == "__main__": + + pass + +# ============================================================================= + +# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 list -- 2.39.5