@type: file or None
'''
+ self.was_read = False
+ '''
+ @ivar: flag, whether the status file was read
+ @type: bool
+ '''
+
self.status_version = None
'''
@ivar: the version of the status file (2 or 3)
# add ch to logger
self.logger.addHandler(ch)
- # Initial read
+ # Initial read and check for permissions
self._read(must_exists = False)
+ self._check_permissions()
#-------------------------------------------------------
def as_dict(self):
res['test_mode'] = self.test_mode
res['logger'] = self.logger
res['file_state'] = self.file_state
+ res['was_read'] = self.was_read
return res
+ #------------------------------------------------------------
+ def get_rotation_date(self, logfile):
+ '''
+ Gives back the date of the last rotation of a particular logfile.
+ If this logfile is not found in the state file, datetime.min() is given back.
+
+ @param logfile: the logfile to query
+ @type logfile: str
+
+ @return: date of last rotation of this logfile
+ @rtype: datetime
+ '''
+
+ if not self.was_read:
+ self._read(must_exists = False)
+
+ rotate_date = datetime.min()
+ if logfile in self.file_state:
+ rotate_date = self.file_state[logfile]
+
+ return rotate_date
+
+ #------------------------------------------------------------
+ def set_rotation_date(self, logfile, rotate_date = None):
+ '''
+ Sets the rotation date of the given logfile.
+ If the rotation date is not given, datetime.utcnow() is used.
+
+ @param logfile: the logfile to set
+ @type logfile: str
+ @param rotate_date: the rotation date of this logfile
+ @type rotate_date: datetime or None
+
+ @return: date of rotation of this logfile (relative to UTC)
+ @rtype: datetime
+ '''
+
+ date_utc = datetime.utcnow()
+ if rotate_date:
+ date_utc = rotate_date.astimezone(utc)
+
+ _ = self.t.lgettext
+ msg = _("Setting rotation date of '%(file)s' to '%(date)s' ...") \
+ % {'file': logfile, 'date': date_utc.isoformat(' ') }
+ self.logger.debug(msg)
+
+ self._read(must_exists = False)
+ self.file_state[logfile] = date_utc
+
+
+ return date_utc
+
#------------------------------------------------------------
def __str__(self):
'''
pp = pprint.PrettyPrinter(indent=4)
return pp.pformat(self.as_dict())
+ #------------------------------------------------------------
+ def _check_permissions(self):
+ '''
+ Checks the permissions of the state file and/or his parent directory.
+ Throws a LogrotateStatusFileError on a error.
+
+ @return: success of check
+ @rtype: bool
+ '''
+
+ _ = self.t.lgettext
+ msg = _("Checking permissions of status file '%s' ...") % (self.file_name)
+ self.logger.debug(msg)
+
+ if os.path.exists(self.file_name):
+ # Check for write access to the status file
+ if os.access(self.file_name, os.W_OK):
+ msg = _("Access to status file '%s' is OK.") % (self.file_name)
+ self.logger.debug(msg)
+ return True
+ else:
+ msg = _("No write access to status file '%s'.") % (self.file_name)
+ if self.test_mode:
+ self.logger.error(msg)
+ else:
+ raise LogrotateStatusFileError(msg)
+ return False
+
+ parent_dir = os.path.dirname(self.file_name)
+ msg = _("Checking permissions of parent directory '%s' ...") % (parent_dir)
+ self.logger.debug(msg)
+
+ # Check for existence of parent dir
+ if not os.path.exists(parent_dir):
+ msg = _("Directory '%s' doesn't exists.") % (parent_dir)
+ if self.test_mode:
+ self.logger.error(msg)
+ else:
+ raise LogrotateStatusFileError(msg)
+ return False
+
+ # Check whether parent dir is a directory
+ if not os.path.isdir(parent_dir):
+ msg = _("Parent directory '%(dir)s' of status file '%(file)s' is not a directory.") \
+ % {'dir': parent_dir, 'file': self.file_name }
+ if self.test_mode:
+ self.logger.error(msg)
+ else:
+ raise LogrotateStatusFileError(msg)
+ return False
+
+ # Check for write access to parent dir
+ if not os.access(parent_dir, os.W_OK):
+ msg = _("No write access to parent directory '%(dir)s' of status file '%(file)s'.") \
+ % {'dir': parent_dir, 'file': self.file_name }
+ if self.test_mode:
+ self.logger.error(msg)
+ else:
+ raise LogrotateStatusFileError(msg)
+ return False
+
+ msg = _("Permissions to parent directory '%s' are OK.") % (parent_dir)
+ self.logger.debug(msg)
+ return True
+
#-------------------------------------------------------
def _read(self, must_exists = True):
'''
fd.close
self.fd = None
+ self.was_read = True
return True