From 6c6389c74ece463179e9f279035d601e5fa86583 Mon Sep 17 00:00:00 2001 From: Dylan Finlay <129635550+DylanFinlay@users.noreply.github.com> Date: Sun, 25 Feb 2024 20:08:22 -0500 Subject: [PATCH] Integrate data merge worker (#166) * Data merge worker added in - logging not in main loop yet * Going back to original temporarily * Changes add to updated main branch * Updated queue names * Structure should be complete. Just need to adjust main loop * Main loop adjusted and print statments correctly grab object data * All PR comments accounted for --- config.yaml | 3 ++ main_2024.py | 81 +++++++++++++++++++++++++++++++--------------------- 2 files changed, 51 insertions(+), 33 deletions(-) diff --git a/config.yaml b/config.yaml index c5a441b9..11f54fd1 100644 --- a/config.yaml +++ b/config.yaml @@ -19,3 +19,6 @@ flight_interface: address: "tcp:127.0.0.1:14550" timeout: 10.0 # seconds worker_period: 0.1 # seconds + +data_merge: + timeout: 10.0 # seconds diff --git a/main_2024.py b/main_2024.py index 9dba7564..17d37306 100644 --- a/main_2024.py +++ b/main_2024.py @@ -15,6 +15,7 @@ from modules.detect_target import detect_target_worker from modules.flight_interface import flight_interface_worker from modules.video_input import video_input_worker +from modules.data_merge import data_merge_worker from utilities.workers import queue_proxy_wrapper from utilities.workers import worker_controller from utilities.workers import worker_manager @@ -76,6 +77,8 @@ def main() -> int: FLIGHT_INTERFACE_ADDRESS = config["flight_interface"]["address"] FLIGHT_INTERFACE_TIMEOUT = config["flight_interface"]["timeout"] FLIGHT_INTERFACE_WORKER_PERIOD = config["flight_interface"]["worker_period"] + + DATA_MERGE_TIMEOUT = config["data_merge"]["timeout"] except KeyError: print("Config key(s) not found") return -1 @@ -90,13 +93,17 @@ def main() -> int: mp_manager, QUEUE_MAX_SIZE, ) - detect_target_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper( + detect_target_to_data_merge_queue = queue_proxy_wrapper.QueueProxyWrapper( mp_manager, QUEUE_MAX_SIZE, ) - flight_interface_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper( + flight_interface_to_data_merge_queue = queue_proxy_wrapper.QueueProxyWrapper( mp_manager, - QUEUE_MAX_SIZE + QUEUE_MAX_SIZE, + ) + data_merge_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper( + mp_manager, + QUEUE_MAX_SIZE, ) video_input_manager = worker_manager.WorkerManager() @@ -123,7 +130,7 @@ def main() -> int: DETECT_TARGET_SHOW_ANNOTATED, DETECT_TARGET_SAVE_PREFIX, video_input_to_detect_target_queue, - detect_target_to_main_queue, + detect_target_to_data_merge_queue, controller, ), ) @@ -136,7 +143,20 @@ def main() -> int: FLIGHT_INTERFACE_ADDRESS, FLIGHT_INTERFACE_TIMEOUT, FLIGHT_INTERFACE_WORKER_PERIOD, - flight_interface_to_main_queue, + flight_interface_to_data_merge_queue, + controller, + ), + ) + + data_merge_manager = worker_manager.WorkerManager() + data_merge_manager.create_workers( + 1, + data_merge_worker.data_merge_worker, + ( + DATA_MERGE_TIMEOUT, + detect_target_to_data_merge_queue, + flight_interface_to_data_merge_queue, + data_merge_to_main_queue, controller, ), ) @@ -145,36 +165,29 @@ def main() -> int: video_input_manager.start_workers() detect_target_manager.start_workers() flight_interface_manager.start_workers() + data_merge_manager.start_workers() while True: try: - detections = detect_target_to_main_queue.queue.get_nowait() + merged_data = data_merge_to_main_queue.queue.get_nowait() except queue.Empty: - detections = None - - if detections is not None: - print("timestamp: " + str(detections.timestamp)) - print("detections: " + str(len(detections.detections))) - for detection in detections.detections: - print(" label: " + str(detection.label)) - print(" confidence: " + str(detection.confidence)) - print("") - - odometry_and_time_info: "odometry_and_time.OdometryAndTime | None" = \ - flight_interface_to_main_queue.queue.get() - - if odometry_and_time_info is not None: - timestamp = odometry_and_time_info.timestamp - position = odometry_and_time_info.odometry_data.position - orientation = odometry_and_time_info.odometry_data.orientation.orientation - - print("timestamp: " + str(timestamp)) - print("north: " + str(position.north)) - print("east: " + str(position.east)) - print("down: " + str(position.down)) - print("yaw: " + str(orientation.yaw)) - print("roll: " + str(orientation.roll)) - print("pitch: " + str(orientation.pitch)) + merged_data = None + + if merged_data is not None: + position = merged_data.odometry_local.position + orientation = merged_data.odometry_local.orientation.orientation + detections = merged_data.detections + + print("merged north: " + str(position.north)) + print("merged east: " + str(position.east)) + print("merged down: " + str(position.down)) + print("merged yaw: " + str(orientation.yaw)) + print("merged roll: " + str(orientation.roll)) + print("merged pitch: " + str(orientation.pitch)) + print("merged detections count: " + str(len(detections))) + for detection in detections: + print("merged label: " + str(detection.label)) + print("merged confidence: " + str(detection.confidence)) print("") if cv2.waitKey(1) == ord('q'): @@ -185,12 +198,14 @@ def main() -> int: controller.request_exit() video_input_to_detect_target_queue.fill_and_drain_queue() - detect_target_to_main_queue.fill_and_drain_queue() - flight_interface_to_main_queue.fill_and_drain_queue() + detect_target_to_data_merge_queue.fill_and_drain_queue() + flight_interface_to_data_merge_queue.fill_and_drain_queue() + data_merge_to_main_queue.fill_and_drain_queue() video_input_manager.join_workers() detect_target_manager.join_workers() flight_interface_manager.join_workers() + data_merge_manager.join_workers() cv2.destroyAllWindows()