tcpliveplot.py 6.37 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
import threading
import sys

from collections import deque

# Constants
#VALUES_TO_PLOT = ['cwnd', 'sst', 'rtt', 'bw', 'loss'] # (only values for Y-axis)
# VALUES_TO_PLOT_ON_SECOND_AXIS = ['rtt']
VALUES_TO_PLOT = ['cwnd', 'sst', 'rtt', 'bw'] # (only values for Y-axis)
VALUES_TO_PROCESS = ['time']  + VALUES_TO_PLOT #helper to init all data structures

DEFAULT_LINES_TO_SHOW = ['cwnd']
DEFAULT_FILTER_PORT = 5001

19
LOG_FORMAT = ['time', 'rTime', 'srcIp', 'srcPort', 'dstIp', 'dstPort', 'cwnd', 'sst', 'rtt', 'minRtt', 'maxRtt', 'avgRtt', 'meanRtt', 'throughput', 'smoothedThroughput', 'assumedLosses']
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
#at least time & dstPort required
NUMBER_OF_VALUES = len(LOG_FORMAT)

THREAD_STOPFLAG_WAIT = 0.000001 # in seconds
THREAD_JOIN_TIMEOUT = 1 # in seconds
THREAD_TESTTIMEOUTS_WAIT = 0.5 # in seconds
THREAD_MISC_WAIT = 2 # in seconds

class TcpLivePlot():
    def __init__(self, inputBackend, outputBackend, options, infoRegistry):
        self.inputBackend = inputBackend
        self.outputBackend = outputBackend
        self.options = options
        self.infoRegistry = infoRegistry

        self.__stopped = threading.Event()
        self.__processInputThread = None
        self.__processInputFilteringThread = None
        self.__processGuiThread = None

        # initialize vars
41 42
        self.incomeBuffer = deque(maxlen=self.options.bufferLength)
        self.connectionBuffer = {}
43 44
        self.__tmpTimestamp = 0

45 46
        self.outputBackend.setConnectionBuffer(self.connectionBuffer)

47 48
        if(len(self.options.filterPorts) < 1):
            self.options.filterPorts.append(DEFAULT_FILTER_PORT)
49 50
        # for i in self.options.filterPorts:
        #     self.__connectionBuffer[i] = deque(maxlen=self.options.bufferLength)
51 52 53

        # init threads
        self.__processInputThread = threading.Thread(target=self.processInput)
54
        # self.__processInputFilteringThread= threading.Thread(target=self.processInputFiltering)
55 56
        self.__processGuiThread = threading.Thread(target=self.processGui)
        self.__processInputThread.daemon = True
57
        # self.__processInputFilteringThread.daemon = True
58 59
        self.__processGuiThread.daemon = True
        self.__processInputThread.start()
60 61
        # self.__processInputFilteringThread.start()
        # self.__processGuiThread.start()
62 63

        while(not self.__stopped.wait(THREAD_STOPFLAG_WAIT)):
64 65
            # self.__stopped.wait(0.5)
            self.processGui()
66 67 68 69 70 71

        if(self.options.debug):
            print("End of main thread reached.")


    def processInput(self):
72 73 74
        """
        Thread reads from input sources (socket/file/stdin)
        """
75
        while(not self.__stopped.wait(THREAD_STOPFLAG_WAIT)):
76 77 78 79 80 81
            newSamples = self.inputBackend.retrieveNewSamples()
            for sample in newSamples:
                tmpData = sample.split(" ")
                if(len(tmpData) == len(LOG_FORMAT)):
                    data = dict(zip(LOG_FORMAT, tmpData))
                    # print(data)
Michael König's avatar
Michael König committed
82 83 84 85 86 87 88 89 90 91 92 93
                    try:
                        srcPort = int(data['srcPort'])
                        dstPort = int(data['dstPort'])
                    except ValueError:
                        continue
                    else:
                        if(dstPort in self.options.filterPorts):
                            flowIdentifier = str(srcPort) + "-" + str(dstPort)
                            if(flowIdentifier not in self.connectionBuffer):
                                self.connectionBuffer[flowIdentifier] = deque()
                            self.connectionBuffer[flowIdentifier].append(data)
                            # self.incomeBuffer.append(sample)
94 95

    def processGui(self):
96 97 98 99 100 101 102 103 104
        # while(not self.__stopped.wait(THREAD_STOPFLAG_WAIT)):
        #     try:
        #         line = self.incomeBuffer.popleft()
        #     except IndexError:
        #         pass
        #     else:
        # import matplotlib.pyplot as plt
        self.outputBackend.plotGraph()
        # plt.show()
105 106


107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
    # def processInputFiltering(self):
    #     """
    #     Filters retrieved data by selected port. Drops malformed data.
    #     """
    #     return
    #
    #     lastTimestamp = 0
    #     while(True):
    #         try:
    #             line = self.incomeBuffer.popleft()
    #         except IndexError:
    #             time.sleep(0.00001)
    #         else:
    #             tmpData = line.split(" ")
    #             if(len(tmpData) is NUMBER_OF_VALUES):
    #                 data = dict(zip(LOG_FORMAT, tmpData))
    #             else:
    #                 continue
    #
    #             try:
    #                 timestamp = float(data['time'])
    #                 srcPort = int(data['srcPort'])
    #                 dstPort = int(data['dstPort'])
    #                 flowIdentifier = str(srcPort) + "-" + str(dstPort)
    #             except ValueError:
    #                 continue
    #             else:
    #                 if(dstPort in self.options.filterPorts):
    #                     print("bar")
    #                     filteredData = {}
    #                     try:
    #                         for val in VALUES_TO_PROCESS:
    #                             filteredData[val] = float(data[val])
    #                     except ValueError:
    #                         continue
    #                     else:
    #                         timestampDelta = lastTimestamp - timestamp
    #                         if(timestampDelta > self.options.plotResolution):
    #                             lastTimestamp = timestamp
    #                             continue
    #                         self.connectionBuffer[flowIdentifier].append(filteredData)
    #                         lastTimestamp = timestamp
    #                 else:
    #                     print("bar")
151 152

    def handleSignals(self, signal, frame):
153 154 155
        """
        Callback handler for signals
        """
156 157 158 159 160 161 162 163
        if(not self.options.quiet and not self.options.outputBackend == 'stdout'):
            print("Exiting...")
        self.initTearDown()
        self.tearDown()
        raise SystemExit
        sys.exit(0)

    def initTearDown(self):
164 165 166
        """
        Sets stops signal for all threads.
        """
167 168 169
        self.__stopped.set()

    def tearDown(self):
170 171 172 173 174
        """
        Performs the cleanup at programm termination.
        """
        self.inputBackend.tearDown()
        self.outputBackend.tearDown()