-
Notifications
You must be signed in to change notification settings - Fork 13
/
main.py
executable file
·113 lines (87 loc) · 3.79 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#!/usr/bin/env python3
import argparse
import os
import io
import tornado.ioloop
import tornado.web
import tornado.websocket
from PIL import Image
import pygame.camera
import pygame.image
parser = argparse.ArgumentParser(description='Start the PyImageStream server.')
parser.add_argument('--port', default=8888, type=int, help='Web server port (default: 8888)')
parser.add_argument('--camera', default=0, type=int, help='Camera index, first camera is 0 (default: 0)')
parser.add_argument('--width', default=640, type=int, help='Width (default: 640)')
parser.add_argument('--height', default=480, type=int, help='Height (default: 480)')
parser.add_argument('--quality', default=70, type=int, help='JPEG Quality 1 (worst) to 100 (best) (default: 70)')
parser.add_argument('--stopdelay', default=7, type=int, help='Delay in seconds before the camera will be stopped after '
'all clients have disconnected (default: 7)')
args = parser.parse_args()
class Camera:
def __init__(self, index, width, height, quality, stopdelay):
print("Initializing camera...")
pygame.camera.init()
camera_name = pygame.camera.list_cameras()[index]
self._cam = pygame.camera.Camera(camera_name, (width, height))
print("Camera initialized")
self.is_started = False
self.stop_requested = False
self.quality = quality
self.stopdelay = stopdelay
def request_start(self):
if self.stop_requested:
print("Camera continues to be in use")
self.stop_requested = False
if not self.is_started:
self._start()
def request_stop(self):
if self.is_started and not self.stop_requested:
self.stop_requested = True
print("Stopping camera in " + str(self.stopdelay) + " seconds...")
tornado.ioloop.IOLoop.current().call_later(self.stopdelay, self._stop)
def _start(self):
print("Starting camera...")
self._cam.start()
print("Camera started")
self.is_started = True
def _stop(self):
if self.stop_requested:
print("Stopping camera now...")
self._cam.stop()
print("Camera stopped")
self.is_started = False
self.stop_requested = False
def get_jpeg_image_bytes(self):
img = self._cam.get_image()
imgstr = pygame.image.tostring(img, "RGB", False)
pimg = Image.frombytes("RGB", img.get_size(), imgstr)
with io.BytesIO() as bytesIO:
pimg.save(bytesIO, "JPEG", quality=self.quality, optimize=True)
return bytesIO.getvalue()
camera = Camera(args.camera, args.width, args.height, args.quality, args.stopdelay)
class ImageWebSocket(tornado.websocket.WebSocketHandler):
clients = set()
def check_origin(self, origin):
# Allow access from every origin
return True
def open(self):
ImageWebSocket.clients.add(self)
print("WebSocket opened from: " + self.request.remote_ip)
camera.request_start()
def on_message(self, message):
jpeg_bytes = camera.get_jpeg_image_bytes()
self.write_message(jpeg_bytes, binary=True)
def on_close(self):
ImageWebSocket.clients.remove(self)
print("WebSocket closed from: " + self.request.remote_ip)
if len(ImageWebSocket.clients) == 0:
camera.request_stop()
script_path = os.path.dirname(os.path.realpath(__file__))
static_path = script_path + '/static/'
app = tornado.web.Application([
(r"/websocket", ImageWebSocket),
(r"/(.*)", tornado.web.StaticFileHandler, {'path': static_path, 'default_filename': 'index.html'}),
])
app.listen(args.port)
print("Starting server: http://localhost:" + str(args.port) + "/")
tornado.ioloop.IOLoop.current().start()