Commit 58a8518c authored by Sebastian Hack's avatar Sebastian Hack
Browse files

Rewrote anything

adaption to new statev

[r15723]
parent 737e2130
#! /usr/bin/env python
import sys
import fileinput
import os
import re
import sha
import time
import stat
import profile
import sqlite3
import MySQLdb
import fileinput
import tempfile
import optparse
class DummyFilter:
def match(self, dummy):
return True
class EmitBase:
def create_table(self, cols, name, type, unique):
create = 'create table if not exists %s (id int %s' % (name, unique)
sorted = [None] * len(cols)
for x in cols.iterkeys():
sorted[cols[x]] = x
for x in sorted:
create += (', %s %s' % (x, type))
create += ');'
return create
class EmitMysqlInfile(EmitBase):
tmpfile_mode = stat.S_IREAD | stat.S_IROTH | stat.S_IWUSR
def ex(self, args, tab, fname):
res = os.fork()
if res == 0:
stmt = """load data infile '%s' into table %s fields terminated by ';'""" % (fname, tab)
conn = MySQLdb.connect(**args)
c = conn.cursor()
c.execute(stmt)
conn.commit()
sys.exit(0)
return res
def __init__(self, options, ctxcols, evcols):
args = dict()
if options.password:
args['passwd'] = options.password
if not options.host:
options.host = 'localhost'
args['user'] = options.user
args['host'] = options.host
args['db'] = options.database
self.conn = MySQLdb.connect(**args)
self.ctxcols = ctxcols
self.evcols = evcols
self.options = options
params = (tempfile.gettempdir(), os.sep, os.getpid())
self.evfifo = '%s%sstatev_ev_%d' % params
self.ctxfifo = '%s%sstatev_ctx_%d' % params
os.mkfifo(self.evfifo)
os.mkfifo(self.ctxfifo)
os.chmod(self.evfifo, self.tmpfile_mode)
os.chmod(self.ctxfifo, self.tmpfile_mode)
c = self.conn.cursor()
c.execute('drop table if exists ev')
c.execute('drop table if exists ctx')
c.execute(self.create_table(self.ctxcols, 'ctx', 'char(80)', 'unique'))
c.execute(self.create_table(self.evcols, 'ev', 'double default null', ''))
self.conn.commit()
if options.verbose:
print 'go for gold'
self.pidev = self.ex(args, 'ev', self.evfifo)
self.pidctx = self.ex(args, 'ctx', self.ctxfifo)
if options.verbose:
print "forked two mysql leechers: %d, %d" % (self.pidev, self.pidctx)
self.evfile = open(self.evfifo, 'w+t')
self.ctxfile = open(self.ctxfifo, 'w+t')
if options.verbose:
print 'fifo: %s, %o' % (self.evfile.name, os.stat(self.evfile.name).st_mode)
print 'fifo: %s, %o' % (self.ctxfile.name, os.stat(self.ctxfile.name).st_mode)
def ev(self, curr_id, evitems):
field = ['\N'] * len(self.evcols)
for key, val in evitems.iteritems():
index = self.evcols[key]
field[index] = val
print >> self.evfile, ('%d;' % curr_id) + ';'.join(field)
def ctx(self, curr_id, ctxitems):
field = ['\N'] * len(self.ctxcols)
for key, val in ctxitems.iteritems():
index = self.ctxcols[key]
field[index] = val
print >> self.ctxfile, ('%d;' % curr_id) + ';'.join(field)
def commit(self):
self.evfile.close()
self.ctxfile.close()
os.waitpid(self.pidev, 0)
os.waitpid(self.pidctx, 0)
os.unlink(self.evfile.name)
os.unlink(self.ctxfile.name)
class EmitSqlite3(EmitBase):
def __init__(self, options, ctxcols, evcols):
if os.path.isfile(options.database):
os.unlink(options.database)
self.conn = sqlite3.connect(options.database)
self.conn.execute(self.create_table(ctxcols, 'ctx', 'text', 'unique'))
self.conn.execute(self.create_table(evcols, 'ev', 'double', ''))
q = []
self.quests = []
for i in xrange(0, max(len(ctxcols), len(evcols))):
self.quests.append(','.join(q))
q.append('?')
def ev(self, curr_id, evitems):
keys = ','.join(evitems.keys())
stmt = 'insert into ev (id, %s) values (%s)' % (keys, self.quests[len(evitems) + 1])
self.conn.execute(stmt, (curr_id,) + tuple(evitems.values()))
from optparse import OptionParser
def ev_create_stmt(heads):
create = 'create table if not exists ev (id int, time int, timeev int'
for x in heads:
create += (', %s real' % (x,))
create += ');'
return create
def ctx_create_stmt(heads):
create = 'create table if not exists ctx (id int unique on conflict ignore'
for x in heads:
create += (', %s text' % (x,))
create += ');'
return create
def find_heads(file):
ctx_heads = set()
ev_heads = set()
for line in fileinput.input(file):
items = re.split('\s+', line)
if items[0] == 'E':
ev_heads.add(items[2])
elif items[0] == 'P':
ctx_heads.add(items[2])
return (ev_heads, ctx_heads)
def fill_tables(conn, file):
curr_id = 0
ids = dict()
valstack = []
keystack = []
for line in fileinput.input(file):
items = re.split('\s+', line)
op = items[0]
id = items[1]
if op == 'P':
key = items[2]
val = items[3]
keystack.append(key)
valstack.append(val)
dig = sha.new()
for i in xrange(0,len(keystack)):
dig.update(".")
dig.update(keystack[i])
dig.update("=")
dig.update(valstack[i])
hash = dig.digest()
if hash in ids:
id = ids[hash]
def ctx(self, curr_id, ctxitems):
keys = ','.join(ctxitems.keys())
stmt = 'insert into ctx (id, %s) values (%s)' % (keys, self.quests[len(ctxitems) + 1])
self.conn.execute(stmt, (curr_id,) + tuple(ctxitems.values()))
def commit(self):
self.conn.commit()
class Conv:
engines = { 'sqlite3': EmitSqlite3, 'mysql': EmitMysqlInfile }
def find_heads(self):
n_ev = 0
ctxind = 0
evind = 0
ctxcols = dict()
evcols = dict()
self.valid_keys = set()
for line in self.input():
heads = None
if line[0] == 'P':
ind = line.index(';', 2)
key = line[2:ind]
if not key in ctxcols:
ctxcols[key] = ctxind
ctxind += 1
elif line[0] == 'E':
ind = line.index(';', 2)
key = line[2:ind]
if self.filter.match(key):
self.n_events += 1
if not key in evcols:
self.valid_keys.add(key)
evcols[key] = evind
evind += 1
return (ctxcols, evcols)
def input(self):
return fileinput.FileInput(files=self.files, openhook=fileinput.hook_compressed)
def fill_tables(self):
ids = 0
curr_id = 0
keystack = []
idstack = []
curr_event = 0
last_prec = -1
evcols = dict()
ctxcols = dict()
for line in self.input():
items = line.strip().split(';')
op = items[0]
if op == 'P':
# flush the current events
if len(evcols):
self.emit.ev(curr_id, evcols)
evcols.clear()
# push the key
key = items[1]
val = items[2]
keystack.append(key)
curr_id = ids
ids += 1
idstack.append(curr_id)
ctxcols[key] = val
self.emit.ctx(curr_id, ctxcols)
elif op == 'O':
key = keystack.pop()
idstack.pop()
if len(idstack) > 0:
if len(evcols) > 0:
self.emit.ev(curr_id, evcols)
evcols.clear()
del ctxcols[key]
curr_id = idstack[-1]
else:
curr_id = -1
elif op == 'E':
key = items[1]
if key in self.valid_keys:
curr_event += 1
evcols[key] = items[2]
if self.verbose:
prec = curr_event * 10 / self.n_events
if prec > last_prec:
last_prec = prec
print '%10d / %10d' % (curr_event, self.n_events)
def __init__(self):
parser = optparse.OptionParser('usage: %prog [options] <event file...>')
parser.add_option("-c", "--clean", dest="clean", help="delete tables in advance", action="store_true", default=False)
parser.add_option("-v", "--verbose", dest="verbose", help="verbose messages", action="store_true", default=False)
parser.add_option("-f", "--filter", dest="filter", help="regexp to filter event keys", metavar="REGEXP")
parser.add_option("-u", "--user", dest="user", help="user", metavar="USER")
parser.add_option("-H", "--host", dest="host", help="host", metavar="HOST")
parser.add_option("-p", "--password", dest="password", help="password", metavar="PASSWORD")
parser.add_option("-d", "--db", dest="database", help="database", metavar="DB")
parser.add_option("-e", "--engine", dest="engine", help="eingine", metavar="ENG", default='sqlite3')
(options, args) = parser.parse_args()
self.n_events = 0
self.stmts = dict()
self.verbose = options.verbose
if len(args) < 1:
parser.print_help()
sys.exit(1)
self.files = []
files = args
for file in files:
if not os.path.isfile(file):
print "cannot find input file %s" % (file, )
else:
id = curr_id
ids[hash] = curr_id
curr_id = curr_id + 1
stmt = 'insert into ctx (id'
for x in keystack:
stmt += ', ' + x
stmt += ') values (' + str(id)
for x in valstack:
stmt += ', \'' + x + '\''
stmt += ');'
conn.execute(stmt)
elif op == 'O':
keystack.pop()
valstack.pop()
elif op == 'E':
key = items[2]
val = items[3]
time = items[4]
timeev = items[5]
t = (id, val, time, timeev)
stmt = 'insert into ev (id, %s, time, timeev) values (?, ?, ?, ?)' % (key,)
conn.execute(stmt, (id, val, time, timeev))
def main():
parser = OptionParser('usage: %prog [options] <sqlite3 database> <event file>')
parser.add_option("-c", "--clean", action="store_true", help="clean existing data base before adding data")
(options, args) = parser.parse_args()
if len(args) < 2:
parser.print_help()
sys.exit(1)
db = args[0]
file = args[1]
if not os.path.isfile(file):
print "cannot find input file %s" % (file, )
sys.exit(2)
have_to_clean = 0
if os.path.isfile(db):
if options.clean:
have_to_clean = 1
else:
print "database %s already exists (use different name or use -c)" % (db, )
self.files.append(file)
if len(self.files) < 1:
print "no input file to process"
sys.exit(3)
(ev_heads, ctx_heads) = find_heads(file)
if options.filter:
self.filter = re.compile(options.filter)
else:
self.filter = DummyFilter()
if options.engine in self.engines:
engine = self.engines[options.engine]
else:
print 'engine %s not found' % options.engine
print 'we offer: %s' % self.engines.keys()
sys.exit(0)
if options.verbose:
print "determining schema..."
(ctxcols, evcols) = self.find_heads()
if options.verbose:
print "context schema:"
print ctxcols
print "event schema:"
print evcols
conn = sqlite3.connect(db)
if have_to_clean:
conn.execute("drop table ctx")
conn.execute("drop table ev")
self.emit = engine(options, ctxcols, evcols)
conn.execute(ev_create_stmt(ev_heads))
conn.execute(ctx_create_stmt(ctx_heads))
fill_tables(conn, file)
conn.commit()
if options.verbose:
print "filling tables..."
self.fill_tables()
if options.verbose:
print "comitting..."
self.emit.commit()
if __name__ == "__main__":
main()
Conv()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment