tcpliveplot.py 4.82 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
import threading
import sys

from collections import deque

# Constants
#VALUES_TO_PLOT = ['cwnd', 'sst', 'rtt', 'bw', 'loss'] # (only values for Y-axis)
# VALUES_TO_PLOT_ON_SECOND_AXIS = ['rtt']
VALUES_TO_PLOT = ['cwnd', 'sst', 'rtt', 'bw'] # (only values for Y-axis)
VALUES_TO_PROCESS = ['time']  + VALUES_TO_PLOT #helper to init all data structures

DEFAULT_LINES_TO_SHOW = ['cwnd']
DEFAULT_FILTER_PORT = 5001


LOG_FORMAT = ['time', 'srcIp', 'srcPort', 'dstIp', 'dstPort', 'cwnd', 'rwnd', 'sst', 'rtt', 'bw', 'loss']
#at least time & dstPort required
NUMBER_OF_VALUES = len(LOG_FORMAT)

THREAD_STOPFLAG_WAIT = 0.000001 # in seconds
THREAD_JOIN_TIMEOUT = 1 # in seconds
THREAD_TESTTIMEOUTS_WAIT = 0.5 # in seconds
THREAD_MISC_WAIT = 2 # in seconds

class TcpLivePlot():
    def __init__(self, inputBackend, outputBackend, options, infoRegistry):
        self.inputBackend = inputBackend
        self.outputBackend = outputBackend
        self.options = options
        self.infoRegistry = infoRegistry

        self.__stopped = threading.Event()
        self.__processInputThread = None
        self.__processInputFilteringThread = None
        self.__processGuiThread = None

        # initialize vars
        self.__incomeBuffer = deque(maxlen=self.options.bufferLength)
        self.__tmpBuffer = deque(maxlen=self.options.bufferLength)
        self.__connectionBuffer = {}
        self.__tmpTimestamp = 0

        if(len(self.options.filterPorts) < 1):
            self.options.filterPorts.append(DEFAULT_FILTER_PORT)
        for i in self.options.filterPorts:
            self.__connectionBuffer[i] = deque(maxlen=self.options.bufferLength)

        if(len(self.options.linesToShow) < 1):
            self.options.linesToShow.extend(DEFAULT_LINES_TO_SHOW)

        # init threads
        self.__processInputThread = threading.Thread(target=self.processInput)
        self.__processInputFilteringThread= threading.Thread(target=self.processInputFiltering)
        self.__processGuiThread = threading.Thread(target=self.processGui)
        self.__processInputThread.daemon = True
        self.__processInputFilteringThread.daemon = True
        self.__processGuiThread.daemon = True
        self.__processInputThread.start()
        self.__processInputFilteringThread.start()
        self.__processGuiThread.start()

        while(not self.__stopped.wait(THREAD_STOPFLAG_WAIT)):
            self.__stopped.wait(0.5)

        if(self.options.debug):
            print("End of main thread reached.")


    def processInput(self):
        """Macro to initialize data retrieval and processing from various sources."""
        while(not self.__stopped.wait(THREAD_STOPFLAG_WAIT)):
            pass
            # newSamples = self.inputBackend.retrieveNewSamples()

    def processGui(self):
        pass


    def processInputFiltering(self):
        """Filters retrieved data by selected port. Drops malformed data."""
        return

        lastTimestamp = 0
        while(True):
            try:
                line = self.__tmpBuffer.popleft()
            except IndexError:
                time.sleep(0.00001)
            else:
                tmpData = line.split(" ")
                if(len(tmpData) is NUMBER_OF_VALUES):
                    data = dict(zip(LOG_FORMAT, tmpData))
                else:
                    continue

                try:
                    timestamp = float(data['time'])
                    port = int(data['dstPort'])
                except ValueError:
                    continue
                else:
                    if(port not in self.options.filterPorts):
                        continue

                filteredData = {}
                try:
                    for val in VALUES_TO_PROCESS:
                        filteredData[val] = float(data[val])
                except ValueError:
                    continue
                else:
                    timestampDelta = lastTimestamp - timestamp
                    if(timestampDelta > self.options.plotResolution):
                        lastTimestamp = timestamp
                        continue
                    self.__connectionBuffer[port].append(filteredData)
                    lastTimestamp = timestamp

    def handleSignals(self, signal, frame):
        """Callback handler for signals"""
        if(not self.options.quiet and not self.options.outputBackend == 'stdout'):
            print("Exiting...")
        self.initTearDown()
        self.tearDown()
        raise SystemExit
        sys.exit(0)

    def initTearDown(self):
        self.__stopped.set()

    def tearDown(self):
        """Performs the cleanup at programm termination."""
        # use input.tearDown()
        if(self.__logType == "socket"):
            self.__socket.close()
        else:
            self.__logFileHandler.close()
        raise SystemExit
        sys.exit