socket.py 3.44 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
# -*- coding: utf-8 -*-

import time
import socket
from ipaddress import ip_address

from .input_base import InputBase

# to remove --> use optioms
DEFAULT_SOCKETSERVER_PORT = 11337
DEFAULT_SOCKETSERVER_LOCATION = 'localhost:' + str(DEFAULT_SOCKETSERVER_PORT)
LOG_FORMAT = ['time', 'srcIp', 'srcPort', 'dstIp', 'dstPort', 'cwnd', 'rwnd', 'sst', 'rtt', 'bw', 'loss']
#at least time & dstPort required
NUMBER_OF_VALUES = len(LOG_FORMAT)

LOGSERVER_CONNECT_RETRY_TIME = 3 #in s

class SocketInput(InputBase):
    def __init__(self, options, infoRegistry):
        self.options = options
        self.infoRegistry = infoRegistry

23 24 25 26 27 28 29 30 31
        ip, separator, port = self.options.logServer.rpartition(':')
        if(':' not in self.options.logServer or port is ''):
            self.logServerPort = DEFAULT_SOCKETSERVER_PORT
            self.logServerIp = ip_address(socket.gethostbyname(self.options.logServer.strip("[]")))
        else:
            self.logServerPort = int(port)
            self.logServerIp = ip_address(socket.gethostbyname(ip.strip("[]")))
        self.dst = str(self.logServerIp) + ":" + str(self.logServerPort)

32 33 34 35
    def startupCheck(self):
        pass

    def startUp(self):
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
        #TODO: eprint + debug..
        try:
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        except socket.error:
            print("Failed to create socket")
        else:
            print("Socket created.")

        try:
            self.socket.connect((str(self.logServerIp), self.logServerPort))
            print("Successfully connected to " + self.dst + "")
        except socket.error:
            print("\
                    Error: Could not connect to " + self.dst + "\
                    Retrying in " + str(LOGSERVER_CONNECT_RETRY_TIME) + "s ...\
                    ")
            time.sleep(LOGSERVER_CONNECT_RETRY_TIME)
        else:
            pass
55 56

    def tearDown(self):
57
        self.socket.close()
58 59 60
        pass

    def retrieveNewSamples(self):
61 62 63
        """
        Reads data (in blocks) from a socket and adds the received data to an temporaray income buffer.
        """
64 65

        try:
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
            data = self.socket.recv(4096)
        except socket.timeout:
            print("Connection timeout.")
            self.socket.close()
            return ""
        except IOError:
            print("Error: Could not retrieve data from " + self.dst)
            self.socket.close()
            return ""
        else:
            if(len(data) == 0):
                print("Connection closed by foreign host.")
                self.socket.close()
            return data
            self.tmpBuffer.append(data)
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97


    def processSocketLogData(self):
        """Reads data from the income buffer and tries to reassemble splitted data."""
        tmpBuffer = ""
        while(True):
            try:
                line = self.__incomeBuffer.popleft()
                line = line.decode("UTF-8")
                lines = line.split("\n")
            except IndexError:
                time.sleep(0.00001)
            else:
                for i in lines:
                    data = i.split(" ")
                    if(tmpBuffer != ""):
                        tmpBuffer += i
98
                        self.incomeBuffer.append(tmpBuffer)
99 100 101 102 103 104
                        tmpBuffer = ""
                        continue

                    if(len(data) < NUMBER_OF_VALUES):
                        tmpBuffer += i
                    else:
105
                        self.incomeBuffer.append(i)