forked from dsully/pykafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathio.py
83 lines (61 loc) · 1.86 KB
/
io.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import array
import errno
import socket
class IO(object):
""" Base class for handling socket communication to the Kafka server. """
def __init__(self, host='localhost', port=9092):
self.socket = None
#: Hostname to connect to.
self.host = host
#: Port to connect to.
self.port = port
def connect(self):
""" Connect to the Kafka server. """
self.socket = socket.socket()
self.socket.connect((self.host, self.port))
def reconnect(self):
""" Reconnect to the Kafka server. """
self.disconnect()
self.connect()
def disconnect(self):
""" Disconnect from the remote server & close the socket. """
try:
self.socket.close()
except IOError:
pass
finally:
self.socket = None
def read(self, length):
""" Send a read request to the remote Kafka server. """
# Create a character array to act as the buffer.
buf = bytearray(length)
bytes_left = length
try:
while bytes_left > 0:
read_length = self.socket.recv_into(memoryview(buf)[length-bytes_left:], bytes_left)
bytes_left -= read_length
except errno.EAGAIN:
self.disconnect()
raise IOError, "Timeout reading from the socket."
else:
return str(buf)
def write(self, data):
""" Write `data` to the remote Kafka server. """
if self.socket is None:
self.reconnect()
wrote_length = 0
try:
wrote_length = self.__write(data)
except IOError, e:
# Retry once.
if e.errno in (errno.ECONNRESET, errno.EPIPE, errno.ECONNABORTED):
self.reconnect()
wrote_length = self.__write(data)
finally:
return wrote_length
def __write(self, data):
write_length = len(data)
wrote_length = 0
while write_length > wrote_length:
wrote_length += self.socket.send(data[wrote_length:])
return wrote_length