import time
import gzip
import bz2
+import zipfile
from LogRotateConfig import LogrotateConfigurationError
from LogRotateConfig import LogrotateConfigurationReader
if definition['copytruncate'] or definition['copy']:
# Copying logfile to target
- msg = _("Copying file '%(from)s' => '%(to)'.") \
+ msg = _("Copying file '%(from)s' => '%(to)s'.") \
% {'from': file_from, 'to': file_to }
self.logger.info(msg)
if not self.test:
self.logger.debug(msg)
found_files = glob.glob(pattern)
for oldfile in found_files:
- if os.path.samefile(oldfile, logfile):
+ oldfile = os.path.abspath(oldfile)
+ if oldfile == logfile:
continue
statinfo = os.stat(oldfile)
result[oldfile] = statinfo.st_mtime
self._compress_internal_gzip(logfile, target)
elif command == 'internal_bzip2':
self._compress_internal_bzip2(logfile, target)
+ elif command == 'internal_zip':
+ self._compress_internal_zip(logfile, target)
else:
self._compress_external(logfile, target, command, compress_opts)
Compression of the given source file to the target file
with an external command.
- It raises a LogrotateHandlerError on some errors.
+ It raises a LogrotateHandlerError on uncoverable errors.
@param source: the source file to compress
@type source: str
% {'source': source, 'target': target, 'cmd': command}
self.logger.debug(msg)
+ #------------------------------------------------------------
+ def _copy_file_metadata(self, target, source=None, statinfo=None):
+ '''
+ Copy all metadata (owner, permissions, timestamps a.s.o) from
+ a source file onto a target file.
+ The target file must be exists.
+ Either an existing source file (parameter 'source') or the
+ statinfo of a former existing file (parameter 'statinfo') must
+ be given.
+
+ It raises a LogrotateHandlerError on uncoverable errors.
+
+ @param target: filename of an existing target file or directory
+ @type target: str
+ @param source: filename of an existing source file or directory
+ or None, if statinfo was given,
+ has precedence before a given statinfo
+ @type source: str or None
+ @param statinfo: stat object from os.stat() or None, if source was given
+ @type statinfo: stat-object or None
+
+ @return: success or not
+ @rtype: bool
+ '''
+
+ _ = self.t.lgettext
+
+ if source is None and statinfo is None:
+ msg = _("Neither 'target' nor 'statinfo' was given on calling _copy_file_metadata().")
+ raise LogrotateHandlerError(msg)
+ return False
+
+ if not os.path.exists(target):
+ msg = _("File or directory '%s' doesn't exists.") % (target)
+ if self.test:
+ self.logger.info(msg)
+ return True
+ self.logger.error(msg)
+ return False
+
+ new_statinfo = statinfo
+ old_statinfo = os.stat(target)
+
+ msg = _("Copying all file metadata to target '%s' ...") % (target)
+ self.logger.info(msg)
+
+ if source is not None:
+
+ # a source object was given
+
+ if not os.path.exists(source):
+ msg = _("File or directory '%s' doesn't exists.") % (source)
+ self.logger.error(msg)
+ return False
+
+ new_statinfo = os.stat(source)
+
+ # Copying permissions and timestamps from source to target
+ if self.verbose > 1:
+ msg = _("Copying permissions and timestamps from source '%(src)s' to target '%(target)s'.") \
+ % {'src': source, 'target': target}
+ self.logger.debug(msg)
+ if not self.test:
+ shutil.copystat(source, target)
+
+ else:
+
+ # a source statinfo was given
+
+ atime = new_statinfo.st_atime
+ mtime = new_statinfo.st_mtime
+ mode = new_statinfo.st_mode
+
+ # Setting atime and mtime
+ if self.verbose > 1:
+ msg = _("Setting atime and mtime of target '%s'.") % (target)
+ self.logger.debug(msg)
+ if not self.test:
+ try:
+ os.utime(target, (atime, mtime))
+ except OSError, e:
+ msg = _("Error on setting times on target file '%(target)s': %(err)s") \
+ % {'target': target, 'err': e.strerror}
+ self.logger.warning(msg)
+ return False
+
+ # Setting permissions
+ old_mode = old_statinfo.st_mode
+ if mode != old_mode:
+ if self.verbose > 1:
+ msg = _("Setting permissions of '%(target)s' to %(mode)4o.") \
+ % {'target': target, 'mode': new_mode}
+ self.logger.info(msg)
+ if not self.test:
+ try:
+ os.chmod(target, mode)
+ except OSError, e:
+ msg = _("Error on chmod of '%(target)s': %(err)s") \
+ % {'target': target, 'err': e.strerror}
+ self.logger.warning(msg)
+ return False
+
+ # Copying ownership from source to target
+ new_uid = new_statinfo.st_uid
+ new_gid = new_statinfo.st_gid
+ old_uid = old_statinfo.st_uid
+ old_gid = old_statinfo.st_gid
+
+ if (old_uid != new_uid) or (old_gid != new_gid):
+ if self.verbose > 1:
+ msg = _("Copying ownership from source to target.")
+ self.logger.debug(msg)
+ myuid = os.geteuid()
+ if myuid != 0:
+ msg = _("Only root may execute chown().")
+ if self.test:
+ self.logger.info(msg)
+ return True
+ else:
+ self.logger.warning(msg)
+ return False
+ if not self.test:
+ try:
+ os.chown(target, old_uid, old_gid)
+ except OSError, e:
+ msg = _("Error on chown of '%(file)s': %(err)s") \
+ % {'file': target, 'err': e.strerror}
+ self.logger.warning(msg)
+ return False
+
+ return True
+
+ #------------------------------------------------------------
+ def _compress_internal_zip(self, source, target):
+ '''
+ Compression of the given source file to the target file
+ with the Python module zipfile.
+
+ It raises a LogrotateHandlerError on some errors.
+
+ @param source: the source file to compress
+ @type source: str
+ @param target: the filename of the compressed file.
+ @type target: str
+
+ @return: success or not
+ @rtype: bool
+ '''
+
+ _ = self.t.lgettext
+
+ if self.verbose > 1:
+ msg = _("Compressing source '%(source)s' to target'%(target)s' with module zipfile.") \
+ % {'source': source, 'target': target}
+ self.logger.debug(msg)
+
+ if not self.test:
+
+ # open target for writing
+ f_out = None
+ try:
+ f_out = zipfile.ZipFile(
+ file=target,
+ mode='w',
+ compression=zipfile.ZIP_DEFLATED
+ )
+ except IOError, e:
+ msg = _("Error on open file '%(file)s' on writing: %(err)s") \
+ % {'file': target, 'err': str(e)}
+ self.logger.error(msg)
+ return False
+
+ basename = os.path.basename(source)
+ f_out.write(source, basename)
+ f_out.close()
+
+ self._copy_file_metadata(source=source, target=target)
+
+ # And last, but not least, delete uncompressed file
+ if self.verbose > 1:
+ msg = _("Deleting uncompressed file '%s' ...") % (source)
+ self.logger.debug(msg)
+
+ if not self.test:
+ try:
+ os.remove(source)
+ except OSError, e:
+ msg = _("Error removing uncompressed file '%(file)s': %(msg)") \
+ % {'file': source, 'msg': str(e) }
+ self.logger.error(msg)
+ return False
+
+ return True
#------------------------------------------------------------
def _compress_internal_gzip(self, source, target):
f_out.close()
f_in.close()
- # Copying permissions and timestamps from source to target
- if self.verbose > 1:
- msg = _("Copying permissions and timestamps from source to target.")
- self.logger.debug(msg)
- if not test_mode:
- shutil.copystat(source, target)
-
- # Copying ownership from source to target
- statinfo = os.stat(source)
- old_uid = statinfo.st_uid
- old_gid = statinfo.st_gid
- statinfo = os.stat(target)
- new_uid = statinfo.st_uid
- new_gid = statinfo.st_gid
-
- if (old_uid != new_uid) or (old_gid != new_gid):
- if self.verbose > 1:
- msg = _("Copying ownershipfrom source to target.")
- self.logger.debug(msg)
- if not test_mode:
- try:
- os.chown(target, old_uid, old_gid)
- except OSError, e:
- msg = _("Error on chown of '%(file)s': %(err)s") \
- % {'file': target, 'err': e.strerror}
- self.logger.warning(msg)
+ self._copy_file_metadata(source=source, target=target)
# And last, but not least, delete uncompressed file
if self.verbose > 1:
f_out.close()
f_in.close()
- # Copying permissions and timestamps from source to target
- if self.verbose > 1:
- msg = _("Copying permissions and timestamps from source to target.")
- self.logger.debug(msg)
- if not test_mode:
- shutil.copystat(source, target)
-
- # Copying ownership from source to target
- statinfo = os.stat(source)
- old_uid = statinfo.st_uid
- old_gid = statinfo.st_gid
- statinfo = os.stat(target)
- new_uid = statinfo.st_uid
- new_gid = statinfo.st_gid
-
- if (old_uid != new_uid) or (old_gid != new_gid):
- if self.verbose > 1:
- msg = _("Copying ownershipfrom source to target.")
- self.logger.debug(msg)
- if not test_mode:
- try:
- os.chown(target, old_uid, old_gid)
- except OSError, e:
- msg = _("Error on chown of '%(file)s': %(err)s") \
- % {'file': target, 'err': e.strerror}
- self.logger.warning(msg)
+ self._copy_file_metadata(source=source, target=target)
# And last, but not least, delete uncompressed file
if self.verbose > 1: