-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathrtsp_server.hpp
334 lines (289 loc) · 11.5 KB
/
rtsp_server.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
#pragma once
#include <memory>
#include <string>
#include <system_error>
#include <vector>
#if defined(ESP_PLATFORM)
#include <esp_random.h>
#else
#include <random>
#endif
#include "logger.hpp"
#include "task.hpp"
#include "tcp_socket.hpp"
#include "udp_socket.hpp"
#include "jpeg_frame.hpp"
#include "rtcp_packet.hpp"
#include "rtp_jpeg_packet.hpp"
#include "rtp_packet.hpp"
#include "rtsp_session.hpp"
namespace espp {
/// Class for streaming MJPEG data from a camera using RTSP + RTP
/// Starts a TCP socket to listen for RTSP connections, and then spawns off a
/// new RTSP session for each connection.
/// @see RtspSession
/// @note This class does not currently send RTCP packets
///
/// \section RtspServer example
/// \snippet rtsp_example.cpp rtsp_server_example
class RtspServer {
public:
/// @brief Configuration for the RTSP server
struct Config {
std::string server_address; ///< The ip address of the server
int port; ///< The port to listen on
std::string path; ///< The path to the RTSP stream
size_t max_data_size =
1000; ///< The maximum size of RTP packet data for the MJPEG stream. Frames will be broken
///< up into multiple packets if they are larger than this. It seems that 1500 works
///< well for sending, but is too large for the esp32 (camera-display) to receive
///< properly.
Logger::Verbosity log_level = Logger::Verbosity::WARN; ///< The log level for the RTSP server
};
/// @brief Construct an RTSP server
/// @param config The configuration for the RTSP server
explicit RtspServer(const Config &config)
: server_address_(config.server_address), port_(config.port), path_(config.path),
rtsp_socket_({.log_level = espp::Logger::Verbosity::WARN}),
max_data_size_(config.max_data_size),
logger_({.tag = "RTSP Server", .level = config.log_level}) {
// generate a random ssrc
#if defined(ESP_PLATFORM)
ssrc_ = esp_random();
#else
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<uint32_t> dis;
ssrc_ = dis(gen);
#endif
}
/// @brief Destroy the RTSP server
~RtspServer() { stop(); }
/// @brief Sets the log level for the RTSP sessions created by this server
/// @note This does not affect the log level of the RTSP server itself
/// @note This does not change the log level of any sessions that have
/// already been created
/// @param log_level The log level to set
void set_session_log_level(Logger::Verbosity log_level) { session_log_level_ = log_level; }
/// @brief Start the RTSP server
/// Starts the accept task, session task, and binds the RTSP socket
/// @return True if the server was started successfully, false otherwise
bool start() {
if (accept_task_ && accept_task_->is_started()) {
logger_.error("Server is already running");
return false;
}
logger_.info("Starting RTSP server on port {}", port_);
if (!rtsp_socket_.bind(port_)) {
logger_.error("Failed to bind to port {}", port_);
return false;
}
int max_pending_connections = 5;
if (!rtsp_socket_.listen(max_pending_connections)) {
logger_.error("Failed to listen on port {}", port_);
return false;
}
using namespace std::placeholders;
accept_task_ = std::make_unique<Task>(Task::Config{
.name = "RTSP Accept Task",
.callback = std::bind(&RtspServer::accept_task_function, this, _1, _2),
.stack_size_bytes = 6 * 1024,
.log_level = espp::Logger::Verbosity::WARN,
});
accept_task_->start();
return true;
}
/// @brief Stop the FTP server
/// Stops the accept task, session task, and closes the RTSP socket
void stop() {
logger_.info("Stopping RTSP server");
// stop the accept task
if (accept_task_) {
accept_task_->stop();
}
// stop the session task
if (session_task_) {
session_task_->stop();
}
// clear the list of sessions
sessions_.clear();
// close the RTSP socket
rtsp_socket_.close();
}
/// @brief Send a frame over the RTSP connection
/// Converts the full JPEG frame into a series of simplified RTP/JPEG
/// packets and stores it to be sent over the RTP socket, but does not
/// actually send it
/// @note Overwrites any existing frame that has not been sent
/// @param frame The frame to send
void send_frame(const JpegFrame &frame) {
// get the frame scan data
auto frame_header = frame.get_header();
auto frame_data = frame.get_scan_data();
auto width = frame_header.get_width();
auto height = frame_header.get_height();
auto q0 = frame_header.get_quantization_table(0);
auto q1 = frame_header.get_quantization_table(1);
// if the frame data is larger than the MTU, then we need to break it up
// into multiple RTP packets
size_t num_packets = frame_data.size() / max_data_size_ + 1;
logger_.debug("Frame data is {} bytes, breaking into {} packets", frame_data.size(),
num_packets);
// create num_packets RtpJpegPackets
// The first packet will have the quantization tables, and the last packet
// will have the end of image marker and the marker bit set
std::vector<std::unique_ptr<RtpJpegPacket>> packets;
packets.reserve(num_packets);
for (size_t i = 0; i < num_packets; i++) {
// get the start and end indices for the current packet
size_t start_index = i * max_data_size_;
size_t end_index = std::min(start_index + max_data_size_, frame_data.size());
static const int type_specific = 0;
static const int fragment_type = 0;
int offset = i * max_data_size_;
std::unique_ptr<RtpJpegPacket> packet;
// if this is the first packet, it has the quantization tables
if (i == 0) {
// use the original q value and include the quantization tables
packet = std::make_unique<RtpJpegPacket>(
type_specific, fragment_type, 128, width, height, q0, q1,
frame_data.substr(start_index, end_index - start_index));
} else {
// use a different q value (less than 128) and don't include the
// quantization tables
packet = std::make_unique<RtpJpegPacket>(
type_specific, offset, fragment_type, 96, width, height,
frame_data.substr(start_index, end_index - start_index));
}
// set the payload type to 26 (JPEG)
packet->set_payload_type(26);
// set the sequence number
packet->set_sequence_number(sequence_number_++);
// set the timestamp
static auto start_time = std::chrono::steady_clock::now();
auto now = std::chrono::steady_clock::now();
auto timestamp =
std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time).count();
packet->set_timestamp(timestamp * 90);
// set the ssrc
packet->set_ssrc(ssrc_);
// auto mjpeg_header = packet->get_mjpeg_header();
// std::vector<char> mjpeg_vec(mjpeg_header.begin(), mjpeg_header.end());
// if it's the last packet, set the marker bit
if (i == num_packets - 1) {
packet->set_marker(true);
}
// make sure the packet header has been serialized
packet->serialize();
// add the packet to the list of packets
packets.emplace_back(std::move(packet));
}
// now move the packets into the rtp_packets_ vector
{
std::unique_lock<std::mutex> lock(rtp_packets_mutex_);
// move the new packets into the list
rtp_packets_ = std::move(packets);
}
}
protected:
bool accept_task_function(std::mutex &m, std::condition_variable &cv) {
// accept a new connection
auto control_socket = rtsp_socket_.accept();
if (!control_socket) {
logger_.error("Failed to accept new connection");
return false;
}
logger_.info("Accepted new connection");
// create a new session
auto session = std::make_unique<RtspSession>(
std::move(control_socket),
RtspSession::Config{.server_address = fmt::format("{}:{}", server_address_, port_),
.rtsp_path = path_,
.log_level = session_log_level_});
// add the session to the list of sessions
auto session_id = session->get_session_id();
sessions_.emplace(session_id, std::move(session));
// start the session task if it is not already running
using namespace std::placeholders;
if (!session_task_ || !session_task_->is_started()) {
logger_.info("Starting session task");
session_task_ = std::make_unique<Task>(Task::Config{
.name = "RtspSessionTask",
.callback = std::bind(&RtspServer::session_task_function, this, _1, _2),
.stack_size_bytes = 6 * 1024,
.log_level = espp::Logger::Verbosity::WARN,
});
session_task_->start();
}
// we do not want to stop the task
return false;
}
bool session_task_function(std::mutex &m, std::condition_variable &cv) {
// sleep between frames
{
using namespace std::chrono_literals;
std::unique_lock<std::mutex> lk(m);
cv.wait_for(lk, 10ms);
}
// when this function returns, the vector of pointers will go out of scope
// and the pointers will be deleted (which is good because it means we
// won't send the same frame twice)
std::vector<std::unique_ptr<RtpJpegPacket>> packets;
{
// copy the rtp packets into a local vector
std::unique_lock<std::mutex> lock(rtp_packets_mutex_);
if (rtp_packets_.empty()) {
// if there is not a new frame (no packets), then simply return
// we do not want to stop the task
return false;
}
// move the packets into the local vector
packets = std::move(rtp_packets_);
}
logger_.debug("Sending frame data to clients");
// for each session in sessions_
// if the session is active
// send the latest frame to the client
std::lock_guard<std::mutex> lk(session_mutex_);
for (auto &session : sessions_) {
[[maybe_unused]] auto session_id = session.first;
auto &session_ptr = session.second;
// send the packets to the client
for (auto &packet : packets) {
// if the session is not active or is closed, then stop sending
if (!session_ptr->is_active() || session_ptr->is_closed()) {
break;
}
session_ptr->send_rtp_packet(*packet);
}
}
// loop over the sessions and erase ones which are closed
for (auto it = sessions_.begin(); it != sessions_.end();) {
auto &session = it->second;
if (session->is_closed()) {
logger_.info("Removing session {}", session->get_session_id());
it = sessions_.erase(it);
} else {
++it;
}
}
// we do not want to stop the task
return false;
}
uint32_t ssrc_; ///< the ssrc (synchronization source identifier) for the RTP packets
uint16_t sequence_number_{0}; ///< the sequence number for the RTP packets
std::string server_address_; ///< the address of the server
int port_; ///< the port of the RTSP server
std::string path_; ///< the path of the RTSP server, e.g. rtsp:://<ip>:<port>/<path>
TcpSocket rtsp_socket_;
size_t max_data_size_;
std::mutex rtp_packets_mutex_;
std::vector<std::unique_ptr<RtpJpegPacket>> rtp_packets_;
Logger::Verbosity session_log_level_{Logger::Verbosity::WARN};
std::mutex session_mutex_;
std::unordered_map<int, std::unique_ptr<RtspSession>> sessions_;
Logger logger_;
std::unique_ptr<Task> accept_task_;
std::unique_ptr<Task> session_task_;
};
} // namespace espp