import pwd
import copy
import glob
+import stat
# Third party modules
import six
from .cfg_app import PpCfgAppError, PpConfigApplication
-__version__ = '0.1.1'
+__version__ = '0.2.1'
LOG = logging.getLogger(__name__)
+UTC = datetime.timezone.utc
# =============================================================================
self.status_dir = self.default_status_dir
self.statusfile_base = self.default_statusfile_base
self.statusfile = os.path.join(self.status_dir, self.statusfile_base)
+ self.status_data = {}
self.passwd_data = {}
self.map_uid = {}
+ self.now = datetime.datetime.now(UTC)
description = textwrap.dedent('''\
This checks the utilization of the home directories on the NFS server
self.initialized = True
+ # -------------------------------------------------------------------------
+ def init_arg_parser(self):
+ """
+ Method to initiate the argument parser.
+
+ This method should be explicitely called by all init_arg_parser()
+ methods in descendant classes.
+ """
+
+ super(PpQuotaCheckApp, self).init_arg_parser()
+
+ def_mb = self.quota_kb / 1024
+
+ self.arg_parser.add_argument(
+ '-Q', '--quota',
+ metavar="MB", type=int, dest='quota_mb',
+ help="Quota value in MB (default: {} MB).".format(def_mb),
+ )
+
# -------------------------------------------------------------------------
def perform_config(self):
self.home_root_rel = os.path.relpath(self.home_root_abs, os.sep)
self.home_root_real = os.path.join(self.chroot_homedir, self.home_root_rel)
+ cmdline_quota = getattr(self.args, 'quota_mb', None)
+ if cmdline_quota is not None:
+ self.quota_kb = cmdline_quota * 1024
+
# -------------------------------------------------------------------------
def _run(self):
+ self.status_data = self.read_status_data()
+ self.status_data['last_check'] = self.now
self.read_passwd_data()
+ self.write_status_data()
+
+ # -------------------------------------------------------------------------
+ def pre_run(self):
+ """
+ Dummy function to run before the main routine.
+ Could be overwritten by descendant classes.
+
+ """
+
+ if os.geteuid():
+ msg = "Only root may execute this application."
+ LOG.error(msg)
+ self.exit(1)
+
+ super(PpQuotaCheckApp, self).pre_run()
+
+ # -------------------------------------------------------------------------
+ def read_status_data(self):
+
+ LOG.info("Reading status data from {!r} ...".format(self.statusfile))
+
+ if not os.path.exists(self.statusfile):
+ LOG.debug("Status file {!r} does not exists.".format(self.statusfile))
+ return
+
+ if not os.path.isfile(self.statusfile):
+ msg = "Status file {!r} is not a regular file.".format(self.statusfile)
+ LOG.error(msg)
+ self.exit(5)
+
+ if not os.access(self.statusfile, os.R_OK):
+ msg = "No read access to status file {!r}.".format(self.statusfile)
+ LOG.error(msg)
+ self.exit(6)
+
+ open_args = {}
+ if six.PY3:
+ open_args['encoding'] = 'utf-8'
+ open_args['errors'] = 'surrogateescape'
+
+ status = {}
+
+ with open(self.statusfile, 'r', **open_args) as fh:
+ try:
+ status = yaml.load(fh)
+ except yaml.YAMLError as e:
+ msg = "YAML error in status file {f!r}: {e}".format(
+ f=self.statusfile, e=e)
+ LOG.error(msg)
+ return {}
+
+ if not isinstance(status, dict):
+ status = {}
+
+ if self.verbose > 2:
+ LOG.debug("Status from {f!r}:\n{s}".format(
+ f=self.statusfile, s=pp(status)))
+
+ return status
+
+ # -------------------------------------------------------------------------
+ def write_status_data(self):
+
+ LOG.info("Writing status data from {!r} ...".format(self.statusfile))
+
+ if self.verbose > 2:
+ LOG.debug("Status to write:\n{}".format(pp(self.status_data)))
+
+ open_args = {}
+ if six.PY3:
+ open_args['encoding'] = 'utf-8'
+ open_args['errors'] = 'surrogateescape'
+
+ if not os.path.exists(self.status_dir):
+ LOG.info("Creating {!r} ...".format(self.status_dir))
+ mode = stat.S_IRWXU | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH | stat.S_IXOTH
+ os.makedirs(self.status_dir, mode, exist_ok=True)
+ elif not os.path.isdir(self.status_dir):
+ msg = "Status directory {!r} exists, but is ot a directory.".format(self.status_dir)
+ LOG.error(msg)
+ return
+
+ status_dump = yaml.dump(self.status_data, default_flow_style=False)
+ if self.verbose > 2:
+ LOG.debug("Writing YAML data:\n{}".format(status_dump))
+
+ with open(self.statusfile, 'w', **open_args) as fh:
+ fh.write(status_dump)
+
# -------------------------------------------------------------------------
def read_passwd_data(self):