Commit 0215cf7b authored by Deathcrow's avatar Deathcrow

optimizations + timeout fixes

parent 88ee83bc
......@@ -6,12 +6,13 @@
import argparse
import copy
import multiprocessing
import os
import re
import signal
import subprocess
import tempfile
import threading
#import threading
import time
def parse_senders( sender_args, sender ):
......@@ -77,6 +78,7 @@ def process_raw_tcpprobe_file(tcpprobe_file, sender):
tcpprobe_file.close()
def process_proc_tcpprobe(sender): #sampling
starty = time.perf_counter()
tcpprobe_proc = open("/proc/net/tcpprobe", "r")
unprocessed_senders = sender.copy()
......@@ -95,15 +97,51 @@ def process_proc_tcpprobe(sender): #sampling
break
else: # no break occured
if line == "": # this shouldn't happen. Try to continue anyway
print("CRITICAL ERROR!")
unprocessed_senders=[]
tcpprobe_proc.close()
def process_proc_tcpprobe_optimized(sender): #sampling
starty = time.perf_counter()
tcpprobe_proc = open("/proc/net/tcpprobe", "r")
unprocessed_senders = set([key for key in range(len(sender))])
# collect line from tcpprobe and write to file if it belongs to a
# unprocessed (one that didn't have a line written yet) sender
line_split = [tcpprobe_proc.readline().split() for i in unprocessed_senders]
while len(unprocessed_senders) > 0:
line_split = tcpprobe_proc.readline().split()
time_dif = time.perf_counter() - start_time
# try to match current tcpprobe line against all unprocessed senders
for key in unprocessed_senders:
if (line_split[1] == r"["+sender[key]['src']+r"]:"+sender[key]['src_port'] and line_split[2] == r"["+sender[key]['dst']+r"]:"+sender[key]['port']):
# write time + current tcpprobe line to file for this sender
unprocessed_senders.discard(key)
sender[key]['tcpprobe_file'].write(str(time_dif)+" "+" ".join(line_split[1:]))
break
else: # no break occured
if line_split == []: # this shouldn't happen. Try to continue anyway
print("CRITICAL ERROR!")
unprocessed_senders={}
#TIMEOUT
if time.perf_counter() - starty > float(args.samplerate):
unprocessed_senders={}
tcpprobe_proc.close()
def signal_handler(signum, frame):
clean_up()
raise SystemExit
def testy(a,b):
print(a)
print(b)
print(sender)
return 0
def clean_up():
# Clean up
if args.legacy:
......@@ -117,10 +155,10 @@ def clean_up():
# i['utility_file'].close()
time.sleep(1)
def start_utility(single_sender, scheduled_time):
def start_utility(sender_key, scheduled_time):
time.sleep(abs(scheduled_time - time.perf_counter()))
single_sender['utility'] = subprocess.Popen(single_sender['utility_command'],
stdout=single_sender['utility_file'])
return subprocess.Popen(sender[sender_key]['utility_command'],
stdout=sender[sender_key]['utility_file'])
......@@ -230,19 +268,27 @@ for key, i in enumerate(sender):
#setup done, run tests
start_time = time.perf_counter()
for key, i in enumerate(sender):
with multiprocessing.Pool(min(multiprocessing.cpu_count(), len(sender))) as pool:
scheduled_time = time.perf_counter() + 5
# for key, i in enumerate(sender):
# i['utility'] = subprocess.Popen(i['utility_command'], stdout=i['utility_file'])
# try to run all senders at the same time through threading 5 seconds from now
scheduled_time = time.perf_counter() + 5
# StartUtilityThread(i, scheduled_time).start()
i['thread'] = threading.Thread(target=start_utility, args=(i, scheduled_time))
i['thread'].start()
# i['utility'] = pool.apply(start_utility, [key, scheduled_time])
# i['utility'] = pool.apply(testy, [start_time, 0])
pool_results = [pool.apply_async(start_utility, [key, scheduled_time]) for key, i in enumerate(sender)]
# print(pool_results[0].get())
# i['thread'] = multiprocessing.Process(target=start_utility, args=(key, scheduled_time))
# i['thread'].start()
pool.close()
pool.join()
#time.sleep(5)
for key, i in enumerate(sender):
i['thread'].join()
print(i['utility'].args)
for key, i in enumerate(sender):
# i['thread'].join()
i['utility'] = pool_results[key].get()
print(i['utility'].args)
if args.utility != "netperf":
ss_file = tempfile.TemporaryFile(mode='r+')
......@@ -285,15 +331,17 @@ if args.utility != "netperf":
if not args.legacy:
while time.perf_counter() - start_time < float(args.time):
# print("begin while")
while_start = time.perf_counter() # remember beginning of this loop
process_proc_tcpprobe(sender) #heavy lifting here
process_proc_tcpprobe_optimized(sender) #heavy lifting here
try:
# sleep samplerate minus time spent in loop
time.sleep(float(args.samplerate) - (time.perf_counter() - while_start))
except ValueError: # negative sleep
pass
# print("End while")
else:
time.sleep(float(args.time))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment