#!/usr/bin/env python ## @file csfile.py # @brief Provide a module to read, write and treat with a csback # checksumfiles. # # ----------------------------------------------------------------------------- # # $Id$ # @author Daniel Armbruster # \date 15/09/2011 # # Purpose: Provide a module to read, write and treat with a csback # checksumfiles. # # ---- # This file is part of csback. # # csback is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # csback is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with csback. If not, see . # ---- # # Copyright (c) 2011-2012 by Daniel Armbruster # # REVISIONS and CHANGES # 15/09/2011 V0.1 Daniel Armbruster # 01/01/2012 V0.1.1 finished implementation # 04/01/2012 V0.1.3 implemented a debug mode using the python logging module # # ============================================================================= """ CsFile module to handle checksum files. """ import os import re import sys import pwd import hashlib import logging from datetime import datetime __version__ = "V0.1" __subversion__ = "$Id$" __license__ = "GPLv2" __author__ = "Daniel Armbruster" __copyright__ = "Copyright (c) 2012 by Daniel Armbruster" # ----------------------------------------------------------------------------- # global variables chunkSize = 1024 * 128 # 128kB csfileLoggerName = '' csfileLogInfo = {} # ----------------------------------------------------------------------------- # functions def getSubDirectories(path, regexes, followLinks=False): """ To generate a list of subdirectories of path using os.walk(). Note that path itself was not appended to the list. """ subDirs = set() try: # collect subdirectories for root, dirs, files in os.walk(path, True, None, followLinks): for dir in dirs: subDirs.add(os.path.join(root, dir)) # exclude directories matching regexes for regex in regexes: matching = set(dir for dir in subDirs if None != re.match(regex, dir)) subDirs -= matching except OSError as err: raise CsFileError(83, "[Errno "+str(err.errno)+"] "+err.strerror+": " \ +err.filename) else: return subDirs # ----------------------------------------------------------------------------- class CsFileError(Exception): """ Exception class of csfile module. """ def __init__(self, line, msg): self.msg = msg self.line = line def display(self): sys.stderr.write("csfile (ERROR): " + str(self.msg) + "\n") sys.stderr.write("triggered in line: " + str(self.line) + "\n") # ----------------------------------------------------------------------------- class CsFile: """ Provides an interface to handle a csback checksumfile. A checksumfile possesses the ability to take the files for generating the checksums from a different sourcdirectory which can be configured with the srcpath variable. """ def __init__(self, filedir, srcpath, hashfunc='sha256'): self.filedir = filedir self.filename = ".cs" self.__cslines = [] self.__hashfunc = hashfunc self.srcpath = srcpath self.logger = logging.LoggerAdapter(logging.getLogger( \ csfileLoggerName+".CsFile"), csfileLogInfo) def read(self): """ Read a checksumfile. """ if not os.access(self.filedir, os.F_OK): raise CsFileError(122, "Invalid directory path.") path = os.path.join(self.filedir, self.filename) # no checksumfile available -> create new file if os.access(self.filedir, os.F_OK) and not os.path.isfile(path): self.logger.info("Creating checksumfile in %s", self.filedir) try: csfile = open(path, 'w') except IOError as err: raise CsFileError(130, "[Errno "+str(err.errno)+"] "+err.strerror+": " \ +err.filename) else: csfile.close() # checksumfile available -> read file else: try: self.logger.debug("Start reading checksumfile %s",path) csfile = open(path) self.__cslines = [CsLine(line.split()) for line in csfile \ if len(line.rstrip()) and line[0] != '#'] except IOError as err: raise CsFileError(142, "[Errno "+str(err.errno)+"] "+err.strerror+": " \ +err.filename) else: csfile.close() self.logger.debug("Finished reading checksumfile: %s", path) def write(self): """ Write the entire checksumfile. """ path = os.path.join(self.filedir, self.filename) try: self.logger.debug("Start writing checksumfile: %s",path) csfile = open(path, 'w') for csline in self.__cslines: self.logger.debug("Writing line: %s",str(csline)) if isinstance(csline, CsLine): csfile.write(str(csline) + '\n') else: raise CsFileError(161, "Argument must be of type CsLine.") except IOError as err: raise CsFileError(163, "[Errno "+str(err.errno)+"] "+err.strerror+": " \ +err.filename) else: csfile.close() self.logger.debug("Finished writing checksumfile: %s", path) def append(self, cslines): """ Append checksum lines to the checksumfile. """ path = os.path.join(self.filedir, self.filename) if 0 == len(cslines): self.logger.debug("Empty list passed. Nothing to append.") try: self.logger.debug("Start appending to checksumfile: %s", path) csfile = open(path, 'a') for csline in cslines: self.logger.debug("Writing line: %s", str(csline)) if isinstance(csline, CsLine): csfile.write(str(csline) + '\n') else: raise CsFileError(184, "Argument must be of type CsLine.") except IOError as err: raise CsFileError(186, "[Errno "+str(err.errno)+"] "+err.strerror+": " \ +err.filename) else: self.logger.debug("Finished appending to checksumfile: %s", path) csfile.close() def update(self, regexes=[]): """ Update a checksum file. Also includes appending of not registered files to checksum file in current directory. """ if not isinstance(regexes, list): raise CsFileError(198, "Pass regular expressions in a list.") self.logger.debug("Updating checksumfile ...") # fetch cslines in current csfile self.read() self.logger.debug("Fetching files not registered yet.") registeredFiles = set(csline.path for csline in self.__cslines) # fetch files (pathes) newFiles = os.listdir(self.srcpath) newFiles = set(os.path.join(self.srcpath, file) for file in newFiles \ if os.path.isfile(os.path.join(self.srcpath, file))) # exclude files matching regexes regexes.append(os.path.join(self.srcpath,self.filename)) for regex in regexes: matching = set(file for file in newFiles \ if None != re.match(regex, file)) newFiles -= matching regexes.remove(os.path.join(self.srcpath,self.filename)) # exclude registered files newFiles -= registeredFiles # generate cslines of newFiles cslines = [] for file in newFiles: csline = CsLine(file, self.__hashfunc) csline.generate(chunkSize) cslines.append(csline) self.append(cslines) path = os.path.join(self.filedir, self.filename) self.logger.debug("Update of checksumfile: %s finished.", path) def check(self, srcDir, beTolerant=False): """ Check a checksum file which means: 1. read checksum file 2, calculate checksum of file which is located in srcDir and check results 3. write the result to the checksum file If the third argument (beTolerant) is set to True the checking process will be successful, too, if a file listed in a checksumfile is not available anymore. Then the status of the checksum line will be set to 'warning'. Note that this function does not perform a check if there are unregistered files in the directory. Adding checksum lines to the checksumfile has to be done by the update function. """ self.logger.debug("Start checking checksums.") self.read() if 0 == len(self.__cslines): self.logger.debug("CSFILE does not contain any lines.") for csline in self.__cslines: filename = csline.path.split(os.sep)[-1] self.logger.debug("Performing check of file with source: %s", \ os.path.join(srcDir,filename)).display() csline.check(os.path.join(srcDir,filename), beTolerant) self.write() def displayLines(self): """ Display the content of the checksum file at stdout. """ if not len(self.__cslines): raise CsFileError(258, "CSFILE does not contain any lines.") for line in self.__cslines: sys.stdout.write(line) # ----------------------------------------------------------------------------- class CsLine: """ Class to handle a checksum and further data for a registered file. """ def __init__(self, *args): self.logger = logging.LoggerAdapter(logging.getLogger( \ csfileLoggerName+".CsLine"), csfileLogInfo) if isinstance(args[0], list): argList = args[0] self.checksum = argList[0] self.path = argList[1] self.hashfunc = argList[2] self.creationDateFile = argList[3] self.creationLocationChecksum = argList[4] self.creationDateChecksum = argList[5] self.dateLastCheck = argList[6] self.statusLastCheck = argList[7] elif isinstance(args[0], str) and isinstance(args[1], str): self.checksum = '' self.path = args[0] self.hashfunc = args[1] self.creationDateFile = '' self.creationLocationChecksum = '' self.creationDateChecksum = '' self.dateLastCheck = '' self.statusLastCheck = '' else: CsFileError(290, "Invalid argument(s).") def generate(self, chunkSize): """ Generate the checksum and establish corresponding data for a file. The result is a fully configured checksum line. """ self.logger.debug("Calculating checksum for file: %s", self.path) # generate checksum try: hashfunc = hashlib.new(self.hashfunc) blockSize = chunkSize * hashfunc.block_size file = open(self.path, 'rb') data = file.read(blockSize) while data: hashfunc.update(data) data = file.read(blockSize) self.checksum = hashfunc.hexdigest() except IOError as err: raise CsFileError(309, "[Errno "+str(err.errno)+"] "+err.strerror+": " \ +err.filename) else: file.close() # set remaining data self.creationDateFile = \ datetime.fromtimestamp(os.path.getctime(self.path)).strftime( \ "%Y/%m/%d-%H:%M:%S") self.creationLocationChecksum = os.uname()[1] self.creationDateChecksum = datetime.now().strftime("%Y/%m/%d-%H:%M:%S") self.dateLastCheck = self.creationDateChecksum self.statusLastCheck = 'ok' def check(self, src, beTolerant=False): """ Check a checksum line. The checksum of the line will be compared with the checksum of the file located in src. In case no file had been found in src and the beTolerant flag had been set to True the status of the line will be set to 'warning' else to 'error'. If the checksums match the status was set to 'ok' else to 'error', too. """ hashfunc = hashlib.new(self.hashfunc) blockSize = chunkSize * hashfunc.block_size try: file = open(src, 'rb') except IOError: if beTolerant: self.statusLastCheck = 'warning' self.logger.warning("While checking file %s does not exist", src) else: self.statusLastCheck = 'error' self.logger.error("While checking file %s does not exist", src) else: # calculate checksum data = file.read(blockSize) while data: hashfunc.update(data) data = file.read(blockSize) checksum = hashfunc.hexdigest() file.close() # checks if checksum == self.checksum: self.statusLastCheck = 'ok' self.logger.debug("Check of file: %s was successful.", src) else: self.logger.critical("File %s has no integrity anymore.", src) def __str__(self): """ String representation of a checksum line. """ return '{0} {1} {2} {3} {4} {5} {6} {7}'.format(self.checksum, self.path, \ self.hashfunc, self.creationDateFile, self.creationLocationChecksum, \ self.creationDateChecksum, self.dateLastCheck, self.statusLastCheck) # ----------------------------------------------------------------------------- # Tests if __name__ == '__main__': try: file = CsFile('/home/daniel/') file.read() except CsFileError as err: err.display() sys.exit() # ----- END OF csfile.py -----