socket.py 3.54 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# -*- coding: utf-8 -*-

import time
import socket
from ipaddress import ip_address

import threading

from .input_base import InputBase

# to remove --> use optioms
DEFAULT_SOCKETSERVER_PORT = 11337
DEFAULT_SOCKETSERVER_LOCATION = 'localhost:' + str(DEFAULT_SOCKETSERVER_PORT)
LOG_FORMAT = ['time', 'srcIp', 'srcPort', 'dstIp', 'dstPort', 'cwnd', 'rwnd', 'sst', 'rtt', 'bw', 'loss']
#at least time & dstPort required
NUMBER_OF_VALUES = len(LOG_FORMAT)

LOGSERVER_CONNECT_RETRY_TIME = 3 #in s

class SocketInput(InputBase):
    def __init__(self, options, infoRegistry):
        self.options = options
        self.infoRegistry = infoRegistry

    def startupCheck(self):
        pass

    def startUp(self):
        pass

    def tearDown(self):
        pass

    def retrieveNewSamples(self):
        """Reads data (in blocks) from a socket and adds the received data to an income buffer."""
        self.__processSocketLogData = threading.Thread(target=self.processSocketLogData)
        self.__processSocketLogData.start()

        ip, separator, port = self.options.logServer.rpartition(':')
        if(':' not in self.options.logServer or port is ''):
            logServerPort = DEFAULT_SOCKETSERVER_PORT
            logServerIp = ip_address(socket.gethostbyname(self.options.logServer.strip("[]")))
        else:
            logServerPort = int(port)
            logServerIp = ip_address(socket.gethostbyname(ip.strip("[]")))
        dst = str(logServerIp) + ":" + str(logServerPort)

        try:
            self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        except socket.error:
            print("Failed to create socket")

        while(not self.__stopped.wait(0.00001)):
            try:
                self.__socket.connect((str(logServerIp), logServerPort))
                print("Successfully connected to " + dst + "")
            except socket.error:
                print("Error: Could not connect to " + dst + " Retrying in " + str(LOGSERVER_CONNECT_RETRY_TIME) + "s ...")
                time.sleep(LOGSERVER_CONNECT_RETRY_TIME)
            else:
                break

        while(not self.__stopped.wait(0.00001)):
            try:
                data = self.__socket.recv(4096)
            except socket.timeout:
                print("Connection timeout.")
                self.__socket.close()
                return
            except IOError:
                print("Error: Could not retrieve data from " + dst)
                self.__socket.close()
                return
            else:
                if(len(data) == 0):
                    print("Connection closed by foreign host.")
                    self.__socket.close()
                    break;
                self.__incomeBuffer.append(data)


    def processSocketLogData(self):
        """Reads data from the income buffer and tries to reassemble splitted data."""
        tmpBuffer = ""
        while(True):
            try:
                line = self.__incomeBuffer.popleft()
                line = line.decode("UTF-8")
                lines = line.split("\n")
            except IndexError:
                time.sleep(0.00001)
            else:
                for i in lines:
                    data = i.split(" ")
                    if(tmpBuffer != ""):
                        tmpBuffer += i
                        self.__tmpBuffer.append(tmpBuffer)
                        tmpBuffer = ""
                        continue

                    if(len(data) < NUMBER_OF_VALUES):
                        tmpBuffer += i
                    else:
                        self.__tmpBuffer.append(i)