socket.py 5.01 KB
Newer Older
1 2 3 4
# -*- coding: utf-8 -*-

import time
import socket
Michael König's avatar
Michael König committed
5
import threading
6
from ipaddress import ip_address
7
from collections import deque
8 9 10 11 12 13 14 15 16 17

from .input_base import InputBase

# to remove --> use optioms
DEFAULT_SOCKETSERVER_PORT = 11337
DEFAULT_SOCKETSERVER_LOCATION = 'localhost:' + str(DEFAULT_SOCKETSERVER_PORT)
LOG_FORMAT = ['time', 'srcIp', 'srcPort', 'dstIp', 'dstPort', 'cwnd', 'rwnd', 'sst', 'rtt', 'bw', 'loss']
#at least time & dstPort required
NUMBER_OF_VALUES = len(LOG_FORMAT)

18 19
LOGSERVER_CONNECT_RETRY_TIME = 1 #in s
LOGSERVER_ERROR_TIMEOUT = 0.5 #in s
20 21 22 23 24

class SocketInput(InputBase):
    def __init__(self, options, infoRegistry):
        self.options = options
        self.infoRegistry = infoRegistry
25 26 27 28
        self.__incomeBuffer = deque()
        self.incomeBuffer = deque()
        self.connectionUp = False
        self.socketUp = False
Michael König's avatar
Michael König committed
29
        self.__stopped = threading.Event()
30

31 32 33 34 35 36 37 38 39
        ip, separator, port = self.options.logServer.rpartition(':')
        if(':' not in self.options.logServer or port is ''):
            self.logServerPort = DEFAULT_SOCKETSERVER_PORT
            self.logServerIp = ip_address(socket.gethostbyname(self.options.logServer.strip("[]")))
        else:
            self.logServerPort = int(port)
            self.logServerIp = ip_address(socket.gethostbyname(ip.strip("[]")))
        self.dst = str(self.logServerIp) + ":" + str(self.logServerPort)

40 41 42 43
    def startupCheck(self):
        pass

    def startUp(self):
44
        #TODO: eprint + debug..
45 46 47
        self.createSocket()

        while not self.connectToServer():
Michael König's avatar
Michael König committed
48 49 50 51
            if self.__stopped:
                return
            else:
                pass
52 53

    def createSocket(self):
54 55 56 57 58
        try:
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        except socket.error:
            print("Failed to create socket")
        else:
59 60 61
            self.socketUp = True
            if(self.options.debug):
                print("Socket created.")
62

63 64 65 66 67 68 69 70
    def connectToServer(self):
        try:
            self.socket.connect((str(self.logServerIp), self.logServerPort))
            self.connectionUp = True
            print("Successfully connected to " + self.dst + "")
            return True
        except socket.error:
            print("Error: Could not connect to " + self.dst + ". Retrying in " + str(LOGSERVER_CONNECT_RETRY_TIME) + "s ...")
Michael König's avatar
Michael König committed
71
            self.__stopped.wait(LOGSERVER_CONNECT_RETRY_TIME)
72 73
            return False
        else:
Michael König's avatar
Michael König committed
74
            self.__stopped.wait(LOGSERVER_CONNECT_RETRY_TIME)
75 76 77 78 79 80 81 82
            return False

    def reconnectToServer(self):
        if not self.socketUp:
            self.socket.close()
            self.createSocket()

        while not self.connectToServer():
Michael König's avatar
Michael König committed
83 84 85 86
            if self.__stopped:
                return
            else:
                pass
87 88

    def tearDown(self):
Michael König's avatar
Michael König committed
89
        self.__stopped.set()
90
        self.socket.close()
91 92 93
        pass

    def retrieveNewSamples(self):
94 95 96
        """
        Reads data (in blocks) from a socket and adds the received data to an temporaray income buffer.
        """
97

98 99 100 101 102 103 104 105 106 107 108 109 110 111
        self.retrieveDataFromSocket();
        self.processDataFromSocket();

        data = []
        while(True):
            try:
                dataLine = self.incomeBuffer.popleft()
            except IndexError:
                return data
            else:
                data.append(dataLine)


    def retrieveDataFromSocket(self):
112 113 114 115 116 117 118 119 120 121 122
        if(self.connectionUp):
            try:
                data = self.socket.recv(4096)
            except socket.timeout:
                print("Connection timeout.")
                self.socket.close()
                self.socketUp = False
                self.connectionUp = False
                return ""
            except IOError:
                print("Error: Could not retrieve data from " + self.dst)
123
                self.socket.close()
124 125 126
                self.socketUp = False
                self.connectionUp = False
                return ""
127
            else:
128 129 130 131 132 133 134 135 136
                if(len(data) == 0):
                    print("Connection closed by foreign host.")
                    self.socket.close()
                    self.socketUp = False
                    self.connectionUp = False
                else:
                    self.__incomeBuffer.append(data)
        else:
            self.reconnectToServer()
137 138


139
    def processDataFromSocket(self):
140 141
        """Reads data from the income buffer and tries to reassemble splitted data."""
        tmpBuffer = ""
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
        try:
            line = self.__incomeBuffer.popleft()
            line = line.decode("UTF-8")
            lines = line.split("\n")
        except IndexError:
            time.sleep(0.00001)
        else:
            for i in lines:
                data = i.split(" ")
                if(tmpBuffer != ""):
                    tmpBuffer += i
                    self.incomeBuffer.append(tmpBuffer)
                    tmpBuffer = ""
                    continue

                if(len(data) < NUMBER_OF_VALUES):
                    tmpBuffer += i
                else:
                    self.incomeBuffer.append(i)