tcpliveplot.py 6.21 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
import threading
import sys

from collections import deque

# Constants
#VALUES_TO_PLOT = ['cwnd', 'sst', 'rtt', 'bw', 'loss'] # (only values for Y-axis)
# VALUES_TO_PLOT_ON_SECOND_AXIS = ['rtt']
VALUES_TO_PLOT = ['cwnd', 'sst', 'rtt', 'bw'] # (only values for Y-axis)
VALUES_TO_PROCESS = ['time']  + VALUES_TO_PLOT #helper to init all data structures

DEFAULT_LINES_TO_SHOW = ['cwnd']
DEFAULT_FILTER_PORT = 5001

19
LOG_FORMAT = ['time', 'rTime', 'srcIp', 'srcPort', 'dstIp', 'dstPort', 'cwnd', 'sst', 'rtt', 'minRtt', 'maxRtt', 'avgRtt', 'meanRtt', 'throughput', 'smoothedThroughput', 'assumedLosses']
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
#at least time & dstPort required
NUMBER_OF_VALUES = len(LOG_FORMAT)

THREAD_STOPFLAG_WAIT = 0.000001 # in seconds
THREAD_JOIN_TIMEOUT = 1 # in seconds
THREAD_TESTTIMEOUTS_WAIT = 0.5 # in seconds
THREAD_MISC_WAIT = 2 # in seconds

class TcpLivePlot():
    def __init__(self, inputBackend, outputBackend, options, infoRegistry):
        self.inputBackend = inputBackend
        self.outputBackend = outputBackend
        self.options = options
        self.infoRegistry = infoRegistry

        self.__stopped = threading.Event()
        self.__processInputThread = None
        self.__processInputFilteringThread = None
        self.__processGuiThread = None

        # initialize vars
41 42
        self.incomeBuffer = deque(maxlen=self.options.bufferLength)
        self.connectionBuffer = {}
43 44
        self.__tmpTimestamp = 0

45 46
        self.outputBackend.setConnectionBuffer(self.connectionBuffer)

47 48
        if(len(self.options.filterPorts) < 1):
            self.options.filterPorts.append(DEFAULT_FILTER_PORT)
49 50
        # for i in self.options.filterPorts:
        #     self.__connectionBuffer[i] = deque(maxlen=self.options.bufferLength)
51 52 53

        # init threads
        self.__processInputThread = threading.Thread(target=self.processInput)
54
        # self.__processInputFilteringThread= threading.Thread(target=self.processInputFiltering)
55 56
        self.__processGuiThread = threading.Thread(target=self.processGui)
        self.__processInputThread.daemon = True
57
        # self.__processInputFilteringThread.daemon = True
58 59
        self.__processGuiThread.daemon = True
        self.__processInputThread.start()
60 61
        # self.__processInputFilteringThread.start()
        # self.__processGuiThread.start()
62 63

        while(not self.__stopped.wait(THREAD_STOPFLAG_WAIT)):
64 65
            # self.__stopped.wait(0.5)
            self.processGui()
66 67 68 69 70 71

        if(self.options.debug):
            print("End of main thread reached.")


    def processInput(self):
72 73 74
        """
        Thread reads from input sources (socket/file/stdin)
        """
75
        while(not self.__stopped.wait(THREAD_STOPFLAG_WAIT)):
76 77 78 79 80 81
            newSamples = self.inputBackend.retrieveNewSamples()
            for sample in newSamples:
                tmpData = sample.split(" ")
                if(len(tmpData) == len(LOG_FORMAT)):
                    data = dict(zip(LOG_FORMAT, tmpData))
                    # print(data)
82 83 84 85 86 87 88 89
                    srcPort = int(data['srcPort'])
                    dstPort = int(data['dstPort'])
                    if(dstPort in self.options.filterPorts):
                        flowIdentifier = str(srcPort) + "-" + str(dstPort)
                        if(flowIdentifier not in self.connectionBuffer):
                            self.connectionBuffer[flowIdentifier] = deque()
                        self.connectionBuffer[flowIdentifier].append(data)
                        # self.incomeBuffer.append(sample)
90 91

    def processGui(self):
92 93 94 95 96 97 98 99 100
        # while(not self.__stopped.wait(THREAD_STOPFLAG_WAIT)):
        #     try:
        #         line = self.incomeBuffer.popleft()
        #     except IndexError:
        #         pass
        #     else:
        # import matplotlib.pyplot as plt
        self.outputBackend.plotGraph()
        # plt.show()
101 102


103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
    # def processInputFiltering(self):
    #     """
    #     Filters retrieved data by selected port. Drops malformed data.
    #     """
    #     return
    #
    #     lastTimestamp = 0
    #     while(True):
    #         try:
    #             line = self.incomeBuffer.popleft()
    #         except IndexError:
    #             time.sleep(0.00001)
    #         else:
    #             tmpData = line.split(" ")
    #             if(len(tmpData) is NUMBER_OF_VALUES):
    #                 data = dict(zip(LOG_FORMAT, tmpData))
    #             else:
    #                 continue
    #
    #             try:
    #                 timestamp = float(data['time'])
    #                 srcPort = int(data['srcPort'])
    #                 dstPort = int(data['dstPort'])
    #                 flowIdentifier = str(srcPort) + "-" + str(dstPort)
    #             except ValueError:
    #                 continue
    #             else:
    #                 if(dstPort in self.options.filterPorts):
    #                     print("bar")
    #                     filteredData = {}
    #                     try:
    #                         for val in VALUES_TO_PROCESS:
    #                             filteredData[val] = float(data[val])
    #                     except ValueError:
    #                         continue
    #                     else:
    #                         timestampDelta = lastTimestamp - timestamp
    #                         if(timestampDelta > self.options.plotResolution):
    #                             lastTimestamp = timestamp
    #                             continue
    #                         self.connectionBuffer[flowIdentifier].append(filteredData)
    #                         lastTimestamp = timestamp
    #                 else:
    #                     print("bar")
147 148

    def handleSignals(self, signal, frame):
149 150 151
        """
        Callback handler for signals
        """
152 153 154 155 156 157 158 159
        if(not self.options.quiet and not self.options.outputBackend == 'stdout'):
            print("Exiting...")
        self.initTearDown()
        self.tearDown()
        raise SystemExit
        sys.exit(0)

    def initTearDown(self):
160 161 162
        """
        Sets stops signal for all threads.
        """
163 164 165
        self.__stopped.set()

    def tearDown(self):
166 167 168 169 170
        """
        Performs the cleanup at programm termination.
        """
        self.inputBackend.tearDown()
        self.outputBackend.tearDown()