Commit a7ca66d8 authored by Michael König (Student)'s avatar Michael König (Student)
Browse files

first demo

parents
This software project contains source code from multiple copyright holders,
who published their code under different open source licenses.
These licenses are:
- The BSD 2-Clause License
The full license texts, including the respective copy right holders,
are given below.
================================================================================
Unless otherwise stated the source code of this software has been developed by:
Karlsruhe Institute of Technology
Institute of Telematics
Zirkel 2, 76131 Karlsruhe
Germany
and is licensed under the "BSD 2-Clause License":
--------------------------------------------------------------------------------
Copyright (c) 2015,
Karlsruhe Institute of Technology, Institute of Telematics
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation and/or
other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
================================================================================
TCPlog and TCPplot are based on Python3.
Tested on GNU/Linux 4.1 & 4.2
Requires kernel module "tcp_probe"
Requirements TCPlog:
=====================
* python3
* Kernel module "tcp_probe"
Requirements TCPplot:
=====================
* python3
* python-matplotlib
* pyqt5-common
* python-pyqt5
Kernel module "tcp_probe":
==========================
To load module run as root:
# modprobe tcp_probe full=1 port=0 bufsize=512 && chmod 444 /proc/net/tcpprobe
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2015,
# Karlsruhe Institute of Technology, Institute of Telematics
#
# This code is provided under the BSD 2-Clause License.
# Please refer to the LICENSE.txt file for further information.
#
# Author: Michael König
import time
import os
import argparse
import signal
import threading
import socket
# import kmod
from collections import deque
from ipaddress import ip_address
from sys import stdout
# format of "/proc/net/tcpprobe"
#
# 11.30814 192.168.123.42:35120 193.99.144.87:80 1472 0x259a6f5 0x259a6f5 10 2147483647 19840 36604 150784
# ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
# | | | | | | | | | | +-------- [10] RWND
# | | | | | | | | | +-------------- [9] sRTT
# | | | | | | | | +-------------------- [8] Send window
# | | | | | | | +------------------------------- [7] Slow start threshold
# | | | | | | +---------------------------------- [6] Congestion window
# | | | | | +-------------------------------------------- [5] Unacknowledged sequence #
# | | | | +------------------------------------------------------ [4] Next send sequence #
# | | | +----------------------------------------------------------- [3] Bytes in packet
# | | +---------------------------------------------------------------------------- [2] Receiver address:port
# | +------------------------------------------------------------------------------------------------- [1] Sender address:port
# +---------------------------------------------------------------------------------------------------------- [0] Time seconds
QUEUE_LENGTH = 10000
DEFAULT_FILTER_PORT = 9999
PATH_TO_TCP_PROBE = "/proc/net/tcpprobe"
SERVER_ADDRESS = "127.0.0.1"
class TcpLog:
def init(self):
# register signals
signal.signal(signal.SIGINT, self.handleSignals)
signal.signal(signal.SIGTERM, self.handleSignals)
if(len(args.filterPorts) < 1):
args.filterPorts.append(DEFAULT_FILTER_PORT)
try:
self.__tcpProbeFileHandler = open(PATH_TO_TCP_PROBE, 'r')
except:
print ("Error: while parsing " + PATH_TO_TCP_PROBE)
raise SystemExit
return
else:
self.__log = deque(maxlen = QUEUE_LENGTH)
self.__senders= {}
self.__linesProccessed = 0
self.__matchesProccessed = 0
self.__stopped = threading.Event()
self.__processTcpProbeThread = threading.Thread(target=self.proccessTcpProbe)
self.__printStatsThread = threading.Thread(target=self.printStats)
self.__processTcpProbeThread.start()
if(args.quiet != True):
self.printConfig()
self.__printStatsThread.start()
self.processLogData()
def proccessTcpProbe(self):
while(not self.__stopped.wait(0.01)):
if self.__stopped.isSet():
pass
for line in self.__tcpProbeFileHandler:
if self.__stopped.isSet():
pass
self.__linesProccessed += 1
processLine = False
data = line.strip().split(" ")
ip, separator, port = data[2].rpartition(':')
assert separator
port = int(port)
ip = ip_address(ip.strip("[]"))
dst = str(ip) + ":" + str(port)
if(args.noFilter):
processLine = True
if(port in args.filterPorts):
processLine = True
if(processLine):
self.__matchesProccessed += 1
timestamp = float(data[0])
seq = int(data[4], 0)
if(data[2] in self.__senders):
lastValues = self.__senders.get(dst)
lastTimestamp = lastValues['lastTimestamp']
lastSeq = lastValues['lastSeq']
if(timestamp - lastTimestamp < args.logResolution):
continue
# CREDIT: NAME???
# TODO: correct?
bandwidth = (seq - lastSeq) if (seq - lastSeq) >= 0 \
else (int("0xffffffff",0) - lastSeq + seq)
bandwidth /= abs(timestamp - lastTimestamp) # byte/s
bandwidth *= 8 / 1000 / 1000 # Mbps
else:
bandwidth = 0
rtt = int(data[9])
if(rtt > 0):
# TODO: get correct MSS
# TODO: correct?
# bw = cwnd / RTT
bandwidthAprox = int(data[6]) * 1500 * 8 / rtt
## bw = min{CWND,RWND} / RTT
# bandwidthAprox = min(int(data[6]) * 1500, int(data[10])) / rtt
# bandwidthAprox *= 8 #to mbit/s
else:
bandwidthAprox = 0
logLine = ""
logLine += str(timestamp) + " " #time
logLine += str(ip) + " " #dst-IP
logLine += str(port) + " " #dst-port
logLine += data[6] + " " #cwnd
logLine += data[10] + " " #rwnd
logLine += data[7] + " " #slowStart-threshold
logLine += data[9] + " " #sRTT
logLine += str(bandwidth) + " "
logLine += str(bandwidthAprox)
self.__log.append(logLine)
# update/init last values
if(data[2] not in self.__senders):
self.__senders[dst] = {}
self.__senders[dst]['lastSeq'] = seq
self.__senders[dst]['lastTimestamp'] = timestamp
def processLogData(self):
if(args.logType == "socket"):
self.processSocketLog()
elif(args.logType == "file"):
self.processFileLog()
else:
self.processStdLog()
def processSocketLog(self):
self.__socketServer = BroadcastLogServer(SERVER_ADDRESS, args.logPort, self.__log).listen()
def processFileLog(self):
lock = threading.Lock()
lock.acquire()
try:
self.__logFileHandler = open(args.logFilePath, 'w')
except:
print ("Error: while opening log file: " + args.logFilePath)
raise
else:
os.chmod(args.logFilePath, 0o644)
self.__logFileHandler.truncate(0)
while(True):
try:
lineToPrint = self.__log.popleft()
except IndexError:
pass
else:
if self.__stopped.isSet():
return
self.__logFileHandler.write(lineToPrint + "\n")
time.sleep(0.001)
lock.release()
def processStdLog(self):
while(not self.__stopped.wait(0.01)):
try:
lineToPrint = self.__log.popleft()
except IndexError:
time.sleep(0.01)
else:
print (lineToPrint)
time.sleep(0.01)
return
def printConfig(self):
print("\n")
print("Config:\n")
print("Filter by port: " + ', '.join(map(str, args.filterPorts)))
if(args.logType == "socket"):
print("Logging to socket: " + SERVER_ADDRESS + ":" + str(args.logPort))
elif(args.logType == "file"):
print("Logging to file: " + args.logFilePath)
else:
pass
print("\n")
def printStats(self):
print ("Lines processed | Lines filtered | Filtered Connections | Clients | Logbuffer")
while(not self.__stopped.wait(0.3)):
if self.__stopped.isSet():
return
strToPrint = "%d | %d | %d | %d | %d/%d " % (self.__linesProccessed, self.__matchesProccessed, len(self.__senders), 0, len(self.__log), QUEUE_LENGTH)
stdout.write(strToPrint)
for i in range(len(strToPrint)):
stdout.write("\r")
stdout.flush()
return
def handleSignals(self, signal, frame):
self.tearDown()
raise SystemExit
def tearDown(self):
print("\n")
self.__stopped.set()
if(args.logType == "socket"):
print("Closing socket...")
# Sockets gets closed by signal
# self.__socketServer.shutdown()
# self.__socketServer.server_close()
elif(args.logType == "file"):
print("Closing files...")
self.__logFileHandler.close()
print("Finishing up...")
self.__tcpProbeFileHandler.close()
print("Finishasdfasdfasdfp...")
def getLog(self):
return self.__log
class BroadcastLogServer(object):
def __init__(self, host, port, log):
self.log = log
self.host = host
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((self.host, self.port))
self.__numberOfClients = 0
def getNumberOfClients(self):
return self.getNumberOfClients
def listen(self):
self.sock.listen(50)
while True:
client, address = self.sock.accept()
threading.Thread(target = self.sendToClient, args = (client,address)).start()
def sendToClient(self, client, address):
self.__numberOfClients += 1
while True:
try:
lineToPrint = self.log.popleft()
except IndexError:
time.sleep(0.01)
else:
try:
# TODO: honor (possible) client filter-args
# TODO: re-add value for next client (numberOfThreads)
client.send(bytes(lineToPrint + "\n", 'UTF-8'))
# time.sleep(0.0005)
except:
self.__numberOfClients -= 1
client.close()
return False
if __name__ == "__main__":
#parse args
parser = argparse.ArgumentParser()
parser.add_argument(
"-d",
"--debug", help="Debug",
action="store_true",
dest="debug",
default=False)
parser.add_argument(
"-q",
"--quiet", help="Whether to ouput anything at all (false)",
action="store_true",
dest="quiet",
default=False)
parser.add_argument(
"-a",
"--all", help="Log all traffic (no filters)",
action="store_true",
dest="noFilter",
default=False)
parser.add_argument(
"-t",
"--type", help="Log to socket / file / stdout (socket)",
dest="logType",
default="file")
parser.add_argument(
"-f",
"--filepath",
help="Path where the log file is stored (/tmp/tcplog)",
dest="logFilePath",
default="/tmp/tcplog")
parser.add_argument(
"--log-port",
help="Port number for socket-logging (11337)",
dest="logPort",
type=int,
default=11337)
parser.add_argument(
"--resolution",
help="Log resolution (0.01ms)",
dest="logResolution",
type=float,
default=0.01)
# Filter
parser.add_argument(
"-p"
"--port",
help="Filter by port. Multiple occurrences possible (9999)",
dest="filterPorts",
action='append',
type=int,
default=[])
args = parser.parse_args()
# test for root-permissionsrm
if os.getuid() == 0:
print ("Please do not run this program with root-privileges.")
else:
# test if tcp_probe file is readable
readable = os.access(PATH_TO_TCP_PROBE, os.R_OK)
if(not readable):
print("\n\n")
print("=======================================================================")
print(" TCP-Probe file '" + PATH_TO_TCP_PROBE + "' not readable.")
print(" Kernel module loaded? Permissions set? Try:\n")
print(" sudo modprobe tcp_probe full=1 port=0 bufsize=512 &&\n \\sudo chmod 444 /proc/net/tcpprobe")
print("=======================================================================")
# Kernel Module "tcp_probe" options
#
# modprobe tcp_probe full=1 port=0 bufsize=512
#
# full: 1=every ack packet received, 0=only cwnd change
# fwmark: skb mark to match (0=no mark)
# bufsize: Log buffer size in packets (4096)
# port: Port to match (0=all)
else:
TcpLog().init()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2015,
# Karlsruhe Institute of Technology, Institute of Telematics
#
# This code is provided under the BSD 2-Clause License.
# Please refer to the LICENSE.txt file for further information.
#
# Author: Michael König
import signal
# import os
import argparse
import time
import threading
import socket
import matplotlib
matplotlib.use('QT5Agg')
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.widgets import Button
# from matplotlib.widgets import Slider, Button, RadioButtons
# from matplotlib.ticker import MultipleLocator, FormatStrFormatter
# import numpy as np
from collections import deque
from ipaddress import ip_address
# Constants
DEFAULT_FILTER_PORT = 9999
CLEAR_TIMEOUT = 1.0 # gap in s
# Strings
FIGURE_TITLE = "TCPplot"
PLOT_TITLE = "Data from"
TIME = "Time"
CWND = "CWND"
RTT = "RTT"
SST = "SST"
PAUSE = "Pause"
QUIT = "Quit"
BANDWIDTH = "Bandwidth"
VALUES_TO_PROCESS = ['time', 'cwnd', 'sst', 'rtt', 'bw', 'bw2']
VALUES_TO_PLOT = ['cwnd', 'sst', 'rtt', 'bw', 'bw2']
NUMBER_OF_VALUES = 9
# format of "/tmp/tcplog" resp. socket-data
#
# 14.910653454 46.163.76.98 9999 35 29312 25 39626 10.750693667388724 10.599101599959623
# ^ ^ ^ ^ ^ ^ ^ ^ ^
# | | | | | | | | |
# | | | | | | | | +------- [8] Bandwidth II (mbit/s)
# | | | | | | | +-------------------------- [7] Bandwidth (mbit/s)
# | | | | | | +-------------------------------- [6] sRTT ms
# | | | | | +----------------------------------- [5] SlowStart-Threshold
# | | | | +----------------------------------------- [4] RWND
# | | | +-------------------------------------------- [3] CWND
# | | +------------------------------------------------- [2] Receiver Port
# | +-------------------------------------------------------------- [1] Receiver IP
# +--------------------------------------------------------------------------- [0] Time seconds
class TcpPlot:
def init(self):
# register signals
signal.signal(signal.SIGINT, self.handleSignals)
signal.signal(signal.SIGTERM, self.handleSignals)
# initialize vars
self.__incomeBuffer = deque()
self.__tmpBuffer = deque()
self.__connectionBuffer = {}
if(len(args.filterPorts) < 1):
args.filterPorts.append(DEFAULT_FILTER_PORT)
for i in args.filterPorts:
self.__connectionBuffer[i] = deque(maxlen=args.bufferLength)
# init tasks and run them
self.__retrieveDataThread = threading.Thread(target=self.retrieveLogData)
self.__retrieveDataThread.start()
self.plotGraph()
def retrieveLogData(self):
self.__filterRetrievedLogData= threading.Thread(target=self.filterRetrievedLogData)
self.__filterRetrievedLogData.start()
if(args.logType == "socket"):
self.retrieveLogDataFromSocket()
else:
self.retrieveLogDataFromFile()
def retrieveLogDataFromSocket(self):
self.__processSocketLogData = threading.Thread(target=self.processSocketLogData)
self.__processSocketLogData.start()
ip, separator, port = args.logServer.rpartition(':')
assert separator
logServerPort = int(port)
logServerIp = ip_address(ip.strip("[]"))
dst = str(logServerIp) + ":" + str(logServerPort)
try:
self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error:
print ("Failed to create socket")
try:
self.__socket.connect((str(logServerIp), logServerPort))
except socket.error:
print ("Could not connect to " + dst + " (Log-Server running?)")
SystemExit
return
while True:
try:
data = self.__socket.recv(4096)
self.__incomeBuffer.append(data)
except:
print ("Could not retrieve data from " + dst)
self.__socket.close()
SystemExit
return
def retrieveLogDataFromFile(self):
try:
self.__logFileHandler = open(args.logFilePath, 'r')
except:
print ("Error: while parsing " + args.logFilePath)
SystemExit
return
else:
loglines = self.tailFile(self.__logFileHandler)
for line in loglines:
line = line.strip()
self.__tmpBuffer.append(line,)
def tailFile(self, fileHandler):
fileHandler.seek(0,2)
while True:
line = fileHandler.readline()
if not line:
time.sleep(0.1)
continue
yield line
def processSocketLogData(self):
tmpBuffer = ""
while(True):
try:
line = self.__incomeBuffer.popleft()
line = line.decode("UTF-8")
lines = line.split("\n")
except IndexError:
time.sleep(0.01)
else:
for i in lines:
data = i.split(" ")
if(tmpBuffer != ""):
tmpBuffer += i
self.__tmpBuffer.append(tmpBuffer)
tmpBuffer = ""
continue
if(len(data) < NUMBER_OF_VALUES):
tmpBuffer += i
else:
self.__tmpBuffer.append(i)
def filterRetrievedLogData(self):
while(True):
try:
line = self.__tmpBuffer.popleft()
except IndexError:
time.sleep(0.01)
else:
tmp = line.split(" ")
if(len(line) < NUMBER_OF_VALUES):
continue
try:
port = int(tmp[2])
except ValueError:
continue