read_card.py 6.4 KB
Newer Older
axel.maurer's avatar
axel.maurer committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
#!/usr/bin/env python3
import time
import serial
import sys
import logging
from serial.tools.list_ports import comports
from time import sleep

VERSION = '1.8.5'

NO_ERROR = 0
REQUEST_ERROR_TIMEOUT = 1
REQUEST_ERROR_CONNECTION = 2
READER_READ_TIMEOUT = 604

ERROR_READER_INIT = 700
ERROR_READER_WRITE = 701
ERROR_READER_READ = 702
ERROR_HTTP_ERROR = 703
UNKNOWN_READER_ERROR = 800


OK = 0
SER_PORT_NAME = 'Elatec'



class SerialConnection:
    def __init__(self, serPort):
        try:
            self.ser = serial.Serial(
                port=serPort,
                # timeout=200,
                baudrate=9600,
                parity=serial.PARITY_NONE,
                stopbits=serial.STOPBITS_ONE,
                bytesize=serial.EIGHTBITS
            )
            self.returnCode = OK
        except serial.serialutil.SerialException as e:
            logging.debug(f"error on serial connection: {e}")
            self.returnCode = ERROR_READER_INIT
            self.returnMsg = f'No connection on {serPort}'
        except:
            self.returnCode = UNKNOWN_READER_ERROR
            self.returnMsg = f'No connection on {serPort}'

    def getReturnCode(self):
        return(self.returnCode)

    def getReturnMsg(self):
        return(self.returnMsg)

    def close(self):
        self.ser.close()

    def write(self, byteStr, waitSec):
        logging.debug(f"WRITE Serial: {byteStr}")
        self.ser.write(byteStr+'\r'.encode())
        self.ser.flush()
        time.sleep(waitSec)

    def read(self):
        try:
            readResult=self.ser.read()
            time.sleep(0.2)
            if self.ser.in_waiting > 0:
                self.resultStr = readResult + self.ser.read(self.ser.in_waiting)
                self.returnMsg = self.resultStr
                self.returnCode = OK
                self.ser.flushInput()
                return (self.returnCode)
            else:
                self.returnCode = ERROR_READER_WRITE
                self.returnMsg = "Error reading CardReader"
        except:
            self.returnCode = ERROR_READER_WRITE
            self.returnMsg = "Error reading CardReader"


class CardReader:
    def __init__(self):
        self.WAIT_FOR_CARD = "99"
        self.LOGIN_DATA = "51"
        self.SECURE_INIT = "100"
        self.SECURE_READ_APP_FILE = "110"
        self.SECURE_CREATE_APP = "120"
        self.SECURE_WRITE_FILE = "130"
        self.ERROR_UNKNOWN = 1
        self.ERROR_CONNECT_HTTP = 2
        self.ERROR_SERVER_HTTP = 3
        self.ERROR_READER_STOP_READING = 799
        self.serPort = None
        if comports:
            comportsList = list(comports())
            for port in comportsList:
                if port.manufacturer.startswith(SER_PORT_NAME):
                    self.serPort = port[0]
                    break
            if not self.serPort:
                self.returnMsg = f"device with name starts with: {SER_PORT_NAME} not found"
                self.returnCode = ERROR_READER_INIT
                return
        self.serCon = SerialConnection(self.serPort)
        self.returnCode = self.serCon.getReturnCode()
        if self.returnCode != OK:
            self.returnMsg = f"cardReader serial error with message: {self.serCon.getReturnMsg()}"
            logging.debug(self.returnMsg)

    def getReturnCode(self):
        return(self.returnCode)

    def getReturnMsg(self):
        return(self.returnMsg)

    def write(self, opCode, opParmStr, opParmLen):
        self.serCon.ser.reset_input_buffer()
        self.serCon.ser.reset_output_buffer()
        try:
            if opParmStr:
                if opParmLen > 0:
                    self.serCon.write(b"".join([opCode.encode(), ','.encode(), format(
                        opParmLen, '02X').encode(), opParmStr.encode()]), 0.0)
                else:
                    self.serCon.write(b"".join([opCode.encode(), ','.encode(), opParmStr.encode()]), 0.0)
            else:
                self.serCon.write(opCode.encode(), 0.0)
            self.returnCode = OK
        except:
            self.returnCode = ERROR_READER_WRITE
        self.returnMsg = ''
        return(self.returnCode)

    def read(self,timeout=None):
        self.serCon.ser.timeout = timeout
        if self.serCon.read() == OK:
            resultBytes = self.serCon.getReturnMsg()
            for line in resultBytes.decode().splitlines():
                if line[0:6] == "DEBUG:":
                    logging.debug(line)
                else:
                    if ',' in line:
                        returnList = line.split(',')
                        self.returnCode = int(returnList[0])
                        if len(returnList) > 1:
                            self.returnMsg = returnList[1]
                        else:
                            if self.returnCode == 0:
                                self.returnMsg = 'success'
                            else:
                                self.returnMsg = f'error: {self.returnCode}'
                    else:
                        self.returnCode = 0
                        self.returnMsg = line
            # time.sleep(0.1)
            return(OK)
        else:
            if timeout:
                return(READER_READ_TIMEOUT)
            else:
                return(ERROR_READER_READ)

    def doOp(self,  opCode, opParmStr, opParmLen, timeout=None):
        self.serCon.ser.timeout = timeout
        self.write(opCode, opParmStr, opParmLen)
        self.returnMsg = ''
        retCode = self.read(timeout)
        if retCode != OK:
            logging.debug(f'Error on DoOp read: {retCode}')
        return retCode
    
    def cancelOp(self):
        self.serCon.ser.write(b'x'+b'\r')
        sleep(0.1)
        if self.serCon.ser.in_waiting > 0:
            self.serCon.ser.read(self.serCon.ser.in_waiting)
        self.serCon.ser.reset_input_buffer()
        self.serCon.ser.reset_output_buffer()

    
    def clear(self):
        if self.serCon.ser.in_waiting > 0:
            self.read()
        self.serCon.ser.reset_output_buffer()
        self.serCon.ser.reset_input_buffer()
        self.serCon.ser.close()




if __name__ == "__main__":
    logging.basicConfig(
        format='%(asctime)s - %(message)s', level=logging.DEBUG)
    cardReader = CardReader()
    if cardReader.getReturnCode() != OK:
        print(cardReader.getReturnMsg())
        sys.exit()
axel.maurer's avatar
axel.maurer committed
198
199
    # actSeconds = str(int(time.time()))
    cardReader.doOp(f'01',None,0)
axel.maurer's avatar
axel.maurer committed
200
201
    print(cardReader.getReturnCode(),cardReader.getReturnMsg())
    cardReader.clear()