tcpliveplot.py 5.59 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
import threading
import sys

from collections import deque

# Constants
#VALUES_TO_PLOT = ['cwnd', 'sst', 'rtt', 'bw', 'loss'] # (only values for Y-axis)
# VALUES_TO_PLOT_ON_SECOND_AXIS = ['rtt']
VALUES_TO_PLOT = ['cwnd', 'sst', 'rtt', 'bw'] # (only values for Y-axis)
VALUES_TO_PROCESS = ['time']  + VALUES_TO_PLOT #helper to init all data structures

DEFAULT_LINES_TO_SHOW = ['cwnd']
DEFAULT_FILTER_PORT = 5001

19
LOG_FORMAT = ['time', 'rTime', 'srcIp', 'srcPort', 'dstIp', 'dstPort', 'cwnd', 'sst', 'rtt', 'minRtt', 'maxRtt', 'avgRtt', 'meanRtt', 'throughput', 'smoothedThroughput', 'assumedLosses']
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
#at least time & dstPort required
NUMBER_OF_VALUES = len(LOG_FORMAT)

THREAD_STOPFLAG_WAIT = 0.000001 # in seconds
THREAD_JOIN_TIMEOUT = 1 # in seconds
THREAD_TESTTIMEOUTS_WAIT = 0.5 # in seconds
THREAD_MISC_WAIT = 2 # in seconds

class TcpLivePlot():
    def __init__(self, inputBackend, outputBackend, options, infoRegistry):
        self.inputBackend = inputBackend
        self.outputBackend = outputBackend
        self.options = options
        self.infoRegistry = infoRegistry

        self.__stopped = threading.Event()
        self.__processInputThread = None
        self.__processInputFilteringThread = None
        self.__processGuiThread = None

        # initialize vars
41 42
        self.incomeBuffer = deque(maxlen=self.options.bufferLength)
        self.connectionBuffer = {}
43 44
        self.__tmpTimestamp = 0

45 46
        self.outputBackend.setConnectionBuffer(self.connectionBuffer)

47 48
        if(len(self.options.filterPorts) < 1):
            self.options.filterPorts.append(DEFAULT_FILTER_PORT)
49 50
        # for i in self.options.filterPorts:
        #     self.__connectionBuffer[i] = deque(maxlen=self.options.bufferLength)
51 52 53

        # init threads
        self.__processInputThread = threading.Thread(target=self.processInput)
54
        # self.__processInputFilteringThread= threading.Thread(target=self.processInputFiltering)
55 56
        self.__processGuiThread = threading.Thread(target=self.processGui)
        self.__processInputThread.daemon = True
57
        # self.__processInputFilteringThread.daemon = True
58 59
        self.__processGuiThread.daemon = True
        self.__processInputThread.start()
60 61
        # self.__processInputFilteringThread.start()
        # self.__processGuiThread.start()
62 63

        while(not self.__stopped.wait(THREAD_STOPFLAG_WAIT)):
64 65
            # self.__stopped.wait(0.5)
            self.processGui()
66 67 68 69 70 71

        if(self.options.debug):
            print("End of main thread reached.")


    def processInput(self):
72 73 74
        """
        Thread reads from input sources (socket/file/stdin)
        """
75
        while(not self.__stopped.wait(THREAD_STOPFLAG_WAIT)):
76 77 78 79 80 81 82 83 84 85 86
            newSamples = self.inputBackend.retrieveNewSamples()
            for sample in newSamples:
                tmpData = sample.split(" ")
                if(len(tmpData) == len(LOG_FORMAT)):
                    data = dict(zip(LOG_FORMAT, tmpData))
                    # print(data)
                    port = int(data['dstPort'])
                    if(port not in self.connectionBuffer):
                        self.connectionBuffer[port] = deque()
                    self.connectionBuffer[port].append(data)
                    # self.incomeBuffer.append(sample)
87 88

    def processGui(self):
89 90 91 92 93 94 95 96 97
        # while(not self.__stopped.wait(THREAD_STOPFLAG_WAIT)):
        #     try:
        #         line = self.incomeBuffer.popleft()
        #     except IndexError:
        #         pass
        #     else:
        # import matplotlib.pyplot as plt
        self.outputBackend.plotGraph()
        # plt.show()
98 99 100


    def processInputFiltering(self):
101 102 103
        """
        Filters retrieved data by selected port. Drops malformed data.
        """
104 105 106 107 108
        return

        lastTimestamp = 0
        while(True):
            try:
109
                line = self.incomeBuffer.popleft()
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
            except IndexError:
                time.sleep(0.00001)
            else:
                tmpData = line.split(" ")
                if(len(tmpData) is NUMBER_OF_VALUES):
                    data = dict(zip(LOG_FORMAT, tmpData))
                else:
                    continue

                try:
                    timestamp = float(data['time'])
                    port = int(data['dstPort'])
                except ValueError:
                    continue
                else:
                    if(port not in self.options.filterPorts):
                        continue

                filteredData = {}
                try:
                    for val in VALUES_TO_PROCESS:
                        filteredData[val] = float(data[val])
                except ValueError:
                    continue
                else:
                    timestampDelta = lastTimestamp - timestamp
                    if(timestampDelta > self.options.plotResolution):
                        lastTimestamp = timestamp
                        continue
139
                    self.connectionBuffer[port].append(filteredData)
140 141 142
                    lastTimestamp = timestamp

    def handleSignals(self, signal, frame):
143 144 145
        """
        Callback handler for signals
        """
146 147 148 149 150 151 152 153
        if(not self.options.quiet and not self.options.outputBackend == 'stdout'):
            print("Exiting...")
        self.initTearDown()
        self.tearDown()
        raise SystemExit
        sys.exit(0)

    def initTearDown(self):
154 155 156
        """
        Sets stops signal for all threads.
        """
157 158 159
        self.__stopped.set()

    def tearDown(self):
160 161 162 163 164
        """
        Performs the cleanup at programm termination.
        """
        self.inputBackend.tearDown()
        self.outputBackend.tearDown()