Skip to content

Commit

Permalink
Integrate data merge worker (#166)
Browse files Browse the repository at this point in the history
* Data merge worker added in - logging not in main loop yet

* Going back to original temporarily

* Changes add to updated main branch

* Updated queue names

* Structure should be complete. Just need to adjust main loop

* Main loop adjusted and print statments correctly grab object data

* All PR comments accounted for
  • Loading branch information
DylanFinlay authored Feb 26, 2024
1 parent 5783995 commit 6c6389c
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 33 deletions.
3 changes: 3 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ flight_interface:
address: "tcp:127.0.0.1:14550"
timeout: 10.0 # seconds
worker_period: 0.1 # seconds

data_merge:
timeout: 10.0 # seconds
81 changes: 48 additions & 33 deletions main_2024.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from modules.detect_target import detect_target_worker
from modules.flight_interface import flight_interface_worker
from modules.video_input import video_input_worker
from modules.data_merge import data_merge_worker
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from utilities.workers import worker_manager
Expand Down Expand Up @@ -76,6 +77,8 @@ def main() -> int:
FLIGHT_INTERFACE_ADDRESS = config["flight_interface"]["address"]
FLIGHT_INTERFACE_TIMEOUT = config["flight_interface"]["timeout"]
FLIGHT_INTERFACE_WORKER_PERIOD = config["flight_interface"]["worker_period"]

DATA_MERGE_TIMEOUT = config["data_merge"]["timeout"]
except KeyError:
print("Config key(s) not found")
return -1
Expand All @@ -90,13 +93,17 @@ def main() -> int:
mp_manager,
QUEUE_MAX_SIZE,
)
detect_target_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper(
detect_target_to_data_merge_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
QUEUE_MAX_SIZE,
)
flight_interface_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper(
flight_interface_to_data_merge_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
QUEUE_MAX_SIZE
QUEUE_MAX_SIZE,
)
data_merge_to_main_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
QUEUE_MAX_SIZE,
)

video_input_manager = worker_manager.WorkerManager()
Expand All @@ -123,7 +130,7 @@ def main() -> int:
DETECT_TARGET_SHOW_ANNOTATED,
DETECT_TARGET_SAVE_PREFIX,
video_input_to_detect_target_queue,
detect_target_to_main_queue,
detect_target_to_data_merge_queue,
controller,
),
)
Expand All @@ -136,7 +143,20 @@ def main() -> int:
FLIGHT_INTERFACE_ADDRESS,
FLIGHT_INTERFACE_TIMEOUT,
FLIGHT_INTERFACE_WORKER_PERIOD,
flight_interface_to_main_queue,
flight_interface_to_data_merge_queue,
controller,
),
)

data_merge_manager = worker_manager.WorkerManager()
data_merge_manager.create_workers(
1,
data_merge_worker.data_merge_worker,
(
DATA_MERGE_TIMEOUT,
detect_target_to_data_merge_queue,
flight_interface_to_data_merge_queue,
data_merge_to_main_queue,
controller,
),
)
Expand All @@ -145,36 +165,29 @@ def main() -> int:
video_input_manager.start_workers()
detect_target_manager.start_workers()
flight_interface_manager.start_workers()
data_merge_manager.start_workers()

while True:
try:
detections = detect_target_to_main_queue.queue.get_nowait()
merged_data = data_merge_to_main_queue.queue.get_nowait()
except queue.Empty:
detections = None

if detections is not None:
print("timestamp: " + str(detections.timestamp))
print("detections: " + str(len(detections.detections)))
for detection in detections.detections:
print(" label: " + str(detection.label))
print(" confidence: " + str(detection.confidence))
print("")

odometry_and_time_info: "odometry_and_time.OdometryAndTime | None" = \
flight_interface_to_main_queue.queue.get()

if odometry_and_time_info is not None:
timestamp = odometry_and_time_info.timestamp
position = odometry_and_time_info.odometry_data.position
orientation = odometry_and_time_info.odometry_data.orientation.orientation

print("timestamp: " + str(timestamp))
print("north: " + str(position.north))
print("east: " + str(position.east))
print("down: " + str(position.down))
print("yaw: " + str(orientation.yaw))
print("roll: " + str(orientation.roll))
print("pitch: " + str(orientation.pitch))
merged_data = None

if merged_data is not None:
position = merged_data.odometry_local.position
orientation = merged_data.odometry_local.orientation.orientation
detections = merged_data.detections

print("merged north: " + str(position.north))
print("merged east: " + str(position.east))
print("merged down: " + str(position.down))
print("merged yaw: " + str(orientation.yaw))
print("merged roll: " + str(orientation.roll))
print("merged pitch: " + str(orientation.pitch))
print("merged detections count: " + str(len(detections)))
for detection in detections:
print("merged label: " + str(detection.label))
print("merged confidence: " + str(detection.confidence))
print("")

if cv2.waitKey(1) == ord('q'):
Expand All @@ -185,12 +198,14 @@ def main() -> int:
controller.request_exit()

video_input_to_detect_target_queue.fill_and_drain_queue()
detect_target_to_main_queue.fill_and_drain_queue()
flight_interface_to_main_queue.fill_and_drain_queue()
detect_target_to_data_merge_queue.fill_and_drain_queue()
flight_interface_to_data_merge_queue.fill_and_drain_queue()
data_merge_to_main_queue.fill_and_drain_queue()

video_input_manager.join_workers()
detect_target_manager.join_workers()
flight_interface_manager.join_workers()
data_merge_manager.join_workers()

cv2.destroyAllWindows()

Expand Down

0 comments on commit 6c6389c

Please sign in to comment.