Commit 34a7f0e9 authored by Michael König's avatar Michael König

started to refactor + new file structuring

parent cca4e015
......@@ -4,3 +4,4 @@ __pycache__/
.bkp
.old
*.egg-info
*.pyc
......@@ -6,22 +6,62 @@ from setuptools import setup
with open("VERSION.txt", "rb") as f:
version = f.read().decode("utf-8")
desc = "Realtime-visualization of TCP flows logged by TCPlog."
setup(
name = "tcpplot",
packages = ["TCPplot"],
name = "tcpliveplot",
packages = [
"tcpliveplot",
"tcpliveplot.input"
],
entry_points = {
"console_scripts": [
'tcpplot = TCPplot.tcpplot:main'
'tcpliveplot = tcpliveplot.tcpliveplot:main'
],
"gui_scripts": [
'tcpplot = TCPplot.tcpplot:main'
'tcpliveplot = tcpliveplot.tcpliveplot:main'
]
},
version = version,
description = "Tool to visualize TCP flows logged by TCPlog.",
author = "Michael Koenig",
author_email = "michael.koenig2@student.kit.edu",
url = "https://git.scc.kit.edu/CPUnetLOG/TCPplot/",
license = "BSD2",
install_requires = ['tcpinfo>=0.1']
description = desc,
long_description = desc,
author = "Karlsruhe Institute of Technology - Institute of Telematics",
author_email = "telematics@tm.kit.edu",
maintainer = "Michael Koenig",
maintainer_email = "michael.koenig2@student.kit.edu",
url = "https://git.scc.kit.edu/CPUnetLOG/TCPlivePLOT/",
license = "BSD",
platforms = "Linux",
zip_safe = False,
install_requires = [
# TODO
# numpy
# matplotlib
#
],
extras_require = {
'Logging': ["tcplog>=0.2"]
},
keywords = ['tcp', 'flow', 'plot', 'visualize', 'graph', 'live', 'analyze', 'network', 'traffic'],
classifiers = [
'Development Status :: 4 - Beta',
'License :: OSI Approved :: BSD License',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
'Operating System :: POSIX :: Linux',
'Natural Language :: English',
'Intended Audience :: Education',
'Intended Audience :: Information Technology',
'Intended Audience :: Science/Research',
'Intended Audience :: System Administrators',
'Intended Audience :: Telecommunications Industry',
'Topic :: Scientific/Engineering',
'Topic :: Internet',
'Topic :: System :: Logging',
'Topic :: System :: Networking',
'Topic :: System :: Networking :: Monitoring',
'Topic :: System :: Operating System Kernels :: Linux',
'Topic :: Utilities'
]
)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Convenience wrapper for running TCPplot directly from source tree."""
from TCPplot.tcpplot import main
"""Convenience wrapper for running TCPlivePLOT directly from source tree."""
from TCPlivePLOT.main import main
if __name__ == '__main__':
main()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from .tcpliveplot import main
from .main import main
main()
# -*- coding: utf-8 -*-
from abc import ABCMeta, abstractmethod
class BackendBase(metaclass=ABCMeta):
"""
Base template for all backends
"""
@abstractmethod
def startupCheck():
"""
Every backend must be able to determine if all of their necessary dependencies are met
"""
pass
@abstractmethod
def startUp():
"""
Initialize backend's dependencies (eg: file opening...)
"""
pass
@abstractmethod
def tearDown():
"""
Executed at programme shutdown: Every backend must perform a cleanup (eg: close files...)
"""
pass
# -*- coding: utf-8 -*-
from abc import ABCMeta, abstractmethod
from ..backend_base import BackendBase
class GuiBase(BackendBase, metaclass=ABCMeta):
"""
Base template for all GUI backends.
"""
# @abstractmethod
# def preLogging(self):
# """
# Executed before any samples are logged (usage example: write meta-data about log)
# """
# pass
#
# @abstractmethod
# def logSample(self):
# """
# Logs a single sample from the sample queue.
# """
# pass
#
# @abstractmethod
# def postLogging(self):
# """
# Executed after all samples are logged (usage example: write summary about logging-session)
# """
# pass
This diff is collapsed.
# -*- coding: utf-8 -*-
import select
from .input_base import InputBase
THREAD_STOPFLAG_WAIT = 0.000001 # in seconds
class FileInput(InputBase):
def __init__(self, options, infoRegistry):
self.options = options
self.infoRegistry = infoRegistry
def startupCheck(self):
pass
def startUp(self):
pass
def tearDown(self):
pass
def retrieveNewSamples(self):
pass
try:
self.__logFileHandler = open(self.options.logFilePath, 'r')
except IOError:
print ("Error: while parsing " + self.options.logFilePath)
SystemExit
return
else:
inputready,outputready,exceptready = select.select([self.__logFileHandler.fileno()],[],[])
while(not self.__stopped.wait(THREAD_STOPFLAG_WAIT)):
if self.__stopped.isSet():
return
else:
for s in inputready:
if(s == self.__logFileHandler.fileno()):
for line in self.__logFileHandler:
if self.__stopped.isSet():
return
#ignore comments
if not line.startswith("#"):
line = line.strip()
self.__tmpBuffer.append(line,)
# -*- coding: utf-8 -*-
from abc import ABCMeta, abstractmethod
from ..backend_base import BackendBase
class InputBase(BackendBase, metaclass=ABCMeta):
"""
Base template for all input backends.
"""
@abstractmethod
def retrieveNewSamples(self):
"""
Returns list of new samples.
"""
pass
# -*- coding: utf-8 -*-
import time
import socket
from ipaddress import ip_address
import threading
from .input_base import InputBase
# to remove --> use optioms
DEFAULT_SOCKETSERVER_PORT = 11337
DEFAULT_SOCKETSERVER_LOCATION = 'localhost:' + str(DEFAULT_SOCKETSERVER_PORT)
LOG_FORMAT = ['time', 'srcIp', 'srcPort', 'dstIp', 'dstPort', 'cwnd', 'rwnd', 'sst', 'rtt', 'bw', 'loss']
#at least time & dstPort required
NUMBER_OF_VALUES = len(LOG_FORMAT)
LOGSERVER_CONNECT_RETRY_TIME = 3 #in s
class SocketInput(InputBase):
def __init__(self, options, infoRegistry):
self.options = options
self.infoRegistry = infoRegistry
def startupCheck(self):
pass
def startUp(self):
pass
def tearDown(self):
pass
def retrieveNewSamples(self):
"""Reads data (in blocks) from a socket and adds the received data to an income buffer."""
self.__processSocketLogData = threading.Thread(target=self.processSocketLogData)
self.__processSocketLogData.start()
ip, separator, port = self.options.logServer.rpartition(':')
if(':' not in self.options.logServer or port is ''):
logServerPort = DEFAULT_SOCKETSERVER_PORT
logServerIp = ip_address(socket.gethostbyname(self.options.logServer.strip("[]")))
else:
logServerPort = int(port)
logServerIp = ip_address(socket.gethostbyname(ip.strip("[]")))
dst = str(logServerIp) + ":" + str(logServerPort)
try:
self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error:
print("Failed to create socket")
while(not self.__stopped.wait(0.00001)):
try:
self.__socket.connect((str(logServerIp), logServerPort))
print("Successfully connected to " + dst + "")
except socket.error:
print("Error: Could not connect to " + dst + " Retrying in " + str(LOGSERVER_CONNECT_RETRY_TIME) + "s ...")
time.sleep(LOGSERVER_CONNECT_RETRY_TIME)
else:
break
while(not self.__stopped.wait(0.00001)):
try:
data = self.__socket.recv(4096)
except socket.timeout:
print("Connection timeout.")
self.__socket.close()
return
except IOError:
print("Error: Could not retrieve data from " + dst)
self.__socket.close()
return
else:
if(len(data) == 0):
print("Connection closed by foreign host.")
self.__socket.close()
break;
self.__incomeBuffer.append(data)
def processSocketLogData(self):
"""Reads data from the income buffer and tries to reassemble splitted data."""
tmpBuffer = ""
while(True):
try:
line = self.__incomeBuffer.popleft()
line = line.decode("UTF-8")
lines = line.split("\n")
except IndexError:
time.sleep(0.00001)
else:
for i in lines:
data = i.split(" ")
if(tmpBuffer != ""):
tmpBuffer += i
self.__tmpBuffer.append(tmpBuffer)
tmpBuffer = ""
continue
if(len(data) < NUMBER_OF_VALUES):
tmpBuffer += i
else:
self.__tmpBuffer.append(i)
# -*- coding: utf-8 -*-
from .input_base import InputBase
class StdinInput(InputBase):
def __init__(self, options, infoRegistry):
self.options = options
self.infoRegistry = infoRegistry
def startupCheck(self):
pass
def startUp(self):
pass
def tearDown(self):
pass
def retrieveNewSamples(self):
pass
# -*- coding: utf-8 -*-
class InfoRegistry(object):
"""Holds global information to be shared between threads"""
def __init__(self):
"""Does nothing"""
self.__registry = {}
def save(self, key, value):
self.__registry[key] = value
def load(self, key):
if key in self.__registry:
return self.__registry[key]
else:
return None
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import sys
import signal
from .tcpliveplot import TcpLivePlot
from .info_registry import InfoRegistry
# constants
TCP_PLOT_VERSION = "0.2.0"
TCP_LOG_FORMAT_VERSION_MIN = "2"
DEFAULT_INPUT_BACKEND = "socket"
# default values
DEFAULT_SOCKETSERVER_PORT = 11337
DEFAULT_SOCKETSERVER_LOCATION = 'localhost:' + str(DEFAULT_SOCKETSERVER_PORT)
DEFAULT_LINES_TO_SHOW = ['cwnd']
DEFAULT_FILTER_PORT = 5001
# move
VALUES_TO_PLOT = ['cwnd', 'sst', 'rtt', 'bw'] # (only values for Y-axis)
# main prog
def main():
inputBackend = None
guiBackend = None
infoRegistry = InfoRegistry()
options = parse_options()
startupSelfCheck(options)
if(options.showVersion):
print("TCPlivePLOT: " + TCP_PLOT_VERSION)
sys.exit(0)
options.guiBackend = "live"
# init input backend
if(options.inputBackend == "socket"):
from .backends.input.socket import SocketInput
inputBackend = SocketInput(options, infoRegistry)
elif(options.inputBackend == "file"):
from .backends.input.file import FileInput
inputBackend = FileInput(options, infoRegistry)
# elif(options.inputBackend == "stdin"):
# from .backends.input.stdin import StdinInput
# inputBackend = StdinInput(options, info)
if(inputBackend is None):
print("No valid input backend selected. Exiting...")
sys.exit(1)
# init guiBackend
if(options.guiBackend == "live"):
from .backends.gui.live import LiveGui
guiBackend = LiveGui(options, infoRegistry)
else:
pass
if(guiBackend is None):
print("No valid GUI backend selected. Exiting...")
sys.exit(1)
if(inputBackend is not None):
inputBackend.startupCheck()
inputBackend.startUp()
if(guiBackend is not None):
guiBackend.startupCheck()
guiBackend.startUp()
try:
tcpLivePlot = TcpLivePlot(inputBackend, guiBackend, options, infoRegistry)
signal.signal(signal.SIGINT, tcpLivePlot.handleSignals)
signal.signal(signal.SIGTERM, tcpLivePlot.handleSignals)
tcpLivePlot.run()
# ^-- starts the main programme
except Exception as e:
# utility.eprint(str(e))
print(str(e))
raise e
finally:
if(not options.quiet):
print("Goodbye cruel world!")
sys.exit(0)
def parse_options():
parser = argparse.ArgumentParser()
parser.add_argument(
"-i",
"--input-backend",
help="Available input backends: \"socket\" and \"file\" (default: " + DEFAULT_INPUT_BACKEND + ")",
dest="inputBackend",
default=DEFAULT_INPUT_BACKEND)
parser.add_argument(
"-b",
"--buffer", help="Length of preload buffer (in seconds, default: 1, 0 to deactivate preload buffer)",
type=float,
dest="preloadBuffer",
default=1)
parser.add_argument(
"-ib",
"--interimbuffer", help="Activate interim buffering.",
action="store_true",
dest="interimBuffering",
default=False)
parser.add_argument(
"-ps",
"--playback-speed", help="Playback speed (factor, default: 1)",
type=float,
dest="playbackSpeed",
default=1)
parser.add_argument(
"-aps",
"--adaptive-playback-speed", help="Enable adaptive playback speed (default: false)",
action="store_true",
dest="adaptivePlaybackSpeed",
default=False)
parser.add_argument(
"--buffer-size", help="Number of elements to buffer from socket per filter (5000)",
dest="bufferLength",
default=5000)
parser.add_argument(
"-z",
"--blit", help="Deactivate blitting",
action="store_false",
dest="blitting",
default=True)
logTypeGroup = parser.add_mutually_exclusive_group()
logTypeGroup.add_argument(
"-s",
"--server",
help="IP and Port of socket-logging server (" + DEFAULT_SOCKETSERVER_LOCATION + ")",
dest="logServer",
default=DEFAULT_SOCKETSERVER_LOCATION)
logTypeGroup.add_argument(
"-f",
"--filepath",
help="Path where the log file is stored",
type=str,
dest="logFilePath")
parser.add_argument(
"-df",
help="Number of FPS to draw",
dest="drawFps",
type=int,
default=60)
parser.add_argument(
"-di",
help="Draw intervall",
dest="drawIntervall",
type=int,
default=30)
parser.add_argument(
"-x",
help="Seconds to plot (20)",
dest="xDelta",
type=int,
default=20)
parser.add_argument(
"-r",
"--resolution",
help="Plot resolution (in seconds, default: 0.001)",
dest="plotResolution",
type=float,
default=0.01)
# Filter
parser.add_argument(
"-p",
"--port",
help="Filter by port. Multiple occurrences possible (" + str(DEFAULT_FILTER_PORT) + ")",
dest="filterPorts",
action='append',
type=int,
default=[])
parser.add_argument(
"-l",
"--line",
help="Plot lines to show initially. Multiple occurrences possible. Possible values: " + ', '.join(VALUES_TO_PLOT),
dest="linesToShow",
action='append',
default=[])
parser.add_argument(
"-d",
"--debug", help="Debug mode - ignores \"-q\"",
action="store_true",
dest="debug",
default=False)
parser.add_argument(
"-q",
"--quiet", help="Whether to ouput anything at all (false)",
action="store_true",
dest="quiet",
default=False)
parser.add_argument(
"--version",
help="Print version information",
action="store_true",
dest="showVersion",
default=False)
options = parser.parse_args()
return options
def startupSelfCheck(options):
pass
if __name__ == "__main__":
main()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import threading
import sys
from collections import deque
# Constants
#VALUES_TO_PLOT = ['cwnd', 'sst', 'rtt', 'bw', 'loss'] # (only values for Y-axis)
# VALUES_TO_PLOT_ON_SECOND_AXIS = ['rtt']
VALUES_TO_PLOT = ['cwnd', 'sst', 'rtt', 'bw'] # (only values for Y-axis)
VALUES_TO_PROCESS = ['time'] + VALUES_TO_PLOT #helper to init all data structures
DEFAULT_LINES_TO_SHOW = ['cwnd']
DEFAULT_FILTER_PORT = 5001
LOG_FORMAT = ['time', 'srcIp', 'srcPort', 'dstIp', 'dstPort', 'cwnd', 'rwnd', 'sst', 'rtt', 'bw', 'loss']
#at least time & dstPort required
NUMBER_OF_VALUES = len(LOG_FORMAT)
THREAD_STOPFLAG_WAIT = 0.000001 # in seconds
THREAD_JOIN_TIMEOUT = 1 # in seconds
THREAD_TESTTIMEOUTS_WAIT = 0.5 # in seconds
THREAD_MISC_WAIT = 2 # in seconds
class TcpLivePlot():
def __init__(self, inputBackend, outputBackend, options, infoRegistry):
self.inputBackend = inputBackend
self.outputBackend = outputBackend
self.options = options
self.infoRegistry = infoRegistry
self.__stopped = threading.Event()
self.__processInputThread = None
self.__processInputFilteringThread = None
self.__processGuiThread = None
# initialize vars
self.__incomeBuffer = deque(maxlen=self.options.bufferLength)
self.__tmpBuffer = deque(maxlen=self.options.bufferLength)
self.__connectionBuffer = {}
self.__tmpTimestamp = 0
if(len(self.options.filterPorts) < 1):
self.options.filterPorts.append(DEFAULT_FILTER_PORT)
for i in self.options.filterPorts:
self.__connectionBuffer[i] = deque(maxlen=self.options.bufferLength)
if(len(self.options.linesToShow) < 1):
self.options.linesToShow.extend(DEFAULT_LINES_TO_SHOW)
# init threads
self.__processInputThread = threading.Thread(target=self.processInput)
self.__processInputFilteringThread= threading.Thread(target=self.processInputFiltering)
self.__processGuiThread = threading.Thread(target=self.processGui)
self.__processInputThread.daemon = True
self.__processInputFilteringThread.daemon = True
self.__processGuiThread.daemon = True
self.__processInputThread.start()
self.__processInputFilteringThread.start()
self.__processGuiThread.start()
while(not self.__stopped.wait(THREAD_STOPFLAG_WAIT)):
self.__stopped.wait(0.5)
if(self.options.debug):
print("End of main thread reached.")
def processInput(self):
"""Macro to initialize data retrieval and processing from various sources."""
while(not self.__stopped.wait(THREAD_STOPFLAG_WAIT)):
pass
# newSamples = self.inputBackend.retrieveNewSamples()
def processGui(self):
pass
def processInputFiltering(self):
"""Filters retrieved data by selected port. Drops malformed data."""
return
lastTimestamp = 0
while(True):
try:
line = self.__tmpBuffer.popleft()
except IndexError:
time.sleep(0.00001)
else:
tmpData = line.split(" ")
if(len(tmpData) is NUMBER_OF_VALUES):
data = dict(zip(LOG_FORMAT, tmpData))
else:
continue
try:
timestamp = float(data['time'])
port = int(data['dstPort'])
except ValueError:
continue
else:
if(port not in self.options.filterPorts):
continue
filteredData = {}
try:
for val in VALUES_TO_PROCESS:
filteredData[val] = float(data[val])
except ValueError:
continue
else:
timestampDelta = lastTimestamp - timestamp
if(timestampDelta > self.options.plotResolution):
lastTimestamp = timestamp
continue
self.__connectionBuffer[port].append(filteredData)
lastTimestamp = timestamp