Commit f5f05487 authored by Michael König's avatar Michael König

refactoring++ & better auto-scaling

parent 34a7f0e9
......@@ -5,3 +5,4 @@ __pycache__/
.old
*.egg-info
*.pyc
todo.txt
TCPlivePLOT is based on Python3.
TCPlivePLOT
================================================================================
Realtime-visualization of TCP flows logged by TCPlog.
TCPlivePLOT is based on Python3 and tested on GNU/Linux 4.{1-7}.
For TCPlog see https://git.scc.kit.edu/CPUnetLOG/TCPlog/
Tested on GNU/Linux 4.{1-6}
Requirements TCPlivePLOT:
================================================================================
--------------------------------------------------------------------------------
* python3
* python-matplotlib
* matplotlib
* numpy
* matplotlib-interactive backend
recommended:
* recommended backends:
* Qt4Agg or
* Qt5Agg
Running TCPplot:
================================================================================
Running TCPlivePLOT:
--------------------------------------------------------------------------------
* ./tcpliveplot.py OR
* python3 -m TCPlivePLOT OR
* tcpliveplot (after installation)
Installation of TCPplot:
================================================================================
Installation of TCPlivePLOT:
--------------------------------------------------------------------------------
* via pip3:
* sudo pip3 install . # system-wide installation
* pip3 install --user . # local installation
* via setup.py:
* python3 setup.py install # system-wide installation
* python3 setup.py install --user # local installation
......@@ -3,9 +3,7 @@
from setuptools import setup
with open("VERSION.txt", "rb") as f:
version = f.read().decode("utf-8")
version = 0.1.0
desc = "Realtime-visualization of TCP flows logged by TCPlog."
setup(
......@@ -41,7 +39,7 @@ setup(
#
],
extras_require = {
'Logging': ["tcplog>=0.2"]
'Logging': ["tcplog"]
},
keywords = ['tcp', 'flow', 'plot', 'visualize', 'graph', 'live', 'analyze', 'network', 'traffic'],
classifiers = [
......
......@@ -3,7 +3,7 @@
"""Convenience wrapper for running TCPlivePLOT directly from source tree."""
from TCPlivePLOT.main import main
from tcpliveplot.main import main
if __name__ == '__main__':
main()
This diff is collapsed.
# -*- coding: utf-8 -*-
import os
import sys
import select
from .input_base import InputBase
from ...utils.utilty import Utility
THREAD_STOPFLAG_WAIT = 0.000001 # in seconds
......@@ -10,39 +13,39 @@ class FileInput(InputBase):
def __init__(self, options, infoRegistry):
self.options = options
self.infoRegistry = infoRegistry
self.__logFileHandler = None
def startupCheck(self):
pass
if not os.access(self.options.logFilePath, os.R_OK):
Utility.eprint("Error: Input file " + self.options.logFilePath + " not readable. Exiting...")
sys.exit(1)
def startUp(self):
pass
def tearDown(self):
pass
def retrieveNewSamples(self):
pass
try:
self.__logFileHandler = open(self.options.logFilePath, 'r')
except IOError:
print ("Error: while parsing " + self.options.logFilePath)
SystemExit
return
Utility.eprint("Error: Input file " + self.options.logFilePath + " not readable. Exiting...")
sys.exit(1)
else:
inputready,outputready,exceptready = select.select([self.__logFileHandler.fileno()],[],[])
while(not self.__stopped.wait(THREAD_STOPFLAG_WAIT)):
if self.__stopped.isSet():
return
else:
for s in inputready:
if(s == self.__logFileHandler.fileno()):
for line in self.__logFileHandler:
if self.__stopped.isSet():
return
#ignore comments
if not line.startswith("#"):
line = line.strip()
self.__tmpBuffer.append(line,)
if(self.options.debug):
print("Openend file '" + self.options.logFilePath + "'")
def tearDown(self):
self.__logFileHandler.close()
if(self.options.debug):
print("Closed file '" + self.options.logFilePath + "'")
def retrieveNewSamples(self):
inputready,outputready,exceptready = select.select([self.__logFileHandler.fileno()],[],[])
for s in inputready:
if(s == self.__logFileHandler.fileno()):
tmpBuffer = []
for line in self.__logFileHandler:
#ignore comments
if not line.startswith("#"):
line = line.strip()
tmpBuffer.append(line,)
return tmpBuffer
......@@ -4,8 +4,6 @@ import time
import socket
from ipaddress import ip_address
import threading
from .input_base import InputBase
# to remove --> use optioms
......@@ -22,61 +20,64 @@ class SocketInput(InputBase):
self.options = options
self.infoRegistry = infoRegistry
ip, separator, port = self.options.logServer.rpartition(':')
if(':' not in self.options.logServer or port is ''):
self.logServerPort = DEFAULT_SOCKETSERVER_PORT
self.logServerIp = ip_address(socket.gethostbyname(self.options.logServer.strip("[]")))
else:
self.logServerPort = int(port)
self.logServerIp = ip_address(socket.gethostbyname(ip.strip("[]")))
self.dst = str(self.logServerIp) + ":" + str(self.logServerPort)
def startupCheck(self):
pass
def startUp(self):
pass
#TODO: eprint + debug..
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error:
print("Failed to create socket")
else:
print("Socket created.")
try:
self.socket.connect((str(self.logServerIp), self.logServerPort))
print("Successfully connected to " + self.dst + "")
except socket.error:
print("\
Error: Could not connect to " + self.dst + "\
Retrying in " + str(LOGSERVER_CONNECT_RETRY_TIME) + "s ...\
")
time.sleep(LOGSERVER_CONNECT_RETRY_TIME)
else:
pass
def tearDown(self):
self.socket.close()
pass
def retrieveNewSamples(self):
"""Reads data (in blocks) from a socket and adds the received data to an income buffer."""
self.__processSocketLogData = threading.Thread(target=self.processSocketLogData)
self.__processSocketLogData.start()
ip, separator, port = self.options.logServer.rpartition(':')
if(':' not in self.options.logServer or port is ''):
logServerPort = DEFAULT_SOCKETSERVER_PORT
logServerIp = ip_address(socket.gethostbyname(self.options.logServer.strip("[]")))
else:
logServerPort = int(port)
logServerIp = ip_address(socket.gethostbyname(ip.strip("[]")))
dst = str(logServerIp) + ":" + str(logServerPort)
"""
Reads data (in blocks) from a socket and adds the received data to an temporaray income buffer.
"""
try:
self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error:
print("Failed to create socket")
while(not self.__stopped.wait(0.00001)):
try:
self.__socket.connect((str(logServerIp), logServerPort))
print("Successfully connected to " + dst + "")
except socket.error:
print("Error: Could not connect to " + dst + " Retrying in " + str(LOGSERVER_CONNECT_RETRY_TIME) + "s ...")
time.sleep(LOGSERVER_CONNECT_RETRY_TIME)
else:
break
while(not self.__stopped.wait(0.00001)):
try:
data = self.__socket.recv(4096)
except socket.timeout:
print("Connection timeout.")
self.__socket.close()
return
except IOError:
print("Error: Could not retrieve data from " + dst)
self.__socket.close()
return
else:
if(len(data) == 0):
print("Connection closed by foreign host.")
self.__socket.close()
break;
self.__incomeBuffer.append(data)
data = self.socket.recv(4096)
except socket.timeout:
print("Connection timeout.")
self.socket.close()
return ""
except IOError:
print("Error: Could not retrieve data from " + self.dst)
self.socket.close()
return ""
else:
if(len(data) == 0):
print("Connection closed by foreign host.")
self.socket.close()
return data
self.tmpBuffer.append(data)
def processSocketLogData(self):
......@@ -94,11 +95,11 @@ class SocketInput(InputBase):
data = i.split(" ")
if(tmpBuffer != ""):
tmpBuffer += i
self.__tmpBuffer.append(tmpBuffer)
self.incomeBuffer.append(tmpBuffer)
tmpBuffer = ""
continue
if(len(data) < NUMBER_OF_VALUES):
tmpBuffer += i
else:
self.__tmpBuffer.append(i)
self.incomeBuffer.append(i)
......@@ -5,6 +5,7 @@ import argparse
import sys
import signal
from .utils.utilty import Utility
from .tcpliveplot import TcpLivePlot
from .info_registry import InfoRegistry
......@@ -13,6 +14,7 @@ from .info_registry import InfoRegistry
TCP_PLOT_VERSION = "0.2.0"
TCP_LOG_FORMAT_VERSION_MIN = "2"
DEFAULT_LOGFILE_PATH = "/tmp/tcplog.log"
DEFAULT_INPUT_BACKEND = "socket"
# default values
......@@ -72,20 +74,23 @@ def main():
guiBackend.startupCheck()
guiBackend.startUp()
try:
tcpLivePlot = TcpLivePlot(inputBackend, guiBackend, options, infoRegistry)
signal.signal(signal.SIGINT, tcpLivePlot.handleSignals)
signal.signal(signal.SIGTERM, tcpLivePlot.handleSignals)
tcpLivePlot.run()
# ^-- starts the main programme
except Exception as e:
# utility.eprint(str(e))
print(str(e))
raise e
finally:
if(not options.quiet):
print("Goodbye cruel world!")
sys.exit(0)
tcpLivePlot = TcpLivePlot(inputBackend, guiBackend, options, infoRegistry)
signal.signal(signal.SIGINT, tcpLivePlot.handleSignals)
signal.signal(signal.SIGTERM, tcpLivePlot.handleSignals)
tcpLivePlot.run()
# try:
# tcpLivePlot = TcpLivePlot(inputBackend, guiBackend, options, infoRegistry)
# signal.signal(signal.SIGINT, tcpLivePlot.handleSignals)
# signal.signal(signal.SIGTERM, tcpLivePlot.handleSignals)
# tcpLivePlot.run()
# # ^-- starts the main programme
# except Exception as e:
# Utility.eprint(str(e))
# raise e
# finally:
# if(not options.quiet):
# print("Goodbye cruel world!")
# sys.exit(0)
def parse_options():
......@@ -125,6 +130,7 @@ def parse_options():
dest="adaptivePlaybackSpeed",
default=False)
# TODO: remove
parser.add_argument(
"--buffer-size", help="Number of elements to buffer from socket per filter (5000)",
dest="bufferLength",
......@@ -133,10 +139,10 @@ def parse_options():
parser.add_argument(
"-z",
"--blit", help="Deactivate blitting",
action="store_false",
"--blit", help="Activate blitting for better performance - but broken axis labels (default: False)",
action="store_true",
dest="blitting",
default=True)
default=False)
logTypeGroup = parser.add_mutually_exclusive_group()
......@@ -149,10 +155,12 @@ def parse_options():
logTypeGroup.add_argument(
"-f",
"--filepath",
help="Path where the log file is stored",
help="Path where the log file is stored - only usable with file input-backend (default: " + DEFAULT_LOGFILE_PATH + ")",
type=str,
default=DEFAULT_LOGFILE_PATH,
dest="logFilePath")
# TODO: remove as param
parser.add_argument(
"-df",
help="Number of FPS to draw",
......@@ -164,21 +172,23 @@ def parse_options():
help="Draw intervall",
dest="drawIntervall",
type=int,
default=30)
default=100)
parser.add_argument(
"-x",
help="Seconds to plot (20)",
help="Seconds to plot (default: 20)",
dest="xDelta",
type=int,
default=20)
# TODO: remove as param
parser.add_argument(
"-r",
"--resolution",
help="Plot resolution (in seconds, default: 0.001)",
help="Plot resolution (in seconds, default: 0.01)",
dest="plotResolution",
type=float,
default=0.01)
default=0.1)
# Filter
parser.add_argument(
......@@ -189,25 +199,18 @@ def parse_options():
action='append',
type=int,
default=[])
parser.add_argument(
"-l",
"--line",
help="Plot lines to show initially. Multiple occurrences possible. Possible values: " + ', '.join(VALUES_TO_PLOT),
dest="linesToShow",
action='append',
default=[])
parser.add_argument(
"-d",
"--debug", help="Debug mode - ignores \"-q\"",
"--debug", help="Debug mode - ignores quiet mode (default: false)",
action="store_true",
dest="debug",
default=False)
parser.add_argument(
"-q",
"--quiet", help="Whether to ouput anything at all (false)",
"--quiet", help="Whether to ouput anything at all on console (default: false)",
action="store_true",
dest="quiet",
default=False)
......
......@@ -16,8 +16,7 @@ VALUES_TO_PROCESS = ['time'] + VALUES_TO_PLOT #helper to init all data structur
DEFAULT_LINES_TO_SHOW = ['cwnd']
DEFAULT_FILTER_PORT = 5001
LOG_FORMAT = ['time', 'srcIp', 'srcPort', 'dstIp', 'dstPort', 'cwnd', 'rwnd', 'sst', 'rtt', 'bw', 'loss']
LOG_FORMAT = ['time', 'rTime', 'srcIp', 'srcPort', 'dstIp', 'dstPort', 'cwnd', 'sst', 'rtt', 'minRtt', 'maxRtt', 'avgRtt', 'meanRtt', 'throughput', 'smoothedThroughput', 'assumedLosses']
#at least time & dstPort required
NUMBER_OF_VALUES = len(LOG_FORMAT)
......@@ -39,55 +38,75 @@ class TcpLivePlot():
self.__processGuiThread = None
# initialize vars
self.__incomeBuffer = deque(maxlen=self.options.bufferLength)
self.__tmpBuffer = deque(maxlen=self.options.bufferLength)
self.__connectionBuffer = {}
self.incomeBuffer = deque(maxlen=self.options.bufferLength)
self.connectionBuffer = {}
self.__tmpTimestamp = 0
self.outputBackend.setConnectionBuffer(self.connectionBuffer)
if(len(self.options.filterPorts) < 1):
self.options.filterPorts.append(DEFAULT_FILTER_PORT)
for i in self.options.filterPorts:
self.__connectionBuffer[i] = deque(maxlen=self.options.bufferLength)
if(len(self.options.linesToShow) < 1):
self.options.linesToShow.extend(DEFAULT_LINES_TO_SHOW)
# for i in self.options.filterPorts:
# self.__connectionBuffer[i] = deque(maxlen=self.options.bufferLength)
# init threads
self.__processInputThread = threading.Thread(target=self.processInput)
self.__processInputFilteringThread= threading.Thread(target=self.processInputFiltering)
# self.__processInputFilteringThread= threading.Thread(target=self.processInputFiltering)
self.__processGuiThread = threading.Thread(target=self.processGui)
self.__processInputThread.daemon = True
self.__processInputFilteringThread.daemon = True
# self.__processInputFilteringThread.daemon = True
self.__processGuiThread.daemon = True
self.__processInputThread.start()
self.__processInputFilteringThread.start()
self.__processGuiThread.start()
# self.__processInputFilteringThread.start()
# self.__processGuiThread.start()
while(not self.__stopped.wait(THREAD_STOPFLAG_WAIT)):
self.__stopped.wait(0.5)
# self.__stopped.wait(0.5)
self.processGui()
if(self.options.debug):
print("End of main thread reached.")
def processInput(self):
"""Macro to initialize data retrieval and processing from various sources."""
"""
Thread reads from input sources (socket/file/stdin)
"""
while(not self.__stopped.wait(THREAD_STOPFLAG_WAIT)):
pass
# newSamples = self.inputBackend.retrieveNewSamples()
newSamples = self.inputBackend.retrieveNewSamples()
for sample in newSamples:
tmpData = sample.split(" ")
if(len(tmpData) == len(LOG_FORMAT)):
data = dict(zip(LOG_FORMAT, tmpData))
# print(data)
port = int(data['dstPort'])
if(port not in self.connectionBuffer):
self.connectionBuffer[port] = deque()
self.connectionBuffer[port].append(data)
# self.incomeBuffer.append(sample)
def processGui(self):
pass
# while(not self.__stopped.wait(THREAD_STOPFLAG_WAIT)):
# try:
# line = self.incomeBuffer.popleft()
# except IndexError:
# pass
# else:
# import matplotlib.pyplot as plt
self.outputBackend.plotGraph()
# plt.show()
def processInputFiltering(self):
"""Filters retrieved data by selected port. Drops malformed data."""
"""
Filters retrieved data by selected port. Drops malformed data.
"""
return
lastTimestamp = 0
while(True):
try:
line = self.__tmpBuffer.popleft()
line = self.incomeBuffer.popleft()
except IndexError:
time.sleep(0.00001)
else:
......@@ -117,11 +136,13 @@ class TcpLivePlot():
if(timestampDelta > self.options.plotResolution):
lastTimestamp = timestamp
continue
self.__connectionBuffer[port].append(filteredData)
self.connectionBuffer[port].append(filteredData)
lastTimestamp = timestamp
def handleSignals(self, signal, frame):
"""Callback handler for signals"""
"""
Callback handler for signals
"""
if(not self.options.quiet and not self.options.outputBackend == 'stdout'):
print("Exiting...")
self.initTearDown()
......@@ -130,15 +151,14 @@ class TcpLivePlot():
sys.exit(0)
def initTearDown(self):
"""
Sets stops signal for all threads.
"""
self.__stopped.set()
def tearDown(self):
"""Performs the cleanup at programm termination."""
# use input.tearDown()
if(self.__logType == "socket"):
self.__socket.close()
else:
self.__logFileHandler.close()
raise SystemExit
sys.exit
"""
Performs the cleanup at programm termination.
"""
self.inputBackend.tearDown()
self.outputBackend.tearDown()
# -*- coding: utf-8 -*-
from __future__ import print_function
import sys
import math
class Utility:
@staticmethod
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
@staticmethod
def truncate(number, digits) -> float:
stepper = pow(10.0, digits)
return math.trunc(stepper * number) / stepper
ALL
================================================================================
* update meta-data and dist-tools
TCPplot
================================================================================
## TODO(michael):
# * rtt on 2nd axis
# * better scaling (maybe auto-scaling Y?)
# * force fullscreen at startup?
# * refresh legend/labels period. (@blitting)
# * detect flows (maybe server sends info?)
# * set x{Min,Max} to relative currentTimestamp (fix e^x)
# * look into preloading-samples and calc. resolution and buffered time
# * try to re-connect upon connection loss
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment