Commit ff6a55aa authored by Michael König (Student)'s avatar Michael König (Student)
Browse files

dynamic detection of new flows

parent 0ca7ef2e
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# TODO:
# * dynamic flow detection
# * flowIdentifer i/o "port"
# * fix socket-input
# * dynamic value detection
import numpy as np import numpy as np
import math import math
import sys import sys
import time import time
...@@ -19,7 +12,7 @@ VALUES_TO_PLOT = ['cwnd', 'sst', 'rtt', 'smoothedThroughput'] # (only values for ...@@ -19,7 +12,7 @@ VALUES_TO_PLOT = ['cwnd', 'sst', 'rtt', 'smoothedThroughput'] # (only values for
VALUES_TO_PROCESS = ['time'] + VALUES_TO_PLOT #helper to init all data structures VALUES_TO_PROCESS = ['time'] + VALUES_TO_PLOT #helper to init all data structures
# Strings for UI-elements # Strings for UI-elements
FIGURE_TITLE = "TCPplot" FIGURE_TITLE = "TCPlivePLOT"
PLOT_TITLE = "Data from" PLOT_TITLE = "Data from"
PAUSE = "Pause" PAUSE = "Pause"
QUIT = "Quit" QUIT = "Quit"
...@@ -40,6 +33,14 @@ class LiveGui(GuiBase): ...@@ -40,6 +33,14 @@ class LiveGui(GuiBase):
self.infoRegistry = infoRegistry self.infoRegistry = infoRegistry
self.__stopped = threading.Event() self.__stopped = threading.Event()
self.timestampOfLastGuiRefresh = 0 self.timestampOfLastGuiRefresh = 0
self.__lastPlotTimestamp = {}
self.__lastDrawTimestamp = 0
self.__initRealtimeTimestamp = 0
self.__initSampletimeTimestamp = -1
self.xmin = 0
self.xmax = self.xmin + self.options.xDelta
self.flows = []
self.lineVisibility = {}
if(self.options.debug): if(self.options.debug):
print("matplotlib-version: " + matplotlib.__version__) print("matplotlib-version: " + matplotlib.__version__)
...@@ -58,7 +59,8 @@ class LiveGui(GuiBase): ...@@ -58,7 +59,8 @@ class LiveGui(GuiBase):
pass pass
def startupCheck(self): def startupCheck(self):
pass for val in VALUES_TO_PLOT:
self.lineVisibility[val] = True
def pause(self, event): def pause(self, event):
"""Toggles pause flag.""" """Toggles pause flag."""
...@@ -67,17 +69,18 @@ class LiveGui(GuiBase): ...@@ -67,17 +69,18 @@ class LiveGui(GuiBase):
def toggleVisibility(self, lineID): def toggleVisibility(self, lineID):
"""Toggles visibility for given line.""" """Toggles visibility for given line."""
for port in self.options.filterPorts: self.lineVisibility[lineID] ^= True
self.__plotLineConfigs[port][lineID] ^= True for flowIdentifier in self.__connectionBuffer:
self.__plotLines[port][lineID].set_visible(self.__plotLineConfigs[port][lineID]) self.__plotLineConfigs[flowIdentifier][lineID] ^= True
self.__plotLines[flowIdentifier][lineID].set_visible(self.__plotLineConfigs[flowIdentifier][lineID])
self.drawPlotLegend() self.drawPlotLegend()
def updateValueVisibility(self, label): def updateValueVisibility(self, label):
for port in self.options.filterPorts: for flowIdentifier in self.__connectionBuffer:
for i in range(1, len(VALUES_TO_PLOT)+1): for i in range(1, len(VALUES_TO_PLOT)+1):
self.__plotLineConfigs[port][VALUES_TO_PLOT[(i-1)]] = False self.__plotLineConfigs[flowIdentifier][VALUES_TO_PLOT[(i-1)]] = False
self.__plotLines[port][(VALUES_TO_PLOT[(i-1)])].set_visible(self.__plotLineConfigs[port][(VALUES_TO_PLOT[(i-1)])]) self.__plotLines[flowIdentifier][(VALUES_TO_PLOT[(i-1)])].set_visible(self.__plotLineConfigs[flowIdentifier][(VALUES_TO_PLOT[(i-1)])])
if label == 'cwnd': if label == 'cwnd':
self.toggleVisibility(VALUES_TO_PLOT[0]) self.toggleVisibility(VALUES_TO_PLOT[0])
elif label == 'sst': elif label == 'sst':
...@@ -91,17 +94,22 @@ class LiveGui(GuiBase): ...@@ -91,17 +94,22 @@ class LiveGui(GuiBase):
def drawPlotLegend(self): def drawPlotLegend(self):
"""(Re)draws legend with visible lines.""" """(Re)draws legend with visible lines."""
labelObjs = [] if len(self.__connectionBuffer) > 0:
labelTexts = [] labelObjs = []
for port in self.options.filterPorts: labelTexts = []
for val in VALUES_TO_PLOT: for flowIdentifier in self.__connectionBuffer:
if(self.__plotLineConfigs[port][val]): if(flowIdentifier in self.flows):
labelObjs.append(self.__plotLines[port][val]) for val in VALUES_TO_PLOT:
labelTexts.append(self.__plotLines[port][val].get_label()) if(self.__plotLineConfigs[flowIdentifier][val]):
if(len(labelObjs) > 0): labelObjs.append(self.__plotLines[flowIdentifier][val])
self.__ax.legend(labelObjs, labelTexts, fontsize='small') labelTexts.append(self.__plotLines[flowIdentifier][val].get_label())
else: if(len(labelObjs) > 0):
self.__ax.legend_.remove() self.legendVisible = True
self.__ax.legend(labelObjs, labelTexts, fontsize='small')
else:
if self.legendVisible:
self.legendVisible = False
self.__ax.legend_.remove()
def plotKeyPressCallback(self, event): def plotKeyPressCallback(self, event):
"""Callback to handle key presses.""" """Callback to handle key presses."""
...@@ -129,8 +137,63 @@ class LiveGui(GuiBase): ...@@ -129,8 +137,63 @@ class LiveGui(GuiBase):
self.__tmpTimestamp = time.perf_counter() self.__tmpTimestamp = time.perf_counter()
self.tearDown() self.tearDown()
def updateFlowDataStructures(self):
for flowIdentifier in self.__connectionBuffer:
if(flowIdentifier not in self.flows):
self.initFlowDataStructures(flowIdentifier)
else:
outdated = self.__lastPlotTimestamp[flowIdentifier] < self.xmin
if(outdated):
self.destroyFlowDataStructures(flowIdentifier)
self.drawPlotLegend()
def initFlowDataStructures(self, flowIdentifier):
splittedFlowidentifier = flowIdentifier.split("-")
srcPort = splittedFlowidentifier[0]
dstPort = splittedFlowidentifier[1]
dstPortCount = 1
for preexistingFlow in self.flows:
flowDstPort = preexistingFlow.split("-")[1]
if flowDstPort == dstPort:
dstPortCount += 1
self.flows.append(flowIdentifier)
self.__lastPlotTimestamp[flowIdentifier] = 0
self.__plotLines[flowIdentifier] = {}
self.__plotValues[flowIdentifier] = {}
self.__plotValuesMin[flowIdentifier] = {}
self.__plotValuesMax[flowIdentifier] = {}
self.__plotLineConfigs[flowIdentifier] = {}
self.__plotLineConfigs[flowIdentifier]['lastTimestamp'] = 0
for val in VALUES_TO_PROCESS:
self.__plotValuesMin[flowIdentifier][val] = math.inf
self.__plotValuesMax[flowIdentifier][val] = -math.inf
self.__plotValues[flowIdentifier][val] = deque(maxlen=(int(self.options.xDelta / self.options.plotResolution * 10)))
index = 1
for val in VALUES_TO_PLOT:
self.__plotLines[flowIdentifier][val], = self.__ax.plot([])
self.__plotLines[flowIdentifier][val].set_label("[" + str(index) + "] " + val + " - " + str(dstPort) + " #" + str(dstPortCount) + "")
self.__plotLineConfigs[flowIdentifier][val] = self.lineVisibility[val]
self.__plotLines[flowIdentifier][val].set_visible(self.lineVisibility[val])
self.__plotLines[flowIdentifier][val].set_data([], [])
index += 1
def destroyFlowDataStructures(self, flowIdentifier):
self.flows.remove(flowIdentifier)
pass
def plotGraph(self): def plotGraph(self):
"""Initializes plot configuration and starts the plotting.""" """Initializes plot configuration and starts the plotting."""
while(len(self.__connectionBuffer) < 1):
print("waiting for data on filtered flows...")
time.sleep(0.5)
if(self.options.debug):
print("initializing...")
time.sleep(1)
self.__paused = False self.__paused = False
self.__minVal = 9999999999 self.__minVal = 9999999999
self.__maxVal = 0 self.__maxVal = 0
...@@ -148,29 +211,8 @@ class LiveGui(GuiBase): ...@@ -148,29 +211,8 @@ class LiveGui(GuiBase):
self.__plotValuesMin = {} self.__plotValuesMin = {}
self.__plotValuesMax = {} self.__plotValuesMax = {}
self.__plotLineConfigs = {} self.__plotLineConfigs = {}
for port in self.options.filterPorts:
self.__plotLines[port] = {} self.updateFlowDataStructures()
self.__plotValues[port] = {}
self.__plotValuesMin[port] = {}
self.__plotValuesMax[port] = {}
self.__plotLineConfigs[port] = {}
self.__plotLineConfigs[port]['lastTimestamp'] = 0
for val in VALUES_TO_PROCESS:
self.__plotValuesMin[port][val] = math.inf
self.__plotValuesMax[port][val] = -math.inf
self.__plotValues[port][val] = deque(maxlen=(int(self.options.xDelta / self.options.plotResolution * 10)))
index = 1
for val in VALUES_TO_PLOT:
# if(val == "rtt"):
# print("bla")
# self.__plotLines[port][val], = self.__ax2.plot([])
# else:
self.__plotLines[port][val], = self.__ax.plot([])
self.__plotLines[port][val].set_label("[" + str(index) + "] " + val + " - " + str(port) + "")
self.__plotLineConfigs[port][val] = True
self.__plotLines[port][val].set_visible(True)
index += 1
self.drawPlotLegend()
# pause button # pause button
pauseAx = plt.axes([0.8, 0.025, 0.1, 0.04]) pauseAx = plt.axes([0.8, 0.025, 0.1, 0.04])
...@@ -196,12 +238,6 @@ class LiveGui(GuiBase): ...@@ -196,12 +238,6 @@ class LiveGui(GuiBase):
else: else:
self.__preloading = False self.__preloading = False
self.__lastPlotTimestamp = {}
for port in self.options.filterPorts:
self.__lastPlotTimestamp[port] = 0
self.__lastDrawTimestamp = 0
self.__initRealtimeTimestamp = 0
self.__initSampletimeTimestamp = -1
self.__timeOffset = 0 self.__timeOffset = 0
self.__bufferFactor = 1 self.__bufferFactor = 1
...@@ -222,9 +258,9 @@ class LiveGui(GuiBase): ...@@ -222,9 +258,9 @@ class LiveGui(GuiBase):
def returnAllLines(self): def returnAllLines(self):
"""Macro to return all lines as they are.""" """Macro to return all lines as they are."""
allPlotLines = [] allPlotLines = []
for port in self.options.filterPorts: for flowIdentifier in self.__connectionBuffer:
for val in VALUES_TO_PLOT: for val in VALUES_TO_PLOT:
allPlotLines.append(self.__plotLines[port][val]) allPlotLines.append(self.__plotLines[flowIdentifier][val])
return tuple(allPlotLines) return tuple(allPlotLines)
def returnNanSample(self, time): def returnNanSample(self, time):
...@@ -237,6 +273,9 @@ class LiveGui(GuiBase): ...@@ -237,6 +273,9 @@ class LiveGui(GuiBase):
def plotGraphUpdate(self, i): def plotGraphUpdate(self, i):
"""Animation loop - does the actual plot update.""" """Animation loop - does the actual plot update."""
self.updateFlowDataStructures()
if(self.__initSampletimeTimestamp == -1): if(self.__initSampletimeTimestamp == -1):
self.__initSampletimeTimestamp = 0 self.__initSampletimeTimestamp = 0
return self.returnAllLines() return self.returnAllLines()
...@@ -247,8 +286,8 @@ class LiveGui(GuiBase): ...@@ -247,8 +286,8 @@ class LiveGui(GuiBase):
# fill playback-buffer # fill playback-buffer
if(False and self.__preloading): if(False and self.__preloading):
bufferLength = -1 bufferLength = -1
for port in self.options.filterPorts: for flowIdentifier in self.__connectionBuffer:
bufferLength = max(bufferLength, len(self.__connectionBuffer[port])) bufferLength = max(bufferLength, len(self.__connectionBuffer[flowIdentifier]))
if(bufferLength > 0): if(bufferLength > 0):
bufferedTime = bufferLength * self.options.plotResolution bufferedTime = bufferLength * self.options.plotResolution
...@@ -275,6 +314,8 @@ class LiveGui(GuiBase): ...@@ -275,6 +314,8 @@ class LiveGui(GuiBase):
currentYmin, currentYmax = self.__ax.get_ylim() currentYmin, currentYmax = self.__ax.get_ylim()
newXmax = currentTimestamp - self.options.preloadBuffer newXmax = currentTimestamp - self.options.preloadBuffer
newXmin = newXmax - self.options.xDelta newXmin = newXmax - self.options.xDelta
self.xmin = newXmin
self.xmax = newXmax
self.__ax.set_xlim(newXmin, newXmax) self.__ax.set_xlim(newXmin, newXmax)
maxYval = -math.inf maxYval = -math.inf
...@@ -285,31 +326,34 @@ class LiveGui(GuiBase): ...@@ -285,31 +326,34 @@ class LiveGui(GuiBase):
if(timestampDelta < self.options.plotResolution): if(timestampDelta < self.options.plotResolution):
return self.returnAllLines() return self.returnAllLines()
for port in self.options.filterPorts: for flowIdentifier in self.__connectionBuffer:
connectionsData[port] = deque() connectionsData[flowIdentifier] = deque()
whileRun = True whileRun = True
while(len(self.__connectionBuffer[port]) > 0 and whileRun): while(len(self.__connectionBuffer[flowIdentifier]) > 0 and whileRun):
try: try:
data = self.__connectionBuffer[port].popleft() data = self.__connectionBuffer[flowIdentifier].popleft()
except IndexError: except IndexError:
whileRun = False whileRun = False
pass pass
else: else:
if(flowIdentifier not in self.flows):
continue
lineTime = self.__initRealtimeTimestamp + (float(data['time']) - self.__initSampletimeTimestamp) lineTime = self.__initRealtimeTimestamp + (float(data['time']) - self.__initSampletimeTimestamp)
# time in past # time in past
if(lineTime < newXmin): if(lineTime < newXmin):
continue continue
# time older than newst timestamp # time older than newst timestamp
elif(lineTime < self.__lastPlotTimestamp[port]): elif(lineTime < self.__lastPlotTimestamp[flowIdentifier]):
continue continue
# skip this sample due plot plotResolution # skip this sample due plot plotResolution
elif((lineTime - self.__lastPlotTimestamp[port]) < self.options.plotResolution): elif((lineTime - self.__lastPlotTimestamp[flowIdentifier]) < self.options.plotResolution):
continue continue
else: else:
if(self.__lastPlotTimestamp[port] > 0 and ((lineTime - self.__lastPlotTimestamp[port]) > CLEAR_GAP)): if(self.__lastPlotTimestamp[flowIdentifier] > 0 and ((lineTime - self.__lastPlotTimestamp[flowIdentifier]) > CLEAR_GAP)):
self.__lastPlotTimestamp[port] = lineTime self.__lastPlotTimestamp[flowIdentifier] = lineTime
nanSample = self.returnNanSample(lineTime) nanSample = self.returnNanSample(lineTime)
connectionsData[port].append(nanSample) connectionsData[flowIdentifier].append(nanSample)
infinityReached = False infinityReached = False
for val in VALUES_TO_PLOT: for val in VALUES_TO_PLOT:
try: try:
...@@ -324,16 +368,16 @@ class LiveGui(GuiBase): ...@@ -324,16 +368,16 @@ class LiveGui(GuiBase):
# infinityReached = True # infinityReached = True
if(not infinityReached): if(not infinityReached):
self.__lastPlotTimestamp[port] = lineTime self.__lastPlotTimestamp[flowIdentifier] = lineTime
connectionsData[port].append(data) connectionsData[flowIdentifier].append(data)
data = 0 data = 0
for port in connectionsData: for flowIdentifier in connectionsData:
if(len(connectionsData[port]) > 0): if(len(connectionsData[flowIdentifier]) > 0):
data += 1 data += 1
for port in self.options.filterPorts: for flowIdentifier in self.__connectionBuffer:
if(data < 1 and currentTimestamp > self.__lastPlotTimestamp[port] ): if(data < 1 and currentTimestamp > self.__lastPlotTimestamp[flowIdentifier] ):
if(self.options.debug): if(self.options.debug):
print("No data for any connection.") print("No data for any connection.")
if(self.options.interimBuffering): if(self.options.interimBuffering):
...@@ -411,9 +455,10 @@ class LiveGui(GuiBase): ...@@ -411,9 +455,10 @@ class LiveGui(GuiBase):
def plotInit(self): def plotInit(self):
"""Helper to initialize plot.""" """Helper to initialize plot."""
for port in self.options.filterPorts: for flowIdentifier in self.__connectionBuffer:
for val in VALUES_TO_PLOT: for val in VALUES_TO_PLOT:
self.__plotLines[port][val].set_data([], []) self.__plotLines[flowIdentifier][val].set_data([], [])
newXmin = 0 newXmin = 0
newXmax = newXmin + self.options.xDelta newXmax = newXmin + self.options.xDelta
...@@ -426,9 +471,9 @@ class LiveGui(GuiBase): ...@@ -426,9 +471,9 @@ class LiveGui(GuiBase):
def calculateSampleTimeOffset(self): def calculateSampleTimeOffset(self):
"""Calculate SampleTime difference at start""" """Calculate SampleTime difference at start"""
for port in self.options.filterPorts: for flowIdentifier in self.__connectionBuffer:
try: try:
data = self.__connectionBuffer[port].popleft() data = self.__connectionBuffer[flowIdentifier].popleft()
except IndexError: except IndexError:
pass pass
except KeyError: except KeyError:
...@@ -436,7 +481,7 @@ class LiveGui(GuiBase): ...@@ -436,7 +481,7 @@ class LiveGui(GuiBase):
else: else:
# print(data) # print(data)
#re-add first sample (to head of dequeue) #re-add first sample (to head of dequeue)
self.__connectionBuffer[port].appendleft(data) self.__connectionBuffer[flowIdentifier].appendleft(data)
self.__initSampletimeTimestamp = float(data['time']) self.__initSampletimeTimestamp = float(data['time'])
return return
...@@ -79,11 +79,14 @@ class TcpLivePlot(): ...@@ -79,11 +79,14 @@ class TcpLivePlot():
if(len(tmpData) == len(LOG_FORMAT)): if(len(tmpData) == len(LOG_FORMAT)):
data = dict(zip(LOG_FORMAT, tmpData)) data = dict(zip(LOG_FORMAT, tmpData))
# print(data) # print(data)
port = int(data['dstPort']) srcPort = int(data['srcPort'])
if(port not in self.connectionBuffer): dstPort = int(data['dstPort'])
self.connectionBuffer[port] = deque() if(dstPort in self.options.filterPorts):
self.connectionBuffer[port].append(data) flowIdentifier = str(srcPort) + "-" + str(dstPort)
# self.incomeBuffer.append(sample) if(flowIdentifier not in self.connectionBuffer):
self.connectionBuffer[flowIdentifier] = deque()
self.connectionBuffer[flowIdentifier].append(data)
# self.incomeBuffer.append(sample)
def processGui(self): def processGui(self):
# while(not self.__stopped.wait(THREAD_STOPFLAG_WAIT)): # while(not self.__stopped.wait(THREAD_STOPFLAG_WAIT)):
...@@ -97,47 +100,50 @@ class TcpLivePlot(): ...@@ -97,47 +100,50 @@ class TcpLivePlot():
# plt.show() # plt.show()
def processInputFiltering(self): # def processInputFiltering(self):
""" # """
Filters retrieved data by selected port. Drops malformed data. # Filters retrieved data by selected port. Drops malformed data.
""" # """
return # return
#
lastTimestamp = 0 # lastTimestamp = 0
while(True): # while(True):
try: # try:
line = self.incomeBuffer.popleft() # line = self.incomeBuffer.popleft()
except IndexError: # except IndexError:
time.sleep(0.00001) # time.sleep(0.00001)
else: # else:
tmpData = line.split(" ") # tmpData = line.split(" ")
if(len(tmpData) is NUMBER_OF_VALUES): # if(len(tmpData) is NUMBER_OF_VALUES):
data = dict(zip(LOG_FORMAT, tmpData)) # data = dict(zip(LOG_FORMAT, tmpData))
else: # else:
continue # continue
#
try: # try:
timestamp = float(data['time']) # timestamp = float(data['time'])
port = int(data['dstPort']) # srcPort = int(data['srcPort'])
except ValueError: # dstPort = int(data['dstPort'])
continue # flowIdentifier = str(srcPort) + "-" + str(dstPort)
else: # except ValueError:
if(port not in self.options.filterPorts): # continue
continue # else:
# if(dstPort in self.options.filterPorts):
filteredData = {} # print("bar")
try: # filteredData = {}
for val in VALUES_TO_PROCESS: # try:
filteredData[val] = float(data[val]) # for val in VALUES_TO_PROCESS:
except ValueError: # filteredData[val] = float(data[val])
continue # except ValueError:
else: # continue
timestampDelta = lastTimestamp - timestamp # else:
if(timestampDelta > self.options.plotResolution): # timestampDelta = lastTimestamp - timestamp
lastTimestamp = timestamp # if(timestampDelta > self.options.plotResolution):
continue # lastTimestamp = timestamp
self.connectionBuffer[port].append(filteredData) # continue
lastTimestamp = timestamp # self.connectionBuffer[flowIdentifier].append(filteredData)
# lastTimestamp = timestamp
# else:
# print("bar")
def handleSignals(self, signal, frame): def handleSignals(self, signal, frame):
""" """
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment