#!/usr/bin/env python3 # vim: expandtab shiftwidth=4 softtabstop=4 #iperf -V -Z cubic -t 30 -c fdb2:f689:4248:2bc8::3 -p 12345 import argparse import copy import multiprocessing import os import re import signal import subprocess import tempfile #import threading import time def parse_senders( sender_args, sender ): for sender_string in sender_args: sender_args = sender_string.split('#') if sender_args[0] is "": raise SystemExit if len(sender_args) < 7: new_sender = dict(zip(['src', 'dst', 'port', 'congestion', 'number', 'interface'], sender_args)) # set globals / defaults if 'dst' not in new_sender or new_sender['dst'] is "": new_sender['dst'] = args.receiver if 'port' not in new_sender or new_sender['port'] is "": new_sender['port'] = args.port if 'congestion' not in new_sender or new_sender['congestion'] is "": new_sender['congestion'] = args.congestion if 'number' not in new_sender or new_sender['number'] is "": new_sender['number'] = args.number_senders if 'interface' not in new_sender or new_sender['interface'] is "": new_sender['interface'] = "any" if args.utility == "netperf": new_sender['src_port'] = new_sender['port'] = str(int(args.netperf_dataport)+len(sender)) new_sender['tcpprobe_file'] = open(args.tmp_folder+"/tcpprobe_"+new_sender['congestion']+"_"+str(len(sender)), 'w+') sender.append(new_sender) if int(new_sender['number']) > 1: for j in range(int(new_sender['number'])-1): clone = new_sender.copy() clone['is_clone'] = True clone['tcpprobe_file'] = open(args.tmp_folder+"/tcpprobe_"+clone['congestion']+"_"+str(len(sender)), 'w+') if args.utility == "netperf": new_sender['src_port'] = new_sender['port'] = str(int(args.netperf_dataport)+len(sender)) sender.append(clone) else: print("Malformed senders") raise SystemExit def process_raw_tcpprobe_file(tcpprobe_file, sender): # processing for a continuously recorded tcp_probe file (deprecated) tcpprobe_file.seek(0) for line in tcpprobe_file: bandwith_interval = float(args.samplerate) line_split = line.split() for key, i in enumerate(sender): if line_split[1] == r"["+i['src']+r"]:"+i['src_port']: # and line_split[2][:-5] == r"["+i['dst']+r"]:": i['tcpprobe_file'].write(line) # i['tcpprobe_file'].close() # if not i.get('tcpprobe_time'): # i['tcpprobe_time'] = float(line_split[0]) # i['tcpprobe_seq'] = int(line_split[4], 0) # if float(line_split[0]) - i['tcpprobe_time'] > bandwith_interval: # bandwidth_file = open(args.tmp_folder+"/bw_"+i['congestion']+"_"+str(key), 'a') # write Mbps to file # bandwidth_file.write(" ".join(line_split[:3]+[str(abs(int(line_split[4], 0) - i['tcpprobe_seq']) / bandwith_interval * 8 / 1000 / 1000 )] )+"\n" ) # bandwidth_file.close() # i['tcpprobe_time'] = float(line_split[0]) # i['tcpprobe_seq'] = int(line_split[4], 0) tcpprobe_file.close() def process_proc_tcpprobe(sender): #sampling starty = time.perf_counter() tcpprobe_proc = open("/proc/net/tcpprobe", "r") unprocessed_senders = sender.copy() # collect line from tcpprobe and write to file if it belongs to a # unprocessed (one that didn't have a line written yet) sender while len(unprocessed_senders) > 0: line = tcpprobe_proc.readline() time_dif = time.perf_counter() - start_time line_split = line.split(" ") # try to match current tcpprobe line against all unprocessed senders for key, i in enumerate(unprocessed_senders): if (line_split[1] == r"["+i['src']+r"]:"+i['src_port'] and line_split[2] == r"["+i['dst']+r"]:"+i['port']): # write time + current tcpprobe line to file for this sender unprocessed_senders.pop(key)['tcpprobe_file'].write(str(time_dif)+" "+" ".join(line_split[1:])) break else: # no break occured if line == "": # this shouldn't happen. Try to continue anyway print("CRITICAL ERROR!") unprocessed_senders=[] tcpprobe_proc.close() def process_proc_tcpprobe_optimized(sender): #sampling starty = time.perf_counter() tcpprobe_proc = open("/proc/net/tcpprobe", "r") unprocessed_senders = set([key for key in range(len(sender))]) # collect line from tcpprobe and write to file if it belongs to a # unprocessed (one that didn't have a line written yet) sender while len(unprocessed_senders) > 0: line_split = tcpprobe_proc.readline().split(" ") time_dif = time.perf_counter() - start_time # try to match current tcpprobe line against all unprocessed senders for key in unprocessed_senders: if (line_split[1] == r"["+sender[key]['src']+r"]:"+sender[key]['src_port'] and line_split[2] == r"["+sender[key]['dst']+r"]:"+sender[key]['port']): # write time + current tcpprobe line to file for this sender unprocessed_senders.discard(key) sender[key]['tcpprobe_file'].write(str(time_dif)+" "+" ".join(line_split[1:])) break else: # no break occured if line_split == []: # this shouldn't happen. Try to continue anyway print("CRITICAL ERROR!") unprocessed_senders={} #TIMEOUT if time.perf_counter() - starty > float(args.samplerate): unprocessed_senders={} tcpprobe_proc.close() def signal_handler(signum, frame): clean_up() raise SystemExit def testy(a,b): print(a) print(b) print(sender) return 0 def clean_up(): # Clean up if args.legacy: tcpprobe.terminate() if args.tcpdump == "1": tcpdump.terminate() if args.cpunetlog != "": cpunetlog.terminate() for i in sender: # if i['utility'].returncode == None: try: i['utility'].kill() except ProcessLookupError: pass # i['utility_file'].close() time.sleep(1) def start_utility(sender_key, scheduled_time): time.sleep(abs(scheduled_time - time.perf_counter())) return subprocess.Popen(sender[sender_key]['utility_command'], stdout=sender[sender_key]['utility_file']) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) parser = argparse.ArgumentParser(description='Configure test environment.') parser.add_argument('-l', '--label', default="default") parser.add_argument('-u', '--utility', default='iperf') parser.add_argument('-t', '--time', default="60") parser.add_argument('-s', '--sender', action='append', nargs='+', required=True, \ help="Specify senders: ##### -- individual specifications overwrite globals") parser.add_argument('-r', '--receiver', default="localhost") parser.add_argument('-p', '--port', default="5001") parser.add_argument('-T', '--tmp-folder', default="/tmp") parser.add_argument('-Z', '--congestion', default="cubic") parser.add_argument('-N', '--no-delay', action='store_true') parser.add_argument('-4', '--use-ipv4', action='store_true') parser.add_argument('-n', '--number-senders', default="1", \ help="Amount of clones per specified sender") parser.add_argument('-C', '--comment', default="", \ help="Path to a (txt) file to be included in the final Tarball") parser.add_argument('-P', '--cpunetlog' , default="", \ help="Path to cpunetlog folder") parser.add_argument("--netperf-dataport", default="53699", \ help="Port for the data conncections by netperf. Will be incremented per sender") parser.add_argument('-D', '--tcpdump', default="1", \ help="Enable/Disable tcpdump (Default: enabled") parser.add_argument('-S', '--samplerate', default="0.1", \ help="Samplerate for measurements") parser.add_argument('--legacy', action='store_true', \ help="Record entire tcp_probe output instead of samples by samplerate. \ Can be less CPU intensive for certain samplerates. \ Consider using full=0 for tcp_probe module") args = parser.parse_args() mypid = os.getpid() flat_sender_args = [item for sublist in args.sender for item in sublist] # make 1-dimensional list (argument parser provides list of lists) #sender = [{} for i in range(len(flat_sender_args))] sender = [] args.tmp_folder = os.path.abspath(args.tmp_folder) args.tmp_folder = os.path.abspath(args.tmp_folder+"/test-"+args.label+"-"+str(mypid)) os.mkdir(args.tmp_folder) parse_senders( flat_sender_args, sender) if args.cpunetlog != "": args.cpunetlog = os.path.abspath(args.cpunetlog) cpunetlog = subprocess.Popen(["python3", args.cpunetlog+"/__init__.py", "-l", "-d", "--path", args.tmp_folder+"/cpunetlog"]) #run cpunetlog in headless mode utility_args = {} utility_args['connect'] = "-c" utility_args['time'] = "-t" utility_args['congestion'] = "-Z" utility_args['port'] = "-p" utility_args['no_delay'] = "-N" utility_args['bind'] = "-B" utility_args['ipv6'] = "" if args.use_ipv4 else "-V" if args.utility == "iperf": pass #iperf is default elif args.utility == "iperf3": utility_args['congestion'] = "-C" utility_args['ipv6'] = "-4" if args.use_ipv4 else "-6" elif args.utility == "netperf": utility_args['connect'] = "-H" utility_args['congestion'] = "-K" #test-specific option utility_args['time'] = "-l" utility_args['bind'] = "-L" utility_args['ipv6'] = "-4" if args.use_ipv4 else "-6" utility_args['no_delay'] = "-D" #test-secific option else: print("Utility not supported") raise SystemExit if args.legacy: tcpprobe_file = tempfile.TemporaryFile('w+') tcpprobe = subprocess.Popen(["cat", "/proc/net/tcpprobe"],stdout=tcpprobe_file) if args.tcpdump == "1": tcpdump = subprocess.Popen(["tcpdump", "-iany", "-w"+args.tmp_folder+"/tcpdump_"+str(mypid), "-s100"]) #TODO? if congesion_algo == all check proc and do all for key, i in enumerate(sender): i['utility_command'] = [ args.utility, \ utility_args['connect'], i['dst'], \ utility_args['bind'], i['src'] , \ utility_args['time'], args.time, \ utility_args['port'], args.port, \ utility_args['ipv6'] ] if args.utility == "netperf": i['utility_command'] += [ "--", "-k", "all", "-P", i['src_port'] ] i['utility_command'] += [ utility_args['congestion'], i['congestion'] ] if args.no_delay: i['utility_command'] += [ utility_args['no_delay'] ] i['utility_file'] = open(args.tmp_folder+"/"+args.utility+"_"+i['congestion']+"_"+str(key), 'w+') # print(i['src']+":"+i['src_port']+" "+i['dst']+":"+i['port']) #setup done, run tests in 5 seconds start_time = time.perf_counter() + 5 with multiprocessing.Pool(len(sender)) as pool: # for key, i in enumerate(sender): # i['utility'] = subprocess.Popen(i['utility_command'], stdout=i['utility_file']) # try to run all senders at the same time through threading 5 seconds from now # StartUtilityThread(i, scheduled_time).start() # i['utility'] = pool.apply(start_utility, [key, scheduled_time]) # i['utility'] = pool.apply(testy, [start_time, 0]) pool_results = [pool.apply_async(start_utility, [key, start_time]) for key, i in enumerate(sender)] # print(pool_results[0].get()) # i['thread'] = multiprocessing.Process(target=start_utility, args=(key, scheduled_time)) # i['thread'].start() pool.close() pool.join() #time.sleep(5) for key, i in enumerate(sender): # i['thread'].join() i['utility'] = pool_results[key].get() print(i['utility'].args) if args.utility != "netperf": ss_file = tempfile.TemporaryFile(mode='r+') subprocess.call(["ss", "-t", "-i", "-p", "-n"],stdout=ss_file) ss_file.seek(0) line = ss_file.readline() while line is not "": for key, i in enumerate(sender): if re.search(str(i['utility'].pid), line): start = re.search(i['src'], line).end()+1 print(line[start:start+5]) i['src_port'] = line[start:start+5] line = ss_file.readline() ss_file.close() # parse ss for bandwith measurement (deprecated, now using tcp_probe sequence numbers) #while time.perf_counter() - start_time < float(args.time): # ss_file = tempfile.TemporaryFile(mode='r+') # subprocess.call(["ss", "-t", "-i", "-n"],stdout=ss_file) # time_dif = time.perf_counter() - start_time # #print(time_dif) # ss_file.seek(0) # line = ss_file.readline() # # while line is not "": # if(line.startswith("ESTAB")): # for key, j in enumerate(sender): # if re.search(j['src']+":"+j['src_port'], line): # line = ss_file.readline() # start_mbps = re.search("send ", line).end() # end_mbps = re.search ("Mbps ", line).start() # ss_file_sender = open(args.tmp_folder+"/ss_"+j['congestion']+"_"+str(key), 'a') # ss_file_sender.write(str(time_dif)+" "+j['src']+":"+j['src_port']+" "+line[start_mbps:end_mbps]+"\n") # break # line = ss_file.readline() # # ss_file.close() # time.sleep(0.1) if not args.legacy: while time.perf_counter() - start_time < float(args.time): # print("begin while") while_start = time.perf_counter() # remember beginning of this loop process_proc_tcpprobe_optimized(sender) #heavy lifting here try: # sleep samplerate minus time spent in loop time.sleep(float(args.samplerate) - (time.perf_counter() - while_start)) except ValueError: # negative sleep pass # print("End while") else: time.sleep(float(args.time)) #for i in sender: # i['utility'].wait() # wait doesn't work with multiprocessing (?! returncode always 0, even when netperf still running) time.sleep(5) # sleep 5 more seconds to allow netperf processes to finish clean_up() if args.legacy: process_raw_tcpprobe_file(tcpprobe_file, sender) #calculate bandwidth for each sender: for key, i in enumerate(sender): last_timestamp = 0 last_seq = 0 i['tcpprobe_file'].seek(0) for line in i['tcpprobe_file']: line_split = line.split() if last_timestamp == 0 and last_seq == 0: last_timestamp = float(line_split[0]) last_seq = int(line_split[4], 0) #convert from hex-string else: if abs(float(line_split[0]) - last_timestamp) >= float(args.samplerate): bandwidth_file = open(args.tmp_folder+"/bw_"+i['congestion']+"_"+str(key), 'a') # print bw for current time interval, convert to Mbps # pay attention to seq number wrap-around bw = (int(line_split[4], 0) - last_seq) if (int(line_split[4], 0) - last_seq) >= 0 \ else (int("0xffffffff",0) - last_seq + int(line_split[4], 0)) bw = bw / abs(float(line_split[0]) - last_timestamp) # byte/s bw = bw * 8 / 1000 / 1000 # Mbps bandwidth_file.write(" ".join(line_split[:3]+[str(bw)] )+"\n" ) bandwidth_file.close() last_timestamp = float(line_split[0]) last_seq = int(line_split[4], 0) else: pass for i in sender: i['tcpprobe_file'].close() #tcpprobe_file.close() if args.comment is not "": subprocess.call(['cp', os.path.abspath(args.comment), args.tmp_folder]) tar_return = subprocess.call(["tar", "cJf", args.tmp_folder+"/../"+args.label+"_"+str(mypid)+".tar.xz", "-C", os.path.abspath(args.tmp_folder+"/../"), os.path.basename(args.tmp_folder)]) if tar_return is 0: #user_input = input("Results in tarfile: "+os.path.abspath(args.tmp_folder+"/../"+args.label+"_"+str(mypid)+".tar.xz")+"\nDelete Folder \""+args.tmp_folder+"\" ? (Y/n) ") # if not user_input.lower().startswith("n"): # subprocess.call(["rm", "-rf", args.tmp_folder]) print("Results in tarfile: "+os.path.abspath(args.tmp_folder+"/../"+args.label+"_"+str(mypid)+".tar.xz")+"\nDeleting Folder \""+args.tmp_folder+"\" in 5 seconds (ctrl+c to abort)...") time.sleep(5) subprocess.call(["rm", "-rf", args.tmp_folder]) print("done") #while True: # pass