#!/usr/bin/env python ## @file csfile.py # @brief Provide a module to read, write and treat with a csback # checksumfiles. # # ----------------------------------------------------------------------------- # # $Id$ # @author Daniel Armbruster # \date 15/09/2011 # # Purpose: Provide a module to read, write and treat with a csback # checksumfiles. # # ---- # This file is part of csback. # # csback is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # csback is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with csback. If not, see . # ---- # # Copyright (c) 2011 by Daniel Armbruster # # REVISIONS and CHANGES # 15/09/2011 V0.1 Daniel Armbruster # 01/01/2012 V0.1.1 finished implementation # 02/01/2012 V0.1.2 implemented debugMode # # ============================================================================= """ CsFile module to handle checksum files. """ import os import re import sys import hashlib from datetime import datetime __version__ = "V0.1" __subversion__ = "$Id$" __license__ = "GPLv2" __author__ = "Daniel Armbruster" __copyright__ = "Copyright (c) 2012 by Daniel Armbruster" # ----------------------------------------------------------------------------- # global variables chunkSize = 1024 * 128 # 128kB debugMode = False # ----------------------------------------------------------------------------- class CsFileError(Exception): """ Exception class of csfile module. """ def __init__(self, line, msg): self.msg = msg self.line = line def display(self): sys.stderr.write("csfile (ERROR): " + str(self.msg) + "\n") sys.stderr.write("triggered in line: " + str(self.line) + "\n") class CsReport(CsFileError): """ Auxiliary class for debugging. """ def display(self): sys.stderr.write("csfile (REPORT): " + str(self.msg) + "\n") sys.stderr.write("triggered in line: " + str(self.line) + "\n") # ----------------------------------------------------------------------------- class CsFile: """ Provides an interface to handle a csback checksum file. """ def __init__(self, filepath, hashfunc='sha256'): self.__filepath = filepath self.__filename = ".cs" self.__cslines = [] self.__hashfunc = hashfunc self.createFile() def createFile(self): path = self.__filepath+os.sep+self.__filename if not os.path.isfile(path): try: csfile = open(path, 'w') except IOError as err: raise CsFileError(99, "[Errno "+str(err.errno)+"] "+err.strerror+": " \ +err.filename) else: csfile.close() def read(self): """ Read a checksum file. """ path = self.__filepath+os.sep+self.__filename try: if debugMode: CsReport(111, "Start reading checksumfile: "+path).display() csfile = open(path) self.__cslines = [CsLine(line.split()) for line in csfile \ if len(line.rstrip()) and line[0] != '#'] except IOError as err: # Maybe better to create the file here and keep on going. # Will be managed later during further dev. raise CsFileError(118, "[Errno "+str(err.errno)+"] "+err.strerror+": " \ +err.filename) else: csfile.close() if debugMode: CsReport(123, "Finished reading checksumfile: "+path).display() def write(self): """ Write the entire checksumfile. """ path = self.__filepath+os.sep+self.__filename try: if debugMode: CsReport(132, "Start writing checksumfile: "+path).display() csfile = open(path, 'w') for csline in self.__cslines: if debugMode: CsReport(136, "Writing line: "+ str(csline)).display() if isinstance(csline, CsLine): csfile.write(str(csline) + '\n') else: raise CsFileError(140, "Argument must be of type CsLine.") except IOError as err: raise CsFileError(142, "[Errno "+str(err.errno)+"] "+err.strerror+": " \ +err.filename) else: csfile.close() if debugMode: CsReport(147, "Finished writing checksumfile: "+path) def append(self, cslines): """ Append checksum lines to the checksum file. """ path = self.__filepath+os.sep+self.__filename if 0 == len(cslines) and debugMode: CsReport(155, "Empty list passed. Nothing to append.").display() else: try: if debugMode: CsReport(159, "Start appending to checksumfile: "+path).display() csfile = open(path, 'a') for csline in cslines: if debugMode: CsReport(163, "Writing line: "+str(csline)).display() if isinstance(csline, CsLine): csfile.write(str(csline) + '\n') else: raise CsFileError(167, "Argument must be of type CsLine.") except IOError as err: raise CsFileError(169, "[Errno "+str(err.errno)+"] "+err.strerror+": " \ +err.filename) else: if debugMode: CsReport(173, "Finished appending to checksumfile: "+path).display() csfile.close() def update(self, regexes=[]): """ Update a checksum file. Also includes appending of not registered files to checksum file in current directory. """ if not isinstance(regexes, list): raise CsFileError(182, "Pass regular expressions in a list.") if debugMode: CsReport(184, "Updating checksumfile ...").display() # fetch cslines in current csfile self.read() if debugMode: CsReport(188, "Fetching files not registered yet.").display() registeredFiles = set(csline.path for csline in self.__cslines) # fetch files (pathes) excluding those matching regexes regexes.append(self.__filename) newFiles = set([os.path.join(self.__filepath, file) \ for file in os.listdir(self.__filepath) \ if os.path.isfile(os.path.join(self.__filepath,file)) \ for regex in regexes if None == re.match(regex, file)]) regexes.remove(self.__filename) # exclude registered files newFiles -= registeredFiles # generate cslines of newFiles cslines = [] for file in newFiles: csline = CsLine(file, self.__hashfunc) csline.generate(chunkSize) cslines.append(csline) print(csline) self.append(cslines) def check(self, srcDir=''): """ Check a checksum file which means: 1. read checksum file 2, calculate checksum of file which is located in srcDir and check results 3. write the result to the checksum file """ if debugMode: CsReport(216, "Start checking checksums.").display() self.read() if 0 == len(self.__cslines) and debugMode: CsReport(219, "CSFILE does not contain any lines.").display() for csline in self.__cslines: if not len(srcDir): if debugMode: CsReport(223, "Performing check of file with file itself.").display() csline.check() else: filename = csline.path.split(os.sep)[-1] if debugMode: CsReport(228, "Performing check of file with source: "+ \ srcDir+os.sep+filename).display() csline.check(srcDir+os.sep+filename) self.write() def displayLines(self): """ Display the content of the checksum file at stdout. """ if not len(self.__cslines): raise CsFileError("CSFILE does not contain any lines.", 193) for line in self.__cslines: sys.stdout.write(line) # ----------------------------------------------------------------------------- class CsLine: """ Class to handle a checksum and further data for a registered file. """ def __init__(self, *args): if isinstance(args[0], list): argList = args[0] self.checksum = argList[0] self.path = argList[1] self.hashfunc = argList[2] self.creationDateFile = argList[3] self.creationLocationChecksum = argList[4] self.creationDateChecksum = argList[5] self.dateLastCheck = argList[6] self.statusLastCheck = argList[7] elif isinstance(args[0], str) and isinstance(args[1], str): self.checksum = '' self.path = args[0] self.hashfunc = args[1] self.creationDateFile = '' self.creationLocationChecksum = '' self.creationDateChecksum = '' self.dateLastCheck = '' self.statusLastCheck = '' else: CsFileError(269, "Invalid argument") def generate(self, chunkSize): """ Generate the checksum and establish corresponding data for a file. The result is a fully configured checksum line. """ if debugMode: CsReport(277, "Calculating checksum for file: "+self.path).display() # generate checksum try: hashfunc = hashlib.new(self.hashfunc) blockSize = chunkSize * hashfunc.block_size file = open(self.path, 'rb') data = file.read(blockSize) while data: hashfunc.update(data) data = file.read(blockSize) self.checksum = hashfunc.hexdigest() except IOError as err: raise CsFileError(289, "[Errno "+str(err.errno)+"] "+err.strerror+": " \ +err.filename) else: file.close() # set remaining data self.creationDateFile = \ datetime.fromtimestamp(os.path.getctime(self.path)).strftime( \ "%Y/%m/%d-%H:%M:%S") self.creationLocationChecksum = os.uname()[1] self.creationDateChecksum = datetime.now().strftime("%Y/%m/%d-%H:%M:%S") self.dateLastCheck = self.creationDateChecksum self.statusLastCheck = 'ok' def check(self, src=''): """ Check a checksum line. Either compare the checksum of the line with the checksum of the file located at src or compare the checksum with the checksum of the file located at the path of its one checksum line. """ # calculate checksum try: hashfunc = hashlib.new(self.hashfunc) blockSize = chunkSize * hashfunc.block_size if 0 == len(src): file = open(self.path, 'rb') else: file = open(src, 'rb') data = file.read(blockSize) while data: hashfunc.update(data) data = file.read(blockSize) checksum = hashfunc.hexdigest() except IOError as err: # Maybe better to avoid Exception here and just put the status to notice raise CsFileError(324, "[Errno "+str(err.errno)+"] "+err.strerror+": " \ +err.filename) else: file.close() # checks if not len(self.checksum): raise CsFileError(330, "Caclulation of checksum was not successful.") if checksum == self.checksum: self.statusLastCheck = 'ok' else: self.statusLastCheck = 'error' def __str__(self): """ String representation of a checksum line. """ return '{0} {1} {2} {3} {4} {5} {6} {7}'.format(self.checksum, self.path, \ self.hashfunc, self.creationDateFile, self.creationLocationChecksum, \ self.creationDateChecksum, self.dateLastCheck, self.statusLastCheck) # ----------------------------------------------------------------------------- # Tests if __name__ == '__main__': try: debugMode = True file = CsFile('/home/daniel/') file.read() except CsFileError as err: err.display() sys.exit() # ----- END OF csfile.py -----