Commit 049f9059 authored by Deathcrow's avatar Deathcrow
Browse files

Large amounts of changes:

+ Remove open("/proc/net/tcpprobe") and corresponding readline() (it can block when there's nothing to read). select.select(), etc,... don't seem to work on /proc.
	+ Using subprocess with "head" and subprocess.communicate(TIMEOUT) to read a line now

+ fixed ipv4 compatibility bug (should work now, but why u no use ipv6?)

+ added delayed/early_stop senders. You can now specify a time in seconds that a sender gets delayed before sending, and a time in seconds that the sender is stopped early. Each sender will run for <global test length> - <start
delay> - <early stop> seconds.
parent df0b944b
......@@ -10,7 +10,6 @@ import multiprocessing
import os
import re
import random
import select
import signal
import subprocess
import tempfile
......@@ -24,8 +23,8 @@ def parse_senders( sender_args, sender ):
if sender_args[0] is "":
raise SystemExit
if len(sender_args) < 7:
new_sender = dict(zip(['src', 'dst', 'port', 'congestion', 'number', 'interface'], sender_args))
if len(sender_args) < 9:
new_sender = dict(zip(['src', 'dst', 'port', 'congestion', 'number', 'interface','additional_delay', 'early_stop'], sender_args))
# set globals / defaults
if 'dst' not in new_sender or new_sender['dst'] is "":
new_sender['dst'] = args.receiver
......@@ -37,6 +36,14 @@ def parse_senders( sender_args, sender ):
new_sender['number'] = args.number_senders
if 'interface' not in new_sender or new_sender['interface'] is "":
new_sender['interface'] = "any"
if 'additional_delay' not in new_sender or new_sender['additional_delay'] is "":
new_sender['additional_delay'] = 0.0
else:
new_sender['additional_delay'] = float(new_sender['additional_delay'])
if 'early_stop' not in new_sender or new_sender['early_stop'] is "":
new_sender['early_stop'] = 0.0
else:
new_sender['early_stop'] = float(new_sender['early_stop'])
if args.utility == "netperf":
new_sender['src_port'] = str(int(args.netperf_dataport)+len(sender))
new_sender['tcpprobe_file'] = open(args.tmp_folder+"/tcpprobe_"+new_sender['congestion']+"_"+str(len(sender)), 'w+')
......@@ -59,7 +66,7 @@ def process_raw_tcpprobe_file(tcpprobe_file, sender):
# processing for a continuously recorded tcp_probe file (deprecated)
tcpprobe_file.seek(0)
for line in tcpprobe_file:
bandwith_interval = float(args.samplerate)
bandwith_interval = args.samplerate
line_split = line.split()
for key, i in enumerate(sender):
if (args.utility == "netperf" and line_split[1] == r"["+i['src']+r"]:"+i['src_port']) \
......@@ -80,7 +87,7 @@ def process_raw_tcpprobe_file(tcpprobe_file, sender):
# i['tcpprobe_seq'] = int(line_split[4], 0)
tcpprobe_file.close()
def process_proc_tcpprobe(sender, while_start): #sampling
def process_proc_tcpprobe(sender, while_start): #sampling --- DEPRECATED
tcpprobe_proc = open("/proc/net/tcpprobe", "r")
unprocessed_senders = sender.copy()
......@@ -109,18 +116,29 @@ def process_proc_tcpprobe(sender, while_start): #sampling
def process_proc_tcpprobe_optimized(sender, while_start): #sampling
unprocessed_senders = set([key for key in range(len(sender))])
tcpprobe_proc = open("/proc/net/tcpprobe", "r")
tcpprobe_poll = select.poll()
tcpprobe_poll.register(tcpprobe_proc, select.POLLIN) # register poll to check for read-available data in file
# tcpprobe_proc = open("/proc/net/tcpprobe", "r")
for key in set(unprocessed_senders):
if sender[key]['additional_delay'] > 0 and while_start < start_time + sender[key]['additional_delay']: # This sender hasn't started sending yet
unprocessed_senders.discard(key)
if sender[key]['early_stop'] > 0 and while_start > start_time + float(args.time) - sender[key]['early_stop']: # This sender is already finished
unprocessed_senders.discard(key)
# collect line from tcpprobe and write to file if it belongs to a
# unprocessed (one that didn't have a line written yet) sender
while (len(unprocessed_senders) > 0) and (time.perf_counter() - while_start < float(args.samplerate)) and tcpprobe_poll.poll(int(float(args.samplerate) * 1000)): # find line matching all senders && TIMEOUT
line_split = tcpprobe_proc.readline().split(" ")
while (len(unprocessed_senders) > 0) and (time.perf_counter() - while_start < args.samplerate): # find line matching all senders && TIMEOUT (CANT USE SELECT/POLLING HERE BECAUSE /PROC FILE)
head = subprocess.Popen(["head","-n","1","/proc/net/tcpprobe"], stdout=subprocess.PIPE)
try:
stdout, err = head.communicate(timeout=args.samplerate/2)
except subprocess.TimeoutExpired:
head.kill()
break
line_split = stdout.decode().split(" ")
# line_split = tcpprobe_proc.readline().split(" ")
# try to match current tcpprobe line against all unprocessed senders
for key in unprocessed_senders:
if (args.utility == "netperf" and line_split[1] == r"["+sender[key]['src']+r"]:"+sender[key]['src_port']) \
or (args.utility == "iperf3" and line_split[2] == r"["+sender[key]['dst']+r"]:"+sender[key]['port']): #<=== use if src_port doesn't work with iperf3
if (args.utility == "netperf" and line_split[1] == bracket_open+sender[key]['src']+bracket_close+":"+sender[key]['src_port']) \
or (args.utility == "iperf3" and line_split[2] == bracket_open+sender[key]['dst']+bracket_close+":"+sender[key]['port']): #<=== use if src_port doesn't work with iperf3
# write time + current tcpprobe line to file for this sender
unprocessed_senders.discard(key)
time_dif = time.perf_counter() - start_time
......@@ -131,7 +149,7 @@ def process_proc_tcpprobe_optimized(sender, while_start): #sampling
print("CRITICAL ERROR!")
unprocessed_senders={}
tcpprobe_proc.close()
# tcpprobe_proc.close()
......@@ -170,6 +188,7 @@ def start_utility(sender_key, scheduled_time):
process = subprocess.Popen(sender[sender_key]['utility_command'],
stdout=sender[sender_key]['utility_file'])
data = [process.pid, process.args]
print(process.args)
return data
......@@ -184,7 +203,7 @@ parser.add_argument('-u', '--utility', default="netperf", \
help="For iperf3 support run multiple iperf3 servers and specify ports per sender") ## XXX Note: iperf3 support is not tested extensively!!
parser.add_argument('-t', '--time', default="60")
parser.add_argument('-s', '--sender', action='append', nargs='+', required=True, \
help="Specify senders: <src ip>#<dst ip>#<port>#<congestion>#<number>#<interface> -- individual specifications overwrite globals. "+\
help="Specify senders: <src ip>#<dst ip>#<port>#<congestion>#<number>#<interface>#<sender-delay in s>#<early stop in s> -- individual specifications overwrite globals. "+\
"Example: -s 10.0.0.1#10.0.0.2##yeah#2 -- Creates 2 senders using yeah, without changing the global port (-p)")
## "--receiver" sets the receiver if there was no receiver was specified
# (XXX same for --port, etc)
......@@ -224,6 +243,8 @@ args.tmp_folder = os.path.abspath(args.tmp_folder)
args.tmp_folder = os.path.abspath(args.tmp_folder+"/test-"+args.label+"-"+str(mypid))
os.mkdir(args.tmp_folder)
args.samplerate = float(args.samplerate)
parse_senders( flat_sender_args, sender)
#clean slate
......@@ -265,7 +286,7 @@ elif args.utility == "netperf":
utility_args['time'] = "-l"
utility_args['bind'] = "-L"
utility_args['ipv6'] = "-4" if args.use_ipv4 else "-6"
utility_args['no_delay'] = "-D" #test-secific option
utility_args['no_delay'] = "-D" #test-specific option
else:
print("Utility not supported")
raise SystemExit
......@@ -287,7 +308,7 @@ for key, i in enumerate(sender):
i['utility_command'] = [ args.utility, \
utility_args['connect'], i['dst'], \
utility_args['bind'], i['src'] , \
utility_args['time'], args.time, \
utility_args['time'], str(float(args.time)-i['additional_delay']-i['early_stop']), \
utility_args['port'], i['port'], \
utility_args['ipv6'] ]
if args.utility == "iperf3":
......@@ -304,12 +325,18 @@ for key, i in enumerate(sender):
#setup done, run tests in 5 seconds
start_time = time.perf_counter() + 5
#quick and simple ipv4 compatibility without adding additional comparisons in sampling loop
bracket_open = "["
bracket_close = "]"
if args.use_ipv4:
bracket_open = bracket_close = ""
## Start senders in different processes
with multiprocessing.Pool(len(sender)) as pool:
#with multiprocessing.Pool(len(sender)) as pool:
pool = multiprocessing.Pool(len(sender))
# for key, i in enumerate(sender):
# i['utility'] = subprocess.Popen(i['utility_command'], stdout=i['utility_file'])
......@@ -317,19 +344,14 @@ with multiprocessing.Pool(len(sender)) as pool:
# StartUtilityThread(i, scheduled_time).start()
# i['utility'] = pool.apply(start_utility, [key, scheduled_time])
# i['utility'] = pool.apply(testy, [start_time, 0])
pool_results = [pool.apply_async(start_utility, [key, start_time]) for key, i in enumerate(sender)]
pool_results = [pool.apply_async(start_utility, [key, start_time + float(i['additional_delay'])]) for key, i in enumerate(sender)]
# print(pool_results[0].get())
# i['thread'] = multiprocessing.Process(target=start_utility, args=(key, scheduled_time))
# i['thread'].start()
pool.close()
pool.join()
pool.close()
# pool.join()
#time.sleep(5)
for key, i in enumerate(sender):
# i['thread'].join()
i['utility_data'] = pool_results[key].get()
print(i['utility_data'][1])
time.sleep(start_time - time.perf_counter())
## Find out which src_port belongs to which sender (not necessary for netperf) ##
......@@ -382,23 +404,27 @@ if 0 and args.utility != "netperf":
######## MAIN LOOP ########
if not args.legacy:
while (time.perf_counter() - start_time) < (float(args.time)-2*float(args.samplerate)):
# print("begin while")
while (time.perf_counter() - start_time) < (float(args.time)-2*args.samplerate):
while_start = time.perf_counter() # remember beginning of this loop
process_proc_tcpprobe_optimized(sender, while_start) # <-- heavy lifting here
try:
# sleep samplerate minus time spent in loop
time.sleep(float(args.samplerate) - (time.perf_counter() - while_start))
time.sleep(args.samplerate - (time.perf_counter() - while_start))
except ValueError: # negative sleep
pass
# print("End while")
else:
time.sleep(float(args.time))
pool.join()
for key, i in enumerate(sender):
# i['thread'].join()
i['utility_data'] = pool_results[key].get()
# print(i['utility_data'][1])
......@@ -433,7 +459,7 @@ for key, i in enumerate(sender):
last_timestamp = float(line_split[0])
last_seq = int(line_split[4], 0) #convert from hex-string
else:
if abs(float(line_split[0]) - last_timestamp) >= float(args.samplerate):
if abs(float(line_split[0]) - last_timestamp) >= args.samplerate:
bandwidth_file = open(args.tmp_folder+"/bw_"+i['congestion']+"_"+str(key), 'a')
# print bw for current time interval, convert to Mbps
# pay attention to seq number wrap-around
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment