#!/usr/bin/env python ## @file csfile.py # @brief Provide a module to read, write and treat with a csback # checksumfiles. # # ----------------------------------------------------------------------------- # # $Id$ # @author Daniel Armbruster # \date 15/09/2011 # # Purpose: Provide a module to read, write and treat with a csback # checksumfiles. # # ---- # This file is part of csback. # # csback is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # csback is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with csback. If not, see . # ---- # # Copyright (c) 2011-2012 by Daniel Armbruster # # REVISIONS and CHANGES # 15/09/2011 V0.1 Daniel Armbruster # 01/01/2012 V0.1.1 finished implementation # 04/01/2012 V0.1.3 implemented a debug mode using the python logging module # # ============================================================================= """ CsFile module to handle checksum files. """ import os import re import sys import pwd import hashlib import logging from datetime import datetime __version__ = "V0.1" __subversion__ = "$Id$" __license__ = "GPLv2" __author__ = "Daniel Armbruster" __copyright__ = "Copyright (c) 2012 by Daniel Armbruster" # ----------------------------------------------------------------------------- # global variables chunkSize = 1024 * 128 # 128kB csfileLoggerName = '' csfileLogInfo = {} # ----------------------------------------------------------------------------- # functions def getSubDirectories(path, regexes, followLinks=False): """ To generate a list of subdirectories of path using os.walk(). Note that path itself was not appended to the list. """ subDirs = set() try: # collect subdirectories for root, dirs, files in os.walk(path, followlinks=followLinks): for dir in dirs: subDirs.add(os.path.join(root, dir)) # exclude directories matching regexes for regex in regexes: matching = set(dir for dir in subDirs if None != re.match(regex, dir)) subDirs -= matching except OSError as err: raise CsFileError(83, "[Errno "+str(err.errno)+"] "+err.strerror+": " \ +err.filename) else: return subDirs def hasCsFile(path): """ Checks if path contains a checksumfile. Returns True if path contains a file named with an CsFile filename. """ return os.path.isfile(os.path.join(path, CsFile.filename)) # ----------------------------------------------------------------------------- class CsFileError(Exception): """ Exception class of csfile module. """ def __init__(self, line, msg): self.msg = msg self.line = line def display(self): sys.stderr.write("csfile (ERROR): " + str(self.msg) + "\n") sys.stderr.write("triggered in line: " + str(self.line) + "\n") # ----------------------------------------------------------------------------- class CsFile: """ Provides an interface to handle a csback checksumfile. A checksumfile possesses the ability to take the files for generating the checksums from a different sourcedirectory which can be configured with the srcpath variable. A checksumfiles usually contains checksumlines (type CsLine) of files. Generally this includes the files of the subdirectories in srcpath, too. """ def __init__(self, filedir, srcpath, hashfunc='sha256', recursive=True, \ followLinks=False): self.filedir = filedir self.__cslines = [] self.__hashfunc = hashfunc self.srcpath = srcpath self.recursive = recursive self.followLinks = False self.logger = logging.LoggerAdapter(logging.getLogger( \ csfileLoggerName+".CsFile"), csfileLogInfo) def read(self): """ Read a checksumfile. """ if not os.access(self.filedir, os.F_OK): raise CsFileError(134, "Invalid directory path.") path = os.path.join(self.filedir, CsFile.filename) # no checksumfile available -> create new file if os.access(self.filedir, os.F_OK) and not os.path.isfile(path): self.logger.info("Creating checksumfile in '%s'", self.filedir) try: csfile = open(path, 'w') except IOError as err: raise CsFileError(142, "[Errno "+str(err.errno)+"] "+err.strerror+": " \ +err.filename) else: csfile.close() # checksumfile available -> read file else: try: self.logger.debug("Start reading checksumfile '%s'",path) csfile = open(path) self.__cslines = [CsLine(line.split()) for line in csfile \ if len(line.rstrip()) and line[0] != '#'] except IOError as err: raise CsFileError(154, "[Errno "+str(err.errno)+"] "+err.strerror+": " \ +err.filename) else: csfile.close() self.logger.debug("Finished reading checksumfile '%s'", path) def write(self): """ Write the entire checksumfile. """ path = os.path.join(self.filedir, CsFile.filename) try: self.logger.debug("Start writing checksumfile '%s'",path) csfile = open(path, 'w') for csline in self.__cslines: self.logger.debug("Writing line: '%s'",str(csline)) if isinstance(csline, CsLine): csfile.write(str(csline) + '\n') else: raise CsFileError(173, "Argument must be of type CsLine.") except IOError as err: raise CsFileError(175, "[Errno "+str(err.errno)+"] "+err.strerror+": " \ +err.filename) else: csfile.close() self.logger.debug("Finished writing checksumfile '%s'", path) def append(self, cslines): """ Append checksum lines to the checksumfile. """ path = os.path.join(self.filedir, CsFile.filename) if 0 == len(cslines): self.logger.debug("Empty list passed. Nothing to append.") else: try: self.logger.debug("Start appending to checksumfile '%s'", path) csfile = open(path, 'a') for csline in cslines: self.logger.debug("Writing line '%s'", str(csline)) if isinstance(csline, CsLine): csfile.write(str(csline) + '\n') else: raise CsFileError(197, "Argument must be of type CsLine.") except IOError as err: raise CsFileError(199, "[Errno "+str(err.errno)+"] "+err.strerror+": " \ +err.filename) else: self.logger.debug("Finished appending to checksumfile '%s'", path) csfile.close() def update(self, regexes=[]): """ Update a checksum file. Also includes appending of not registered files to checksum file in current directory. """ if not isinstance(regexes, list): raise CsFileError(211, "Pass regular expressions in a list.") self.logger.debug("Updating checksumfile ...") # fetch cslines in current csfile self.read() self.logger.debug("Fetching files not registered yet.") registeredFiles = set(csline.path for csline in self.__cslines) # fetch files (pathes) newFiles = set() if self.recursive: self.logger.debug("Fetching files recursively.") for root, dirs, files in os.walk(self.srcpath, \ followlinks=self.followLinks): for file in files: newFiles.add(os.path.join(root, file)) else: self.logger.debug("Fetching files not recursively.") newFiles = set(os.path.join(self.srcpath, file) \ for file in os.listdir(self.srcpath) \ if os.path.isfile(os.path.join(self.srcpath, file))) # exclude files matching regexes regexes.append(os.path.join(self.srcpath,CsFile.filename)) for regex in regexes: matching = set(file for file in newFiles \ if None != re.match(regex, file)) newFiles -= matching regexes.remove(os.path.join(self.srcpath,CsFile.filename)) # exclude registered files newFiles -= registeredFiles # generate cslines of newFiles cslines = [] for file in newFiles: csline = CsLine(file, self.__hashfunc) csline.generate(chunkSize) cslines.append(csline) self.append(cslines) path = os.path.join(self.filedir, CsFile.filename) self.logger.debug("Update of checksumfile '%s' finished.", path) def check(self, regexes, beTolerant=False): """ Check a checksum file which means: Calculate checksum of a file which is located in self.srcpath and check results. Files in the checksumfile matching one of the regexes are excluded. If the third argument (beTolerant) is set to True the checking process will be successful, too, if a file listed in a checksumfile is not available anymore. Then the status of the checksum line will be set to 'warning'. Note that this function does not check if there are unregistered files in the directory. Adding checksum lines to the checksumfile has to be done by using the update function. """ if 0 == len(self.__cslines): self.logger.debug( \ "CSFILE does not contain any lines or had not been read yet.") # exclude those files matching regex in regexes self.logger.debug("Exclude files matching regexes") cslinesSet = set(self.__cslines) for regex in regexes: matching = set(csline for csline in self.__cslines \ if None != re.match(regex, csline.path)) cslinesSet -= matching # perform check self.logger.debug("Start checking checksums ...") for csline in self.__cslines: if csline in cslinesSet: path = csline.path.replace(self.filedir, self.srcpath) self.logger.debug( \ "Performing check of csline-file '%s' with file '%s'", csline.path, \ path) csline.check(path, beTolerant) self.logger.debug("Finished checking checksums.") def display(self): """ Display the content of the checksum file to stdout. """ if 0 == len(self.__cslines): self.logger.info("CSFILE does not contain any lines.") for line in self.__cslines: sys.stdout.write(line) filename = ".cs" # ----------------------------------------------------------------------------- class CsLine: """ Class to handle a checksum and further data for a registered file. """ def __init__(self, *args): self.logger = logging.LoggerAdapter(logging.getLogger( \ csfileLoggerName+".CsLine"), csfileLogInfo) if isinstance(args[0], list): argList = args[0] self.checksum = argList[0] self.path = argList[1] self.hashfunc = argList[2] self.creationDateFile = argList[3] self.creationLocationChecksum = argList[4] self.creationDateChecksum = argList[5] self.dateLastCheck = argList[6] self.statusLastCheck = argList[7] elif isinstance(args[0], str) and isinstance(args[1], str): self.checksum = '' self.path = args[0] self.hashfunc = args[1] self.creationDateFile = '' self.creationLocationChecksum = '' self.creationDateChecksum = '' self.dateLastCheck = '' self.statusLastCheck = '' else: CsFileError(325, "Invalid argument(s).") def generate(self, chunkSize): """ Generate the checksum and establish corresponding data for a file. The result is a fully configured checksum line. """ self.logger.debug("Calculating checksum for '%s'", self.path) # generate checksum try: hashfunc = hashlib.new(self.hashfunc) blockSize = chunkSize * hashfunc.block_size file = open(self.path, 'rb') data = file.read(blockSize) while data: hashfunc.update(data) data = file.read(blockSize) self.checksum = hashfunc.hexdigest() except IOError as err: raise CsFileError(344, "[Errno "+str(err.errno)+"] "+err.strerror+": " \ +err.filename) else: file.close() # set remaining data self.creationDateFile = \ datetime.fromtimestamp(os.path.getctime(self.path)).strftime( \ "%Y/%m/%d-%H:%M:%S") self.creationLocationChecksum = os.uname()[1] self.creationDateChecksum = datetime.now().strftime("%Y/%m/%d-%H:%M:%S") self.dateLastCheck = self.creationDateChecksum self.statusLastCheck = 'ok' def check(self, src, beTolerant=False): """ Check a checksum line. The checksum of the line will be compared with the checksum of the file located in src. In case no file had been found in src and the beTolerant flag had been set to True the status of the line will be set to 'warning' else to 'error'. If the checksums match the status was set to 'ok' else to 'error', too. """ hashfunc = hashlib.new(self.hashfunc) blockSize = chunkSize * hashfunc.block_size try: file = open(src, 'rb') except IOError: if beTolerant: self.statusLastCheck = 'warning' self.logger.warning("While checking file '%s' does not exist.", src) else: self.statusLastCheck = 'error' self.logger.error("While checking file '%s' does not exist.", src) else: # calculate checksum data = file.read(blockSize) while data: hashfunc.update(data) data = file.read(blockSize) checksum = hashfunc.hexdigest() file.close() # checks if checksum == self.checksum: self.statusLastCheck = 'ok' self.logger.debug("Check of file '%s' was successful.", src) else: self.statusLastCheck = 'error' self.logger.critical("File '%s' has no integrity anymore.", src) def __str__(self): """ String representation of a checksum line. """ return '{0} {1} {2} {3} {4} {5} {6} {7}'.format(self.checksum, self.path, \ self.hashfunc, self.creationDateFile, self.creationLocationChecksum, \ self.creationDateChecksum, self.dateLastCheck, self.statusLastCheck) # ----------------------------------------------------------------------------- # Tests if __name__ == '__main__': try: file = CsFile('/home/daniel/') file.read() except CsFileError as err: err.display() sys.exit() # ----- END OF csfile.py -----