wrapper.py 20.3 KB
Newer Older
Deathcrow's avatar
Deathcrow committed
1
#!/usr/bin/env python3
Deathcrow's avatar
Deathcrow committed
2 3 4
# vim: expandtab shiftwidth=4 softtabstop=4


Deathcrow's avatar
Deathcrow committed
5 6 7 8
#iperf -V -Z cubic -t 30 -c fdb2:f689:4248:2bc8::3 -p 12345

import argparse
import copy
Deathcrow's avatar
Deathcrow committed
9
import multiprocessing
Deathcrow's avatar
Deathcrow committed
10 11
import os
import re
12
import random
Deathcrow's avatar
Deathcrow committed
13 14 15
import signal
import subprocess
import tempfile
Deathcrow's avatar
Deathcrow committed
16
#import threading
Deathcrow's avatar
Deathcrow committed
17 18 19
import time

def parse_senders( sender_args, sender ):
Deathcrow's avatar
Deathcrow committed
20
#Set user or global settings for each sender. Adjust settings when needed for different utilities
Deathcrow's avatar
Deathcrow committed
21 22 23 24 25 26
    for sender_string in sender_args:
        sender_args = sender_string.split('#')
        
        if sender_args[0] is "":
            raise SystemExit
            
Deathcrow's avatar
Deathcrow committed
27 28
        if len(sender_args) < 9:
            new_sender = dict(zip(['src', 'dst', 'port', 'congestion', 'number', 'interface','additional_delay', 'early_stop'], sender_args))
Deathcrow's avatar
Deathcrow committed
29 30 31 32 33 34 35 36 37 38 39
            # set globals / defaults
            if 'dst' not in new_sender or new_sender['dst'] is "":
                new_sender['dst'] = args.receiver
            if 'port' not in new_sender or new_sender['port'] is "":
                new_sender['port'] = args.port
            if 'congestion' not in new_sender or new_sender['congestion'] is "":
                new_sender['congestion'] = args.congestion
            if 'number' not in new_sender or new_sender['number'] is "":
                new_sender['number'] = args.number_senders
            if 'interface' not in new_sender or new_sender['interface'] is "":
                new_sender['interface'] = "any"
Deathcrow's avatar
Deathcrow committed
40 41 42 43 44 45 46 47
            if 'additional_delay' not in new_sender or new_sender['additional_delay'] is "":
                new_sender['additional_delay'] = 0.0
            else:
                new_sender['additional_delay'] = float(new_sender['additional_delay'])
            if 'early_stop' not in new_sender or new_sender['early_stop'] is "":
                new_sender['early_stop'] = 0.0
            else:
                new_sender['early_stop'] = float(new_sender['early_stop'])
Deathcrow's avatar
Deathcrow committed
48
            if args.utility == "netperf":
Deathcrow's avatar
Deathcrow committed
49
                new_sender['src_port'] = str(int(args.netperf_dataport)+len(sender))
Deathcrow's avatar
Deathcrow committed
50 51 52 53 54 55 56 57 58
            new_sender['tcpprobe_file'] = open(args.tmp_folder+"/tcpprobe_"+new_sender['congestion']+"_"+str(len(sender)), 'w+')

            sender.append(new_sender)
            if int(new_sender['number']) > 1:
                for j in range(int(new_sender['number'])-1):
                    clone = new_sender.copy()
                    clone['is_clone'] = True 
                    clone['tcpprobe_file'] = open(args.tmp_folder+"/tcpprobe_"+clone['congestion']+"_"+str(len(sender)), 'w+')
                    if args.utility == "netperf":
Deathcrow's avatar
Deathcrow committed
59
                        new_sender['src_port'] = str(int(args.netperf_dataport)+len(sender))
Deathcrow's avatar
Deathcrow committed
60 61 62 63 64 65
                    sender.append(clone)

        else:
            print("Malformed senders")
            raise SystemExit

Deathcrow's avatar
Deathcrow committed
66 67
def process_raw_tcpprobe_file(tcpprobe_file, sender): 
# processing for a continuously recorded tcp_probe file (legacy mode)
Deathcrow's avatar
Deathcrow committed
68 69
    tcpprobe_file.seek(0)
    for line in tcpprobe_file:
Deathcrow's avatar
Deathcrow committed
70
        bandwith_interval = args.samplerate
Deathcrow's avatar
Deathcrow committed
71 72
        line_split = line.split()
        for key, i in enumerate(sender):
Deathcrow's avatar
Deathcrow committed
73 74
            if (args.utility == "netperf" and line_split[1] == bracket_open+sender[key]['src']+bracket_close+":"+sender[key]['src_port']) \
            or (args.utility == "iperf3" and line_split[2] == bracket_open+sender[key]['dst']+bracket_close+":"+sender[key]['port']):
Deathcrow's avatar
Deathcrow committed
75
            # and line_split[2][:-5] == r"["+i['dst']+r"]:":
Deathcrow's avatar
Deathcrow committed
76 77 78 79 80 81 82
                i['tcpprobe_file'].write(line)
#                i['tcpprobe_file'].close()
#                if not i.get('tcpprobe_time'):
#                    i['tcpprobe_time'] = float(line_split[0])
#                    i['tcpprobe_seq'] = int(line_split[4], 0)
#                if float(line_split[0]) - i['tcpprobe_time'] > bandwith_interval:
#                    bandwidth_file = open(args.tmp_folder+"/bw_"+i['congestion']+"_"+str(key), 'a')
Deathcrow's avatar
Deathcrow committed
83
                    # write Mbps to file
Deathcrow's avatar
Deathcrow committed
84 85
#                    bandwidth_file.write(" ".join(line_split[:3]+[str(abs(int(line_split[4], 0) - i['tcpprobe_seq']) / bandwith_interval * 8 / 1000 / 1000 )] )+"\n" )
#                    bandwidth_file.close()
Deathcrow's avatar
Deathcrow committed
86
                
Deathcrow's avatar
Deathcrow committed
87 88 89 90
#                    i['tcpprobe_time'] = float(line_split[0])
#                    i['tcpprobe_seq'] = int(line_split[4], 0)
    tcpprobe_file.close()

Deathcrow's avatar
Deathcrow committed
91
def process_proc_tcpprobe(sender, while_start): #sampling --- DEPRECATED
Deathcrow's avatar
Deathcrow committed
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
    tcpprobe_proc = open("/proc/net/tcpprobe", "r")
    unprocessed_senders = sender.copy()

    # collect line from tcpprobe and write to file if it belongs to a 
    # unprocessed (one that didn't have a line written yet) sender
    while len(unprocessed_senders) > 0:
        line = tcpprobe_proc.readline()
        time_dif = time.perf_counter() - start_time
        line_split = line.split(" ")

        # try to match current tcpprobe line against all unprocessed senders 
        for key, i in enumerate(unprocessed_senders):
            if (line_split[1] == r"["+i['src']+r"]:"+i['src_port'] and line_split[2] == r"["+i['dst']+r"]:"+i['port']):
            # write time + current tcpprobe line to file for this sender
                unprocessed_senders.pop(key)['tcpprobe_file'].write(str(time_dif)+" "+" ".join(line_split[1:]))
                break
Deathcrow's avatar
Deathcrow committed
108 109
        else: # no break occured
            if line == "": # this shouldn't happen. Try to continue anyway
Deathcrow's avatar
Deathcrow committed
110 111 112 113 114
                print("CRITICAL ERROR!")                    
                unprocessed_senders=[]

    tcpprobe_proc.close()

115 116 117



Deathcrow's avatar
Deathcrow committed
118
def process_proc_tcpprobe_optimized(sender, while_start): #sampling
Deathcrow's avatar
Deathcrow committed
119
    unprocessed_senders = set([key for key in range(len(sender))])
Deathcrow's avatar
Deathcrow committed
120 121 122 123 124 125
#    tcpprobe_proc = open("/proc/net/tcpprobe", "r")
    for key in set(unprocessed_senders):
        if sender[key]['additional_delay'] > 0 and while_start < start_time + sender[key]['additional_delay']: # This sender hasn't started sending yet
            unprocessed_senders.discard(key)
        if sender[key]['early_stop'] > 0 and while_start > start_time + float(args.time) - sender[key]['early_stop']: # This sender is already finished
            unprocessed_senders.discard(key)
Deathcrow's avatar
Deathcrow committed
126 127 128

    # collect line from tcpprobe and write to file if it belongs to a 
    # unprocessed (one that didn't have a line written yet) sender
Deathcrow's avatar
Deathcrow committed
129 130 131 132 133 134 135 136 137 138
    while (len(unprocessed_senders) > 0) and (time.perf_counter() - while_start < args.samplerate): # find line matching all senders && TIMEOUT (CANT USE SELECT/POLLING HERE BECAUSE /PROC FILE)
        head = subprocess.Popen(["head","-n","1","/proc/net/tcpprobe"], stdout=subprocess.PIPE)
        try:
            stdout, err = head.communicate(timeout=args.samplerate/2)
        except subprocess.TimeoutExpired:
            head.kill()
            break
        line_split = stdout.decode().split(" ") 

#        line_split = tcpprobe_proc.readline().split(" ")
139
        # try to match current tcpprobe line against all unprocessed senders
Deathcrow's avatar
Deathcrow committed
140
        for key in unprocessed_senders:
Deathcrow's avatar
Deathcrow committed
141 142
            if (args.utility == "netperf" and line_split[1] == bracket_open+sender[key]['src']+bracket_close+":"+sender[key]['src_port']) \
            or (args.utility == "iperf3" and line_split[2] == bracket_open+sender[key]['dst']+bracket_close+":"+sender[key]['port']):      #<=== use if src_port doesn't work with iperf3
Deathcrow's avatar
Deathcrow committed
143 144
            # write time + current tcpprobe line to file for this sender
                unprocessed_senders.discard(key)
145
                time_dif = time.perf_counter() - start_time
Deathcrow's avatar
Deathcrow committed
146 147 148 149
                sender[key]['tcpprobe_file'].write(str(time_dif)+" "+" ".join(line_split[1:]))
                break
        else: # no break occured
            if line_split == []: # this shouldn't happen. Try to continue anyway
Deathcrow's avatar
Deathcrow committed
150 151
                print("CRITICAL ERROR!")                    
                unprocessed_senders={}
Deathcrow's avatar
Deathcrow committed
152

Deathcrow's avatar
Deathcrow committed
153
#    tcpprobe_proc.close()
Deathcrow's avatar
Deathcrow committed
154

155 156 157 158




Deathcrow's avatar
Deathcrow committed
159 160 161 162
def signal_handler(signum, frame):
    clean_up()
    raise SystemExit

Deathcrow's avatar
Deathcrow committed
163 164 165 166 167 168
def testy(a,b):
    print(a)
    print(b)
    print(sender)
    return 0

Deathcrow's avatar
Deathcrow committed
169 170
def clean_up():
# Clean up
Deathcrow's avatar
Deathcrow committed
171 172
    if args.legacy:
        tcpprobe.terminate()
Deathcrow's avatar
Deathcrow committed
173 174
    if args.tcpdump == "1":
        tcpdump.terminate()
Deathcrow's avatar
Deathcrow committed
175 176
    if args.cpunetlog != "":
        cpunetlog.terminate()
Deathcrow's avatar
Deathcrow committed
177
    for i in sender:
Deathcrow's avatar
Deathcrow committed
178 179 180 181 182
        try:
            subprocess.Popen(["kill", "-9", str(i['utility_data'][0])])
        except KeyError:
            subprocess.Popen(["killall", "-s", "9", args.utility])
            break
Deathcrow's avatar
Deathcrow committed
183
#        if i['utility'].returncode == None:
Deathcrow's avatar
Deathcrow committed
184 185 186 187
#            try:
#                i['utility'].kill()
#            except ProcessLookupError:
#                pass
Deathcrow's avatar
Deathcrow committed
188 189 190
#        i['utility_file'].close()
    time.sleep(1)

Deathcrow's avatar
Deathcrow committed
191
def start_utility(sender_key, scheduled_time):
192
    time.sleep(abs(scheduled_time - time.perf_counter()))
Deathcrow's avatar
Deathcrow committed
193
    process = subprocess.Popen(sender[sender_key]['utility_command'],
Deathcrow's avatar
Deathcrow committed
194
                            stdout=sender[sender_key]['utility_file'])
Deathcrow's avatar
Deathcrow committed
195
    data = [process.pid, process.args]
Deathcrow's avatar
Deathcrow committed
196
    print(process.args)
Deathcrow's avatar
Deathcrow committed
197
    return data
198 199 200



Deathcrow's avatar
Deathcrow committed
201 202 203 204 205 206
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

parser = argparse.ArgumentParser(description='Configure test environment.')

parser.add_argument('-l', '--label', default="default")
Deathcrow's avatar
Deathcrow committed
207 208
parser.add_argument('-u', '--utility', default="netperf", \
    help="For iperf3 support run multiple iperf3 servers and specify ports per sender")  ## XXX Note: iperf3 support is not tested extensively!!
Deathcrow's avatar
Deathcrow committed
209 210
parser.add_argument('-t', '--time', default="60")
parser.add_argument('-s', '--sender', action='append', nargs='+', required=True, \
Deathcrow's avatar
Deathcrow committed
211 212
    help="Specify senders: <src ip>[#<dst ip>#<port>#<congestion>#<number>#<interface>#<sender-delay in s>#<early stop in s>] -- individual specifications overwrite globals. "+\
          "Example: -s 10.0.0.1#10.0.0.2##yeah#2 -- Creates 2 senders using yeah, without changing the global port (-p). All optional settings can be omitted.")
213 214
## "--receiver" sets the receiver if there was no receiver was specified
#  (XXX same for --port, etc)
Deathcrow's avatar
Deathcrow committed
215 216 217 218
parser.add_argument('-r', '--receiver', default="localhost")
parser.add_argument('-p', '--port', default="5001")
parser.add_argument('-T', '--tmp-folder', default="/tmp")
parser.add_argument('-Z', '--congestion', default="cubic")
219
parser.add_argument('-N', '--no-delay', default="1")
Deathcrow's avatar
Deathcrow committed
220 221
parser.add_argument('-4', '--use-ipv4', action='store_true')
parser.add_argument('-n', '--number-senders', default="1", \
Deathcrow's avatar
Deathcrow committed
222
    help="Amount of clones per sender")
Deathcrow's avatar
Deathcrow committed
223 224 225 226
parser.add_argument('-C', '--comment', default="", \
    help="Path to a (txt) file to be included in the final Tarball")
parser.add_argument('-P', '--cpunetlog' , default="", \
    help="Path to cpunetlog folder")
227
parser.add_argument("--netperf-dataport", default=str(random.randint(15000,65000)), \
Deathcrow's avatar
Deathcrow committed
228
    help="Port for the data conncections by netperf. Will be incremented per additional sender")
229
parser.add_argument('-D', '--tcpdump', default="0", \
Deathcrow's avatar
Deathcrow committed
230
    help="Enable/Disable tcpdump (Default: disabled)")
Deathcrow's avatar
Deathcrow committed
231
parser.add_argument('-S', '--samplerate', default="0.1", \
Deathcrow's avatar
Deathcrow committed
232
    help="Samplerate for measurements: Defines amount of seconds between samples")
Deathcrow's avatar
Deathcrow committed
233 234 235 236
parser.add_argument('--legacy', action='store_true', \
    help="Record entire tcp_probe output instead of samples by samplerate. \
          Can be less CPU intensive for certain samplerates. \
          Consider using full=0 for tcp_probe module")
Deathcrow's avatar
Deathcrow committed
237 238 239 240 241 242 243 244 245 246 247

args = parser.parse_args()

mypid = os.getpid()

flat_sender_args = [item for sublist in args.sender for item in sublist] # make 1-dimensional list (argument parser provides list of lists)

#sender = [{} for i in range(len(flat_sender_args))]
sender = []

args.tmp_folder = os.path.abspath(args.tmp_folder)
Deathcrow's avatar
Deathcrow committed
248
args.tmp_folder = os.path.abspath(args.tmp_folder+"/test-"+args.label+"_"+str(mypid))
Deathcrow's avatar
Deathcrow committed
249 250
os.mkdir(args.tmp_folder)

Deathcrow's avatar
Deathcrow committed
251 252
args.samplerate = float(args.samplerate)

Deathcrow's avatar
Deathcrow committed
253 254
parse_senders( flat_sender_args, sender)

255
#clean slate
256
print("First, kill all remaining instances of '" + args.utility + "'")
257
subprocess.Popen(["killall", "-s", "9", args.utility])
258
print()
259

Deathcrow's avatar
Deathcrow committed
260 261
if args.cpunetlog != "":
    args.cpunetlog = os.path.abspath(args.cpunetlog)
262
    cpunetlog = subprocess.Popen(["python3", args.cpunetlog+"/__init__.py", "-l", "-q", "--path", args.tmp_folder+"/cpunetlog"]) #run cpunetlog in headless mode
Deathcrow's avatar
Deathcrow committed
263

264 265 266 267 268 269 270 271 272 273





## Prepare commandline to start experiment utilities (netperf, etc.) ##


# General conversion between commandline syntax of the tools/utilities

Deathcrow's avatar
Deathcrow committed
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
utility_args = {}
utility_args['connect'] = "-c"
utility_args['time'] = "-t"
utility_args['congestion'] = "-Z"
utility_args['port'] = "-p"
utility_args['no_delay'] = "-N"
utility_args['bind'] = "-B"
utility_args['ipv6'] = "" if args.use_ipv4 else "-V"

if args.utility == "iperf":
    pass #iperf is default
elif args.utility == "iperf3":
    utility_args['congestion'] = "-C"
    utility_args['ipv6'] = "-4" if args.use_ipv4 else "-6"
elif args.utility == "netperf":
    utility_args['connect'] = "-H"
    utility_args['congestion'] = "-K" #test-specific option
    utility_args['time'] = "-l"
    utility_args['bind'] = "-L"
    utility_args['ipv6'] = "-4" if args.use_ipv4 else "-6"
Deathcrow's avatar
Deathcrow committed
294
    utility_args['no_delay'] = "-D" #test-specific option
Deathcrow's avatar
Deathcrow committed
295 296 297 298
else:
    print("Utility not supported")
    raise SystemExit

Deathcrow's avatar
Deathcrow committed
299 300 301
if args.legacy:
    tcpprobe_file = tempfile.TemporaryFile('w+')
    tcpprobe = subprocess.Popen(["cat", "/proc/net/tcpprobe"],stdout=tcpprobe_file)
Deathcrow's avatar
Deathcrow committed
302
if args.tcpdump == "1":
Deathcrow's avatar
Deathcrow committed
303
    tcpdump = subprocess.Popen(["tcpdump", "-iany", "-w"+args.tmp_folder+"/tcpdump_"+str(mypid), "-s100"])
Deathcrow's avatar
Deathcrow committed
304
            
305 306
# Prapare command line for each sender

Deathcrow's avatar
Deathcrow committed
307
for key, i in enumerate(sender):
Deathcrow's avatar
Deathcrow committed
308
    i['utility_command'] = [ args.utility,                           \
Deathcrow's avatar
Deathcrow committed
309 310
        utility_args['connect'], i['dst'],                      \
        utility_args['bind'], i['src'] ,                        \
Deathcrow's avatar
Deathcrow committed
311
        utility_args['time'], str(float(args.time)-i['additional_delay']-i['early_stop']),                        \
Deathcrow's avatar
Deathcrow committed
312
        utility_args['port'], i['port'],                        \
Deathcrow's avatar
Deathcrow committed
313
        utility_args['ipv6'] ]
Deathcrow's avatar
Deathcrow committed
314
    if args.utility == "iperf3":
315
        i['utility_command'] += [ "-J"]
Deathcrow's avatar
Deathcrow committed
316
    if args.utility == "netperf":
Deathcrow's avatar
Deathcrow committed
317 318
        i['utility_command'] += [ "--", "-k", "all", "-P", i['src_port'] ]
    i['utility_command'] += [ utility_args['congestion'], i['congestion'] ]
Deathcrow's avatar
Deathcrow committed
319
    if args.no_delay:
Deathcrow's avatar
Deathcrow committed
320
            i['utility_command'] += [ utility_args['no_delay'] ]
Deathcrow's avatar
Deathcrow committed
321 322

    i['utility_file'] = open(args.tmp_folder+"/"+args.utility+"_"+i['congestion']+"_"+str(key), 'w+')
Deathcrow's avatar
Deathcrow committed
323 324
#    print(i['src']+":"+i['src_port']+" "+i['dst']+":"+i['port'])

Deathcrow's avatar
Deathcrow committed
325 326 327 328 329
#quick and simple ipv4 compatibility without adding additional comparisons in sampling loop
bracket_open = "["
bracket_close = "]"
if args.use_ipv4:
    bracket_open = bracket_close = ""
330

Deathcrow's avatar
Deathcrow committed
331 332
#setup done, run tests in 5 seconds
start_time = time.perf_counter() + 5
333 334 335

## Start senders in different processes

Deathcrow's avatar
Deathcrow committed
336 337
#with multiprocessing.Pool(len(sender)) as pool:
pool = multiprocessing.Pool(len(sender))
Deathcrow's avatar
Deathcrow committed
338
#    for key, i in enumerate(sender):
339 340 341 342
#    i['utility'] = subprocess.Popen(i['utility_command'], stdout=i['utility_file'])

    # try to run all senders at the same time through threading 5 seconds from now
#    StartUtilityThread(i, scheduled_time).start()
Deathcrow's avatar
Deathcrow committed
343 344
#        i['utility'] = pool.apply(start_utility, [key, scheduled_time])
#        i['utility'] = pool.apply(testy, [start_time, 0])
Deathcrow's avatar
Deathcrow committed
345
pool_results = [pool.apply_async(start_utility, [key, start_time + float(i['additional_delay'])]) for key, i in enumerate(sender)]
Deathcrow's avatar
Deathcrow committed
346 347 348
#    print(pool_results[0].get())
#    i['thread'] = multiprocessing.Process(target=start_utility, args=(key, scheduled_time))
#    i['thread'].start()
Deathcrow's avatar
Deathcrow committed
349 350
pool.close()
#    pool.join()
351
    
Deathcrow's avatar
Deathcrow committed
352
time.sleep(start_time - time.perf_counter())
353 354 355


## Find out which src_port belongs to which sender (not necessary for netperf) ##
Deathcrow's avatar
Deathcrow committed
356
# TODO: FIXME, SLEEP AS A WORKAROUND IS UNRELIABLE AND DIRTY.
Deathcrow's avatar
Deathcrow committed
357
# Disabled for now: we can use port for iperf because we have multiple servers anyway
358

Deathcrow's avatar
Deathcrow committed
359
if 0 and args.utility != "netperf":
Deathcrow's avatar
Deathcrow committed
360
    time.sleep(0.1) # sacrifice first-born to satan
Deathcrow's avatar
Deathcrow committed
361 362 363 364 365 366
    ss_file = tempfile.TemporaryFile(mode='r+')
    subprocess.call(["ss", "-t", "-i", "-p", "-n"],stdout=ss_file)
    ss_file.seek(0)
    line = ss_file.readline()
    while line is not "":
        for key, i in enumerate(sender):
Deathcrow's avatar
Deathcrow committed
367
            if re.search(str(i['utility_data'][0]), line):
Deathcrow's avatar
Deathcrow committed
368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
                start = re.search(i['src'], line).end()+1
                print(line[start:start+5])
                i['src_port'] = line[start:start+5]
        line = ss_file.readline()
    ss_file.close()                

# parse ss for bandwith measurement (deprecated, now using tcp_probe sequence numbers)

#while time.perf_counter() - start_time < float(args.time):
#    ss_file = tempfile.TemporaryFile(mode='r+')
#    subprocess.call(["ss", "-t", "-i", "-n"],stdout=ss_file)
#    time_dif = time.perf_counter() - start_time
#    #print(time_dif)
#    ss_file.seek(0)
#    line = ss_file.readline()
#
#    while line is not "":
#        if(line.startswith("ESTAB")):
#            for key, j in enumerate(sender):
#                if re.search(j['src']+":"+j['src_port'], line):
#                    line = ss_file.readline()
#                    start_mbps = re.search("send ", line).end()
#                    end_mbps = re.search ("Mbps ", line).start()
#                    ss_file_sender = open(args.tmp_folder+"/ss_"+j['congestion']+"_"+str(key), 'a')
#                    ss_file_sender.write(str(time_dif)+" "+j['src']+":"+j['src_port']+" "+line[start_mbps:end_mbps]+"\n")
#                    break
#        line = ss_file.readline()
#    
#    ss_file.close()
#    time.sleep(0.1)

399 400 401 402 403




######## MAIN LOOP ########
Deathcrow's avatar
Deathcrow committed
404
if not args.legacy:
Deathcrow's avatar
Deathcrow committed
405
    while (time.perf_counter() - start_time) < (float(args.time)-2*args.samplerate):
Deathcrow's avatar
Deathcrow committed
406 407
        while_start = time.perf_counter() # remember beginning of this loop
        
Deathcrow's avatar
Deathcrow committed
408
        process_proc_tcpprobe_optimized(sender, while_start) # <-- heavy lifting here
Deathcrow's avatar
Deathcrow committed
409 410 411
        
        try:
            # sleep samplerate minus time spent in loop
Deathcrow's avatar
Deathcrow committed
412
            time.sleep(args.samplerate - (time.perf_counter() - while_start))
Deathcrow's avatar
Deathcrow committed
413 414
        except ValueError: # negative sleep
            pass
Deathcrow's avatar
Deathcrow committed
415

Deathcrow's avatar
Deathcrow committed
416 417
else:
    time.sleep(float(args.time))
Deathcrow's avatar
Deathcrow committed
418 419 420 421 422 423 424 425

pool.join()
for key, i in enumerate(sender):
#   i['thread'].join()
    i['utility_data'] = pool_results[key].get()
#    print(i['utility_data'][1])

   
426 427 428 429 430 431 432



    
###  Experiments finished  ###
    
    
Deathcrow's avatar
Deathcrow committed
433 434 435 436 437 438
#for i in sender:
#    i['utility'].wait()
# wait doesn't work with multiprocessing (?! returncode always 0, even when netperf still running)

time.sleep(5) # sleep 5 more seconds to allow netperf processes to finish

Deathcrow's avatar
Deathcrow committed
439 440 441

clean_up()

Deathcrow's avatar
Deathcrow committed
442 443 444
if args.legacy:
    process_raw_tcpprobe_file(tcpprobe_file, sender)

445 446 447 448 449


### After the experiments are finished, parse tcpprobe files again and calculate "goodput" ###

# calculate bandwidth for each sender:
Deathcrow's avatar
Deathcrow committed
450 451 452 453 454 455 456 457 458 459
for key, i in enumerate(sender):
    last_timestamp = 0
    last_seq = 0
    i['tcpprobe_file'].seek(0)
    for line in i['tcpprobe_file']:
        line_split = line.split()
        if last_timestamp == 0 and last_seq == 0:
            last_timestamp = float(line_split[0])
            last_seq = int(line_split[4], 0) #convert from hex-string
        else:
Deathcrow's avatar
Deathcrow committed
460
            if abs(float(line_split[0]) - last_timestamp) >= args.samplerate:
Deathcrow's avatar
Deathcrow committed
461 462 463 464 465 466 467 468 469 470 471 472 473
                bandwidth_file = open(args.tmp_folder+"/bw_"+i['congestion']+"_"+str(key), 'a')
                # print bw for current time interval, convert to Mbps
                # pay attention to seq number wrap-around
                bw = (int(line_split[4], 0) - last_seq) if (int(line_split[4], 0) - last_seq) >= 0 \
                    else (int("0xffffffff",0) - last_seq + int(line_split[4], 0))
                bw = bw / abs(float(line_split[0]) - last_timestamp) # byte/s
                bw = bw * 8 / 1000 / 1000 # Mbps
                bandwidth_file.write(" ".join(line_split[:3]+[str(bw)] )+"\n" )
                bandwidth_file.close()
                last_timestamp = float(line_split[0])
                last_seq = int(line_split[4], 0)
            else:
                pass
Deathcrow's avatar
Deathcrow committed
474 475 476 477 478 479 480 481 482 483

for i in sender:
    i['tcpprobe_file'].close()


#tcpprobe_file.close()

if args.comment is not "":
    subprocess.call(['cp', os.path.abspath(args.comment), args.tmp_folder])

484 485 486 487 488 489 490 491






## packing results into tar file ##

Deathcrow's avatar
Deathcrow committed
492 493 494 495 496
tar_return = subprocess.call(["tar", "cJf", args.tmp_folder+"/../"+args.label+"_"+str(mypid)+".tar.xz", "-C", os.path.abspath(args.tmp_folder+"/../"), os.path.basename(args.tmp_folder)])
if tar_return is 0:
    #user_input = input("Results in tarfile: "+os.path.abspath(args.tmp_folder+"/../"+args.label+"_"+str(mypid)+".tar.xz")+"\nDelete Folder \""+args.tmp_folder+"\" ? (Y/n) ")
#    if not user_input.lower().startswith("n"):
#        subprocess.call(["rm", "-rf", args.tmp_folder])
497 498
    print("Results in tarfile: "+os.path.abspath(args.tmp_folder+"/../"+args.label+"_"+str(mypid)+".tar.xz"))
#    time.sleep(5)
Deathcrow's avatar
Deathcrow committed
499 500 501 502 503 504
    subprocess.call(["rm", "-rf", args.tmp_folder]) 

print("done")

#while True:
#    pass
505