diff --git a/src/ethernet/common/camOE_stream_profile.hh b/src/ethernet/common/camOE_stream_profile.hh new file mode 100644 index 0000000000..43272383cd --- /dev/null +++ b/src/ethernet/common/camOE_stream_profile.hh @@ -0,0 +1,35 @@ + +enum stream_type_id +{ + STREAM_DEPTH, + STREAM_COLOR, +}; + +struct profile_resolution +{ + int width; + int hight; +}; + +class camOE_stream_profile +{ + public: + camOE_stream_profile(stream_type_id id, profile_resolution res, int fps) + :m_stream_sensor(id), + m_profile_res(res), + m_stream_fps(fps){} + + //in case we want to have self-contained sdp parser + camOE_stream_profile(char* sdp_line); + + int width(){ return m_profile_res.width; } + int hight(){ return m_profile_res.hight; } + int fps(){ return m_stream_fps; } + stream_type_id stream_sensor(){ return m_stream_sensor; } + + private: + stream_type_id m_stream_sensor; + profile_resolution m_profile_res; + int m_stream_fps; + +}; \ No newline at end of file diff --git a/src/ethernet/common/stream_profile.hh b/src/ethernet/common/stream_profile.hh deleted file mode 100644 index 62664d1c9a..0000000000 --- a/src/ethernet/common/stream_profile.hh +++ /dev/null @@ -1,35 +0,0 @@ - -typedef enum stream_type_id -{ - STREAM_DEPTH, - STREAM_COLOR, -}; - -typedef struct stream_resolution -{ - int width; - int hight; -}; - -class stream_profile -{ - public: - stream_profile(stream_type_id id, stream_resolution res, int fps) - :m_stream_type(id), - m_stream_res(res), - m_stream_fps(fps){} - - //in case we want to have self-contained sdp parser - stream_profile(char* sdp_line); - - int width(){ return m_stream_res.width; } - int hight(){ return m_stream_res.hight; } - int fps(){ return m_stream_fps; } - stream_type_id stream_type(){ return m_stream_type; } - - private: - stream_type_id m_stream_type; - stream_resolution m_stream_res; - int m_stream_fps; - -}; \ No newline at end of file diff --git a/src/ethernet/ethernet-device.cpp b/src/ethernet/ethernet-device.cpp index 0d111d918d..d800c1593e 100644 --- a/src/ethernet/ethernet-device.cpp +++ b/src/ethernet/ethernet-device.cpp @@ -1,5 +1,4 @@ #include "ethernet-device.h" -#include "common/stream_profile.hh" int frame_number = 0; std::chrono::high_resolution_clock::time_point last; @@ -37,6 +36,7 @@ rs2::ethernet_device::ethernet_device(std::string url) : ethernet_device() rs2::ethernet_device::~ethernet_device() { + stop(); rs2_delete_device(dev); } @@ -58,18 +58,16 @@ std::vector rs2::ethernet_device::ethernet_device::query_sensors() } -rs2_intrinsics rs2::ethernet_device::get_intrinsics() +rs2_intrinsics rs2::ethernet_device::get_stream_sensor_intrinsics(camOE_stream_profile stream) { - rs2_intrinsics intrinsics = { W, H, - (float)W / 2, (float)H / 2, - (float)W / 2, (float)H / 2, - RS2_DISTORTION_BROWN_CONRADY ,{ 0,0,0,0,0 } }; - - return intrinsics; + //todo: get intrinsics from rtsp + if (stream.stream_sensor()==0) + return { W, H,(float)W / 2, (float)H / 2,(float)W / 2, (float)H / 2,RS2_DISTORTION_BROWN_CONRADY ,{ 0,0,0,0,0 } }; + return { W, H ,0, 0, 0, 0, RS2_DISTORTION_BROWN_CONRADY, { 0,0,0,0,0 } }; } rs2_software_video_frame& rs2::ethernet_device::get_frame() { - return depth_frame; + return last_frame[0]; } rs2_device* rs2::ethernet_device::get_device() { @@ -78,103 +76,103 @@ rs2_device* rs2::ethernet_device::get_device() { void rs2::ethernet_device::add_frame_to_queue(int type, Frame* raw_frame) { - if(QUEUE_MAX_SIZE>this->depth_frames.size()) + if(QUEUE_MAX_SIZE>this->frame_queues[type].size()) { std::mutex m_mtx; const std::lock_guard lock(m_mtx); - if(type==0) - this->depth_frames.push(raw_frame); - else - { - this->color_frames.push(raw_frame); - } + //TODO: map between type and stream queue + this->frame_queues[type].push(raw_frame); frame_number++; } else { - std::cout<< "queue is full. dropping frame" << std::endl; + std::cout<< "queue is full. dropping frame of type " << type << std::endl; + } +} + + +rs2_video_stream rs2::ethernet_device::rtsp_stream_to_rs_video_stream(camOE_stream_profile profile) +{ + static int streams_count = 0; + + rs2_stream type; + rs2_format format; + rs2_intrinsics intrinsics; + int BPP = 2; + intrinsics = get_stream_sensor_intrinsics(profile); + if (profile.stream_sensor()==0) + { + type = RS2_STREAM_DEPTH; + format = RS2_FORMAT_Z16; + } + else + { + type = RS2_STREAM_COLOR; + format = RS2_FORMAT_YUYV; } + + rs2_video_stream st = {type,0,streams_count,profile.width(),profile.hight(),profile.fps(),BPP,format,intrinsics}; + streams_count++; + + return st; } void rs2::ethernet_device::inject_frames_to_sw_device() { + //todo: replace with input from rtsp client + std::queue streams; + streams.push(camOE_stream_profile(stream_type_id::STREAM_DEPTH,{640,480},30)); + streams.push(camOE_stream_profile(stream_type_id::STREAM_COLOR,{640,480},30)); - /* - TODO: replace the pre-loop code with tream generation according to profile_list - something like: + inject_threads = new std::thread[streams.size()]; - for each stream in streams_list: - rs2_video_stream new_video_stream = { stream.stream_id, 0, 1, stream.W,stream.H, stream.fps, BPP, RS2_FORMAT_YUYV/Z16, stream.intrinsics }; - new_sw_sensor = rs2_software_device_add_sensor(dev, stream.desciption + "(remote)", NULL); - rs2_software_sensor_add_video_stream(new_sensor, st2, NULL); - */ + for (size_t i = 0; i <= streams.size(); i++) + { + rs2_video_stream st = rtsp_stream_to_rs_video_stream(streams.front()); + + if (streams.front().stream_sensor()==0) + sensors[i] = rs2_software_device_add_sensor(dev, "Depth (Remote)", NULL); + else + sensors[i] = rs2_software_device_add_sensor(dev, "Color (Remote)", NULL); - rs2_intrinsics depth_intrinsics = get_intrinsics(); - depth_sensor = rs2_software_device_add_sensor(dev, "Depth (Remote)", NULL); - - rs2_video_stream st = { RS2_STREAM_DEPTH, 0, 1, W, - H, 30, BPP, - RS2_FORMAT_Z16, depth_intrinsics }; - depth_stream = rs2_software_sensor_add_video_stream(depth_sensor, st, NULL); - - depth_frame.bpp = BPP; - depth_frame.profile = depth_stream; - depth_frame.stride = BPP * W; - pixels.resize(depth_frame.stride * H, 0); - depth_frame.pixels = pixels.data(); - depth_frame.deleter = ðernet_device_deleter; - - //color - - rs2_intrinsics color_intrinsics = { W,H, - 0, 0, - 0, 0, - RS2_DISTORTION_BROWN_CONRADY ,{ 0,0,0,0,0 } }; - - rs2_video_stream st2 = { RS2_STREAM_COLOR, 0, 2, W, - H, 30, BPP, - RS2_FORMAT_YUYV, color_intrinsics }; - - color_sensor = rs2_software_device_add_sensor(dev, "Color (Remote)", NULL); - color_stream = rs2_software_sensor_add_video_stream(color_sensor, st2, NULL); - - color_frame.bpp = BPP; - color_frame.profile = color_stream; - color_frame.stride = BPP * W; - pixels.resize(color_frame.stride * H, 0); - color_frame.pixels = pixels.data(); - color_frame.deleter = ðernet_device_deleter; - - while (is_active) + profiles[i] = rs2_software_sensor_add_video_stream(sensors[i], st, NULL); + + last_frame[i].bpp = st.bpp; + last_frame[i].profile = profiles[i]; + last_frame[i].stride = st.bpp * st.width; + pixels_buff[i].resize(last_frame[i].stride * st.height, 0); + last_frame[i].pixels = pixels_buff[i].data(); + last_frame[i].deleter = ðernet_device_deleter; + + + inject_threads[i] = std::thread(&rs2::ethernet_device::pull_from_queue,this,streams.front().stream_sensor()); + + streams.pop(); + } +} + +void rs2::ethernet_device::pull_from_queue(int stream_index) +{ + while(is_active) + { + if (frame_queues[stream_index].empty()) { + /*no data at quque*/; + } else { + const std::lock_guard lock(mtx); + Frame* frame = frame_queues[stream_index].front(); + frame_queues[stream_index].pop(); + memcpy(last_frame[stream_index].pixels, frame->m_buffer, frame->m_size); + // delete frame; + last_frame[stream_index].timestamp = frame->m_timestamp.tv_sec; + last_frame[stream_index].frame_number++; + rs2_software_sensor_on_video_frame(sensors[stream_index], last_frame[stream_index], NULL); + } + } + while(!frame_queues[stream_index].empty()) { - if (depth_frames.empty()) { - //do nothing - } else { - const std::lock_guard lock(mtx); - Frame* frame = depth_frames.front(); - depth_frames.pop(); - memcpy(depth_frame.pixels, frame->m_buffer, frame->m_size); - // delete frame; - depth_frame.timestamp = frame->m_timestamp.tv_sec; - depth_frame.frame_number++; - rs2_software_sensor_on_video_frame(depth_sensor, depth_frame, NULL); - } - - if (color_frames.empty()) { - ;//do nothing - } else { - const std::lock_guard lock(mtx2); - Frame* frame = color_frames.front(); - color_frames.pop(); - memcpy(color_frame.pixels, frame->m_buffer, frame->m_size); - // delete frame; - color_frame.timestamp = frame->m_timestamp.tv_sec; - color_frame.frame_number++; - //color_sensor.on_video_frame(color_frame); - rs2_software_sensor_on_video_frame(color_sensor, color_frame, NULL); - } + frame_queues[stream_index].pop(); } - + std::cout<<"pulling data at stream index " << stream_index <<" is done\n"; } void rs2::ethernet_device::incomming_server_frames_handler() @@ -189,13 +187,9 @@ void rs2::ethernet_device::incomming_server_frames_handler() std::string url = "rtsp://" + ip_address + "/unicast"; //"rtsp://10.12.144.74:8554/unicast"; RS_RTSPFrameCallback rs_cb(this, output); rs_cb.id=0; - //RS_RTSPFrameCallback rs_cb_color(this, output); - //rs_cb_color.id=1; RTSPConnection rtsp_client = RTSPConnection(env, &rs_cb, url.c_str(), timeout, rtptransport); - //RTSPConnection rtsp_client_color = RTSPConnection(env, &rs_cb_color, (url+"2").c_str(), timeout, rtptransport); - signal(SIGINT, sig_handler); std::cout << "Start mainloop" << std::endl; env.mainloop(); @@ -206,13 +200,10 @@ void rs2::ethernet_device::start() { return; is_active = true; - t = std::thread(&rs2::ethernet_device::incomming_server_frames_handler,this); - t2 = std::thread(&rs2::ethernet_device::inject_frames_to_sw_device,this); + incomming_frames_thread = std::thread(&rs2::ethernet_device::incomming_server_frames_handler,this); + inject_frames_to_sw_device(); } void rs2::ethernet_device::stop() { - if (!is_active) - return; is_active = false; - t.join(); - t2.join(); + incomming_frames_thread.join(); } diff --git a/src/ethernet/ethernet-device.h b/src/ethernet/ethernet-device.h index 0222c14158..84d72d942f 100644 --- a/src/ethernet/ethernet-device.h +++ b/src/ethernet/ethernet-device.h @@ -12,6 +12,7 @@ //RTSP_CLIENT +#include "common/camOE_stream_profile.hh" #include "rtsp_client/environment.h" #include "rtsp_client/rtspconnectionclient.h" @@ -34,6 +35,8 @@ #include #endif +#define MAX_ACTIVE_STREAMS_NUMBER 4 + namespace rs2 { class software_sensor; @@ -75,8 +78,8 @@ namespace rs2 #ifdef _WIN32 __declspec(dllexport) #endif - rs2_intrinsics get_intrinsics(); - + rs2_intrinsics get_stream_sensor_intrinsics(camOE_stream_profile stream); + #ifdef _WIN32 __declspec(dllexport) #endif @@ -87,19 +90,22 @@ namespace rs2 #endif rs2_device* get_device(); + //todo: make it private - once rtp client will be ready void add_frame_to_queue(int type,Frame* frame); private: + rs2_video_stream rtsp_stream_to_rs_video_stream(camOE_stream_profile rtsp_stream); + void incomming_server_frames_handler(); void inject_frames_to_sw_device(); - rs2_software_video_frame& get_frame(); + void pull_from_queue(int stream_id); + rs2_software_video_frame& get_frame(); - std::queue depth_frames; - std::queue color_frames; + std::queue frame_queues[MAX_ACTIVE_STREAMS_NUMBER]; unsigned int frame_queue_max_size = 30; @@ -108,21 +114,19 @@ namespace rs2 std::string ip_address; - std::vector pixels; - std::thread t,t2; + std::thread incomming_frames_thread; + std::thread* inject_threads; + std::mutex mtx,mtx2; - rs2_stream_profile* depth_stream; - rs2_stream_profile* color_stream; - rs2_software_video_frame depth_frame; - rs2_software_video_frame color_frame; - rs2_sensor* depth_sensor; - rs2_sensor* color_sensor; - - //software_sensor* color_sensor; + + rs2_sensor* sensors[MAX_ACTIVE_STREAMS_NUMBER]; + rs2_stream_profile* profiles[MAX_ACTIVE_STREAMS_NUMBER]; + rs2_device* dev; // Create software-only device Environment* env; - + rs2_software_video_frame last_frame[2]; + std::vector pixels_buff[2]; }; class RS_RTSPFrameCallback: public RTSPCallback