-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathEtherSenseServer.py
executable file
·140 lines (115 loc) · 4.3 KB
/
EtherSenseServer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#!/usr/bin/python
import pyrealsense2 as rs
import sys, getopt
import asyncore
import numpy as np
import pickle
import socket
import struct
import cv2
print('Number of arguments:', len(sys.argv), 'arguments.')
print('Argument List:', str(sys.argv))
mc_ip_address = '224.0.0.1'
port = 1024
chunk_size = 4096
#rs.log_to_console(rs.log_severity.debug)
def getDepthAndTimestamp(pipeline, depth_filter):
frames = pipeline.wait_for_frames()
# take owner ship of the frame for further processing
frames.keep()
depth = frames.get_depth_frame()
if depth:
depth2 = depth_filter.process(depth)
# take owner ship of the frame for further processing
depth2.keep()
# represent the frame as a numpy array
depthData = depth2.as_frame().get_data()
depthMat = np.asanyarray(depthData)
ts = frames.get_timestamp()
return depthMat, ts
else:
return None, None
def openPipeline():
cfg = rs.config()
cfg.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
pipeline = rs.pipeline()
pipeline_profile = pipeline.start(cfg)
sensor = pipeline_profile.get_device().first_depth_sensor()
return pipeline
class DevNullHandler(asyncore.dispatcher_with_send):
def handle_read(self):
print(self.recv(1024))
def handle_close(self):
self.close()
class EtherSenseServer(asyncore.dispatcher):
def __init__(self, address):
asyncore.dispatcher.__init__(self)
print("Launching Realsense Camera Server")
try:
self.pipeline = openPipeline()
except:
print("Unexpected error: ", sys.exc_info()[1])
sys.exit(1)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
print('sending acknowledgement to', address)
# reduce the resolution of the depth image using post processing
self.decimate_filter = rs.decimation_filter()
self.decimate_filter.set_option(rs.option.filter_magnitude, 2)
self.frame_data = ''
self.connect((address[0], 1024))
self.packet_id = 0
def handle_connect(self):
print("connection received")
def writable(self):
return True
def update_frame(self):
depth, timestamp = getDepthAndTimestamp(self.pipeline, self.decimate_filter)
if depth is not None:
# convert the depth image to a string for broadcast
data = pickle.dumps(depth)
# capture the lenght of the data portion of the message
length = struct.pack('<I', len(data))
# include the current timestamp for the frame
ts = struct.pack('<d', timestamp)
# for the message for transmission
self.frame_data = ''.join([length, ts, data])
def handle_write(self):
# first time the handle_write is called
if not hasattr(self, 'frame_data'):
self.update_frame()
# the frame has been sent in it entirety so get the latest frame
if len(self.frame_data) == 0:
self.update_frame()
else:
# send the remainder of the frame_data until there is no data remaining for transmition
remaining_size = self.send(self.frame_data)
self.frame_data = self.frame_data[remaining_size:]
def handle_close(self):
self.close()
class MulticastServer(asyncore.dispatcher):
def __init__(self, host = mc_ip_address, port=1024):
asyncore.dispatcher.__init__(self)
server_address = ('', port)
self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.bind(server_address)
def handle_read(self):
data, addr = self.socket.recvfrom(42)
print('Recived Multicast message %s bytes from %s' % (data, addr))
# Once the server recives the multicast signal, open the frame server
EtherSenseServer(addr)
print(sys.stderr, data)
def writable(self):
return False # don't want write notifies
def handle_close(self):
self.close()
def handle_accept(self):
channel, addr = self.accept()
print('received %s bytes from %s' % (data, addr))
def main(argv):
# initalise the multicast receiver
server = MulticastServer()
# hand over excicution flow to asyncore
asyncore.loop()
if __name__ == '__main__':
main(sys.argv[1:])