wrapper.py 16.7 KB
Newer Older
Deathcrow's avatar
Deathcrow committed
1
#!/usr/bin/env python3
Deathcrow's avatar
Deathcrow committed
2
3
4
# vim: expandtab shiftwidth=4 softtabstop=4


Deathcrow's avatar
Deathcrow committed
5
6
7
8
#iperf -V -Z cubic -t 30 -c fdb2:f689:4248:2bc8::3 -p 12345

import argparse
import copy
Deathcrow's avatar
Deathcrow committed
9
import multiprocessing
Deathcrow's avatar
Deathcrow committed
10
11
12
13
14
import os
import re
import signal
import subprocess
import tempfile
Deathcrow's avatar
Deathcrow committed
15
#import threading
Deathcrow's avatar
Deathcrow committed
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import time

def parse_senders( sender_args, sender ):
    for sender_string in sender_args:
        sender_args = sender_string.split('#')
        
        if sender_args[0] is "":
            raise SystemExit
            
        if len(sender_args) < 7:
            new_sender = dict(zip(['src', 'dst', 'port', 'congestion', 'number', 'interface'], sender_args))
            # set globals / defaults
            if 'dst' not in new_sender or new_sender['dst'] is "":
                new_sender['dst'] = args.receiver
            if 'port' not in new_sender or new_sender['port'] is "":
                new_sender['port'] = args.port
            if 'congestion' not in new_sender or new_sender['congestion'] is "":
                new_sender['congestion'] = args.congestion
            if 'number' not in new_sender or new_sender['number'] is "":
                new_sender['number'] = args.number_senders
            if 'interface' not in new_sender or new_sender['interface'] is "":
                new_sender['interface'] = "any"
            if args.utility == "netperf":
                new_sender['src_port'] = new_sender['port'] = str(int(args.netperf_dataport)+len(sender))
            new_sender['tcpprobe_file'] = open(args.tmp_folder+"/tcpprobe_"+new_sender['congestion']+"_"+str(len(sender)), 'w+')

            sender.append(new_sender)
            if int(new_sender['number']) > 1:
                for j in range(int(new_sender['number'])-1):
                    clone = new_sender.copy()
                    clone['is_clone'] = True 
                    clone['tcpprobe_file'] = open(args.tmp_folder+"/tcpprobe_"+clone['congestion']+"_"+str(len(sender)), 'w+')
                    if args.utility == "netperf":
                        new_sender['src_port'] = new_sender['port'] = str(int(args.netperf_dataport)+len(sender))
                    sender.append(clone)

        else:
            print("Malformed senders")
            raise SystemExit

def process_raw_tcpprobe_file(tcpprobe_file, sender):
# processing for a continuously recorded tcp_probe file (deprecated)
    tcpprobe_file.seek(0)
    for line in tcpprobe_file:
        bandwith_interval = float(args.samplerate)
        line_split = line.split()
        for key, i in enumerate(sender):
            if line_split[1] == r"["+i['src']+r"]:"+i['src_port']:
            # and line_split[2][:-5] == r"["+i['dst']+r"]:":
Deathcrow's avatar
Deathcrow committed
65
66
67
68
69
70
71
                i['tcpprobe_file'].write(line)
#                i['tcpprobe_file'].close()
#                if not i.get('tcpprobe_time'):
#                    i['tcpprobe_time'] = float(line_split[0])
#                    i['tcpprobe_seq'] = int(line_split[4], 0)
#                if float(line_split[0]) - i['tcpprobe_time'] > bandwith_interval:
#                    bandwidth_file = open(args.tmp_folder+"/bw_"+i['congestion']+"_"+str(key), 'a')
Deathcrow's avatar
Deathcrow committed
72
                    # write Mbps to file
Deathcrow's avatar
Deathcrow committed
73
74
#                    bandwidth_file.write(" ".join(line_split[:3]+[str(abs(int(line_split[4], 0) - i['tcpprobe_seq']) / bandwith_interval * 8 / 1000 / 1000 )] )+"\n" )
#                    bandwidth_file.close()
Deathcrow's avatar
Deathcrow committed
75
                
Deathcrow's avatar
Deathcrow committed
76
77
78
79
80
#                    i['tcpprobe_time'] = float(line_split[0])
#                    i['tcpprobe_seq'] = int(line_split[4], 0)
    tcpprobe_file.close()

def process_proc_tcpprobe(sender): #sampling
Deathcrow's avatar
Deathcrow committed
81
    starty = time.perf_counter()
Deathcrow's avatar
Deathcrow committed
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
    tcpprobe_proc = open("/proc/net/tcpprobe", "r")
    unprocessed_senders = sender.copy()

    # collect line from tcpprobe and write to file if it belongs to a 
    # unprocessed (one that didn't have a line written yet) sender
    while len(unprocessed_senders) > 0:
        line = tcpprobe_proc.readline()
        time_dif = time.perf_counter() - start_time
        line_split = line.split(" ")

        # try to match current tcpprobe line against all unprocessed senders 
        for key, i in enumerate(unprocessed_senders):
            if (line_split[1] == r"["+i['src']+r"]:"+i['src_port'] and line_split[2] == r"["+i['dst']+r"]:"+i['port']):
            # write time + current tcpprobe line to file for this sender
                unprocessed_senders.pop(key)['tcpprobe_file'].write(str(time_dif)+" "+" ".join(line_split[1:]))
                break
Deathcrow's avatar
Deathcrow committed
98
99
        else: # no break occured
            if line == "": # this shouldn't happen. Try to continue anyway
Deathcrow's avatar
Deathcrow committed
100
101
102
103
104
105
106
107
108
109
110
111
112
                print("CRITICAL ERROR!")                    
                unprocessed_senders=[]

    tcpprobe_proc.close()

def process_proc_tcpprobe_optimized(sender): #sampling
    starty = time.perf_counter()
    tcpprobe_proc = open("/proc/net/tcpprobe", "r")
    unprocessed_senders = set([key for key in range(len(sender))])

    # collect line from tcpprobe and write to file if it belongs to a 
    # unprocessed (one that didn't have a line written yet) sender
    while len(unprocessed_senders) > 0:
Deathcrow's avatar
bugfix    
Deathcrow committed
113
        line_split = tcpprobe_proc.readline().split(" ")
Deathcrow's avatar
Deathcrow committed
114
115
116
117
118
119
120
121
122
123
124
        time_dif = time.perf_counter() - start_time

        # try to match current tcpprobe line against all unprocessed senders 
        for key in unprocessed_senders:
            if (line_split[1] == r"["+sender[key]['src']+r"]:"+sender[key]['src_port'] and line_split[2] == r"["+sender[key]['dst']+r"]:"+sender[key]['port']):
            # write time + current tcpprobe line to file for this sender
                unprocessed_senders.discard(key)
                sender[key]['tcpprobe_file'].write(str(time_dif)+" "+" ".join(line_split[1:]))
                break
        else: # no break occured
            if line_split == []: # this shouldn't happen. Try to continue anyway
Deathcrow's avatar
Deathcrow committed
125
126
                print("CRITICAL ERROR!")                    
                unprocessed_senders={}
Deathcrow's avatar
Deathcrow committed
127

Deathcrow's avatar
Deathcrow committed
128
129
130
131
        #TIMEOUT
        if time.perf_counter() - starty > float(args.samplerate):
            unprocessed_senders={}

Deathcrow's avatar
Deathcrow committed
132
133
    tcpprobe_proc.close()

Deathcrow's avatar
Deathcrow committed
134
135
136
137
def signal_handler(signum, frame):
    clean_up()
    raise SystemExit

Deathcrow's avatar
Deathcrow committed
138
139
140
141
142
143
def testy(a,b):
    print(a)
    print(b)
    print(sender)
    return 0

Deathcrow's avatar
Deathcrow committed
144
145
def clean_up():
# Clean up
Deathcrow's avatar
Deathcrow committed
146
147
    if args.legacy:
        tcpprobe.terminate()
Deathcrow's avatar
Deathcrow committed
148
149
    if args.tcpdump == "1":
        tcpdump.terminate()
Deathcrow's avatar
Deathcrow committed
150
151
    if args.cpunetlog != "":
        cpunetlog.terminate()
Deathcrow's avatar
Deathcrow committed
152
    for i in sender:
Deathcrow's avatar
Deathcrow committed
153
154
155
156
157
#        if i['utility'].returncode == None:
            try:
                i['utility'].kill()
            except ProcessLookupError:
                pass
Deathcrow's avatar
Deathcrow committed
158
159
160
#        i['utility_file'].close()
    time.sleep(1)

Deathcrow's avatar
Deathcrow committed
161
def start_utility(sender_key, scheduled_time):
162
    time.sleep(abs(scheduled_time - time.perf_counter()))
Deathcrow's avatar
Deathcrow committed
163
164
    return subprocess.Popen(sender[sender_key]['utility_command'],
                            stdout=sender[sender_key]['utility_file'])
165
166
167



Deathcrow's avatar
Deathcrow committed
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

parser = argparse.ArgumentParser(description='Configure test environment.')

parser.add_argument('-l', '--label', default="default")
parser.add_argument('-u', '--utility', default='iperf')
parser.add_argument('-t', '--time', default="60")
parser.add_argument('-s', '--sender', action='append', nargs='+', required=True, \
    help="Specify senders: <src ip>#<dst ip>#<port>#<congestion>#<number>#<interface> -- individual specifications overwrite globals")
parser.add_argument('-r', '--receiver', default="localhost")
parser.add_argument('-p', '--port', default="5001")
parser.add_argument('-T', '--tmp-folder', default="/tmp")
parser.add_argument('-Z', '--congestion', default="cubic")
parser.add_argument('-N', '--no-delay', action='store_true')
parser.add_argument('-4', '--use-ipv4', action='store_true')
parser.add_argument('-n', '--number-senders', default="1", \
    help="Amount of clones per specified sender")
parser.add_argument('-C', '--comment', default="", \
    help="Path to a (txt) file to be included in the final Tarball")
parser.add_argument('-P', '--cpunetlog' , default="", \
    help="Path to cpunetlog folder")
parser.add_argument("--netperf-dataport", default="53699", \
    help="Port for the data conncections by netperf. Will be incremented per sender")
parser.add_argument('-D', '--tcpdump', default="1", \
    help="Enable/Disable tcpdump (Default: enabled")
parser.add_argument('-S', '--samplerate', default="0.1", \
    help="Samplerate for measurements")
Deathcrow's avatar
Deathcrow committed
196
197
198
199
parser.add_argument('--legacy', action='store_true', \
    help="Record entire tcp_probe output instead of samples by samplerate. \
          Can be less CPU intensive for certain samplerates. \
          Consider using full=0 for tcp_probe module")
Deathcrow's avatar
Deathcrow committed
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244

args = parser.parse_args()

mypid = os.getpid()

flat_sender_args = [item for sublist in args.sender for item in sublist] # make 1-dimensional list (argument parser provides list of lists)

#sender = [{} for i in range(len(flat_sender_args))]
sender = []

args.tmp_folder = os.path.abspath(args.tmp_folder)
args.tmp_folder = os.path.abspath(args.tmp_folder+"/test-"+args.label+"-"+str(mypid))
os.mkdir(args.tmp_folder)

parse_senders( flat_sender_args, sender)

if args.cpunetlog != "":
    args.cpunetlog = os.path.abspath(args.cpunetlog)
    cpunetlog = subprocess.Popen(["python3", args.cpunetlog+"/__init__.py", "-l", "-d", "--path", args.tmp_folder+"/cpunetlog"]) #run cpunetlog in headless mode

utility_args = {}
utility_args['connect'] = "-c"
utility_args['time'] = "-t"
utility_args['congestion'] = "-Z"
utility_args['port'] = "-p"
utility_args['no_delay'] = "-N"
utility_args['bind'] = "-B"
utility_args['ipv6'] = "" if args.use_ipv4 else "-V"

if args.utility == "iperf":
    pass #iperf is default
elif args.utility == "iperf3":
    utility_args['congestion'] = "-C"
    utility_args['ipv6'] = "-4" if args.use_ipv4 else "-6"
elif args.utility == "netperf":
    utility_args['connect'] = "-H"
    utility_args['congestion'] = "-K" #test-specific option
    utility_args['time'] = "-l"
    utility_args['bind'] = "-L"
    utility_args['ipv6'] = "-4" if args.use_ipv4 else "-6"
    utility_args['no_delay'] = "-D" #test-secific option
else:
    print("Utility not supported")
    raise SystemExit

Deathcrow's avatar
Deathcrow committed
245
246
247
if args.legacy:
    tcpprobe_file = tempfile.TemporaryFile('w+')
    tcpprobe = subprocess.Popen(["cat", "/proc/net/tcpprobe"],stdout=tcpprobe_file)
Deathcrow's avatar
Deathcrow committed
248
if args.tcpdump == "1":
Deathcrow's avatar
Deathcrow committed
249
    tcpdump = subprocess.Popen(["tcpdump", "-iany", "-w"+args.tmp_folder+"/tcpdump_"+str(mypid), "-s100"])
Deathcrow's avatar
Deathcrow committed
250
251
            

Deathcrow's avatar
Deathcrow committed
252
#TODO? if congesion_algo == all check proc and do all
Deathcrow's avatar
Deathcrow committed
253
254
255


for key, i in enumerate(sender):
Deathcrow's avatar
Deathcrow committed
256
    i['utility_command'] = [ args.utility,                           \
Deathcrow's avatar
Deathcrow committed
257
258
259
260
261
262
        utility_args['connect'], i['dst'],                      \
        utility_args['bind'], i['src'] ,                        \
        utility_args['time'], args.time,                        \
        utility_args['port'], args.port,                        \
        utility_args['ipv6'] ]
    if args.utility == "netperf":
Deathcrow's avatar
Deathcrow committed
263
264
        i['utility_command'] += [ "--", "-k", "all", "-P", i['src_port'] ]
    i['utility_command'] += [ utility_args['congestion'], i['congestion'] ]
Deathcrow's avatar
Deathcrow committed
265
    if args.no_delay:
Deathcrow's avatar
Deathcrow committed
266
            i['utility_command'] += [ utility_args['no_delay'] ]
Deathcrow's avatar
Deathcrow committed
267
268

    i['utility_file'] = open(args.tmp_folder+"/"+args.utility+"_"+i['congestion']+"_"+str(key), 'w+')
Deathcrow's avatar
Deathcrow committed
269
270
#    print(i['src']+":"+i['src_port']+" "+i['dst']+":"+i['port'])

Deathcrow's avatar
Deathcrow committed
271
272
#setup done, run tests in 5 seconds
start_time = time.perf_counter() + 5
Deathcrow's avatar
Deathcrow committed
273

Deathcrow's avatar
Deathcrow committed
274
with multiprocessing.Pool(len(sender)) as pool:
Deathcrow's avatar
Deathcrow committed
275
#    for key, i in enumerate(sender):
276
277
278
279
#    i['utility'] = subprocess.Popen(i['utility_command'], stdout=i['utility_file'])

    # try to run all senders at the same time through threading 5 seconds from now
#    StartUtilityThread(i, scheduled_time).start()
Deathcrow's avatar
Deathcrow committed
280
281
#        i['utility'] = pool.apply(start_utility, [key, scheduled_time])
#        i['utility'] = pool.apply(testy, [start_time, 0])
Deathcrow's avatar
Deathcrow committed
282
    pool_results = [pool.apply_async(start_utility, [key, start_time]) for key, i in enumerate(sender)]
Deathcrow's avatar
Deathcrow committed
283
284
285
286
287
#    print(pool_results[0].get())
#    i['thread'] = multiprocessing.Process(target=start_utility, args=(key, scheduled_time))
#    i['thread'].start()
    pool.close()
    pool.join()
288
289
    
#time.sleep(5)
Deathcrow's avatar
Deathcrow committed
290
291
292
293
    for key, i in enumerate(sender):
#        i['thread'].join()
        i['utility'] = pool_results[key].get()
        print(i['utility'].args)
Deathcrow's avatar
Deathcrow committed
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333

if args.utility != "netperf":
    ss_file = tempfile.TemporaryFile(mode='r+')
    subprocess.call(["ss", "-t", "-i", "-p", "-n"],stdout=ss_file)
    ss_file.seek(0)
    line = ss_file.readline()
    while line is not "":
        for key, i in enumerate(sender):
            if re.search(str(i['utility'].pid), line):
                start = re.search(i['src'], line).end()+1
                print(line[start:start+5])
                i['src_port'] = line[start:start+5]
        line = ss_file.readline()
    ss_file.close()                

# parse ss for bandwith measurement (deprecated, now using tcp_probe sequence numbers)

#while time.perf_counter() - start_time < float(args.time):
#    ss_file = tempfile.TemporaryFile(mode='r+')
#    subprocess.call(["ss", "-t", "-i", "-n"],stdout=ss_file)
#    time_dif = time.perf_counter() - start_time
#    #print(time_dif)
#    ss_file.seek(0)
#    line = ss_file.readline()
#
#    while line is not "":
#        if(line.startswith("ESTAB")):
#            for key, j in enumerate(sender):
#                if re.search(j['src']+":"+j['src_port'], line):
#                    line = ss_file.readline()
#                    start_mbps = re.search("send ", line).end()
#                    end_mbps = re.search ("Mbps ", line).start()
#                    ss_file_sender = open(args.tmp_folder+"/ss_"+j['congestion']+"_"+str(key), 'a')
#                    ss_file_sender.write(str(time_dif)+" "+j['src']+":"+j['src_port']+" "+line[start_mbps:end_mbps]+"\n")
#                    break
#        line = ss_file.readline()
#    
#    ss_file.close()
#    time.sleep(0.1)

Deathcrow's avatar
Deathcrow committed
334
335
if not args.legacy:
    while time.perf_counter() - start_time < float(args.time):
Deathcrow's avatar
Deathcrow committed
336
#        print("begin while")
Deathcrow's avatar
Deathcrow committed
337
338
        while_start = time.perf_counter() # remember beginning of this loop
        
Deathcrow's avatar
Deathcrow committed
339
        process_proc_tcpprobe_optimized(sender) #heavy lifting here
Deathcrow's avatar
Deathcrow committed
340
341
342
343
344
345
        
        try:
            # sleep samplerate minus time spent in loop
            time.sleep(float(args.samplerate) - (time.perf_counter() - while_start))
        except ValueError: # negative sleep
            pass
Deathcrow's avatar
Deathcrow committed
346
#        print("End while")
Deathcrow's avatar
Deathcrow committed
347

Deathcrow's avatar
Deathcrow committed
348
349
else:
    time.sleep(float(args.time))
Deathcrow's avatar
Deathcrow committed
350
    
Deathcrow's avatar
Deathcrow committed
351
352
353
354
355
356
#for i in sender:
#    i['utility'].wait()
# wait doesn't work with multiprocessing (?! returncode always 0, even when netperf still running)

time.sleep(5) # sleep 5 more seconds to allow netperf processes to finish

Deathcrow's avatar
Deathcrow committed
357
358
359

clean_up()

Deathcrow's avatar
Deathcrow committed
360
361
362
if args.legacy:
    process_raw_tcpprobe_file(tcpprobe_file, sender)

Deathcrow's avatar
bugfix    
Deathcrow committed
363
#calculate bandwidth for each sender:
Deathcrow's avatar
Deathcrow committed
364
365
366
367
368
369
370
371
372
373
for key, i in enumerate(sender):
    last_timestamp = 0
    last_seq = 0
    i['tcpprobe_file'].seek(0)
    for line in i['tcpprobe_file']:
        line_split = line.split()
        if last_timestamp == 0 and last_seq == 0:
            last_timestamp = float(line_split[0])
            last_seq = int(line_split[4], 0) #convert from hex-string
        else:
Deathcrow's avatar
Deathcrow committed
374
375
376
377
378
379
380
381
382
383
384
385
386
387
            if abs(float(line_split[0]) - last_timestamp) >= float(args.samplerate):
                bandwidth_file = open(args.tmp_folder+"/bw_"+i['congestion']+"_"+str(key), 'a')
                # print bw for current time interval, convert to Mbps
                # pay attention to seq number wrap-around
                bw = (int(line_split[4], 0) - last_seq) if (int(line_split[4], 0) - last_seq) >= 0 \
                    else (int("0xffffffff",0) - last_seq + int(line_split[4], 0))
                bw = bw / abs(float(line_split[0]) - last_timestamp) # byte/s
                bw = bw * 8 / 1000 / 1000 # Mbps
                bandwidth_file.write(" ".join(line_split[:3]+[str(bw)] )+"\n" )
                bandwidth_file.close()
                last_timestamp = float(line_split[0])
                last_seq = int(line_split[4], 0)
            else:
                pass
Deathcrow's avatar
Deathcrow committed
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410

for i in sender:
    i['tcpprobe_file'].close()


#tcpprobe_file.close()

if args.comment is not "":
    subprocess.call(['cp', os.path.abspath(args.comment), args.tmp_folder])

tar_return = subprocess.call(["tar", "cJf", args.tmp_folder+"/../"+args.label+"_"+str(mypid)+".tar.xz", "-C", os.path.abspath(args.tmp_folder+"/../"), os.path.basename(args.tmp_folder)])
if tar_return is 0:
    #user_input = input("Results in tarfile: "+os.path.abspath(args.tmp_folder+"/../"+args.label+"_"+str(mypid)+".tar.xz")+"\nDelete Folder \""+args.tmp_folder+"\" ? (Y/n) ")
#    if not user_input.lower().startswith("n"):
#        subprocess.call(["rm", "-rf", args.tmp_folder])
    print("Results in tarfile: "+os.path.abspath(args.tmp_folder+"/../"+args.label+"_"+str(mypid)+".tar.xz")+"\nDeleting Folder \""+args.tmp_folder+"\" in 5 seconds (ctrl+c to abort)...")
    time.sleep(5)
    subprocess.call(["rm", "-rf", args.tmp_folder]) 

print("done")

#while True:
#    pass