-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathudp_socket.hpp
320 lines (306 loc) · 12.8 KB
/
udp_socket.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
#pragma once
#include <optional>
#include <string_view>
#include <vector>
#include "logger.hpp"
#include "socket.hpp"
#include "task.hpp"
// TODO: should this class _contain_ a socket or just create sockets within each
// call?
namespace espp {
/**
* @brief Class for managing sending and receiving data using UDP/IP. Can be
* used to create client or server sockets.
*
* See
* https://github.com/espressif/esp-idf/tree/master/examples/protocols/sockets/udp_multicast
* for more information on udp multicast sockets.
*
* \section udp_ex1 UDP Client Example
* \snippet socket_example.cpp UDP Client example
* \section udp_ex2 UDP Server Example
* \snippet socket_example.cpp UDP Server example
*
* \section udp_ex3 UDP Client Response Example
* \snippet socket_example.cpp UDP Client Response example
* \section udp_ex4 UDP Server Response Example
* \snippet socket_example.cpp UDP Server Response example
*
* \section udp_ex5 UDP Multicast Client Example
* \snippet socket_example.cpp UDP Multicast Client example
* \section udp_ex6 UDP Multicast Server Example
* \snippet socket_example.cpp UDP Multicast Server example
*
*/
class UdpSocket : public Socket {
public:
struct ReceiveConfig {
size_t port; /**< Port number to bind to / receive from. */
size_t buffer_size; /**< Max size of data we can receive at one time. */
bool is_multicast_endpoint{false}; /**< Whether this should be a multicast endpoint. */
std::string multicast_group{
""}; /**< If this is a multicast endpoint, this is the group it belongs to. */
receive_callback_fn on_receive_callback{
nullptr}; /**< Function containing business logic to handle data received. */
};
struct SendConfig {
std::string ip_address; /**< Address to send data to. */
size_t port; /**< Port number to send data to.*/
bool is_multicast_endpoint{false}; /**< Whether this should be a multicast endpoint. */
bool wait_for_response{false}; /**< Whether to wait for a response from the remote or not. */
size_t response_size{
0}; /**< If waiting for a response, this is the maximum size response we will receive. */
response_callback_fn on_response_callback{
nullptr}; /**< If waiting for a response, this is an optional handler which is provided the
response data. */
std::chrono::duration<float> response_timeout{
0.5f}; /**< If waiting for a response, this is the maximum timeout to wait. */
};
struct Config {
Logger::Verbosity log_level{
Logger::Verbosity::WARN}; /**< Verbosity level for the UDP socket logger. */
};
/**
* @brief Initialize the socket and associated resources.
* @param config Config for the socket.
*/
explicit UdpSocket(const Config &config)
: Socket(Type::DGRAM, Logger::Config{.tag = "UdpSocket", .level = config.log_level}) {}
/**
* @brief Tear down any resources associted with the socket.
*/
~UdpSocket() {
// we have to explicitly call cleanup here so that the server recvfrom
// will return and the task can stop.
cleanup();
}
/**
* @brief Send data to the endpoint specified by the send_config.
* Can be configured to multicast (within send_config) and can be
* configured to block waiting for a response from the remote.
*
* @note in the case of multicast, it will block only until the first
* response.
*
* If response is requested, a callback can be provided in
* send_config which will be provided the response data for
* processing.
* @param data vector of bytes to send to the remote endpoint.
* @param send_config SendConfig struct indicating where to send and whether
* to wait for a response.
* @return true if the data was sent, false otherwise.
*/
bool send(const std::vector<uint8_t> &data, const SendConfig &send_config) {
return send(std::string_view{(const char *)data.data(), data.size()}, send_config);
}
/**
* @brief Send data to the endpoint specified by the send_config.
* Can be configured to multicast (within send_config) and can be
* configured to block waiting for a response from the remote.
*
* @note in the case of multicast, it will block only until the first
* response.
*
* If response is requested, a callback can be provided in
* send_config which will be provided the response data for
* processing.
* @param data String view of bytes to send to the remote endpoint.
* @param send_config SendConfig struct indicating where to send and whether
* to wait for a response.
* @return true if the data was sent, false otherwise.
*/
bool send(std::string_view data, const SendConfig &send_config) {
if (!is_valid()) {
logger_.error("Socket invalid, cannot send");
return false;
}
if (send_config.is_multicast_endpoint) {
// configure it for multicast
if (!make_multicast()) {
logger_.error("Cannot make multicast: {} - '{}'", errno, strerror(errno));
return false;
}
}
// set the receive timeout
if (!set_receive_timeout(send_config.response_timeout)) {
logger_.error("Could not set receive timeout to {}: {} - '{}'",
send_config.response_timeout.count(), errno, strerror(errno));
return false;
}
// sendto
Socket::Info server_info;
server_info.init_ipv4(send_config.ip_address, send_config.port);
auto server_address = server_info.ipv4_ptr();
logger_.info("Client sending {} bytes to {}:{}", data.size(), send_config.ip_address,
send_config.port);
int num_bytes_sent = sendto(socket_, data.data(), data.size(), 0,
(struct sockaddr *)server_address, sizeof(*server_address));
if (num_bytes_sent < 0) {
logger_.error("Error occurred during sending: {} - '{}'", errno, strerror(errno));
return false;
}
logger_.debug("Client sent {} bytes", num_bytes_sent);
// we don't need to wait for a response and the socket is good;
if (!send_config.wait_for_response) {
return true;
}
if (send_config.response_size == 0) {
logger_.warn("Response requested, but response_size=0, not waiting for response!");
// NOTE: we did send successfully, so we return true and warn about
// misconfiguration
return true;
}
std::vector<uint8_t> received_data;
logger_.info("Client waiting for response");
if (!receive(send_config.response_size, received_data, server_info)) {
logger_.warn("Client could not get response");
return false;
}
logger_.info("Client got {} bytes of response", received_data.size());
if (send_config.on_response_callback) {
logger_.debug("Client calling response callback");
send_config.on_response_callback(received_data);
}
return true;
}
/**
* @brief Call recvfrom on the socket, assuming it has already been
* configured appropriately.
*
* @param max_num_bytes Maximum number of bytes to receive.
* @param data Vector of bytes of received data.
* @param remote_info Socket::Info containing the sender's information. This
* will be populated with the information about the sender.
* @return true if successfully received, false otherwise.
*/
bool receive(size_t max_num_bytes, std::vector<uint8_t> &data, Socket::Info &remote_info) {
if (!is_valid()) {
logger_.error("Socket invalid, cannot receive.");
return false;
}
// recvfrom
auto remote_address = remote_info.ipv4_ptr();
socklen_t socklen = sizeof(*remote_address);
// put it on the heap so that our stack usage doesn't change depending on
// max_num_bytes
std::unique_ptr<uint8_t[]> receive_buffer(new uint8_t[max_num_bytes]());
// now actually receive
logger_.info("Receiving up to {} bytes", max_num_bytes);
int num_bytes_received = recvfrom(socket_, receive_buffer.get(), max_num_bytes, 0,
(struct sockaddr *)remote_address, &socklen);
// if we didn't receive anything return false and don't do anything else
if (num_bytes_received < 0) {
logger_.error("Receive failed: {} - '{}'", errno, strerror(errno));
return false;
}
// we received data, so call the callback function if one was provided.
uint8_t *data_ptr = (uint8_t *)receive_buffer.get();
data.assign(data_ptr, data_ptr + num_bytes_received);
remote_info.update();
logger_.debug("Received {} bytes from {}", num_bytes_received, remote_info);
return true;
}
/**
* @brief Configure a server socket and start a thread to continuously
* receive and handle data coming in on that socket.
*
* @param task_config Task::Config struct for configuring the receive task.
* @param receive_config ReceiveConfig struct with socket and callback info.
* @return true if the socket was created and task was started, false otherwise.
*/
bool start_receiving(Task::Config &task_config, const ReceiveConfig &receive_config) {
if (task_ && task_->is_started()) {
logger_.error("Server is alrady receiving");
return false;
}
if (!is_valid()) {
logger_.error("Socket invalid, cannot start receiving.");
return false;
}
server_receive_callback_ = receive_config.on_receive_callback;
// bind
struct sockaddr_in server_addr;
// configure the server socket accordingly - assume IPV4 and bind to the
// any address "0.0.0.0"
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_family = address_family_;
server_addr.sin_port = htons(receive_config.port);
int err = bind(socket_, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (err < 0) {
logger_.error("Unable to bind: {} - '{}'", errno, strerror(errno));
return false;
}
if (receive_config.is_multicast_endpoint) {
// enable multicast
if (!make_multicast()) {
logger_.error("Unable to make bound socket multicast: {} - '{}'", errno, strerror(errno));
return false;
}
// add multicast group
if (!add_multicast_group(receive_config.multicast_group)) {
logger_.error("Unable to add multicast group to bound socket: {} - '{}'", errno,
strerror(errno));
return false;
}
}
// set the callback function
using namespace std::placeholders;
task_config.callback =
std::bind(&UdpSocket::server_task_function, this, receive_config.buffer_size, _1, _2);
// start the thread
task_ = Task::make_unique(task_config);
task_->start();
return true;
}
protected:
/**
* @brief Function run in the task_ when start_receiving is called.
* Continuously receive data on the socket, pass the received data to
* the registered callback function (registered in start_receiving in
* the ReceiveConfig struct), and optionally respond to the sender if
* the registered callback returns data.
* @param buffer_size number of bytes of receive buffer allowed.
* @param m std::mutex provided from the task for use with the
* condition_variable (cv)
* @param cv std::condition_variable from the task for allowing
* interruptible wait / delay.
* @return Return true if the task should stop; false if it should continue.
*/
bool server_task_function(size_t buffer_size, std::mutex &m, std::condition_variable &cv) {
// receive data
std::vector<uint8_t> received_data;
Socket::Info sender_info;
if (!receive(buffer_size, received_data, sender_info)) {
// if we failed to receive, then likely we should delay a little bit
using namespace std::chrono_literals;
std::unique_lock<std::mutex> lk(m);
cv.wait_for(lk, 1ms);
return false;
}
if (!server_receive_callback_) {
logger_.error("Server receive callback is invalid");
return false;
}
// callback
auto maybe_response = server_receive_callback_(received_data, sender_info);
// send if callback returned data
if (!maybe_response.has_value()) {
return false;
}
auto response = maybe_response.value();
// sendto
logger_.info("Server responding to {} with message of length {}", sender_info, response.size());
auto sender_address = sender_info.ipv4_ptr();
int num_bytes_sent = sendto(socket_, response.data(), response.size(), 0,
(struct sockaddr *)sender_address, sizeof(*sender_address));
if (num_bytes_sent < 0) {
logger_.error("Error occurred responding: {} - '{}'", errno, strerror(errno));
}
logger_.info("Server responded with {} bytes", num_bytes_sent);
// don't want to stop the task
return false;
}
std::unique_ptr<Task> task_;
receive_callback_fn server_receive_callback_;
};
} // namespace espp