-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtcpclient.py
118 lines (100 loc) · 3.91 KB
/
tcpclient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import errno
import select
import socket
import logging
import traceback
from abc import ABCMeta, abstractmethod
class TcpClient:
__metaclass__ = ABCMeta
def __init__(self, host, port):
self.logger = logging.getLogger(__name__)
self.server_socket = None
self.received_buffer = bytearray()
self.host = host
self.port = port
self.select_timeout = 0.5
self.listener = None
self.inputs = []
self.outputs = []
self.output_message_stacks = {}
self.r = None
self.w = None
@staticmethod
def close_sockets(socket_container):
for sock in socket_container:
if sock:
sock.close()
def cleanup(self):
TcpClient.close_sockets(self.inputs)
TcpClient.close_sockets(self.outputs)
def is_connected(self):
return self.server_socket is not None
@abstractmethod
def on_connect(self):
pass
def connect(self):
if self.server_socket:
raise Exception("Already connected...")
self.server_socket = socket.socket()
self.server_socket.settimeout(10)
self.logger.info('Connecting on [{0}:{1}]'.format(self.host, self.port))
self.server_socket.connect((self.host, self.port))
self.inputs.append(self.server_socket)
# TODO: investigate why this goes to 100% CPU
self.output_message_stacks[self.server_socket] = []
self.on_connect()
def remove_server_socket(self):
self.logger.info('Removing server socket [{}]'.format(self.server_socket.getsockname()))
if self.server_socket in self.outputs:
self.outputs.remove(self.server_socket)
if self.server_socket in self.inputs:
self.inputs.remove(self.server_socket)
self.server_socket.close()
if self.server_socket in self.r:
self.r.remove(self.server_socket)
if self.server_socket in self.w:
self.w.remove(self.server_socket)
self.server_socket = None
@abstractmethod
def on_read_from_server(self):
pass
def process_sockets(self):
if len(self.output_message_stacks[self.server_socket]):
self.outputs.append(self.server_socket)
self.r, self.w, _ = select.select(self.inputs, self.outputs, self.inputs, self.select_timeout)
for sock in self.r:
self.generic_handle(handler=self.read_from_server, sock=sock)
for sock in self.w:
self.generic_handle(handler=self.write_to_server, sock=sock)
def read_from_server(self, **kwargs):
sock = kwargs.get('sock')
data = sock.recv(8192)
if data:
self.logger.debug('Adding server data ({}) to received buffer'.format(len(data)))
self.received_buffer += data
self.on_read_from_server()
else:
self.logger.info('Server [{}] closed its socket'.format(sock.getpeername()))
self.remove_server_socket()
def write_to_server(self, **kwargs):
sock = kwargs.get('sock')
while len(self.output_message_stacks[sock]) > 0:
next_message = self.output_message_stacks[sock].pop(0)
sock.send(next_message)
self.outputs.remove(sock)
def generic_handle(self, **kwargs):
try:
kwargs['handler'](**kwargs)
return
except KeyboardInterrupt:
raise
except KeyError:
pass
except socket.error as exception:
if exception.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN):
self.logger.error('Server connection lost, unhandled errno [{}]'.format(exception.errno))
self.logger.error(traceback.print_exc())
except Exception as exception:
self.logger.error('generic_handle: {}'.format(exception))
self.logger.error(traceback.print_exc())
self.remove_server_socket()