Commit 60f51c0f authored by Daniel Armbruster's avatar Daniel Armbruster Committed by thomas.forbriger
Browse files

Implementation of CsFile and CsLine classes finished

This is a legacy commit from before 2015-05-18.
It may be incomplete as well as inconsistent.
See COPYING.legacy and README.history for details.

Not tested properly yet.


SVN Path:     http://gpitrsvn.gpi.uni-karlsruhe.de/repos/TFSoftware/trunk
SVN Revision: 4326
SVN UUID:     67feda4a-a26e-11df-9d6e-31afc202ad0c
parent f0cf2f09
#!/usr/bin/env python
## @file csfile.py
# @brief Provide a module to read, write and treat with a csback
# checksumfiles.
......@@ -34,6 +35,7 @@
# 15/09/2011 V0.1 Daniel Armbruster
#
# =============================================================================
""" CsFile module to handle checksum files. """
import os
import re
......@@ -55,66 +57,104 @@ chunkSize = 131072 # 128kB
# -----------------------------------------------------------------------------
class CsFileError(Exception):
def __init__(self, msg):
def __init__(self, msg, line):
self.msg = msg
self.line = line
def display(self):
sys.stderr.write("csFile (ERROR): " + self.msg + "\n")
sys.stderr.write("csfile (ERROR): " + self.msg + "\n")
sys.stderr.write("triggered in line: " + str(self.line) + "\n")
# -----------------------------------------------------------------------------
class CsFile:
"""
Provides an interface to handle a csback checksum file.
"""
def __init__(self, filepath, hashfunc='sha256'):
self.__filepath = filepath
self.__filename = ".cs"
self.__cslines = []
self.__hashfunc = hashfunc
createFile()
self.createFile()
def createFile(self):
path = self.__filepath+os.sep+self.__filename
if not os.path.isfile(path):
try:
csfile = open(path, 'w')
finally:
except IOError as err:
raise CsFileError("[Errno "+str(err.errno)+"] "+err.strerror+": " \
+err.filename, 87)
else:
csfile.close()
def read(self):
"""
Read a checksum file.
"""
path = self.__filepath+os.sep+self.__filename
if not os.path.isfile(path):
raise CsFileError("CSFILE does not exist.")
try:
csfile = open(path)
self.__cslines = [CsLine(line.split()) for line in csfile \
if len(line.rstrip()) and line[0] != '#']
finally:
except IOError as err:
# Maybe better to create the file here and keep on going.
# Will be managed later during further dev.
raise CsFileError("[Errno "+str(err.errno)+"] "+err.strerror+": " \
+err.filename, 104)
else:
csfile.close()
def write(self):
"""
Write the entire checksumfile.
"""
path = self.__filepath+os.sep+self.__filename
try:
csfile = open(path, 'w')
for csline in self.__cslines:
if isinstance(csline, CsLine):
csfile.write(str(csline) + '\n')
else:
raise CsFileError("Argument must be of type CsLine.", 116)
except IOError as err:
raise CsFileError("[Errno "+str(err.errno)+"] "+err.strerror+": " \
+err.filename, 119)
else:
csfile.close()
def append(self, cslines):
"""
Append checksum lines to the checksum file.
"""
path = self.__filepath+os.sep+self.__filename
# Maybe not necessary to raise an exception here
if 0 == len(cslines):
raise CsFileError("Invalid argument. Empty list.")
raise CsFileError("Invalid argument. Empty list.", 130)
try:
csfile = open(path, 'a')
for csline in cslines:
if isinstance(csline, CsLine):
csfile.write(str(csline) + '\n')
else:
raise CsFileError("Argument must be of type CsLine.")
raise CsFileError("Argument must be of type CsLine.", 137)
except IOError as err:
raise CsFileError(err.msg)
finally:
raise CsFileError("[Errno "+str(err.errno)+"] "+err.strerror+": " \
+err.filename, 140)
else:
csfile.close()
def update(self, regexes=[]):
if not isinstance(regexes, list)
raise CsFileError("Pass regular expressions in a list of strings")
"""
Update a checksum file. Also includes appending of not registered files to
checksum file in current directory.
"""
if not isinstance(regexes, list):
raise CsFileError("Pass regular expressions in a list of strings", 150)
# fetch cslines in current csfile
read()
self.read()
# IMPORTANT NOTE: Check again if it is more convenient to whole path or
sep = os.sep
registeredFiles = [csline.path.split(sep)[len(csline.path.split(sep))] \
registeredFiles = [csline.path.split(os.sep)[-1] \
for csline in self.__cslines]
# fetch files excluding regex matches
newFiles = list(set([file for file in os.listdir(self.__filepath) \
......@@ -126,37 +166,42 @@ class CsFile:
# generate cslines of newFiles
csLines = [CsLine(self.__filepath+os.sep+file, self.__hashfunc) \
for file in newFiles]
for csline in csLines
csline.generate(chunkSize)
append(csLines)
for line in csLines:
line.generate(chunkSize)
self.append(csLines)
def checkLines(self, srcDir):
def check(self, srcDir=''):
"""
Check a checksum file which means:
1. read checksum file
2, calculate checksum of file which is located in srcDir and check results
3. write the result to the checksum file
"""
self.read()
if 0 == len(self.__cslines):
raise CsFileError("CSFILE does not contain any lines.")
for line in self.__cslines:
# TODO: get here filename of file in checksum
line.check(srcDir)
raise CsFileError("CSFILE does not contain any lines.", 179)
for csline in self.__cslines:
if not len(srcDir):
csline.check()
else:
filename = csline.path.split(os.sep)[-1]
csline.check(srcDir+os.sep+filename)
self.write()
def displayLines(self):
if 0 == len(self.__cslines):
raise CsFileError("CSFILE does not contain any lines.")
"""
Display the content of the checksum file at stdout.
"""
if not len(self.__cslines):
raise CsFileError("CSFILE does not contain any lines.", 193)
for line in self.__cslines:
sys.stdout.write(line)
def addLine(self, csline):
if isinstance(csline, CsLine):
self.__cslines.append(csline)
else:
raise CsFileError("Argument must be of type CsLine.")
def removeLine(self, index):
if 0 <= index and len(self.__cslines) >= index:
del self.__cslines[index]
else:
raise CsFileError("Index out of range.")
# -----------------------------------------------------------------------------
class CsLine:
"""
Class to handle a checksum and further data for a registered file.
"""
def __init__(self, path, hashfunc='sha256'):
self.checksum = ''
......@@ -181,16 +226,24 @@ class CsLine:
self.statusLastCheck = argList[7]
def generate(self, chunkSize):
"""
Generate the checksum and establish corresponding data for a file. The
result is a fully configured checksum line.
"""
# generate checksum
try:
hashfunc = hashlib.new(self.hashfunc)
blockSize = chunkSize * hashfunc.block_size
with open(self.path, "rb") as file:
for chunk in iter(lambda: f.read(blockSize), ''):
for chunk in iter(lambda: file.read(blockSize), ''):
hasfunc.update(chunk)
self.checksum = hashfunc.hexdigest()
except IOError as err:
raise CsFileError(err.msg)
raise CsFileError("[Errno "+str(err.errno)+"] "+err.strerror+": " \
+err.filename, 240)
else:
file.close()
# set remaining data
self.creationDateFile = \
datetime.fromtimestamp(os.path.getctime(self.path)).strftime( \
......@@ -201,17 +254,55 @@ class CsLine:
self.statusLastCheck = 'ok'
def check(self, src=''):
pass
"""
Check a checksum line. Either compare the checksum of the line with the
checksum of the file located at src or compare the checksum with the checksum
of the file located at the path of its one checksum line.
"""
# calculate checksum
try:
hashfunc = hashlib.new(self.hashfunc)
blockSize = chunkSize * hashfunc.block_size
if 0 == len(src):
with open(self.path, "rb") as file:
for chunk in iter(lambda: file.read(blockSize), ''):
hasfunc.update(chunk)
else:
with open(src, "rb") as file:
for chunk in iter(lambda: file.read(blockSize), ''):
hasfunc.update(chunk)
checksum = hashfunc.hexdigest()
except IOError as err:
# Maybe better to avoid Exception here and just put the status to notice
raise CsFileError("[Errno "+str(err.errno)+"] "+err.strerror+": " \
+err.filename, 275)
else:
file.close()
# checks
if not len(self.checksum):
raise CsFileError("No checksum calculated yet.", 280)
if checksum == self.checksum:
self.statusLastCheck = 'ok'
else:
self.statusLastCheck = 'error'
def __str__(self):
"""
String representation of a checksum line.
"""
return '{0} {1} {2} {3} {4} {5} {6} {7}'.format(self.checksum, self.path, \
self.hashfunc, self.creationDateFile, self.creationLocationChecksum, \
self.creationDateChecksum, self.dateLastCheck, self.statusLastCheck)
# -----------------------------------------------------------------------------
# Tests
if __name__ == '__main__':
pass
try:
file = CsFile('/home/daniel/')
file.read()
except CsFileError as err:
err.display()
sys.exit()
# ----- END OF csfile.py -----
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment